Change to cast in inker/iclerk interaction

This allows for leveled_iclerk:clerk_stop to be a sync call, so that files will only be closed once the iclerk has stopped.  This is designed ot prevent iclerk crashes during shutdowns when files it is depnding on are closed mid shutdown.
This commit is contained in:
Martin Sumner 2019-01-24 21:32:54 +00:00
parent 28d0aef5fe
commit 0333604fd9
3 changed files with 78 additions and 100 deletions

View file

@ -82,9 +82,10 @@
code_change/3]).
-export([clerk_new/1,
clerk_compact/7,
clerk_compact/6,
clerk_hashtablecalc/3,
clerk_trim/3,
clerk_promptdeletions/3,
clerk_stop/1,
clerk_loglevel/2,
clerk_addlogs/2,
@ -148,25 +149,30 @@ clerk_new(InkerClerkOpts) ->
-spec clerk_compact(pid(), pid(),
fun(), fun(), fun(),
pid(), integer()) -> ok.
list()) -> ok.
%% @doc
%% Trigger a compaction for this clerk if the threshold of data recovery has
%% been met
clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) ->
clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Manifest) ->
gen_server:cast(Pid,
{compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Inker,
TimeO}).
Manifest}).
-spec clerk_trim(pid(), pid(), integer()) -> ok.
-spec clerk_trim(pid(), integer(), list()) -> ok.
%% @doc
%% Trim the Inker back to the persisted SQN
clerk_trim(Pid, Inker, PersistedSQN) ->
gen_server:cast(Pid, {trim, Inker, PersistedSQN}).
clerk_trim(Pid, PersistedSQN, ManifestAsList) ->
gen_server:cast(Pid, {trim, PersistedSQN, ManifestAsList}).
-spec clerk_promptdeletions(pid(), pos_integer(), list()) -> ok.
%% @doc
%%
clerk_promptdeletions(Pid, ManifestSQN, DeletedFiles) ->
gen_server:cast(Pid, {prompt_deletions, ManifestSQN, DeletedFiles}).
-spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok.
%% @doc
@ -182,7 +188,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
%% @doc
%% Stop the clerk
clerk_stop(Pid) ->
gen_server:cast(Pid, stop).
gen_server:call(Pid, stop, 60000).
-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
@ -247,10 +253,10 @@ init([LogOpts, IClerkOpts]) ->
compression_method =
IClerkOpts#iclerk_options.compression_method}}.
handle_call(_Msg, _From, State) ->
{reply, not_supported, State}.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
State) ->
% Empty the waste folder
clear_waste(State),
@ -260,7 +266,8 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
% Need to fetch manifest at start rather than have it be passed in
% Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = leveled_inker:ink_getmanifest(Inker),
[_Active|Manifest] = Manifest0,
Inker = State#state.inker,
MaxRunLength = State#state.max_run_length,
{FilterServer, MaxSQN} = InitiateFun(Checker),
CDBopts = State#state.cdb_options,
@ -291,24 +298,29 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
end,
BestRun1),
leveled_log:log("IC002", [length(FilesToDelete)]),
case is_process_alive(Inker) of
true ->
update_inker(Inker,
ok = leveled_inker:ink_clerkcomplete(Inker,
ManifestSlice,
FilesToDelete),
ok = CloseFun(FilterServer),
{noreply, State}
end;
{noreply, State};
false ->
ok = leveled_inker:ink_compactioncomplete(Inker),
ok = leveled_inker:ink_clerkcomplete(Inker, [], []),
ok = CloseFun(FilterServer),
{noreply, State}
end;
handle_cast({trim, Inker, PersistedSQN}, State) ->
ManifestAsList = leveled_inker:ink_getmanifest(Inker),
handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
FilesToDelete =
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
ok = update_inker(Inker, [], FilesToDelete),
leveled_log:log("IC007", []),
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], FilesToDelete),
{noreply, State};
handle_cast({prompt_deletions, ManifestSQN, FilesToDelete}, State) ->
lists:foreach(fun({_SQN, _FN, J2D, _LK}) ->
leveled_cdb:cdb_deletepending(J2D,
ManifestSQN,
State#state.inker)
end,
FilesToDelete),
{noreply, State};
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
{IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos),
@ -328,9 +340,7 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast(stop, State) ->
{stop, normal, State}.
{noreply, State#state{cdb_options = CDBopts0}}.
handle_info(_Info, State) ->
{noreply, State}.
@ -613,20 +623,6 @@ sort_run(RunOfFiles) ->
Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end,
lists:sort(CompareFun, RunOfFiles).
update_inker(Inker, ManifestSlice, FilesToDelete) ->
{ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker,
ManifestSlice,
FilesToDelete),
ok = leveled_inker:ink_compactioncomplete(Inker),
leveled_log:log("IC007", []),
lists:foreach(fun({_SQN, _FN, J2D, _LK}) ->
leveled_cdb:cdb_deletepending(J2D,
ManSQN,
Inker)
end,
FilesToDelete),
ok.
compact_files(BestRun, CDBopts, FilterFun, FilterServer,
MaxSQN, RStrategy, PressMethod) ->
BatchesOfPositions = get_all_positions(BestRun, []),
@ -1147,7 +1143,6 @@ size_score_test() ->
coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}),
{ok, _State1} = code_change(null, #state{}, null),
{reply, not_supported, _State2} = handle_call(null, null, #state{}),
terminate(error, #state{}).
-endif.

View file

@ -105,11 +105,10 @@
ink_registersnapshot/2,
ink_confirmdelete/2,
ink_compactjournal/3,
ink_compactioncomplete/1,
ink_clerkcomplete/3,
ink_compactionpending/1,
ink_trim/2,
ink_getmanifest/1,
ink_updatemanifest/3,
ink_printmanifest/1,
ink_close/1,
ink_doom/1,
@ -359,7 +358,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
%% that any value that was written more recently than the last flush to disk
%% of the Ledger will not be considered for compaction (as this may be
%% required to reload the Ledger on startup).
ink_compactjournal(Pid, Bookie, Timeout) ->
ink_compactjournal(Pid, Bookie, _Timeout) ->
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
CheckerCloseFun = fun leveled_penciller:pcl_close/1,
CheckerFilterFun =
@ -369,28 +368,26 @@ ink_compactjournal(Pid, Bookie, Timeout) ->
Bookie,
CheckerInitiateFun,
CheckerCloseFun,
CheckerFilterFun,
Timeout},
CheckerFilterFun},
infinity).
%% Allows the Checker to be overriden in test, use something other than a
%% penciller
ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) ->
ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, _Timeout) ->
gen_server:call(Pid,
{compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Timeout},
FilterFun},
infinity).
-spec ink_compactioncomplete(pid()) -> ok.
-spec ink_clerkcomplete(pid(), list(), list()) -> ok.
%% @doc
%% Used by a clerk to state that a compaction process is over, only change
%% is to unlock the Inker for further compactions.
ink_compactioncomplete(Pid) ->
gen_server:call(Pid, compaction_complete, infinity).
ink_clerkcomplete(Pid, ManifestSnippet, FilesToDelete) ->
gen_server:cast(Pid, {clerk_complete, ManifestSnippet, FilesToDelete}).
-spec ink_compactionpending(pid()) -> boolean().
%% @doc
@ -425,21 +422,6 @@ ink_backup(Pid, BackupPath) ->
ink_getmanifest(Pid) ->
gen_server:call(Pid, get_manifest, infinity).
-spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}.
%% @doc
%% Add a section of new entries into the manifest, and drop a bunch of deleted
%% files out of the manifest. Used to update the manifest after a compaction
%% job.
%%
%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the
%% updated manifest
ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) ->
gen_server:call(Pid,
{update_manifest,
ManifestSnippet,
DeletedFiles},
infinity).
-spec ink_printmanifest(pid()) -> ok.
%% @doc
%% Used in tests to print out the manifest
@ -574,27 +556,6 @@ handle_call({confirm_delete, ManSQN}, _From, State) ->
State#state{registered_snapshots = RegisteredSnapshots0}};
handle_call(get_manifest, _From, State) ->
{reply, leveled_imanifest:to_list(State#state.manifest), State};
handle_call({update_manifest,
ManifestSnippet,
DeletedFiles}, _From, State) ->
DropFun =
fun(E, Acc) ->
leveled_imanifest:remove_entry(Acc, E)
end,
Man0 = lists:foldl(DropFun, State#state.manifest, DeletedFiles),
AddFun =
fun(E, Acc) ->
leveled_imanifest:add_entry(Acc, E, false)
end,
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
NewManifestSQN = State#state.manifest_sqn + 1,
leveled_imanifest:printer(Man1),
leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path),
{reply,
{ok, NewManifestSQN},
State#state{manifest=Man1,
manifest_sqn=NewManifestSQN,
pending_removals=DeletedFiles}};
handle_call(print_manifest, _From, State) ->
leveled_imanifest:printer(State#state.manifest),
{reply, ok, State};
@ -602,28 +563,26 @@ handle_call({compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Timeout},
FilterFun},
_From, State) ->
case State#state.compaction_pending of
true ->
{reply, busy, State};
false ->
Manifest = leveled_imanifest:to_list(State#state.manifest),
leveled_iclerk:clerk_compact(State#state.clerk,
Checker,
InitiateFun,
CloseFun,
FilterFun,
self(),
Timeout),
Manifest),
{reply, ok, State#state{compaction_pending=true}}
end;
handle_call(compaction_complete, _From, State) ->
{reply, ok, State#state{compaction_pending=false}};
handle_call(compaction_pending, _From, State) ->
{reply, State#state.compaction_pending, State};
handle_call({trim, PersistedSQN}, _From, State) ->
ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN),
Manifest = leveled_imanifest:to_list(State#state.manifest),
ok = leveled_iclerk:clerk_trim(State#state.clerk, PersistedSQN, Manifest),
{reply, ok, State};
handle_call(roll, _From, State) ->
case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of
@ -717,7 +676,7 @@ handle_call(close, _From, State) ->
leveled_log:log("I0005", [close]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest)
end,
@ -732,12 +691,33 @@ handle_call(doom, _From, State) ->
leveled_log:log("I0005", [doom]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest),
{stop, normal, {ok, FPs}, State}.
handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
DropFun =
fun(E, Acc) ->
leveled_imanifest:remove_entry(Acc, E)
end,
Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete),
AddFun =
fun(E, Acc) ->
leveled_imanifest:add_entry(Acc, E, false)
end,
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
NewManifestSQN = State#state.manifest_sqn + 1,
leveled_imanifest:printer(Man1),
leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path),
ok = leveled_iclerk:clerk_promptdeletions(State#state.clerk,
NewManifestSQN,
FilesToDelete),
{noreply, State#state{manifest=Man1,
manifest_sqn=NewManifestSQN,
pending_removals=FilesToDelete,
compaction_pending=false}};
handle_cast({release_snapshot, Snapshot}, State) ->
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
leveled_log:log("I0003", [Snapshot]),

View file

@ -724,6 +724,9 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) ->
{ok, FinalFNs} = file:list_dir(JFP),
ok = leveled_bookie:book_trimjournal(Bookie1),
% CCheck a second trim is still OK
[{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL,
case HeadOnly of
with_lookup ->