diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 05744df..d076dcc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -173,7 +173,7 @@ pcl_snapstart/1, pcl_start/1, pcl_pushmem/2, - pcl_fetchlevelzero/2, + pcl_fetchlevelzero/3, pcl_fetch/4, pcl_fetchkeys/5, pcl_fetchkeys/6, @@ -306,6 +306,8 @@ -type iterator() :: list(iterator_entry()). -type bad_ledgerkey() :: list(). +-export_type([levelzero_cacheentry/0]). + %%%============================================================================ %%% API %%%============================================================================ @@ -348,7 +350,7 @@ pcl_pushmem(Pid, LedgerCache) -> %% Bookie to dump memory onto penciller gen_server:call(Pid, {push_mem, LedgerCache}, infinity). --spec pcl_fetchlevelzero(pid(), integer()) -> tuple(). +-spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok. %% @doc %% Allows a single slot of the penciller's levelzero cache to be fetched. The %% levelzero cache can be up to 40K keys - sending this to the process that is @@ -358,13 +360,13 @@ pcl_pushmem(Pid, LedgerCache) -> %% %% The return value will be a leveled_skiplist that forms that part of the %% cache -pcl_fetchlevelzero(Pid, Slot) -> +pcl_fetchlevelzero(Pid, Slot, ReturnFun) -> % Timeout to cause crash of L0 file when it can't get the close signal % as it is deadlocked making this call. % % If the timeout gets hit outside of close scenario the Penciller will % be stuck in L0 pending - gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). + gen_server:cast(Pid, {fetch_levelzero, Slot, ReturnFun}). -spec pcl_fetch(pid(), leveled_codec:ledger_key()) -> leveled_codec:ledger_kv()|not_present. @@ -889,8 +891,6 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning}, CloneState#state{snapshot_fully_loaded=true, manifest=ManifestClone}}, State#state{manifest = Manifest0}}; -handle_call({fetch_levelzero, Slot}, _From, State) -> - {reply, lists:nth(Slot, State#state.levelzero_cache), State}; handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true -> ok = pcl_releasesnapshot(State#state.source_penciller, self()), {stop, normal, ok, State}; @@ -1049,6 +1049,9 @@ handle_cast(work_for_clerk, State) -> _ -> {noreply, State} end; +handle_cast({fetch_levelzero, Slot, ReturnFun}, State) -> + ReturnFun(lists:nth(Slot, State#state.levelzero_cache)), + {noreply, State}; handle_cast({log_level, LogLevel}, State) -> PC = State#state.clerk, ok = leveled_pclerk:clerk_loglevel(PC, LogLevel), @@ -1352,7 +1355,8 @@ roll_memory(State, false) -> FileName = sst_filename(ManSQN, 0, 0), leveled_log:log("P0019", [FileName, State#state.ledger_sqn]), PCL = self(), - FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, + FetchFun = + fun(Slot, ReturnFun) -> pcl_fetchlevelzero(PCL, Slot, ReturnFun) end, R = leveled_sst:sst_newlevelzero(RootPath, FileName, length(State#state.levelzero_cache), @@ -2047,7 +2051,7 @@ simple_server_test() -> false = pcl_checkbloomtest(PCL, {o,"Bucket9999", "Key9999", null}), ok = shutdown_when_compact(PCL), - + {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000, @@ -2314,12 +2318,12 @@ create_file_test() -> ok = file:write_file(filename:join(RP, Filename), term_to_binary("hello")), KVL = lists:usort(generate_randomkeys({50000, 0})), Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE), - FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, + {ok, SP, noreply} = leveled_sst:sst_newlevelzero(RP, Filename, 1, - FetchFun, + [Tree], undefined, 50000, #sst_options{press_method = native}), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 50dab3e..a753dc0 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -189,7 +189,9 @@ timings = no_timing :: sst_timings(), timings_countdown = 0 :: integer(), starting_pid :: pid()|undefined, - fetch_cache = array:new([{size, ?CACHE_SIZE}])}). + fetch_cache = array:new([{size, ?CACHE_SIZE}]), + new_slots :: list()|undefined, + deferred_startup_tuple :: tuple()|undefined}). -record(sst_timings, {sample_count = 0 :: integer(), @@ -336,31 +338,42 @@ sst_new(RootPath, Filename, end. -spec sst_newlevelzero(string(), string(), - integer(), fun(), pid()|undefined, integer(), + integer(), fun()|list(), pid()|undefined, integer(), sst_options()) -> - {ok, pid(), noreply}. + {ok, pid(), noreply}. %% @doc %% Start a new file at level zero. At this level the file size is not fixed - %% it will be as big as the input. Also the KVList is not passed in, it is %% fetched slot by slot using the FetchFun sst_newlevelzero(RootPath, Filename, - Slots, FetchFun, Penciller, + Slots, Fetcher, Penciller, MaxSQN, OptsSST) -> PressMethod0 = compress_level(0, OptsSST#sst_options.press_method), OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), - gen_fsm:send_event(Pid, - {sst_newlevelzero, - RootPath, - Filename, - Slots, - FetchFun, - Penciller, - MaxSQN, - OptsSST0, - ?INDEX_MODDATE}), + % Initiate the file into the "starting" state + ok = gen_fsm:sync_send_event(Pid, + {sst_newlevelzero, + RootPath, + Filename, + Penciller, + MaxSQN, + OptsSST0, + ?INDEX_MODDATE}), + ok = + case is_list(Fetcher) of + true -> + gen_fsm:send_event(Pid, {complete_l0startup, Fetcher}); + false -> + % Fetcher is a function + gen_fsm:send_event(Pid, {sst_returnslot, none, Fetcher, Slots}) + % Start the fetch loop (async). Having the fetch loop running + % on async message passing means that the SST file can now be + % closed while the fetch loop is still completing + end, {ok, Pid, noreply}. + -spec sst_get(pid(), leveled_codec:ledger_key()) -> leveled_codec:ledger_kv()|not_present. %% @doc @@ -493,15 +506,29 @@ starting({sst_new, reader, UpdState#state{blockindex_cache = BlockIndex, starting_pid = StartingPID}, - ?STARTUP_TIMEOUT}. - + ?STARTUP_TIMEOUT}; starting({sst_newlevelzero, RootPath, Filename, - Slots, FetchFun, Penciller, MaxSQN, - OptsSST, IdxModDate}, State) -> + Penciller, MaxSQN, + OptsSST, IdxModDate}, _From, State) -> + DeferredStartupTuple = + {RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate}, + {reply, ok, starting, + State#state{deferred_startup_tuple = DeferredStartupTuple}}; +starting(close, _From, State) -> + % No file should have been created, so nothing to close. + {stop, normal, ok, State}. + +starting({complete_l0startup, Slots}, State) -> + starting(complete_l0startup, State#state{new_slots = Slots}); +starting(complete_l0startup, State) -> + {RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate} = + State#state.deferred_startup_tuple, SW0 = os:timestamp(), + FetchedSlots = State#state.new_slots, leveled_log:save(OptsSST#sst_options.log_options), PressMethod = OptsSST#sst_options.press_method, - KVList = leveled_pmem:to_list(Slots, FetchFun), + FetchFun = fun(Slot) -> lists:nth(Slot, FetchedSlots) end, + KVList = leveled_pmem:to_list(length(FetchedSlots), FetchFun), Time0 = timer:now_diff(os:timestamp(), SW0), SW1 = os:timestamp(), @@ -525,7 +552,12 @@ starting({sst_newlevelzero, RootPath, Filename, PressMethod, IdxModDate), {UpdState, Bloom} = read_file(ActualFilename, - State#state{root_path=RootPath, yield_blockquery=true}), + State#state{root_path=RootPath, + yield_blockquery=true, + % Important to empty this from state rather + % than carry it through to the next stage + new_slots=undefined, + deferred_startup_tuple=undefined}), Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), @@ -548,9 +580,37 @@ starting({sst_newlevelzero, RootPath, Filename, {next_state, reader, UpdState#state{blockindex_cache = BlockIndex}} + end; +starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) -> + Self = self(), + FetchedSlots = + case FetchedSlot of + none -> + []; + _ -> + [FetchedSlot|State#state.new_slots] + end, + case length(FetchedSlots) == SlotCount of + true -> + gen_fsm:send_event(Self, complete_l0startup), + {next_state, + starting, + % Reverse the slots so that they are back in the expected + % order + State#state{new_slots = lists:reverse(FetchedSlots)}}; + false -> + ReturnFun = + fun(NextSlot) -> + gen_fsm:send_event(Self, + {sst_returnslot, NextSlot, + FetchFun, SlotCount}) + end, + FetchFun(length(FetchedSlots) + 1, ReturnFun), + {next_state, + starting, + State#state{new_slots = FetchedSlots}} end. - reader({get_kv, LedgerKey, Hash}, _From, State) -> % Get a KV value and potentially take sample timings {Result, UpdState, UpdTimings} = @@ -3352,6 +3412,10 @@ take_max_lastmoddate_test() -> % modified dates ?assertMatch(1, take_max_lastmoddate(0, 1)). - +stopstart_test() -> + {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), + % check we can close in the starting state. This may happen due to the + % fetcher on new level zero files working in a loop + ok = sst_close(Pid). -endif.