From 01f731dbc955b82e53ea1513f66feb268a5b45d8 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 26 Feb 2019 18:16:47 +0000 Subject: [PATCH] Refactor fetching of level zero cache entries This is now down on an async message passing loop between the penciller and the new SST file. this way when the penciller it shuts down, and can call close on a L0 file that is awaiting a fetch - rather than be trapped in deadlock. The deadlock otherwise occurs if a penciller is sent a close immediately after if thas prompted a new level zero. --- src/leveled_penciller.erl | 24 +++++---- src/leveled_sst.erl | 108 ++++++++++++++++++++++++++++++-------- 2 files changed, 100 insertions(+), 32 deletions(-) diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 05744df..d076dcc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -173,7 +173,7 @@ pcl_snapstart/1, pcl_start/1, pcl_pushmem/2, - pcl_fetchlevelzero/2, + pcl_fetchlevelzero/3, pcl_fetch/4, pcl_fetchkeys/5, pcl_fetchkeys/6, @@ -306,6 +306,8 @@ -type iterator() :: list(iterator_entry()). -type bad_ledgerkey() :: list(). +-export_type([levelzero_cacheentry/0]). + %%%============================================================================ %%% API %%%============================================================================ @@ -348,7 +350,7 @@ pcl_pushmem(Pid, LedgerCache) -> %% Bookie to dump memory onto penciller gen_server:call(Pid, {push_mem, LedgerCache}, infinity). --spec pcl_fetchlevelzero(pid(), integer()) -> tuple(). +-spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok. %% @doc %% Allows a single slot of the penciller's levelzero cache to be fetched. The %% levelzero cache can be up to 40K keys - sending this to the process that is @@ -358,13 +360,13 @@ pcl_pushmem(Pid, LedgerCache) -> %% %% The return value will be a leveled_skiplist that forms that part of the %% cache -pcl_fetchlevelzero(Pid, Slot) -> +pcl_fetchlevelzero(Pid, Slot, ReturnFun) -> % Timeout to cause crash of L0 file when it can't get the close signal % as it is deadlocked making this call. % % If the timeout gets hit outside of close scenario the Penciller will % be stuck in L0 pending - gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). + gen_server:cast(Pid, {fetch_levelzero, Slot, ReturnFun}). -spec pcl_fetch(pid(), leveled_codec:ledger_key()) -> leveled_codec:ledger_kv()|not_present. @@ -889,8 +891,6 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning}, CloneState#state{snapshot_fully_loaded=true, manifest=ManifestClone}}, State#state{manifest = Manifest0}}; -handle_call({fetch_levelzero, Slot}, _From, State) -> - {reply, lists:nth(Slot, State#state.levelzero_cache), State}; handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true -> ok = pcl_releasesnapshot(State#state.source_penciller, self()), {stop, normal, ok, State}; @@ -1049,6 +1049,9 @@ handle_cast(work_for_clerk, State) -> _ -> {noreply, State} end; +handle_cast({fetch_levelzero, Slot, ReturnFun}, State) -> + ReturnFun(lists:nth(Slot, State#state.levelzero_cache)), + {noreply, State}; handle_cast({log_level, LogLevel}, State) -> PC = State#state.clerk, ok = leveled_pclerk:clerk_loglevel(PC, LogLevel), @@ -1352,7 +1355,8 @@ roll_memory(State, false) -> FileName = sst_filename(ManSQN, 0, 0), leveled_log:log("P0019", [FileName, State#state.ledger_sqn]), PCL = self(), - FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, + FetchFun = + fun(Slot, ReturnFun) -> pcl_fetchlevelzero(PCL, Slot, ReturnFun) end, R = leveled_sst:sst_newlevelzero(RootPath, FileName, length(State#state.levelzero_cache), @@ -2047,7 +2051,7 @@ simple_server_test() -> false = pcl_checkbloomtest(PCL, {o,"Bucket9999", "Key9999", null}), ok = shutdown_when_compact(PCL), - + {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000, @@ -2314,12 +2318,12 @@ create_file_test() -> ok = file:write_file(filename:join(RP, Filename), term_to_binary("hello")), KVL = lists:usort(generate_randomkeys({50000, 0})), Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE), - FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, + {ok, SP, noreply} = leveled_sst:sst_newlevelzero(RP, Filename, 1, - FetchFun, + [Tree], undefined, 50000, #sst_options{press_method = native}), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 50dab3e..a753dc0 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -189,7 +189,9 @@ timings = no_timing :: sst_timings(), timings_countdown = 0 :: integer(), starting_pid :: pid()|undefined, - fetch_cache = array:new([{size, ?CACHE_SIZE}])}). + fetch_cache = array:new([{size, ?CACHE_SIZE}]), + new_slots :: list()|undefined, + deferred_startup_tuple :: tuple()|undefined}). -record(sst_timings, {sample_count = 0 :: integer(), @@ -336,31 +338,42 @@ sst_new(RootPath, Filename, end. -spec sst_newlevelzero(string(), string(), - integer(), fun(), pid()|undefined, integer(), + integer(), fun()|list(), pid()|undefined, integer(), sst_options()) -> - {ok, pid(), noreply}. + {ok, pid(), noreply}. %% @doc %% Start a new file at level zero. At this level the file size is not fixed - %% it will be as big as the input. Also the KVList is not passed in, it is %% fetched slot by slot using the FetchFun sst_newlevelzero(RootPath, Filename, - Slots, FetchFun, Penciller, + Slots, Fetcher, Penciller, MaxSQN, OptsSST) -> PressMethod0 = compress_level(0, OptsSST#sst_options.press_method), OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), - gen_fsm:send_event(Pid, - {sst_newlevelzero, - RootPath, - Filename, - Slots, - FetchFun, - Penciller, - MaxSQN, - OptsSST0, - ?INDEX_MODDATE}), + % Initiate the file into the "starting" state + ok = gen_fsm:sync_send_event(Pid, + {sst_newlevelzero, + RootPath, + Filename, + Penciller, + MaxSQN, + OptsSST0, + ?INDEX_MODDATE}), + ok = + case is_list(Fetcher) of + true -> + gen_fsm:send_event(Pid, {complete_l0startup, Fetcher}); + false -> + % Fetcher is a function + gen_fsm:send_event(Pid, {sst_returnslot, none, Fetcher, Slots}) + % Start the fetch loop (async). Having the fetch loop running + % on async message passing means that the SST file can now be + % closed while the fetch loop is still completing + end, {ok, Pid, noreply}. + -spec sst_get(pid(), leveled_codec:ledger_key()) -> leveled_codec:ledger_kv()|not_present. %% @doc @@ -493,15 +506,29 @@ starting({sst_new, reader, UpdState#state{blockindex_cache = BlockIndex, starting_pid = StartingPID}, - ?STARTUP_TIMEOUT}. - + ?STARTUP_TIMEOUT}; starting({sst_newlevelzero, RootPath, Filename, - Slots, FetchFun, Penciller, MaxSQN, - OptsSST, IdxModDate}, State) -> + Penciller, MaxSQN, + OptsSST, IdxModDate}, _From, State) -> + DeferredStartupTuple = + {RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate}, + {reply, ok, starting, + State#state{deferred_startup_tuple = DeferredStartupTuple}}; +starting(close, _From, State) -> + % No file should have been created, so nothing to close. + {stop, normal, ok, State}. + +starting({complete_l0startup, Slots}, State) -> + starting(complete_l0startup, State#state{new_slots = Slots}); +starting(complete_l0startup, State) -> + {RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate} = + State#state.deferred_startup_tuple, SW0 = os:timestamp(), + FetchedSlots = State#state.new_slots, leveled_log:save(OptsSST#sst_options.log_options), PressMethod = OptsSST#sst_options.press_method, - KVList = leveled_pmem:to_list(Slots, FetchFun), + FetchFun = fun(Slot) -> lists:nth(Slot, FetchedSlots) end, + KVList = leveled_pmem:to_list(length(FetchedSlots), FetchFun), Time0 = timer:now_diff(os:timestamp(), SW0), SW1 = os:timestamp(), @@ -525,7 +552,12 @@ starting({sst_newlevelzero, RootPath, Filename, PressMethod, IdxModDate), {UpdState, Bloom} = read_file(ActualFilename, - State#state{root_path=RootPath, yield_blockquery=true}), + State#state{root_path=RootPath, + yield_blockquery=true, + % Important to empty this from state rather + % than carry it through to the next stage + new_slots=undefined, + deferred_startup_tuple=undefined}), Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), @@ -548,9 +580,37 @@ starting({sst_newlevelzero, RootPath, Filename, {next_state, reader, UpdState#state{blockindex_cache = BlockIndex}} + end; +starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) -> + Self = self(), + FetchedSlots = + case FetchedSlot of + none -> + []; + _ -> + [FetchedSlot|State#state.new_slots] + end, + case length(FetchedSlots) == SlotCount of + true -> + gen_fsm:send_event(Self, complete_l0startup), + {next_state, + starting, + % Reverse the slots so that they are back in the expected + % order + State#state{new_slots = lists:reverse(FetchedSlots)}}; + false -> + ReturnFun = + fun(NextSlot) -> + gen_fsm:send_event(Self, + {sst_returnslot, NextSlot, + FetchFun, SlotCount}) + end, + FetchFun(length(FetchedSlots) + 1, ReturnFun), + {next_state, + starting, + State#state{new_slots = FetchedSlots}} end. - reader({get_kv, LedgerKey, Hash}, _From, State) -> % Get a KV value and potentially take sample timings {Result, UpdState, UpdTimings} = @@ -3352,6 +3412,10 @@ take_max_lastmoddate_test() -> % modified dates ?assertMatch(1, take_max_lastmoddate(0, 1)). - +stopstart_test() -> + {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), + % check we can close in the starting state. This may happen due to the + % fetcher on new level zero files working in a loop + ok = sst_close(Pid). -endif.