From cf66431c8e18b9bb23116fd2ae58483dad029aa0 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Thu, 20 Oct 2016 02:23:45 +0100 Subject: [PATCH] Smoother handling of back-pressure The Penciller had two problems in previous commits: - If it had a push_mem soon after a L0 file had been created, the push_mem would stall waiting for the L0 file to complete - and this count take 100-200ms - The penciller's clerk favoured L0 work, but was lazy about asking for other work in-between, so often the L1 layer was bursting over capacity and the clerk was doing nothing but merging more L0 files in (with those merges getting more and more expensive as they had to cover more and more files) There are some partial resolutions to this. There is now an aggressive timeout when checking whther the L0 file is ready on a push_mem, and if the timeout is breached the error is caught and a 'returned' message goes back to the Bookie. the Bookie doesn't now empty its cache, it carrie son filling it, but on some probability it will keep trying to push_mem on future pushes. This increases Jitter around the expensive operation and split out the L0 delay into defined chunks. The penciller's clerk is now more aggressive in asking for work. There is also some simplification of the relationship between clerk timeouts and penciller back-pressure. Also resolved is an issue of inconcistency between the loader and the on startup (replaying the transaction log) and the standard push_mem process. The loader was not correctly de-duplicating by adding first (in order) to a tree before outputting the list from the tree. Some thought will be given later as to whether non-L0 work can be safely prioritised if the merge process still keeps getting behind. --- src/leveled_bookie.erl | 35 ++++++-- src/leveled_inker.erl | 12 ++- src/leveled_pclerk.erl | 23 ++---- src/leveled_penciller.erl | 167 ++++++++++++++++++++++---------------- src/leveled_sft.erl | 2 +- 5 files changed, 142 insertions(+), 97 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index f95b37c..9d61661 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -156,6 +156,7 @@ -define(SHUTDOWN_WAITS, 60). -define(SHUTDOWN_PAUSE, 10000). -define(SNAPSHOT_TIMEOUT, 300000). +-define(JITTER_PROBABILITY, 0.1). -record(state, {inker :: pid(), penciller :: pid(), @@ -582,23 +583,41 @@ addto_ledgercache(Changes, Cache) -> maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> CacheSize = gb_trees:size(Cache), + TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if - CacheSize > MaxCacheSize -> - case leveled_penciller:pcl_pushmem(Penciller, - gb_trees:to_list(Cache)) of + TimeToPush -> + Dump = gb_trees:to_list(Cache), + case leveled_penciller:pcl_pushmem(Penciller, Dump) of ok -> {ok, gb_trees:empty()}; pause -> {pause, gb_trees:empty()}; - refused -> + returned -> {ok, Cache} end; true -> - {ok, Cache} + {ok, Cache} + end. + + +maybe_withjitter(CacheSize, MaxCacheSize) -> + if + CacheSize > 2 * MaxCacheSize -> + true; + CacheSize > MaxCacheSize -> + R = random:uniform(), + if + R < ?JITTER_PROBABILITY -> + true; + true -> + false + end; + true -> + false end. load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> - {MinSQN, MaxSQN, Output} = Acc0, + {MinSQN, MaxSQN, OutputTree} = Acc0, {SQN, PK} = KeyInLedger, % VBin may already be a term {VBin, VSize} = ExtractFun(ValueInLedger), @@ -613,11 +632,11 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {loop, Acc0}; SQN when SQN < MaxSQN -> Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs), - {loop, {MinSQN, MaxSQN, Output ++ Changes}}; + {loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; MaxSQN -> io:format("Reached end of load batch with SQN ~w~n", [SQN]), Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs), - {stop, {MinSQN, MaxSQN, Output ++ Changes}}; + {stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; SQN when SQN > MaxSQN -> io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n", [MaxSQN, SQN]), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 5be590b..79a0cd8 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -609,7 +609,7 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos, FN, Rest) -> io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]), - InitAcc = {MinSQN, MaxSQN, []}, + InitAcc = {MinSQN, MaxSQN, gb_trees:empty()}, Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> ok = push_to_penciller(Penciller, AccKL), @@ -633,12 +633,18 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, ok end. -push_to_penciller(Penciller, KeyList) -> +push_to_penciller(Penciller, KeyTree) -> + % The push to penciller must start as a tree to correctly de-duplicate + % the list by order before becoming a de-duplicated list for loading + KeyList = gb_trees:to_list(KeyTree), R = leveled_penciller:pcl_pushmem(Penciller, KeyList), if R == pause -> timer:sleep(?LOADING_PAUSE); - true -> + R == returned -> + timer:sleep(?LOADING_PAUSE), + push_to_penciller(Penciller, KeyTree); + R == ok -> ok end. diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index cac5d6e..2a915ae 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -67,9 +67,8 @@ -include_lib("eunit/include/eunit.hrl"). --define(INACTIVITY_TIMEOUT, 5000). --define(QUICK_TIMEOUT, 500). --define(HAPPYTIME_MULTIPLIER, 2). +-define(MAX_TIMEOUT, 2000). +-define(MIN_TIMEOUT, 200). -record(state, {owner :: pid(), change_pending=false :: boolean(), @@ -98,7 +97,7 @@ init([]) -> {ok, #state{}}. handle_call({register, Owner}, _From, State) -> - {reply, ok, State#state{owner=Owner}, ?INACTIVITY_TIMEOUT}; + {reply, ok, State#state{owner=Owner}, ?MIN_TIMEOUT}; handle_call({manifest_change, return, true}, _From, State) -> io:format("Request for manifest change from clerk on closing~n"), case State#state.change_pending of @@ -124,12 +123,11 @@ handle_call({manifest_change, confirm, Closing}, From, State) -> State#state.owner), {noreply, State#state{work_item=null, change_pending=false}, - ?QUICK_TIMEOUT} + ?MIN_TIMEOUT} end. handle_cast(prompt, State) -> - io:format("Clerk reducing timeout due to prompt~n"), - {noreply, State, ?QUICK_TIMEOUT}. + {noreply, State, ?MIN_TIMEOUT}. handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false -> case requestandhandle_work(State) of @@ -155,15 +153,10 @@ code_change(_OldVsn, State, _Extra) -> requestandhandle_work(State) -> case leveled_penciller:pcl_workforclerk(State#state.owner) of - {none, Backlog} -> + none -> io:format("Work prompted but none needed~n"), - case Backlog of - false -> - {false, ?INACTIVITY_TIMEOUT * ?HAPPYTIME_MULTIPLIER}; - _ -> - {false, ?INACTIVITY_TIMEOUT} - end; - {WI, _} -> + {false, ?MAX_TIMEOUT}; + WI -> {NewManifest, FilesToDelete} = merge(WI), UpdWI = WI#penciller_work{new_manifest=NewManifest, unreferenced_files=FilesToDelete}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 0ea83c8..39c2626 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -306,7 +306,6 @@ memtable_copy = #l0snapshot{} :: #l0snapshot{}, levelzero_snapshot = gb_trees:empty() :: gb_trees:tree(), memtable, - backlog = false :: boolean(), memtable_maxsize :: integer(), is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), @@ -412,58 +411,25 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) case assess_sqn(DumpList) of {MinSQN, MaxSQN} when MaxSQN >= MinSQN, MinSQN >= State#state.ledger_sqn -> - MaxTableSize = State#state.memtable_maxsize, - {TableSize0, State1} = checkready_pushtomem(State), - case quickcheck_pushtomem(DumpList, - TableSize0, - MaxTableSize) of - {twist, TableSize1} -> - gen_server:reply(From, ok), - io:format("Reply made on push in ~w microseconds~n", + case checkready_pushtomem(State) of + {ok, TableSize0, State1} -> + push_and_roll(DumpList, + TableSize0, + State#state.memtable_maxsize, + MaxSQN, + StartWatch, + From, + State1); + timeout -> + io:format("Timeout of ~w microseconds awaiting " ++ + "L0 SFT write~n", [timer:now_diff(os:timestamp(), StartWatch)]), - L0Snap = do_pushtomem(DumpList, - State1#state.memtable, - State1#state.memtable_copy, - MaxSQN), - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {noreply, - State1#state{memtable_copy=L0Snap, - table_size=TableSize1, - ledger_sqn=MaxSQN}}; - {maybe_roll, TableSize1} -> - L0Snap = do_pushtomem(DumpList, - State1#state.memtable, - State1#state.memtable_copy, - MaxSQN), - - case roll_memory(State1, MaxTableSize, L0Snap) of - {ok, L0Pend, ManSN, TableSize2} -> - io:format("Push completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), StartWatch)]), - {reply, - ok, - State1#state{levelzero_pending=L0Pend, - table_size=TableSize2, - manifest_sqn=ManSN, - memtable_copy=L0Snap, - ledger_sqn=MaxSQN, - backlog=false}}; - {pause, Reason, Details} -> - io:format("Excess work due to - " ++ Reason, - Details), - {reply, - pause, - State1#state{backlog=true, - memtable_copy=L0Snap, - table_size=TableSize1, - ledger_sqn=MaxSQN}} - end + {reply, returned, State} end; {MinSQN, MaxSQN} -> io:format("Mismatch of sequence number expectations with push " - ++ "having sequence numbers between ~w and ~w " - ++ "but current sequence number is ~w~n", + ++ "having sequence numbers between ~w and ~w " + ++ "but current sequence number is ~w~n", [MinSQN, MaxSQN, State#state.ledger_sqn]), {reply, refused, State}; empty -> @@ -520,7 +486,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc}, {reply, Acc, State}; handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), - {reply, {Work, UpdState#state.backlog}, UpdState}; + {reply, Work, UpdState}; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.ledger_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> @@ -568,7 +534,8 @@ handle_cast({release_snapshot, Snapshot}, State) -> io:format("Penciller snapshot ~w released~n", [Snapshot]), {noreply, State#state{registered_snapshots=Rs}}. -handle_info(_Info, State) -> +handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) -> + io:format("Orphaned reply after timeout on L0 file write ~s~n", [SrcFN]), {noreply, State}. terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true -> @@ -749,28 +716,87 @@ start_from_file(PCLopts) -> checkready_pushtomem(State) -> - {TableSize, UpdState} = case State#state.levelzero_pending of + case State#state.levelzero_pending of {true, Pid, _TS} -> - % N.B. Sync call - so will be ready - {ok, SrcFN, StartKey, EndKey} = leveled_sft:sft_checkready(Pid), - true = ets:delete_all_objects(State#state.memtable), - ManifestEntry = #manifest_entry{start_key=StartKey, - end_key=EndKey, - owner=Pid, - filename=SrcFN}, - % Prompt clerk to ask about work - do this for every L0 roll - ok = leveled_pclerk:clerk_prompt(State#state.clerk), - {0, - State#state{manifest=lists:keystore(0, + case checkready(Pid) of + timeout -> + timeout; + {ok, SrcFN, StartKey, EndKey} -> + true = ets:delete_all_objects(State#state.memtable), + ManifestEntry = #manifest_entry{start_key=StartKey, + end_key=EndKey, + owner=Pid, + filename=SrcFN}, + % Prompt clerk to ask about work - do this for every + % L0 roll + ok = leveled_pclerk:clerk_prompt(State#state.clerk), + UpdManifest = lists:keystore(0, 1, State#state.manifest, {0, [ManifestEntry]}), - levelzero_pending=?L0PEND_RESET, - memtable_copy=#l0snapshot{}}}; + {ok, + 0, + State#state{manifest=UpdManifest, + levelzero_pending=?L0PEND_RESET, + memtable_copy=#l0snapshot{}}} + end; ?L0PEND_RESET -> - {State#state.table_size, State} - end, - {TableSize, UpdState}. + {ok, State#state.table_size, State} + end. + + +checkready(Pid) -> + try + leveled_sft:sft_checkready(Pid) + catch + exit:{timeout, _} -> + timeout + end. + + +push_and_roll(DumpList, TableSize, MaxTableSize, MaxSQN, StartWatch, From, State) -> + case quickcheck_pushtomem(DumpList, TableSize, MaxTableSize) of + {twist, TableSize1} -> + gen_server:reply(From, ok), + io:format("Reply made on push in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + L0Snap = do_pushtomem(DumpList, + State#state.memtable, + State#state.memtable_copy, + MaxSQN), + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + {noreply, + State#state{memtable_copy=L0Snap, + table_size=TableSize1, + ledger_sqn=MaxSQN}}; + {maybe_roll, TableSize1} -> + L0Snap = do_pushtomem(DumpList, + State#state.memtable, + State#state.memtable_copy, + MaxSQN), + + case roll_memory(State, MaxTableSize, L0Snap) of + {ok, L0Pend, ManSN, TableSize2} -> + io:format("Push completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), StartWatch)]), + {reply, + ok, + State#state{levelzero_pending=L0Pend, + table_size=TableSize2, + manifest_sqn=ManSN, + memtable_copy=L0Snap, + ledger_sqn=MaxSQN}}; + {pause, Reason, Details} -> + io:format("Excess work due to - " ++ Reason, + Details), + {reply, + pause, + State#state{memtable_copy=L0Snap, + table_size=TableSize1, + ledger_sqn=MaxSQN}} + end + end. quickcheck_pushtomem(DumpList, TableSize, MaxSize) -> case TableSize + length(DumpList) of @@ -894,9 +920,10 @@ return_work(State, From) -> case length(WorkQueue) of L when L > 0 -> [{SrcLevel, Manifest}|OtherWork] = WorkQueue, + Backlog = length(OtherWork), io:format("Work at Level ~w to be scheduled for ~w with ~w " ++ "queue items outstanding~n", - [SrcLevel, From, length(OtherWork)]), + [SrcLevel, From, Backlog]), case element(1, State#state.levelzero_pending) of true -> % Once the L0 file is completed there will be more work @@ -1557,7 +1584,7 @@ simple_server_test() -> "Key0004", null}, 3002)), - % Add some more keys and confirm that chekc sequence number still + % Add some more keys and confirm that check sequence number still % sees the old version in the previous snapshot, but will see the new version % in a new snapshot Key1A = {{o,"Bucket0001", "Key0001", null}, {4002, {active, infinity}, null}}, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 7669660..f292339 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -252,7 +252,7 @@ sft_close(Pid) -> gen_server:call(Pid, close, infinity). sft_checkready(Pid) -> - gen_server:call(Pid, background_complete, infinity). + gen_server:call(Pid, background_complete, 50). sft_getmaxsequencenumber(Pid) -> gen_server:call(Pid, get_maxsqn, infinity).