Add back-pressure on work queue limit
Previously under heavy load, as long as L0 was being cleared, the ledger woud keep accapting. Now there is a formla limit on how far behind the work queue (of compactions required at other levels) before the break is applied on new updates coming in).
This commit is contained in:
parent
4556573a5c
commit
7f456fa993
1 changed files with 13 additions and 7 deletions
|
@ -252,6 +252,7 @@
|
||||||
-define(MEMTABLE, mem).
|
-define(MEMTABLE, mem).
|
||||||
-define(MAX_TABLESIZE, 32000).
|
-define(MAX_TABLESIZE, 32000).
|
||||||
-define(PROMPT_WAIT_ONL0, 5).
|
-define(PROMPT_WAIT_ONL0, 5).
|
||||||
|
-define(WORKQUEUE_BACKLOG_TOLERANCE, 4).
|
||||||
|
|
||||||
|
|
||||||
-record(state, {manifest = [] :: list(),
|
-record(state, {manifest = [] :: list(),
|
||||||
|
@ -275,7 +276,8 @@
|
||||||
snapshot_fully_loaded = false :: boolean(),
|
snapshot_fully_loaded = false :: boolean(),
|
||||||
source_penciller :: pid(),
|
source_penciller :: pid(),
|
||||||
|
|
||||||
ongoing_work = [] :: list()}).
|
ongoing_work = [] :: list(),
|
||||||
|
work_backlog = false :: boolean()}).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -376,11 +378,14 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap})
|
||||||
%
|
%
|
||||||
% Check the approximate size of the cache. If it is over the maximum size,
|
% Check the approximate size of the cache. If it is over the maximum size,
|
||||||
% trigger a backgroun L0 file write and update state of levelzero_pending.
|
% trigger a backgroun L0 file write and update state of levelzero_pending.
|
||||||
case State#state.levelzero_pending of
|
case {State#state.levelzero_pending, State#state.work_backlog} of
|
||||||
true ->
|
{true, _} ->
|
||||||
leveled_log:log("P0018", [returned, "L-0 persist pending"]),
|
leveled_log:log("P0018", [returned, "L-0 persist pending"]),
|
||||||
{reply, returned, State};
|
{reply, returned, State};
|
||||||
false ->
|
{false, true} ->
|
||||||
|
leveled_log:log("P0018", [returned, "Merge tree work backlog"]),
|
||||||
|
{reply, returned, State};
|
||||||
|
{false, false} ->
|
||||||
leveled_log:log("P0018", [ok, "L0 memory updated"]),
|
leveled_log:log("P0018", [ok, "L0 memory updated"]),
|
||||||
gen_server:reply(From, ok),
|
gen_server:reply(From, ok),
|
||||||
{noreply, update_levelzero(State#state.levelzero_index,
|
{noreply, update_levelzero(State#state.levelzero_index,
|
||||||
|
@ -791,12 +796,13 @@ return_work(State, From) ->
|
||||||
true ->
|
true ->
|
||||||
false
|
false
|
||||||
end,
|
end,
|
||||||
|
Backlog = L >= ?WORKQUEUE_BACKLOG_TOLERANCE,
|
||||||
case State#state.levelzero_pending of
|
case State#state.levelzero_pending of
|
||||||
true ->
|
true ->
|
||||||
% Once the L0 file is completed there will be more work
|
% Once the L0 file is completed there will be more work
|
||||||
% - so don't be busy doing other work now
|
% - so don't be busy doing other work now
|
||||||
leveled_log:log("P0021", []),
|
leveled_log:log("P0021", []),
|
||||||
{State, none};
|
{State#state{work_backlog=Backlog}, none};
|
||||||
false ->
|
false ->
|
||||||
%% No work currently outstanding
|
%% No work currently outstanding
|
||||||
%% Can allocate work
|
%% Can allocate work
|
||||||
|
@ -815,10 +821,10 @@ return_work(State, From) ->
|
||||||
ledger_filepath = FP,
|
ledger_filepath = FP,
|
||||||
manifest_file = ManFile,
|
manifest_file = ManFile,
|
||||||
target_is_basement = IsBasement},
|
target_is_basement = IsBasement},
|
||||||
{State#state{ongoing_work=[WI]}, WI}
|
{State#state{ongoing_work=[WI], work_backlog=Backlog}, WI}
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
{State, none}
|
{State#state{work_backlog=false}, none}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue