Smoother handling of back-pressure

The Penciller had two problems in previous commits:
- If it had a push_mem soon after a L0 file had been created, the
push_mem would stall waiting for the L0 file to complete - and this
count take 100-200ms
- The penciller's clerk favoured L0 work, but was lazy about asking for
other work in-between, so often the L1 layer was bursting over capacity
and the clerk was doing nothing but merging more L0 files in (with those
merges getting more and more expensive as they had to cover more and
more files)

There are some partial resolutions to this.  There is now an aggressive
timeout when checking whther the L0 file is ready on a push_mem, and if
the timeout is breached the error is caught and a 'returned' message
goes back to the Bookie.  the Bookie doesn't now empty its cache, it
carrie son filling it, but on some probability it will keep trying to
push_mem on future pushes.  This increases Jitter around the expensive
operation and split out the L0 delay into defined chunks.

The penciller's clerk is now more aggressive in asking for work.  There
is also some simplification of the relationship between clerk timeouts
and penciller back-pressure.

Also resolved is an issue of inconcistency between the loader and the on
startup (replaying the transaction log) and the standard push_mem
process.  The loader was not correctly de-duplicating by adding first
(in order) to a tree before outputting the list from the tree.

Some thought will be given later as to whether non-L0 work can be safely
prioritised if the merge process still keeps getting behind.
This commit is contained in:
martinsumner 2016-10-20 02:23:45 +01:00
parent 7319b8f415
commit cf66431c8e
5 changed files with 142 additions and 97 deletions

View file

@ -156,6 +156,7 @@
-define(SHUTDOWN_WAITS, 60).
-define(SHUTDOWN_PAUSE, 10000).
-define(SNAPSHOT_TIMEOUT, 300000).
-define(JITTER_PROBABILITY, 0.1).
-record(state, {inker :: pid(),
penciller :: pid(),
@ -582,23 +583,41 @@ addto_ledgercache(Changes, Cache) ->
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
CacheSize = gb_trees:size(Cache),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
if
CacheSize > MaxCacheSize ->
case leveled_penciller:pcl_pushmem(Penciller,
gb_trees:to_list(Cache)) of
TimeToPush ->
Dump = gb_trees:to_list(Cache),
case leveled_penciller:pcl_pushmem(Penciller, Dump) of
ok ->
{ok, gb_trees:empty()};
pause ->
{pause, gb_trees:empty()};
refused ->
returned ->
{ok, Cache}
end;
true ->
{ok, Cache}
end.
maybe_withjitter(CacheSize, MaxCacheSize) ->
if
CacheSize > 2 * MaxCacheSize ->
true;
CacheSize > MaxCacheSize ->
R = random:uniform(),
if
R < ?JITTER_PROBABILITY ->
true;
true ->
false
end;
true ->
false
end.
load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
{MinSQN, MaxSQN, Output} = Acc0,
{MinSQN, MaxSQN, OutputTree} = Acc0,
{SQN, PK} = KeyInLedger,
% VBin may already be a term
{VBin, VSize} = ExtractFun(ValueInLedger),
@ -613,11 +632,11 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
{loop, Acc0};
SQN when SQN < MaxSQN ->
Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs),
{loop, {MinSQN, MaxSQN, Output ++ Changes}};
{loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}};
MaxSQN ->
io:format("Reached end of load batch with SQN ~w~n", [SQN]),
Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs),
{stop, {MinSQN, MaxSQN, Output ++ Changes}};
{stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}};
SQN when SQN > MaxSQN ->
io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n",
[MaxSQN, SQN]),

View file

@ -609,7 +609,7 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) ->
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
CDBpid, StartPos, FN, Rest) ->
io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]),
InitAcc = {MinSQN, MaxSQN, []},
InitAcc = {MinSQN, MaxSQN, gb_trees:empty()},
Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, AccKL}} ->
ok = push_to_penciller(Penciller, AccKL),
@ -633,12 +633,18 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
ok
end.
push_to_penciller(Penciller, KeyList) ->
push_to_penciller(Penciller, KeyTree) ->
% The push to penciller must start as a tree to correctly de-duplicate
% the list by order before becoming a de-duplicated list for loading
KeyList = gb_trees:to_list(KeyTree),
R = leveled_penciller:pcl_pushmem(Penciller, KeyList),
if
R == pause ->
timer:sleep(?LOADING_PAUSE);
true ->
R == returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller(Penciller, KeyTree);
R == ok ->
ok
end.

View file

