From 61c62692003dd4a3ed454a375037752c1a98598c Mon Sep 17 00:00:00 2001 From: martinsumner Date: Sat, 5 Nov 2016 11:22:27 +0000 Subject: [PATCH] Penciller back-pressure - Phase 1 There were issues with how the Penciller behaves under ehavy write pressure - most particularly where there are a large number of keys per update (i.e. 2i heavy objects). Most immediately the attempt to chekc whether the l0 file was ready slowed down the process of producing the L0 file - so back-pressure created more back-pressure. Going forward want to alter this most significantly as also the work queue can build up unsustainably. there needs to be some pausing prompted by the bookie on 'returned', and the use of 'returend when the work queue exceeds a threshold. --- include/leveled.hrl | 3 +- src/leveled_log.erl | 2 ++ src/leveled_penciller.erl | 73 +++++++++++++++------------------------ src/leveled_sft.erl | 16 +++++++-- 4 files changed, 44 insertions(+), 50 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 028eb95..e685a39 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -17,7 +17,8 @@ -record(sft_options, {wait = true :: boolean(), - expire_tombstones = false :: boolean()}). + expire_tombstones = false :: boolean(), + penciller :: pid()}). -record(penciller_work, {next_sqn :: integer(), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index d0ab7f5..581e02c 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -93,6 +93,8 @@ {info, "Rename of manifest from ~s ~w to ~s ~w"}}, {"P0028", {info, "Adding cleared file ~s to deletion list"}}, + {"P0029", + {info, "L0 completion confirmed and will transition to not pending"}}, {"PC001", {info, "Penciller's clerk ~w started with owner ~w"}}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 81216e4..5c1ebce 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -229,6 +229,7 @@ pcl_checksequencenumber/3, pcl_workforclerk/1, pcl_promptmanifestchange/2, + pcl_confirml0complete/4, pcl_confirmdelete/2, pcl_close/1, pcl_registersnapshot/2, @@ -314,6 +315,9 @@ pcl_workforclerk(Pid) -> pcl_promptmanifestchange(Pid, WI) -> gen_server:cast(Pid, {manifest_change, WI}). +pcl_confirml0complete(Pid, FN, StartKey, EndKey) -> + gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}). + pcl_confirmdelete(Pid, FileName) -> gen_server:cast(Pid, {confirm_delete, FileName}). @@ -360,13 +364,10 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) % we mean value from the perspective of the Ledger, not the full value % stored in the Inker) % - % 2 - Check to see if there is a levelzero file pending. If so check if - % the levelzero file is complete. If it is complete, the levelzero tree - % can be flushed, the in-memory manifest updated, and the new tree can - % be accepted as the new levelzero cache. If not, the update must be - % returned. + % 2 - Check to see if there is a levelzero file pending. If so, the + % update must be returned. If not the update can be accepted % - % 3 - The Penciller can now reply to the Bookie to show that the push has + % 3 - The Penciller can now reply to the Bookie to show if the push has % been accepted % % 4 - Update the cache: @@ -375,47 +376,12 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) % % Check the approximate size of the cache. If it is over the maximum size, % trigger a backgroun L0 file write and update state of levelzero_pending. - - SW = os:timestamp(), + SW = os:timestamp(), S = case State#state.levelzero_pending of true -> - L0Pid = State#state.levelzero_constructor, - case checkready(L0Pid) of - timeout -> - log_pushmem_reply(From, - {returned, - "L-0 persist pending"}, - SW), - State; - {ok, SrcFN, StartKey, EndKey} -> - log_pushmem_reply(From, - {ok, - "L-0 persist completed"}, - SW), - ManEntry = #manifest_entry{start_key=StartKey, - end_key=EndKey, - owner=L0Pid, - filename=SrcFN}, - UpdMan = lists:keystore(0, - 1, - State#state.manifest, - {0, [ManEntry]}), - LedgerSQN = State#state.ledger_sqn, - UpdState = State#state{manifest=UpdMan, - levelzero_pending=false, - persisted_sqn=LedgerSQN}, - % Prompt clerk to ask about work - do this for - % every L0 roll - ok = leveled_pclerk:clerk_prompt(State#state.clerk), - NewL0Index = leveled_pmem:new_index(), - update_levelzero(NewL0Index, - 0, - PushedTree, - LedgerSQN, - [], - UpdState) - end; + log_pushmem_reply(From, {returned, "L-0 persist pending"}, SW), + State; false -> log_pushmem_reply(From, {ok, "L0 memory updated"}, SW), update_levelzero(State#state.levelzero_index, @@ -506,7 +472,22 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) {noreply, State#state{unreferenced_files=UF1}}; _ -> {noreply, State} - end. + end; +handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> + leveled_log:log("P0029", []), + ManEntry = #manifest_entry{start_key=StartKey, + end_key=EndKey, + owner=State#state.levelzero_constructor, + filename=FN}, + UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}), + % Prompt clerk to ask about work - do this for every L0 roll + ok = leveled_pclerk:clerk_prompt(State#state.clerk), + {noreply, State#state{levelzero_cache=[], + levelzero_pending=false, + levelzero_constructor=undefined, + levelzero_index=leveled_pmem:new_index(), + levelzero_size=0, + manifest=UpdMan}}. handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) -> @@ -724,7 +705,7 @@ checkready(Pid) -> roll_memory(State, false) -> FileName = levelzero_filename(State), leveled_log:log("P0019", [FileName]), - Opts = #sft_options{wait=false}, + Opts = #sft_options{wait=false, penciller=self()}, PCL = self(), FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, % FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 4833015..f6cdeb1 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -245,7 +245,8 @@ sft_newfroml0cache(Filename, Slots, FetchFun, Options) -> {sft_newfroml0cache, Filename, Slots, - FetchFun}), + FetchFun, + Options#sft_options.penciller}), {ok, Pid, noreply} end. @@ -352,12 +353,21 @@ handle_call({set_for_delete, Penciller}, _From, State) -> handle_call(get_maxsqn, _From, State) -> statecheck_onreply(State#state.highest_sqn, State). -handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun}, _State) -> +handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) -> SW = os:timestamp(), Inp1 = leveled_pmem:to_list(Slots, FetchFun), {ok, State} = create_levelzero(Inp1, Filename), leveled_log:log_timer("SFT03", [Filename], SW), - {noreply, State}; + case PCL of + undefined -> + {noreply, State}; + _ -> + ok = leveled_penciller:pcl_confirml0complete(PCL, + Filename, + State#state.smallest_key, + State#state.highest_key), + {noreply, State} + end; handle_cast(close, State) -> {stop, normal, State}.