diff --git a/priv/leveled.schema b/priv/leveled.schema index 4498573..241ee33 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -21,7 +21,7 @@ %% @doc The key size of the Bookie's in-memory cache {mapping, "leveled.cache_size", "leveled.cache_size", [ - {default, 4000}, + {default, 2500}, {datatype, integer}, hidden ]}. diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index d7e5d4a..5b57e19 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -2187,21 +2187,13 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> -spec maybe_withjitter(integer(), integer()) -> boolean(). %% @doc -%% Push down randomly, but the closer to the maximum size, the more likely a -%% push should be -maybe_withjitter(CacheSize, MaxCacheSize) -> - if - CacheSize > MaxCacheSize -> - R = leveled_rand:uniform(7 * MaxCacheSize), - if - (CacheSize - MaxCacheSize) > R -> - true; - true -> - false - end; - true -> - false - end. +%% Push down randomly, but the closer to 4 * the maximum size, the more likely +%% a push should be +maybe_withjitter(CacheSize, MaxCacheSize) when CacheSize > MaxCacheSize -> + R = leveled_rand:uniform(4 * MaxCacheSize), + (CacheSize - MaxCacheSize) > R; +maybe_withjitter(_CacheSize, _MaxCacheSize) -> + false. -spec get_loadfun(book_state()) -> fun(). diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 4c1f061..bcb8900 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -98,7 +98,7 @@ {"P0008", {info, "Penciller closing for reason ~w"}}, {"P0010", - {info, "No level zero action on close of Penciller ~w"}}, + {info, "discarded=~w level zero on close of Penciller"}}, {"P0011", {info, "Shutdown complete for Penciller for reason ~w"}}, {"P0012", diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index e60a7b6..98b849b 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -173,7 +173,7 @@ pcl_snapstart/1, pcl_start/1, pcl_pushmem/2, - pcl_fetchlevelzero/2, + pcl_fetchlevelzero/3, pcl_fetch/4, pcl_fetchkeys/5, pcl_fetchkeys/6, @@ -306,6 +306,8 @@ -type iterator() :: list(iterator_entry()). -type bad_ledgerkey() :: list(). +-export_type([levelzero_cacheentry/0]). + %%%============================================================================ %%% API %%%============================================================================ @@ -348,7 +350,7 @@ pcl_pushmem(Pid, LedgerCache) -> %% Bookie to dump memory onto penciller gen_server:call(Pid, {push_mem, LedgerCache}, infinity). --spec pcl_fetchlevelzero(pid(), integer()) -> tuple(). +-spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok. %% @doc %% Allows a single slot of the penciller's levelzero cache to be fetched. The %% levelzero cache can be up to 40K keys - sending this to the process that is @@ -358,13 +360,13 @@ pcl_pushmem(Pid, LedgerCache) -> %% %% The return value will be a leveled_skiplist that forms that part of the %% cache -pcl_fetchlevelzero(Pid, Slot) -> +pcl_fetchlevelzero(Pid, Slot, ReturnFun) -> % Timeout to cause crash of L0 file when it can't get the close signal % as it is deadlocked making this call. % % If the timeout gets hit outside of close scenario the Penciller will % be stuck in L0 pending - gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). + gen_server:cast(Pid, {fetch_levelzero, Slot, ReturnFun}). -spec pcl_fetch(pid(), leveled_codec:ledger_key()) -> leveled_codec:ledger_kv()|not_present. @@ -683,15 +685,12 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}}, {reply, returned, State}; {false, true} -> leveled_log:log("P0042", [State#state.levelzero_size]), - % The cache is full (there are 127 items already in it), so - % can't accept any more. However, we need to try and roll - % memory otherwise cache may be permanently full. + % The cache is full (the maximum line items have been reached), so + % can't accept any more. However, we need to try and roll memory + % otherwise cache may be permanently full. gen_server:reply(From, returned), - {L0Pend, L0Constructor, none} = - maybe_roll_memory(State, false), - {noreply, - State#state{levelzero_pending=L0Pend, - levelzero_constructor=L0Constructor}}; + {UpdState, none} = maybe_roll_memory(State, true, false), + {noreply, UpdState}; {false, false} -> % leveled_log:log("P0018", [ok, false, false]), PushedTree = @@ -892,8 +891,6 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning}, CloneState#state{snapshot_fully_loaded=true, manifest=ManifestClone}}, State#state{manifest = Manifest0}}; -handle_call({fetch_levelzero, Slot}, _From, State) -> - {reply, lists:nth(Slot, State#state.levelzero_cache), State}; handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true -> ok = pcl_releasesnapshot(State#state.source_penciller, self()), {stop, normal, ok, State}; @@ -909,20 +906,21 @@ handle_call(close, _From, State) -> % on the clerk. ok = leveled_pclerk:clerk_close(State#state.clerk), leveled_log:log("P0008", [close]), - L0_Left = State#state.levelzero_size > 0, - case {State#state.levelzero_pending, L0_Left} of - {false, true} -> - {_L0Pend, L0Pid, _L0Bloom} = maybe_roll_memory(State, true), + L0Empty = State#state.levelzero_size == 0, + case (not State#state.levelzero_pending and not L0Empty) of + true -> + L0_Left = State#state.levelzero_size > 0, + {UpdState, _L0Bloom} = maybe_roll_memory(State, L0_Left, true), + L0Pid = UpdState#state.levelzero_constructor, case is_pid(L0Pid) of true -> ok = leveled_sst:sst_close(L0Pid); false -> - ok + leveled_log:log("P0010", [State#state.levelzero_size]) end; - StatusTuple -> - leveled_log:log("P0010", [StatusTuple]) + false -> + leveled_log:log("P0010", [State#state.levelzero_size]) end, - shutdown_manifest(State#state.manifest, State#state.levelzero_constructor), {stop, normal, ok, State}; handle_call(doom, _From, State) -> @@ -958,14 +956,21 @@ handle_call(check_for_work, _From, State) -> handle_call(persisted_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}. -handle_cast({manifest_change, NewManifest}, State) -> - NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest), +handle_cast({manifest_change, Manifest}, State) -> + NewManSQN = leveled_pmanifest:get_manifest_sqn(Manifest), OldManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest), leveled_log:log("P0041", [OldManSQN, NewManSQN]), - ok = leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN), - UpdManifest = leveled_pmanifest:merge_snapshot(State#state.manifest, - NewManifest), - {noreply, State#state{manifest = UpdManifest, work_ongoing=false}}; + % Only safe to update the manifest if the SQN increments + if NewManSQN > OldManSQN -> + ok = + leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN), + % This is accepted as the new manifest, files may be deleted + UpdManifest = + leveled_pmanifest:merge_snapshot(State#state.manifest, Manifest), + % Need to preserve the penciller's view of snapshots stored in + % the manifest + {noreply, State#state{manifest=UpdManifest, work_ongoing=false}} + end; handle_cast({release_snapshot, Snapshot}, State) -> Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest, Snapshot), @@ -1044,6 +1049,9 @@ handle_cast(work_for_clerk, State) -> _ -> {noreply, State} end; +handle_cast({fetch_levelzero, Slot, ReturnFun}, State) -> + ReturnFun(lists:nth(Slot, State#state.levelzero_cache)), + {noreply, State}; handle_cast({log_level, LogLevel}, State) -> PC = State#state.clerk, ok = leveled_pclerk:clerk_loglevel(PC, LogLevel), @@ -1293,37 +1301,37 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, false -> true end, - NoPendingManifestChange = not State#state.work_ongoing, JitterCheck = RandomFactor or CacheMuchTooBig, Due = CacheTooBig and JitterCheck, - case {Due, NoPendingManifestChange} of - {true, true} -> - {L0Pend, L0Constructor, none} = - maybe_roll_memory(UpdState, false), - LogSubs = [NewL0Size, true, true], - leveled_log:log_timer("P0031", LogSubs, SW), - UpdState#state{levelzero_pending=L0Pend, - levelzero_constructor=L0Constructor}; - _ -> - LogSubs = [NewL0Size, Due, NoPendingManifestChange], - leveled_log:log_timer("P0031", LogSubs, SW), - UpdState - end + {UpdState0, _L0Bloom} = maybe_roll_memory(UpdState, Due, false), + LogSubs = [NewL0Size, Due, State#state.work_ongoing], + leveled_log:log_timer("P0031", LogSubs, SW), + UpdState0 end. --spec maybe_roll_memory(pcl_state(), boolean()) - -> {boolean(), pid()|undefined, leveled_ebloom:bloom()|none}. +-spec maybe_roll_memory(pcl_state(), boolean(), boolean()) + -> {pcl_state(), leveled_ebloom:bloom()|none}. %% @doc -%% Check that no L0 file is present before rolling memory -maybe_roll_memory(State, SyncRoll) -> +%% Check that no L0 file is present before rolling memory. Returns a boolean +%% to indicate if memory has been rolled, the Pid of the L0 constructor and +%% The bloom of the L0 file (or none) +maybe_roll_memory(State, false, _SyncRoll) -> + {State, none}; +maybe_roll_memory(State, true, SyncRoll) -> BlockedByL0 = leveled_pmanifest:levelzero_present(State#state.manifest), - case BlockedByL0 of + PendingManifestChange = State#state.work_ongoing, + % It is critical that memory is not rolled if the manifest is due to be + % updated by a change by the clerk. When that manifest change is made it + % will override the addition of L0 and data will be lost. + case (BlockedByL0 or PendingManifestChange) of true -> - {false, undefined, none}; + {State, none}; false -> {L0Constructor, Bloom} = roll_memory(State, SyncRoll), - {true, L0Constructor, Bloom} + {State#state{levelzero_pending=true, + levelzero_constructor=L0Constructor}, + Bloom} end. -spec roll_memory(pcl_state(), boolean()) @@ -1347,7 +1355,8 @@ roll_memory(State, false) -> FileName = sst_filename(ManSQN, 0, 0), leveled_log:log("P0019", [FileName, State#state.ledger_sqn]), PCL = self(), - FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, + FetchFun = + fun(Slot, ReturnFun) -> pcl_fetchlevelzero(PCL, Slot, ReturnFun) end, R = leveled_sst:sst_newlevelzero(RootPath, FileName, length(State#state.levelzero_cache), @@ -2042,7 +2051,7 @@ simple_server_test() -> false = pcl_checkbloomtest(PCL, {o,"Bucket9999", "Key9999", null}), ok = shutdown_when_compact(PCL), - + {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000, @@ -2309,24 +2318,16 @@ create_file_test() -> ok = file:write_file(filename:join(RP, Filename), term_to_binary("hello")), KVL = lists:usort(generate_randomkeys({50000, 0})), Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE), - FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, + {ok, SP, noreply} = leveled_sst:sst_newlevelzero(RP, Filename, 1, - FetchFun, + [Tree], undefined, 50000, #sst_options{press_method = native}), - lists:foreach(fun(X) -> - case checkready(SP) of - timeout -> - timer:sleep(X); - _ -> - ok - end end, - [50, 100, 200, 400, 800]), - {ok, SrcFN, StartKey, EndKey} = checkready(SP), + {ok, SrcFN, StartKey, EndKey} = leveled_sst:sst_checkready(SP), io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]), ?assertMatch({o, _, _, _}, StartKey), ?assertMatch({o, _, _, _}, EndKey), @@ -2339,14 +2340,6 @@ slow_fetch_test() -> ?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)), ?assertMatch("value", log_slowfetch(2, "value", "fake", 0, 1)). -checkready(Pid) -> - try - leveled_sst:sst_checkready(Pid) - catch - exit:{timeout, _} -> - timeout - end. - timings_test() -> SW = os:timestamp(), timer:sleep(1), diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index e2de638..17b9277 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -44,6 +44,8 @@ -include_lib("eunit/include/eunit.hrl"). +-define(MAX_CACHE_LINES, 31). % Must be less than 128 + % -type index_array() :: array:array(). -type index_array() :: any()|none. % To live with OTP16 @@ -55,9 +57,9 @@ -spec cache_full(list()) -> boolean(). %% @doc -%% If there are already 127 entries in the cache then the cache is full +%% If there are already 31 entries in the cache then the cache is full cache_full(L0Cache) -> - length(L0Cache) == 127. + length(L0Cache) == ?MAX_CACHE_LINES. -spec prepare_for_index(index_array(), leveled_codec:segment_hash()) -> index_array(). diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 36ae60a..a753dc0 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -65,6 +65,7 @@ -ifdef(fsm_deprecated). -compile({nowarn_deprecated_function, [{gen_fsm, start_link, 3}, + {gen_fsm, sync_send_event, 2}, {gen_fsm, sync_send_event, 3}, {gen_fsm, send_event, 2}, {gen_fsm, send_all_state_event, 2}]}). @@ -188,7 +189,9 @@ timings = no_timing :: sst_timings(), timings_countdown = 0 :: integer(), starting_pid :: pid()|undefined, - fetch_cache = array:new([{size, ?CACHE_SIZE}])}). + fetch_cache = array:new([{size, ?CACHE_SIZE}]), + new_slots :: list()|undefined, + deferred_startup_tuple :: tuple()|undefined}). -record(sst_timings, {sample_count = 0 :: integer(), @@ -335,31 +338,42 @@ sst_new(RootPath, Filename, end. -spec sst_newlevelzero(string(), string(), - integer(), fun(), pid()|undefined, integer(), + integer(), fun()|list(), pid()|undefined, integer(), sst_options()) -> - {ok, pid(), noreply}. + {ok, pid(), noreply}. %% @doc %% Start a new file at level zero. At this level the file size is not fixed - %% it will be as big as the input. Also the KVList is not passed in, it is %% fetched slot by slot using the FetchFun sst_newlevelzero(RootPath, Filename, - Slots, FetchFun, Penciller, + Slots, Fetcher, Penciller, MaxSQN, OptsSST) -> PressMethod0 = compress_level(0, OptsSST#sst_options.press_method), OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), - gen_fsm:send_event(Pid, - {sst_newlevelzero, - RootPath, - Filename, - Slots, - FetchFun, - Penciller, - MaxSQN, - OptsSST0, - ?INDEX_MODDATE}), + % Initiate the file into the "starting" state + ok = gen_fsm:sync_send_event(Pid, + {sst_newlevelzero, + RootPath, + Filename, + Penciller, + MaxSQN, + OptsSST0, + ?INDEX_MODDATE}), + ok = + case is_list(Fetcher) of + true -> + gen_fsm:send_event(Pid, {complete_l0startup, Fetcher}); + false -> + % Fetcher is a function + gen_fsm:send_event(Pid, {sst_returnslot, none, Fetcher, Slots}) + % Start the fetch loop (async). Having the fetch loop running + % on async message passing means that the SST file can now be + % closed while the fetch loop is still completing + end, {ok, Pid, noreply}. + -spec sst_get(pid(), leveled_codec:ledger_key()) -> leveled_codec:ledger_kv()|not_present. %% @doc @@ -415,7 +429,7 @@ sst_setfordelete(Pid, Penciller) -> %% For this file to be closed and deleted sst_clear(Pid) -> gen_fsm:sync_send_event(Pid, {set_for_delete, false}, infinity), - gen_fsm:sync_send_event(Pid, close, 1000). + gen_fsm:sync_send_event(Pid, close). -spec sst_deleteconfirmed(pid()) -> ok. %% @doc @@ -432,13 +446,13 @@ sst_deleteconfirmed(Pid) -> %% the filename and the {startKey, EndKey} for the manifest. sst_checkready(Pid) -> %% Only used in test - gen_fsm:sync_send_event(Pid, background_complete, 100). + gen_fsm:sync_send_event(Pid, background_complete). -spec sst_close(pid()) -> ok. %% @doc %% Close the file sst_close(Pid) -> - gen_fsm:sync_send_event(Pid, close, 2000). + gen_fsm:sync_send_event(Pid, close). -spec sst_printtimings(pid()) -> ok. %% @doc @@ -446,7 +460,7 @@ sst_close(Pid) -> %% forced to be printed. %% Used in unit tests to force the printing of timings sst_printtimings(Pid) -> - gen_fsm:sync_send_event(Pid, print_timings, 1000). + gen_fsm:sync_send_event(Pid, print_timings). %%%============================================================================ @@ -492,15 +506,29 @@ starting({sst_new, reader, UpdState#state{blockindex_cache = BlockIndex, starting_pid = StartingPID}, - ?STARTUP_TIMEOUT}. - + ?STARTUP_TIMEOUT}; starting({sst_newlevelzero, RootPath, Filename, - Slots, FetchFun, Penciller, MaxSQN, - OptsSST, IdxModDate}, State) -> + Penciller, MaxSQN, + OptsSST, IdxModDate}, _From, State) -> + DeferredStartupTuple = + {RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate}, + {reply, ok, starting, + State#state{deferred_startup_tuple = DeferredStartupTuple}}; +starting(close, _From, State) -> + % No file should have been created, so nothing to close. + {stop, normal, ok, State}. + +starting({complete_l0startup, Slots}, State) -> + starting(complete_l0startup, State#state{new_slots = Slots}); +starting(complete_l0startup, State) -> + {RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate} = + State#state.deferred_startup_tuple, SW0 = os:timestamp(), + FetchedSlots = State#state.new_slots, leveled_log:save(OptsSST#sst_options.log_options), PressMethod = OptsSST#sst_options.press_method, - KVList = leveled_pmem:to_list(Slots, FetchFun), + FetchFun = fun(Slot) -> lists:nth(Slot, FetchedSlots) end, + KVList = leveled_pmem:to_list(length(FetchedSlots), FetchFun), Time0 = timer:now_diff(os:timestamp(), SW0), SW1 = os:timestamp(), @@ -524,7 +552,12 @@ starting({sst_newlevelzero, RootPath, Filename, PressMethod, IdxModDate), {UpdState, Bloom} = read_file(ActualFilename, - State#state{root_path=RootPath, yield_blockquery=true}), + State#state{root_path=RootPath, + yield_blockquery=true, + % Important to empty this from state rather + % than carry it through to the next stage + new_slots=undefined, + deferred_startup_tuple=undefined}), Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), @@ -547,9 +580,37 @@ starting({sst_newlevelzero, RootPath, Filename, {next_state, reader, UpdState#state{blockindex_cache = BlockIndex}} + end; +starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) -> + Self = self(), + FetchedSlots = + case FetchedSlot of + none -> + []; + _ -> + [FetchedSlot|State#state.new_slots] + end, + case length(FetchedSlots) == SlotCount of + true -> + gen_fsm:send_event(Self, complete_l0startup), + {next_state, + starting, + % Reverse the slots so that they are back in the expected + % order + State#state{new_slots = lists:reverse(FetchedSlots)}}; + false -> + ReturnFun = + fun(NextSlot) -> + gen_fsm:send_event(Self, + {sst_returnslot, NextSlot, + FetchFun, SlotCount}) + end, + FetchFun(length(FetchedSlots) + 1, ReturnFun), + {next_state, + starting, + State#state{new_slots = FetchedSlots}} end. - reader({get_kv, LedgerKey, Hash}, _From, State) -> % Get a KV value and potentially take sample timings {Result, UpdState, UpdTimings} = @@ -3351,6 +3412,10 @@ take_max_lastmoddate_test() -> % modified dates ?assertMatch(1, take_max_lastmoddate(0, 1)). - +stopstart_test() -> + {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), + % check we can close in the starting state. This may happen due to the + % fetcher on new level zero files working in a loop + ok = sst_close(Pid). -endif.