From c9bf43953b5488a45ee7e9d54a45cbe27036ab16 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 24 Jan 2019 14:32:01 +0000 Subject: [PATCH 1/3] Expect TS in snapshot references from manifest Add type to prevent re-occurence. This is also detected by failure in eqc tests. --- src/leveled_inker.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2faadf6..d2ad02b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -142,7 +142,7 @@ journal_sqn = 0 :: integer(), active_journaldb :: pid() | undefined, pending_removals = [] :: list(), - registered_snapshots = [] :: list(), + registered_snapshots = [] :: list(registered_snapshot()), root_path :: string() | undefined, cdb_options :: #cdb_options{} | undefined, clerk :: pid() | undefined, @@ -157,7 +157,7 @@ -type inker_options() :: #inker_options{}. -type ink_state() :: #state{}. - +-type registered_snapshot() :: {pid(), os:timestamp(), integer()}. %%%============================================================================ %%% API @@ -843,11 +843,12 @@ start_from_file(InkOpts) -> clerk = Clerk}}. --spec shutdown_snapshots(list(tuple())) -> ok. +-spec shutdown_snapshots(list(registered_snapshot())) -> ok. %% @doc %% Shutdown any snapshots before closing the store shutdown_snapshots(Snapshots) -> - lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, Snapshots). + lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end, + Snapshots). -spec shutdown_manifest(leveled_imanifest:manifest()) -> ok. %% @doc From 28d0aef5fec893deea752da895594627cb1a3b83 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 24 Jan 2019 15:46:17 +0000 Subject: [PATCH 2/3] Make check that compaction not ongoing before accepting new compaction Respond 'busy' if compaction is ongoing --- src/leveled_bookie.erl | 6 +++--- src/leveled_inker.erl | 23 ++++++++++++++--------- test/end_to_end/basic_SUITE.erl | 1 + 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 73c0c33..9e547b4 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1004,7 +1004,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity). --spec book_compactjournal(pid(), integer()) -> ok. +-spec book_compactjournal(pid(), integer()) -> ok|busy. -spec book_islastcompactionpending(pid()) -> boolean(). -spec book_trimjournal(pid()) -> ok. @@ -1371,10 +1371,10 @@ handle_call({return_runner, QueryType}, _From, State) -> fold_countdown = CountDown}}; handle_call({compact_journal, Timeout}, _From, State) when State#state.head_only == false -> - ok = leveled_inker:ink_compactjournal(State#state.inker, + R = leveled_inker:ink_compactjournal(State#state.inker, self(), Timeout), - {reply, ok, State}; + {reply, R, State}; handle_call(confirm_compact, _From, State) when State#state.head_only == false -> {reply, leveled_inker:ink_compactionpending(State#state.inker), State}; diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index d2ad02b..dd6b86d 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -348,7 +348,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> as_ink}, infinity). --spec ink_compactjournal(pid(), pid(), integer()) -> ok. +-spec ink_compactjournal(pid(), pid(), integer()) -> ok|busy. %% @doc %% Trigger a compaction event. the compaction event will use a sqn check %% against the Ledger to see if a value can be compacted - if the penciller @@ -605,14 +605,19 @@ handle_call({compact, FilterFun, Timeout}, _From, State) -> - leveled_iclerk:clerk_compact(State#state.clerk, - Checker, - InitiateFun, - CloseFun, - FilterFun, - self(), - Timeout), - {reply, ok, State#state{compaction_pending=true}}; + case State#state.compaction_pending of + true -> + {reply, busy, State}; + false -> + leveled_iclerk:clerk_compact(State#state.clerk, + Checker, + InitiateFun, + CloseFun, + FilterFun, + self(), + Timeout), + {reply, ok, State#state{compaction_pending=true}} + end; handle_call(compaction_complete, _From, State) -> {reply, ok, State#state{compaction_pending=false}}; handle_call(compaction_pending, _From, State) -> diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 5aef2e8..a17108a 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -300,6 +300,7 @@ journal_compaction_tester(Restart, WRP) -> {sync_strategy, testutil:sync_strategy()}], {ok, Bookie3} = leveled_bookie:book_start(StartOpts2), ok = leveled_bookie:book_compactjournal(Bookie3, 30000), + busy = leveled_bookie:book_compactjournal(Bookie3, 30000), testutil:wait_for_compaction(Bookie3), ok = leveled_bookie:book_close(Bookie3), From 0333604fd93278e06e35ed4d9e2d699e010e59b8 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 24 Jan 2019 21:32:54 +0000 Subject: [PATCH 3/3] Change to cast in inker/iclerk interaction This allows for leveled_iclerk:clerk_stop to be a sync call, so that files will only be closed once the iclerk has stopped. This is designed ot prevent iclerk crashes during shutdowns when files it is depnding on are closed mid shutdown. --- src/leveled_iclerk.erl | 81 +++++++++++++-------------- src/leveled_inker.erl | 94 +++++++++++++------------------- test/end_to_end/tictac_SUITE.erl | 3 + 3 files changed, 78 insertions(+), 100 deletions(-) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 5f6b09a..cd8c547 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -82,9 +82,10 @@ code_change/3]). -export([clerk_new/1, - clerk_compact/7, + clerk_compact/6, clerk_hashtablecalc/3, clerk_trim/3, + clerk_promptdeletions/3, clerk_stop/1, clerk_loglevel/2, clerk_addlogs/2, @@ -148,25 +149,30 @@ clerk_new(InkerClerkOpts) -> -spec clerk_compact(pid(), pid(), fun(), fun(), fun(), - pid(), integer()) -> ok. + list()) -> ok. %% @doc %% Trigger a compaction for this clerk if the threshold of data recovery has %% been met -clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) -> +clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Manifest) -> gen_server:cast(Pid, {compact, Checker, InitiateFun, CloseFun, FilterFun, - Inker, - TimeO}). + Manifest}). --spec clerk_trim(pid(), pid(), integer()) -> ok. +-spec clerk_trim(pid(), integer(), list()) -> ok. %% @doc %% Trim the Inker back to the persisted SQN -clerk_trim(Pid, Inker, PersistedSQN) -> - gen_server:cast(Pid, {trim, Inker, PersistedSQN}). +clerk_trim(Pid, PersistedSQN, ManifestAsList) -> + gen_server:cast(Pid, {trim, PersistedSQN, ManifestAsList}). + +-spec clerk_promptdeletions(pid(), pos_integer(), list()) -> ok. +%% @doc +%% +clerk_promptdeletions(Pid, ManifestSQN, DeletedFiles) -> + gen_server:cast(Pid, {prompt_deletions, ManifestSQN, DeletedFiles}). -spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok. %% @doc @@ -182,7 +188,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> %% @doc %% Stop the clerk clerk_stop(Pid) -> - gen_server:cast(Pid, stop). + gen_server:call(Pid, stop, 60000). -spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. %% @doc @@ -247,10 +253,10 @@ init([LogOpts, IClerkOpts]) -> compression_method = IClerkOpts#iclerk_options.compression_method}}. -handle_call(_Msg, _From, State) -> - {reply, not_supported, State}. +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. -handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, +handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, State) -> % Empty the waste folder clear_waste(State), @@ -260,7 +266,8 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest - [_Active|Manifest] = leveled_inker:ink_getmanifest(Inker), + [_Active|Manifest] = Manifest0, + Inker = State#state.inker, MaxRunLength = State#state.max_run_length, {FilterServer, MaxSQN} = InitiateFun(Checker), CDBopts = State#state.cdb_options, @@ -291,24 +298,29 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, end, BestRun1), leveled_log:log("IC002", [length(FilesToDelete)]), - case is_process_alive(Inker) of - true -> - update_inker(Inker, - ManifestSlice, - FilesToDelete), - ok = CloseFun(FilterServer), - {noreply, State} - end; + ok = leveled_inker:ink_clerkcomplete(Inker, + ManifestSlice, + FilesToDelete), + ok = CloseFun(FilterServer), + {noreply, State}; false -> - ok = leveled_inker:ink_compactioncomplete(Inker), + ok = leveled_inker:ink_clerkcomplete(Inker, [], []), ok = CloseFun(FilterServer), {noreply, State} end; -handle_cast({trim, Inker, PersistedSQN}, State) -> - ManifestAsList = leveled_inker:ink_getmanifest(Inker), +handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> FilesToDelete = leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList), - ok = update_inker(Inker, [], FilesToDelete), + leveled_log:log("IC007", []), + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], FilesToDelete), + {noreply, State}; +handle_cast({prompt_deletions, ManifestSQN, FilesToDelete}, State) -> + lists:foreach(fun({_SQN, _FN, J2D, _LK}) -> + leveled_cdb:cdb_deletepending(J2D, + ManifestSQN, + State#state.inker) + end, + FilesToDelete), {noreply, State}; handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), @@ -328,9 +340,7 @@ handle_cast({remove_logs, ForcedLogs}, State) -> ok = leveled_log:remove_forcedlogs(ForcedLogs), CDBopts = State#state.cdb_options, CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, - {noreply, State#state{cdb_options = CDBopts0}}; -handle_cast(stop, State) -> - {stop, normal, State}. + {noreply, State#state{cdb_options = CDBopts0}}. handle_info(_Info, State) -> {noreply, State}. @@ -613,20 +623,6 @@ sort_run(RunOfFiles) -> Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end, lists:sort(CompareFun, RunOfFiles). -update_inker(Inker, ManifestSlice, FilesToDelete) -> - {ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker, - ManifestSlice, - FilesToDelete), - ok = leveled_inker:ink_compactioncomplete(Inker), - leveled_log:log("IC007", []), - lists:foreach(fun({_SQN, _FN, J2D, _LK}) -> - leveled_cdb:cdb_deletepending(J2D, - ManSQN, - Inker) - end, - FilesToDelete), - ok. - compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy, PressMethod) -> BatchesOfPositions = get_all_positions(BestRun, []), @@ -1147,7 +1143,6 @@ size_score_test() -> coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null), - {reply, not_supported, _State2} = handle_call(null, null, #state{}), terminate(error, #state{}). -endif. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index dd6b86d..d25f6d9 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -105,11 +105,10 @@ ink_registersnapshot/2, ink_confirmdelete/2, ink_compactjournal/3, - ink_compactioncomplete/1, + ink_clerkcomplete/3, ink_compactionpending/1, ink_trim/2, ink_getmanifest/1, - ink_updatemanifest/3, ink_printmanifest/1, ink_close/1, ink_doom/1, @@ -359,7 +358,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> %% that any value that was written more recently than the last flush to disk %% of the Ledger will not be considered for compaction (as this may be %% required to reload the Ledger on startup). -ink_compactjournal(Pid, Bookie, Timeout) -> +ink_compactjournal(Pid, Bookie, _Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, CheckerCloseFun = fun leveled_penciller:pcl_close/1, CheckerFilterFun = @@ -369,28 +368,26 @@ ink_compactjournal(Pid, Bookie, Timeout) -> Bookie, CheckerInitiateFun, CheckerCloseFun, - CheckerFilterFun, - Timeout}, + CheckerFilterFun}, infinity). %% Allows the Checker to be overriden in test, use something other than a %% penciller -ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) -> +ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, _Timeout) -> gen_server:call(Pid, {compact, Checker, InitiateFun, CloseFun, - FilterFun, - Timeout}, + FilterFun}, infinity). --spec ink_compactioncomplete(pid()) -> ok. +-spec ink_clerkcomplete(pid(), list(), list()) -> ok. %% @doc %% Used by a clerk to state that a compaction process is over, only change %% is to unlock the Inker for further compactions. -ink_compactioncomplete(Pid) -> - gen_server:call(Pid, compaction_complete, infinity). +ink_clerkcomplete(Pid, ManifestSnippet, FilesToDelete) -> + gen_server:cast(Pid, {clerk_complete, ManifestSnippet, FilesToDelete}). -spec ink_compactionpending(pid()) -> boolean(). %% @doc @@ -425,21 +422,6 @@ ink_backup(Pid, BackupPath) -> ink_getmanifest(Pid) -> gen_server:call(Pid, get_manifest, infinity). --spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}. -%% @doc -%% Add a section of new entries into the manifest, and drop a bunch of deleted -%% files out of the manifest. Used to update the manifest after a compaction -%% job. -%% -%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the -%% updated manifest -ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) -> - gen_server:call(Pid, - {update_manifest, - ManifestSnippet, - DeletedFiles}, - infinity). - -spec ink_printmanifest(pid()) -> ok. %% @doc %% Used in tests to print out the manifest @@ -574,27 +556,6 @@ handle_call({confirm_delete, ManSQN}, _From, State) -> State#state{registered_snapshots = RegisteredSnapshots0}}; handle_call(get_manifest, _From, State) -> {reply, leveled_imanifest:to_list(State#state.manifest), State}; -handle_call({update_manifest, - ManifestSnippet, - DeletedFiles}, _From, State) -> - DropFun = - fun(E, Acc) -> - leveled_imanifest:remove_entry(Acc, E) - end, - Man0 = lists:foldl(DropFun, State#state.manifest, DeletedFiles), - AddFun = - fun(E, Acc) -> - leveled_imanifest:add_entry(Acc, E, false) - end, - Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), - NewManifestSQN = State#state.manifest_sqn + 1, - leveled_imanifest:printer(Man1), - leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path), - {reply, - {ok, NewManifestSQN}, - State#state{manifest=Man1, - manifest_sqn=NewManifestSQN, - pending_removals=DeletedFiles}}; handle_call(print_manifest, _From, State) -> leveled_imanifest:printer(State#state.manifest), {reply, ok, State}; @@ -602,28 +563,26 @@ handle_call({compact, Checker, InitiateFun, CloseFun, - FilterFun, - Timeout}, + FilterFun}, _From, State) -> case State#state.compaction_pending of true -> {reply, busy, State}; false -> + Manifest = leveled_imanifest:to_list(State#state.manifest), leveled_iclerk:clerk_compact(State#state.clerk, Checker, InitiateFun, CloseFun, FilterFun, - self(), - Timeout), + Manifest), {reply, ok, State#state{compaction_pending=true}} end; -handle_call(compaction_complete, _From, State) -> - {reply, ok, State#state{compaction_pending=false}}; handle_call(compaction_pending, _From, State) -> {reply, State#state.compaction_pending, State}; handle_call({trim, PersistedSQN}, _From, State) -> - ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN), + Manifest = leveled_imanifest:to_list(State#state.manifest), + ok = leveled_iclerk:clerk_trim(State#state.clerk, PersistedSQN, Manifest), {reply, ok, State}; handle_call(roll, _From, State) -> case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of @@ -717,7 +676,7 @@ handle_call(close, _From, State) -> leveled_log:log("I0005", [close]), leveled_log:log("I0006", [State#state.journal_sqn, State#state.manifest_sqn]), - leveled_iclerk:clerk_stop(State#state.clerk), + ok = leveled_iclerk:clerk_stop(State#state.clerk), shutdown_snapshots(State#state.registered_snapshots), shutdown_manifest(State#state.manifest) end, @@ -732,12 +691,33 @@ handle_call(doom, _From, State) -> leveled_log:log("I0005", [doom]), leveled_log:log("I0006", [State#state.journal_sqn, State#state.manifest_sqn]), - leveled_iclerk:clerk_stop(State#state.clerk), + ok = leveled_iclerk:clerk_stop(State#state.clerk), shutdown_snapshots(State#state.registered_snapshots), shutdown_manifest(State#state.manifest), - {stop, normal, {ok, FPs}, State}. + +handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) -> + DropFun = + fun(E, Acc) -> + leveled_imanifest:remove_entry(Acc, E) + end, + Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete), + AddFun = + fun(E, Acc) -> + leveled_imanifest:add_entry(Acc, E, false) + end, + Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), + NewManifestSQN = State#state.manifest_sqn + 1, + leveled_imanifest:printer(Man1), + leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path), + ok = leveled_iclerk:clerk_promptdeletions(State#state.clerk, + NewManifestSQN, + FilesToDelete), + {noreply, State#state{manifest=Man1, + manifest_sqn=NewManifestSQN, + pending_removals=FilesToDelete, + compaction_pending=false}}; handle_cast({release_snapshot, Snapshot}, State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), leveled_log:log("I0003", [Snapshot]), diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index e0f5f01..ba422b0 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -724,6 +724,9 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) -> {ok, FinalFNs} = file:list_dir(JFP), + ok = leveled_bookie:book_trimjournal(Bookie1), + % CCheck a second trim is still OK + [{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL, case HeadOnly of with_lookup ->