Penciller back-pressure - Phase 1
There were issues with how the Penciller behaves under ehavy write pressure - most particularly where there are a large number of keys per update (i.e. 2i heavy objects). Most immediately the attempt to chekc whether the l0 file was ready slowed down the process of producing the L0 file - so back-pressure created more back-pressure. Going forward want to alter this most significantly as also the work queue can build up unsustainably. there needs to be some pausing prompted by the bookie on 'returned', and the use of 'returend when the work queue exceeds a threshold.
This commit is contained in:
parent
34a90231e0
commit
61c6269200
4 changed files with 44 additions and 50 deletions
|
@ -17,7 +17,8 @@
|
||||||
|
|
||||||
-record(sft_options,
|
-record(sft_options,
|
||||||
{wait = true :: boolean(),
|
{wait = true :: boolean(),
|
||||||
expire_tombstones = false :: boolean()}).
|
expire_tombstones = false :: boolean(),
|
||||||
|
penciller :: pid()}).
|
||||||
|
|
||||||
-record(penciller_work,
|
-record(penciller_work,
|
||||||
{next_sqn :: integer(),
|
{next_sqn :: integer(),
|
||||||
|
|
|
@ -93,6 +93,8 @@
|
||||||
{info, "Rename of manifest from ~s ~w to ~s ~w"}},
|
{info, "Rename of manifest from ~s ~w to ~s ~w"}},
|
||||||
{"P0028",
|
{"P0028",
|
||||||
{info, "Adding cleared file ~s to deletion list"}},
|
{info, "Adding cleared file ~s to deletion list"}},
|
||||||
|
{"P0029",
|
||||||
|
{info, "L0 completion confirmed and will transition to not pending"}},
|
||||||
|
|
||||||
{"PC001",
|
{"PC001",
|
||||||
{info, "Penciller's clerk ~w started with owner ~w"}},
|
{info, "Penciller's clerk ~w started with owner ~w"}},
|
||||||
|
|
|
@ -229,6 +229,7 @@
|
||||||
pcl_checksequencenumber/3,
|
pcl_checksequencenumber/3,
|
||||||
pcl_workforclerk/1,
|
pcl_workforclerk/1,
|
||||||
pcl_promptmanifestchange/2,
|
pcl_promptmanifestchange/2,
|
||||||
|
pcl_confirml0complete/4,
|
||||||
pcl_confirmdelete/2,
|
pcl_confirmdelete/2,
|
||||||
pcl_close/1,
|
pcl_close/1,
|
||||||
pcl_registersnapshot/2,
|
pcl_registersnapshot/2,
|
||||||
|
@ -314,6 +315,9 @@ pcl_workforclerk(Pid) ->
|
||||||
pcl_promptmanifestchange(Pid, WI) ->
|
pcl_promptmanifestchange(Pid, WI) ->
|
||||||
gen_server:cast(Pid, {manifest_change, WI}).
|
gen_server:cast(Pid, {manifest_change, WI}).
|
||||||
|
|
||||||
|
pcl_confirml0complete(Pid, FN, StartKey, EndKey) ->
|
||||||
|
gen_server:cast(Pid, {levelzero_complete, FN, StartKey, EndKey}).
|
||||||
|
|
||||||
pcl_confirmdelete(Pid, FileName) ->
|
pcl_confirmdelete(Pid, FileName) ->
|
||||||
gen_server:cast(Pid, {confirm_delete, FileName}).
|
gen_server:cast(Pid, {confirm_delete, FileName}).
|
||||||
|
|
||||||
|
@ -360,13 +364,10 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap})
|
||||||
% we mean value from the perspective of the Ledger, not the full value
|
% we mean value from the perspective of the Ledger, not the full value
|
||||||
% stored in the Inker)
|
% stored in the Inker)
|
||||||
%
|
%
|
||||||
% 2 - Check to see if there is a levelzero file pending. If so check if
|
% 2 - Check to see if there is a levelzero file pending. If so, the
|
||||||
% the levelzero file is complete. If it is complete, the levelzero tree
|
% update must be returned. If not the update can be accepted
|
||||||
% can be flushed, the in-memory manifest updated, and the new tree can
|
|
||||||
% be accepted as the new levelzero cache. If not, the update must be
|
|
||||||
% returned.
|
|
||||||
%
|
%
|
||||||
% 3 - The Penciller can now reply to the Bookie to show that the push has
|
% 3 - The Penciller can now reply to the Bookie to show if the push has
|
||||||
% been accepted
|
% been accepted
|
||||||
%
|
%
|
||||||
% 4 - Update the cache:
|
% 4 - Update the cache:
|
||||||
|
@ -377,45 +378,10 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap})
|
||||||
% trigger a backgroun L0 file write and update state of levelzero_pending.
|
% trigger a backgroun L0 file write and update state of levelzero_pending.
|
||||||
|
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
|
|
||||||
S = case State#state.levelzero_pending of
|
S = case State#state.levelzero_pending of
|
||||||
true ->
|
true ->
|
||||||
L0Pid = State#state.levelzero_constructor,
|
log_pushmem_reply(From, {returned, "L-0 persist pending"}, SW),
|
||||||
case checkready(L0Pid) of
|
State;
|
||||||
timeout ->
|
|
||||||
log_pushmem_reply(From,
|
|
||||||
{returned,
|
|
||||||
"L-0 persist pending"},
|
|
||||||
SW),
|
|
||||||
State;
|
|
||||||
{ok, SrcFN, StartKey, EndKey} ->
|
|
||||||
log_pushmem_reply(From,
|
|
||||||
{ok,
|
|
||||||
"L-0 persist completed"},
|
|
||||||
SW),
|
|
||||||
ManEntry = #manifest_entry{start_key=StartKey,
|
|
||||||
end_key=EndKey,
|
|
||||||
owner=L0Pid,
|
|
||||||
filename=SrcFN},
|
|
||||||
UpdMan = lists:keystore(0,
|
|
||||||
1,
|
|
||||||
State#state.manifest,
|
|
||||||
{0, [ManEntry]}),
|
|
||||||
LedgerSQN = State#state.ledger_sqn,
|
|
||||||
UpdState = State#state{manifest=UpdMan,
|
|
||||||
levelzero_pending=false,
|
|
||||||
persisted_sqn=LedgerSQN},
|
|
||||||
% Prompt clerk to ask about work - do this for
|
|
||||||
% every L0 roll
|
|
||||||
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
|
|
||||||
NewL0Index = leveled_pmem:new_index(),
|
|
||||||
update_levelzero(NewL0Index,
|
|
||||||
0,
|
|
||||||
PushedTree,
|
|
||||||
LedgerSQN,
|
|
||||||
[],
|
|
||||||
UpdState)
|
|
||||||
end;
|
|
||||||
false ->
|
false ->
|
||||||
log_pushmem_reply(From, {ok, "L0 memory updated"}, SW),
|
log_pushmem_reply(From, {ok, "L0 memory updated"}, SW),
|
||||||
update_levelzero(State#state.levelzero_index,
|
update_levelzero(State#state.levelzero_index,
|
||||||
|
@ -506,7 +472,22 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
|
||||||
{noreply, State#state{unreferenced_files=UF1}};
|
{noreply, State#state{unreferenced_files=UF1}};
|
||||||
_ ->
|
_ ->
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end.
|
end;
|
||||||
|
handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
|
||||||
|
leveled_log:log("P0029", []),
|
||||||
|
ManEntry = #manifest_entry{start_key=StartKey,
|
||||||
|
end_key=EndKey,
|
||||||
|
owner=State#state.levelzero_constructor,
|
||||||
|
filename=FN},
|
||||||
|
UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}),
|
||||||
|
% Prompt clerk to ask about work - do this for every L0 roll
|
||||||
|
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
|
||||||
|
{noreply, State#state{levelzero_cache=[],
|
||||||
|
levelzero_pending=false,
|
||||||
|
levelzero_constructor=undefined,
|
||||||
|
levelzero_index=leveled_pmem:new_index(),
|
||||||
|
levelzero_size=0,
|
||||||
|
manifest=UpdMan}}.
|
||||||
|
|
||||||
|
|
||||||
handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) ->
|
handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) ->
|
||||||
|
@ -724,7 +705,7 @@ checkready(Pid) ->
|
||||||
roll_memory(State, false) ->
|
roll_memory(State, false) ->
|
||||||
FileName = levelzero_filename(State),
|
FileName = levelzero_filename(State),
|
||||||
leveled_log:log("P0019", [FileName]),
|
leveled_log:log("P0019", [FileName]),
|
||||||
Opts = #sft_options{wait=false},
|
Opts = #sft_options{wait=false, penciller=self()},
|
||||||
PCL = self(),
|
PCL = self(),
|
||||||
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
|
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
|
||||||
% FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
|
% FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
|
||||||
|
|
|
@ -245,7 +245,8 @@ sft_newfroml0cache(Filename, Slots, FetchFun, Options) ->
|
||||||
{sft_newfroml0cache,
|
{sft_newfroml0cache,
|
||||||
Filename,
|
Filename,
|
||||||
Slots,
|
Slots,
|
||||||
FetchFun}),
|
FetchFun,
|
||||||
|
Options#sft_options.penciller}),
|
||||||
{ok, Pid, noreply}
|
{ok, Pid, noreply}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -352,12 +353,21 @@ handle_call({set_for_delete, Penciller}, _From, State) ->
|
||||||
handle_call(get_maxsqn, _From, State) ->
|
handle_call(get_maxsqn, _From, State) ->
|
||||||
statecheck_onreply(State#state.highest_sqn, State).
|
statecheck_onreply(State#state.highest_sqn, State).
|
||||||
|
|
||||||
handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun}, _State) ->
|
handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun, PCL}, _State) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
Inp1 = leveled_pmem:to_list(Slots, FetchFun),
|
Inp1 = leveled_pmem:to_list(Slots, FetchFun),
|
||||||
{ok, State} = create_levelzero(Inp1, Filename),
|
{ok, State} = create_levelzero(Inp1, Filename),
|
||||||
leveled_log:log_timer("SFT03", [Filename], SW),
|
leveled_log:log_timer("SFT03", [Filename], SW),
|
||||||
{noreply, State};
|
case PCL of
|
||||||
|
undefined ->
|
||||||
|
{noreply, State};
|
||||||
|
_ ->
|
||||||
|
ok = leveled_penciller:pcl_confirml0complete(PCL,
|
||||||
|
Filename,
|
||||||
|
State#state.smallest_key,
|
||||||
|
State#state.highest_key),
|
||||||
|
{noreply, State}
|
||||||
|
end;
|
||||||
handle_cast(close, State) ->
|
handle_cast(close, State) ->
|
||||||
{stop, normal, State}.
|
{stop, normal, State}.
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue