Refactor fetching of level zero cache entries

This is now down on an async message passing loop between the penciller and the new SST file.  this way when the penciller it shuts down, and can call close on a L0 file that is awaiting a fetch - rather than be trapped in deadlock.

The deadlock otherwise occurs if a penciller is sent a close immediately after if thas prompted a new level zero.
This commit is contained in:
Martin Sumner 2019-02-26 18:16:47 +00:00
parent a589c9ca63
commit 01f731dbc9
2 changed files with 100 additions and 32 deletions

View file

@ -173,7 +173,7 @@
pcl_snapstart/1,
pcl_start/1,
pcl_pushmem/2,
pcl_fetchlevelzero/2,
pcl_fetchlevelzero/3,
pcl_fetch/4,
pcl_fetchkeys/5,
pcl_fetchkeys/6,
@ -306,6 +306,8 @@
-type iterator() :: list(iterator_entry()).
-type bad_ledgerkey() :: list().
-export_type([levelzero_cacheentry/0]).
%%%============================================================================
%%% API
%%%============================================================================
@ -348,7 +350,7 @@ pcl_pushmem(Pid, LedgerCache) ->
%% Bookie to dump memory onto penciller
gen_server:call(Pid, {push_mem, LedgerCache}, infinity).
-spec pcl_fetchlevelzero(pid(), integer()) -> tuple().
-spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok.
%% @doc
%% Allows a single slot of the penciller's levelzero cache to be fetched. The
%% levelzero cache can be up to 40K keys - sending this to the process that is
@ -358,13 +360,13 @@ pcl_pushmem(Pid, LedgerCache) ->
%%
%% The return value will be a leveled_skiplist that forms that part of the
%% cache
pcl_fetchlevelzero(Pid, Slot) ->
pcl_fetchlevelzero(Pid, Slot, ReturnFun) ->
% Timeout to cause crash of L0 file when it can't get the close signal
% as it is deadlocked making this call.
%
% If the timeout gets hit outside of close scenario the Penciller will
% be stuck in L0 pending
gen_server:call(Pid, {fetch_levelzero, Slot}, 60000).
gen_server:cast(Pid, {fetch_levelzero, Slot, ReturnFun}).
-spec pcl_fetch(pid(), leveled_codec:ledger_key())
-> leveled_codec:ledger_kv()|not_present.
@ -889,8 +891,6 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
CloneState#state{snapshot_fully_loaded=true,
manifest=ManifestClone}},
State#state{manifest = Manifest0}};
handle_call({fetch_levelzero, Slot}, _From, State) ->
{reply, lists:nth(Slot, State#state.levelzero_cache), State};
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
{stop, normal, ok, State};
@ -1049,6 +1049,9 @@ handle_cast(work_for_clerk, State) ->
_ ->
{noreply, State}
end;
handle_cast({fetch_levelzero, Slot, ReturnFun}, State) ->
ReturnFun(lists:nth(Slot, State#state.levelzero_cache)),
{noreply, State};
handle_cast({log_level, LogLevel}, State) ->
PC = State#state.clerk,
ok = leveled_pclerk:clerk_loglevel(PC, LogLevel),
@ -1352,7 +1355,8 @@ roll_memory(State, false) ->
FileName = sst_filename(ManSQN, 0, 0),
leveled_log:log("P0019", [FileName, State#state.ledger_sqn]),
PCL = self(),
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
FetchFun =
fun(Slot, ReturnFun) -> pcl_fetchlevelzero(PCL, Slot, ReturnFun) end,
R = leveled_sst:sst_newlevelzero(RootPath,
FileName,
length(State#state.levelzero_cache),
@ -2314,12 +2318,12 @@ create_file_test() ->
ok = file:write_file(filename:join(RP, Filename), term_to_binary("hello")),
KVL = lists:usort(generate_randomkeys({50000, 0})),
Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE),
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
{ok, SP, noreply} =
leveled_sst:sst_newlevelzero(RP,
Filename,
1,
FetchFun,
[Tree],
undefined,
50000,
#sst_options{press_method = native}),

View file

@ -189,7 +189,9 @@
timings = no_timing :: sst_timings(),
timings_countdown = 0 :: integer(),
starting_pid :: pid()|undefined,
fetch_cache = array:new([{size, ?CACHE_SIZE}])}).
fetch_cache = array:new([{size, ?CACHE_SIZE}]),
new_slots :: list()|undefined,
deferred_startup_tuple :: tuple()|undefined}).
-record(sst_timings,
{sample_count = 0 :: integer(),
@ -336,31 +338,42 @@ sst_new(RootPath, Filename,
end.
-spec sst_newlevelzero(string(), string(),
integer(), fun(), pid()|undefined, integer(),
integer(), fun()|list(), pid()|undefined, integer(),
sst_options()) ->
{ok, pid(), noreply}.
{ok, pid(), noreply}.
%% @doc
%% Start a new file at level zero. At this level the file size is not fixed -
%% it will be as big as the input. Also the KVList is not passed in, it is
%% fetched slot by slot using the FetchFun
sst_newlevelzero(RootPath, Filename,
Slots, FetchFun, Penciller,
Slots, Fetcher, Penciller,
MaxSQN, OptsSST) ->
PressMethod0 = compress_level(0, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
gen_fsm:send_event(Pid,
{sst_newlevelzero,
RootPath,
Filename,
Slots,
FetchFun,
Penciller,
MaxSQN,
OptsSST0,
?INDEX_MODDATE}),
% Initiate the file into the "starting" state
ok = gen_fsm:sync_send_event(Pid,
{sst_newlevelzero,
RootPath,
Filename,
Penciller,
MaxSQN,
OptsSST0,
?INDEX_MODDATE}),
ok =
case is_list(Fetcher) of
true ->
gen_fsm:send_event(Pid, {complete_l0startup, Fetcher});
false ->
% Fetcher is a function
gen_fsm:send_event(Pid, {sst_returnslot, none, Fetcher, Slots})
% Start the fetch loop (async). Having the fetch loop running
% on async message passing means that the SST file can now be
% closed while the fetch loop is still completing
end,
{ok, Pid, noreply}.
-spec sst_get(pid(), leveled_codec:ledger_key())
-> leveled_codec:ledger_kv()|not_present.
%% @doc
@ -493,15 +506,29 @@ starting({sst_new,
reader,
UpdState#state{blockindex_cache = BlockIndex,
starting_pid = StartingPID},
?STARTUP_TIMEOUT}.
?STARTUP_TIMEOUT};
starting({sst_newlevelzero, RootPath, Filename,
Slots, FetchFun, Penciller, MaxSQN,
OptsSST, IdxModDate}, State) ->
Penciller, MaxSQN,
OptsSST, IdxModDate}, _From, State) ->
DeferredStartupTuple =
{RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate},
{reply, ok, starting,
State#state{deferred_startup_tuple = DeferredStartupTuple}};
starting(close, _From, State) ->
% No file should have been created, so nothing to close.
{stop, normal, ok, State}.
starting({complete_l0startup, Slots}, State) ->
starting(complete_l0startup, State#state{new_slots = Slots});
starting(complete_l0startup, State) ->
{RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate} =
State#state.deferred_startup_tuple,
SW0 = os:timestamp(),
FetchedSlots = State#state.new_slots,
leveled_log:save(OptsSST#sst_options.log_options),
PressMethod = OptsSST#sst_options.press_method,
KVList = leveled_pmem:to_list(Slots, FetchFun),
FetchFun = fun(Slot) -> lists:nth(Slot, FetchedSlots) end,
KVList = leveled_pmem:to_list(length(FetchedSlots), FetchFun),
Time0 = timer:now_diff(os:timestamp(), SW0),
SW1 = os:timestamp(),
@ -525,7 +552,12 @@ starting({sst_newlevelzero, RootPath, Filename,
PressMethod, IdxModDate),
{UpdState, Bloom} =
read_file(ActualFilename,
State#state{root_path=RootPath, yield_blockquery=true}),
State#state{root_path=RootPath,
yield_blockquery=true,
% Important to empty this from state rather
% than carry it through to the next stage
new_slots=undefined,
deferred_startup_tuple=undefined}),
Summary = UpdState#state.summary,
Time4 = timer:now_diff(os:timestamp(), SW4),
@ -548,9 +580,37 @@ starting({sst_newlevelzero, RootPath, Filename,
{next_state,
reader,
UpdState#state{blockindex_cache = BlockIndex}}
end;
starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
Self = self(),
FetchedSlots =
case FetchedSlot of
none ->
[];
_ ->
[FetchedSlot|State#state.new_slots]
end,
case length(FetchedSlots) == SlotCount of
true ->
gen_fsm:send_event(Self, complete_l0startup),
{next_state,
starting,
% Reverse the slots so that they are back in the expected
% order
State#state{new_slots = lists:reverse(FetchedSlots)}};
false ->
ReturnFun =
fun(NextSlot) ->
gen_fsm:send_event(Self,
{sst_returnslot, NextSlot,
FetchFun, SlotCount})
end,
FetchFun(length(FetchedSlots) + 1, ReturnFun),
{next_state,
starting,
State#state{new_slots = FetchedSlots}}
end.
reader({get_kv, LedgerKey, Hash}, _From, State) ->
% Get a KV value and potentially take sample timings
{Result, UpdState, UpdTimings} =
@ -3352,6 +3412,10 @@ take_max_lastmoddate_test() ->
% modified dates
?assertMatch(1, take_max_lastmoddate(0, 1)).
stopstart_test() ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
% check we can close in the starting state. This may happen due to the
% fetcher on new level zero files working in a loop
ok = sst_close(Pid).
-endif.