@ -67,9 +67,8 @@
-include_lib("eunit/include/eunit.hrl").
-define(INACTIVITY_TIMEOUT, 5000).
-define(QUICK_TIMEOUT, 500).
-define(HAPPYTIME_MULTIPLIER, 2).
-define(MAX_TIMEOUT, 2000).
-define(MIN_TIMEOUT, 200).
-record(state, {owner :: pid(),
change_pending=false :: boolean(),
@ -98,7 +97,7 @@ init([]) ->
{ok, #state{}}.
handle_call({register, Owner}, _From, State) ->
{reply, ok, State#state{owner=Owner}, ?INACTIVITY_TIMEOUT};
{reply, ok, State#state{owner=Owner}, ?MIN_TIMEOUT};
handle_call({manifest_change, return, true}, _From, State) ->
io:format("Request for manifest change from clerk on closing~n"),
case State#state.change_pending of
@ -124,12 +123,11 @@ handle_call({manifest_change, confirm, Closing}, From, State) ->
State#state.owner),
{noreply,
State#state{work_item=null, change_pending=false},
?QUICK_TIMEOUT}
?MIN_TIMEOUT}
end.
handle_cast(prompt, State) ->
io:format("Clerk reducing timeout due to prompt~n"),
{noreply, State, ?QUICK_TIMEOUT}.
{noreply, State, ?MIN_TIMEOUT}.
handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false ->
case requestandhandle_work(State) of
@ -155,15 +153,10 @@ code_change(_OldVsn, State, _Extra) ->
requestandhandle_work(State) ->
case leveled_penciller:pcl_workforclerk(State#state.owner) of
{none, Backlog} ->
none ->
io:format("Work prompted but none needed~n"),
case Backlog of
false ->
{false, ?INACTIVITY_TIMEOUT * ?HAPPYTIME_MULTIPLIER};
_ ->
{false, ?INACTIVITY_TIMEOUT}
end;
{WI, _} ->
{false, ?MAX_TIMEOUT};
WI ->
{NewManifest, FilesToDelete} = merge(WI),
UpdWI = WI#penciller_work{new_manifest=NewManifest,
unreferenced_files=FilesToDelete},

View file

@ -306,7 +306,6 @@
memtable_copy = #l0snapshot{} :: #l0snapshot{},
levelzero_snapshot = gb_trees:empty() :: gb_trees:tree(),
memtable,
backlog = false :: boolean(),
memtable_maxsize :: integer(),
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
@ -412,53 +411,20 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap})
case assess_sqn(DumpList) of
{MinSQN, MaxSQN} when MaxSQN >= MinSQN,
MinSQN >= State#state.ledger_sqn ->
MaxTableSize = State#state.memtable_maxsize,
{TableSize0, State1} = checkready_pushtomem(State),
case quickcheck_pushtomem(DumpList,
case checkready_pushtomem(State) of
{ok, TableSize0, State1} ->
push_and_roll(DumpList,
TableSize0,
MaxTableSize) of
{twist, TableSize1} ->
gen_server:reply(From, ok),
io:format("Reply made on push in ~w microseconds~n",
State#state.memtable_maxsize,
MaxSQN,
StartWatch,
From,
State1);
timeout ->
io:format("Timeout of ~w microseconds awaiting " ++
"L0 SFT write~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
L0Snap = do_pushtomem(DumpList,
State1#state.memtable,
State1#state.memtable_copy,
MaxSQN),
io:format("Push completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
{noreply,
State1#state{memtable_copy=L0Snap,
table_size=TableSize1,
ledger_sqn=MaxSQN}};
{maybe_roll, TableSize1} ->
L0Snap = do_pushtomem(DumpList,
State1#state.memtable,
State1#state.memtable_copy,
MaxSQN),
case roll_memory(State1, MaxTableSize, L0Snap) of
{ok, L0Pend, ManSN, TableSize2} ->
io:format("Push completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
{reply,
ok,
State1#state{levelzero_pending=L0Pend,
table_size=TableSize2,
manifest_sqn=ManSN,
memtable_copy=L0Snap,
ledger_sqn=MaxSQN,
backlog=false}};
{pause, Reason, Details} ->
io:format("Excess work due to - " ++ Reason,
Details),
{reply,
pause,
State1#state{backlog=true,
memtable_copy=L0Snap,
table_size=TableSize1,
ledger_sqn=MaxSQN}}
end
{reply, returned, State}
end;
{MinSQN, MaxSQN} ->
io:format("Mismatch of sequence number expectations with push "
@ -520,7 +486,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc},
{reply, Acc, State};
handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, {Work, UpdState#state.backlog}, UpdState};
{reply, Work, UpdState};
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.ledger_sqn, State};
handle_call({register_snapshot, Snapshot}, _From, State) ->
@ -568,7 +534,8 @@ handle_cast({release_snapshot, Snapshot}, State) ->
io:format("Penciller snapshot ~w released~n", [Snapshot]),
{noreply, State#state{registered_snapshots=Rs}}.
handle_info(_Info, State) ->
handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) ->
io:format("Orphaned reply after timeout on L0 file write ~s~n", [SrcFN]),
{noreply, State}.
terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true ->
@ -749,28 +716,87 @@ start_from_file(PCLopts) ->
checkready_pushtomem(State) ->
{TableSize, UpdState} = case State#state.levelzero_pending of
case State#state.levelzero_pending of
{true, Pid, _TS} ->
% N.B. Sync call - so will be ready
{ok, SrcFN, StartKey, EndKey} = leveled_sft:sft_checkready(Pid),
case checkready(Pid) of
timeout ->
timeout;
{ok, SrcFN, StartKey, EndKey} ->
true = ets:delete_all_objects(State#state.memtable),
ManifestEntry = #manifest_entry{start_key=StartKey,
end_key=EndKey,
owner=Pid,
filename=SrcFN},
% Prompt clerk to ask about work - do this for every L0 roll
% Prompt clerk to ask about work - do this for every
% L0 roll
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
{0,
State#state{manifest=lists:keystore(0,
UpdManifest = lists:keystore(0,
1,
State#state.manifest,
{0, [ManifestEntry]}),
{ok,
0,
State#state{manifest=UpdManifest,
levelzero_pending=?L0PEND_RESET,
memtable_copy=#l0snapshot{}}};
memtable_copy=#l0snapshot{}}}
end;
?L0PEND_RESET ->
{State#state.table_size, State}
end,
{TableSize, UpdState}.
{ok, State#state.table_size, State}
end.
checkready(Pid) ->
try
leveled_sft:sft_checkready(Pid)
catch
exit:{timeout, _} ->
timeout
end.
push_and_roll(DumpList, TableSize, MaxTableSize, MaxSQN, StartWatch, From, State) ->
case quickcheck_pushtomem(DumpList, TableSize, MaxTableSize) of
{twist, TableSize1} ->
gen_server:reply(From, ok),
io:format("Reply made on push in ~w microseconds~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
L0Snap = do_pushtomem(DumpList,
State#state.memtable,
State#state.memtable_copy,
MaxSQN),
io:format("Push completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
{noreply,
State#state{memtable_copy=L0Snap,
table_size=TableSize1,
ledger_sqn=MaxSQN}};
{maybe_roll, TableSize1} ->
L0Snap = do_pushtomem(DumpList,
State#state.memtable,
State#state.memtable_copy,
MaxSQN),
case roll_memory(State, MaxTableSize, L0Snap) of
{ok, L0Pend, ManSN, TableSize2} ->
io:format("Push completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(), StartWatch)]),
{reply,
ok,
State#state{levelzero_pending=L0Pend,
table_size=TableSize2,
manifest_sqn=ManSN,
memtable_copy=L0Snap,
ledger_sqn=MaxSQN}};
{pause, Reason, Details} ->
io:format("Excess work due to - " ++ Reason,
Details),
{reply,
pause,
State#state{memtable_copy=L0Snap,
table_size=TableSize1,
ledger_sqn=MaxSQN}}
end
end.
quickcheck_pushtomem(DumpList, TableSize, MaxSize) ->
case TableSize + length(DumpList) of
@ -894,9 +920,10 @@ return_work(State, From) ->
case length(WorkQueue) of
L when L > 0 ->
[{SrcLevel, Manifest}|OtherWork] = WorkQueue,
Backlog = length(OtherWork),
io:format("Work at Level ~w to be scheduled for ~w with ~w " ++
"queue items outstanding~n",
[SrcLevel, From, length(OtherWork)]),
[SrcLevel, From, Backlog]),
case element(1, State#state.levelzero_pending) of
true ->
% Once the L0 file is completed there will be more work
@ -1557,7 +1584,7 @@ simple_server_test() ->
"Key0004",
null},
3002)),
% Add some more keys and confirm that chekc sequence number still
% Add some more keys and confirm that check sequence number still
% sees the old version in the previous snapshot, but will see the new version
% in a new snapshot
Key1A = {{o,"Bucket0001", "Key0001", null}, {4002, {active, infinity}, null}},

View file

@ -252,7 +252,7 @@ sft_close(Pid) ->
gen_server:call(Pid, close, infinity).
sft_checkready(Pid) ->
gen_server:call(Pid, background_complete, infinity).
gen_server:call(Pid, background_complete, 50).
sft_getmaxsequencenumber(Pid) ->
gen_server:call(Pid, get_maxsqn, infinity).