diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index cfa4084..a0fd287 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1005,7 +1005,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity). --spec book_compactjournal(pid(), integer()) -> ok. +-spec book_compactjournal(pid(), integer()) -> ok|busy. -spec book_eqccompactjournal(pid(), integer()) -> {ok, pid()}. -spec book_islastcompactionpending(pid()) -> boolean(). -spec book_trimjournal(pid()) -> ok. @@ -1016,11 +1016,12 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> %% in Riak it will be triggered by a vnode callback. book_eqccompactjournal(Pid, Timeout) -> - gen_server:call(Pid, {compact_journal, Timeout}, infinity). + {_R, P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), + {ok, P}. book_compactjournal(Pid, Timeout) -> - {ok, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), - ok. + {R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), + R. %% @doc Check on progress of the last compaction diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index f0a2318..cd8c547 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -82,9 +82,10 @@ code_change/3]). -export([clerk_new/1, - clerk_compact/7, + clerk_compact/6, clerk_hashtablecalc/3, clerk_trim/3, + clerk_promptdeletions/3, clerk_stop/1, clerk_loglevel/2, clerk_addlogs/2, @@ -148,25 +149,30 @@ clerk_new(InkerClerkOpts) -> -spec clerk_compact(pid(), pid(), fun(), fun(), fun(), - pid(), integer()) -> ok. + list()) -> ok. %% @doc %% Trigger a compaction for this clerk if the threshold of data recovery has %% been met -clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) -> +clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Manifest) -> gen_server:cast(Pid, {compact, Checker, InitiateFun, CloseFun, FilterFun, - Inker, - TimeO}). + Manifest}). --spec clerk_trim(pid(), pid(), integer()) -> ok. +-spec clerk_trim(pid(), integer(), list()) -> ok. %% @doc %% Trim the Inker back to the persisted SQN -clerk_trim(Pid, Inker, PersistedSQN) -> - gen_server:cast(Pid, {trim, Inker, PersistedSQN}). +clerk_trim(Pid, PersistedSQN, ManifestAsList) -> + gen_server:cast(Pid, {trim, PersistedSQN, ManifestAsList}). + +-spec clerk_promptdeletions(pid(), pos_integer(), list()) -> ok. +%% @doc +%% +clerk_promptdeletions(Pid, ManifestSQN, DeletedFiles) -> + gen_server:cast(Pid, {prompt_deletions, ManifestSQN, DeletedFiles}). -spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok. %% @doc @@ -182,8 +188,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> %% @doc %% Stop the clerk clerk_stop(Pid) -> - unlink(Pid), - gen_server:cast(Pid, stop). + gen_server:call(Pid, stop, 60000). -spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. %% @doc @@ -248,10 +253,10 @@ init([LogOpts, IClerkOpts]) -> compression_method = IClerkOpts#iclerk_options.compression_method}}. -handle_call(_Msg, _From, State) -> - {reply, not_supported, State}. +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. -handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, +handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, State) -> % Empty the waste folder clear_waste(State), @@ -261,7 +266,8 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest - [_Active|Manifest] = leveled_inker:ink_getmanifest(Inker), + [_Active|Manifest] = Manifest0, + Inker = State#state.inker, MaxRunLength = State#state.max_run_length, {FilterServer, MaxSQN} = InitiateFun(Checker), CDBopts = State#state.cdb_options, @@ -292,24 +298,29 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, end, BestRun1), leveled_log:log("IC002", [length(FilesToDelete)]), - case is_process_alive(Inker) of - true -> - update_inker(Inker, - ManifestSlice, - FilesToDelete), - ok = CloseFun(FilterServer), - {noreply, State} - end; + ok = leveled_inker:ink_clerkcomplete(Inker, + ManifestSlice, + FilesToDelete), + ok = CloseFun(FilterServer), + {noreply, State}; false -> - ok = leveled_inker:ink_compactioncomplete(Inker), + ok = leveled_inker:ink_clerkcomplete(Inker, [], []), ok = CloseFun(FilterServer), {noreply, State} end; -handle_cast({trim, Inker, PersistedSQN}, State) -> - ManifestAsList = leveled_inker:ink_getmanifest(Inker), +handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> FilesToDelete = leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList), - ok = update_inker(Inker, [], FilesToDelete), + leveled_log:log("IC007", []), + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], FilesToDelete), + {noreply, State}; +handle_cast({prompt_deletions, ManifestSQN, FilesToDelete}, State) -> + lists:foreach(fun({_SQN, _FN, J2D, _LK}) -> + leveled_cdb:cdb_deletepending(J2D, + ManifestSQN, + State#state.inker) + end, + FilesToDelete), {noreply, State}; handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), @@ -329,9 +340,7 @@ handle_cast({remove_logs, ForcedLogs}, State) -> ok = leveled_log:remove_forcedlogs(ForcedLogs), CDBopts = State#state.cdb_options, CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, - {noreply, State#state{cdb_options = CDBopts0}}; -handle_cast(stop, State) -> - {stop, normal, State}. + {noreply, State#state{cdb_options = CDBopts0}}. handle_info(_Info, State) -> {noreply, State}. @@ -614,20 +623,6 @@ sort_run(RunOfFiles) -> Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end, lists:sort(CompareFun, RunOfFiles). -update_inker(Inker, ManifestSlice, FilesToDelete) -> - {ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker, - ManifestSlice, - FilesToDelete), - ok = leveled_inker:ink_compactioncomplete(Inker), - leveled_log:log("IC007", []), - lists:foreach(fun({_SQN, _FN, J2D, _LK}) -> - leveled_cdb:cdb_deletepending(J2D, - ManSQN, - Inker) - end, - FilesToDelete), - ok. - compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy, PressMethod) -> BatchesOfPositions = get_all_positions(BestRun, []), @@ -1148,7 +1143,6 @@ size_score_test() -> coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null), - {reply, not_supported, _State2} = handle_call(null, null, #state{}), terminate(error, #state{}). -endif. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index cee237a..858a5e9 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -105,11 +105,10 @@ ink_registersnapshot/2, ink_confirmdelete/2, ink_compactjournal/3, - ink_compactioncomplete/1, + ink_clerkcomplete/3, ink_compactionpending/1, ink_trim/2, ink_getmanifest/1, - ink_updatemanifest/3, ink_printmanifest/1, ink_close/1, ink_doom/1, @@ -142,7 +141,7 @@ journal_sqn = 0 :: integer(), active_journaldb :: pid() | undefined, pending_removals = [] :: list(), - registered_snapshots = [] :: list(), + registered_snapshots = [] :: list(registered_snapshot()), root_path :: string() | undefined, cdb_options :: #cdb_options{} | undefined, clerk :: pid() | undefined, @@ -157,7 +156,7 @@ -type inker_options() :: #inker_options{}. -type ink_state() :: #state{}. - +-type registered_snapshot() :: {pid(), os:timestamp(), integer()}. %%%============================================================================ %%% API @@ -348,7 +347,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> as_ink}, infinity). --spec ink_compactjournal(pid(), pid(), integer()) -> {ok, pid()}. +-spec ink_compactjournal(pid(), pid(), integer()) -> {ok|busy, pid()}. %% @doc %% Trigger a compaction event. the compaction event will use a sqn check %% against the Ledger to see if a value can be compacted - if the penciller @@ -359,7 +358,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> %% that any value that was written more recently than the last flush to disk %% of the Ledger will not be considered for compaction (as this may be %% required to reload the Ledger on startup). -ink_compactjournal(Pid, Bookie, Timeout) -> +ink_compactjournal(Pid, Bookie, _Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, CheckerCloseFun = fun leveled_penciller:pcl_close/1, CheckerFilterFun = @@ -369,28 +368,26 @@ ink_compactjournal(Pid, Bookie, Timeout) -> Bookie, CheckerInitiateFun, CheckerCloseFun, - CheckerFilterFun, - Timeout}, + CheckerFilterFun}, infinity). %% Allows the Checker to be overriden in test, use something other than a %% penciller -ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) -> +ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, _Timeout) -> gen_server:call(Pid, {compact, Checker, InitiateFun, CloseFun, - FilterFun, - Timeout}, + FilterFun}, infinity). --spec ink_compactioncomplete(pid()) -> ok. +-spec ink_clerkcomplete(pid(), list(), list()) -> ok. %% @doc %% Used by a clerk to state that a compaction process is over, only change %% is to unlock the Inker for further compactions. -ink_compactioncomplete(Pid) -> - gen_server:call(Pid, compaction_complete, infinity). +ink_clerkcomplete(Pid, ManifestSnippet, FilesToDelete) -> + gen_server:cast(Pid, {clerk_complete, ManifestSnippet, FilesToDelete}). -spec ink_compactionpending(pid()) -> boolean(). %% @doc @@ -425,21 +422,6 @@ ink_backup(Pid, BackupPath) -> ink_getmanifest(Pid) -> gen_server:call(Pid, get_manifest, infinity). --spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}. -%% @doc -%% Add a section of new entries into the manifest, and drop a bunch of deleted -%% files out of the manifest. Used to update the manifest after a compaction -%% job. -%% -%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the -%% updated manifest -ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) -> - gen_server:call(Pid, - {update_manifest, - ManifestSnippet, - DeletedFiles}, - infinity). - -spec ink_printmanifest(pid()) -> ok. %% @doc %% Used in tests to print out the manifest @@ -574,27 +556,6 @@ handle_call({confirm_delete, ManSQN}, _From, State) -> State#state{registered_snapshots = RegisteredSnapshots0}}; handle_call(get_manifest, _From, State) -> {reply, leveled_imanifest:to_list(State#state.manifest), State}; -handle_call({update_manifest, - ManifestSnippet, - DeletedFiles}, _From, State) -> - DropFun = - fun(E, Acc) -> - leveled_imanifest:remove_entry(Acc, E) - end, - Man0 = lists:foldl(DropFun, State#state.manifest, DeletedFiles), - AddFun = - fun(E, Acc) -> - leveled_imanifest:add_entry(Acc, E, false) - end, - Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), - NewManifestSQN = State#state.manifest_sqn + 1, - leveled_imanifest:printer(Man1), - leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path), - {reply, - {ok, NewManifestSQN}, - State#state{manifest=Man1, - manifest_sqn=NewManifestSQN, - pending_removals=DeletedFiles}}; handle_call(print_manifest, _From, State) -> leveled_imanifest:printer(State#state.manifest), {reply, ok, State}; @@ -602,23 +563,27 @@ handle_call({compact, Checker, InitiateFun, CloseFun, - FilterFun, - Timeout}, + FilterFun}, _From, State) -> - leveled_iclerk:clerk_compact(State#state.clerk, - Checker, - InitiateFun, - CloseFun, - FilterFun, - self(), - Timeout), - {reply, {ok, State#state.clerk}, State#state{compaction_pending=true}}; -handle_call(compaction_complete, _From, State) -> - {reply, ok, State#state{compaction_pending=false}}; + Clerk = State#state.clerk, + case State#state.compaction_pending of + true -> + {reply, {busy, Clerk}, State}; + false -> + Manifest = leveled_imanifest:to_list(State#state.manifest), + leveled_iclerk:clerk_compact(State#state.clerk, + Checker, + InitiateFun, + CloseFun, + FilterFun, + Manifest), + {reply, {ok, Clerk}, State#state{compaction_pending=true}} + end; handle_call(compaction_pending, _From, State) -> {reply, State#state.compaction_pending, State}; handle_call({trim, PersistedSQN}, _From, State) -> - ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN), + Manifest = leveled_imanifest:to_list(State#state.manifest), + ok = leveled_iclerk:clerk_trim(State#state.clerk, PersistedSQN, Manifest), {reply, ok, State}; handle_call(roll, _From, State) -> case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of @@ -712,7 +677,7 @@ handle_call(close, _From, State) -> leveled_log:log("I0005", [close]), leveled_log:log("I0006", [State#state.journal_sqn, State#state.manifest_sqn]), - leveled_iclerk:clerk_stop(State#state.clerk), + ok = leveled_iclerk:clerk_stop(State#state.clerk), shutdown_snapshots(State#state.registered_snapshots), shutdown_manifest(State#state.manifest) end, @@ -727,12 +692,33 @@ handle_call(doom, _From, State) -> leveled_log:log("I0005", [doom]), leveled_log:log("I0006", [State#state.journal_sqn, State#state.manifest_sqn]), - leveled_iclerk:clerk_stop(State#state.clerk), + ok = leveled_iclerk:clerk_stop(State#state.clerk), shutdown_snapshots(State#state.registered_snapshots), shutdown_manifest(State#state.manifest), - {stop, normal, {ok, FPs}, State}. + +handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) -> + DropFun = + fun(E, Acc) -> + leveled_imanifest:remove_entry(Acc, E) + end, + Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete), + AddFun = + fun(E, Acc) -> + leveled_imanifest:add_entry(Acc, E, false) + end, + Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), + NewManifestSQN = State#state.manifest_sqn + 1, + leveled_imanifest:printer(Man1), + leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path), + ok = leveled_iclerk:clerk_promptdeletions(State#state.clerk, + NewManifestSQN, + FilesToDelete), + {noreply, State#state{manifest=Man1, + manifest_sqn=NewManifestSQN, + pending_removals=FilesToDelete, + compaction_pending=false}}; handle_cast({release_snapshot, Snapshot}, State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), leveled_log:log("I0003", [Snapshot]), @@ -843,11 +829,12 @@ start_from_file(InkOpts) -> clerk = Clerk}}. --spec shutdown_snapshots(list(tuple())) -> ok. +-spec shutdown_snapshots(list(registered_snapshot())) -> ok. %% @doc %% Shutdown any snapshots before closing the store shutdown_snapshots(Snapshots) -> - lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end, Snapshots). + lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end, + Snapshots). -spec shutdown_manifest(leveled_imanifest:manifest()) -> ok. %% @doc diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 5aef2e8..a17108a 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -300,6 +300,7 @@ journal_compaction_tester(Restart, WRP) -> {sync_strategy, testutil:sync_strategy()}], {ok, Bookie3} = leveled_bookie:book_start(StartOpts2), ok = leveled_bookie:book_compactjournal(Bookie3, 30000), + busy = leveled_bookie:book_compactjournal(Bookie3, 30000), testutil:wait_for_compaction(Bookie3), ok = leveled_bookie:book_close(Bookie3), diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index e0f5f01..ba422b0 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -724,6 +724,9 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) -> {ok, FinalFNs} = file:list_dir(JFP), + ok = leveled_bookie:book_trimjournal(Bookie1), + % CCheck a second trim is still OK + [{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL, case HeadOnly of with_lookup ->