Merge pull request #250 from martinsumner/mas-i249-iclerkshutdown

Mas i249 iclerkshutdown
This commit is contained in:
Martin Sumner 2019-01-25 09:54:14 +00:00 committed by GitHub
commit 2c1503b6b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 102 additions and 116 deletions

View file

@ -1005,7 +1005,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity).
-spec book_compactjournal(pid(), integer()) -> ok.
-spec book_compactjournal(pid(), integer()) -> ok|busy.
-spec book_eqccompactjournal(pid(), integer()) -> {ok, pid()}.
-spec book_islastcompactionpending(pid()) -> boolean().
-spec book_trimjournal(pid()) -> ok.
@ -1016,11 +1016,12 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
%% in Riak it will be triggered by a vnode callback.
book_eqccompactjournal(Pid, Timeout) ->
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
{_R, P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity),
{ok, P}.
book_compactjournal(Pid, Timeout) ->
{ok, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity),
ok.
{R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity),
R.
%% @doc Check on progress of the last compaction

View file

@ -82,9 +82,10 @@
code_change/3]).
-export([clerk_new/1,
clerk_compact/7,
clerk_compact/6,
clerk_hashtablecalc/3,
clerk_trim/3,
clerk_promptdeletions/3,
clerk_stop/1,
clerk_loglevel/2,
clerk_addlogs/2,
@ -148,25 +149,30 @@ clerk_new(InkerClerkOpts) ->
-spec clerk_compact(pid(), pid(),
fun(), fun(), fun(),
pid(), integer()) -> ok.
list()) -> ok.
%% @doc
%% Trigger a compaction for this clerk if the threshold of data recovery has
%% been met
clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) ->
clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Manifest) ->
gen_server:cast(Pid,
{compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Inker,
TimeO}).
Manifest}).
-spec clerk_trim(pid(), pid(), integer()) -> ok.
-spec clerk_trim(pid(), integer(), list()) -> ok.
%% @doc
%% Trim the Inker back to the persisted SQN
clerk_trim(Pid, Inker, PersistedSQN) ->
gen_server:cast(Pid, {trim, Inker, PersistedSQN}).
clerk_trim(Pid, PersistedSQN, ManifestAsList) ->
gen_server:cast(Pid, {trim, PersistedSQN, ManifestAsList}).
-spec clerk_promptdeletions(pid(), pos_integer(), list()) -> ok.
%% @doc
%%
clerk_promptdeletions(Pid, ManifestSQN, DeletedFiles) ->
gen_server:cast(Pid, {prompt_deletions, ManifestSQN, DeletedFiles}).
-spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok.
%% @doc
@ -182,8 +188,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
%% @doc
%% Stop the clerk
clerk_stop(Pid) ->
unlink(Pid),
gen_server:cast(Pid, stop).
gen_server:call(Pid, stop, 60000).
-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
@ -248,10 +253,10 @@ init([LogOpts, IClerkOpts]) ->
compression_method =
IClerkOpts#iclerk_options.compression_method}}.
handle_call(_Msg, _From, State) ->
{reply, not_supported, State}.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
State) ->
% Empty the waste folder
clear_waste(State),
@ -261,7 +266,8 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
% Need to fetch manifest at start rather than have it be passed in
% Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = leveled_inker:ink_getmanifest(Inker),
[_Active|Manifest] = Manifest0,
Inker = State#state.inker,
MaxRunLength = State#state.max_run_length,
{FilterServer, MaxSQN} = InitiateFun(Checker),
CDBopts = State#state.cdb_options,
@ -292,24 +298,29 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
end,
BestRun1),
leveled_log:log("IC002", [length(FilesToDelete)]),
case is_process_alive(Inker) of
true ->
update_inker(Inker,
ok = leveled_inker:ink_clerkcomplete(Inker,
ManifestSlice,
FilesToDelete),
ok = CloseFun(FilterServer),
{noreply, State}
end;
{noreply, State};
false ->
ok = leveled_inker:ink_compactioncomplete(Inker),
ok = leveled_inker:ink_clerkcomplete(Inker, [], []),
ok = CloseFun(FilterServer),
{noreply, State}
end;
handle_cast({trim, Inker, PersistedSQN}, State) ->
ManifestAsList = leveled_inker:ink_getmanifest(Inker),
handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
FilesToDelete =
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
ok = update_inker(Inker, [], FilesToDelete),
leveled_log:log("IC007", []),
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], FilesToDelete),
{noreply, State};
handle_cast({prompt_deletions, ManifestSQN, FilesToDelete}, State) ->
lists:foreach(fun({_SQN, _FN, J2D, _LK}) ->
leveled_cdb:cdb_deletepending(J2D,
ManifestSQN,
State#state.inker)
end,
FilesToDelete),
{noreply, State};
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
{IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos),
@ -329,9 +340,7 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast(stop, State) ->
{stop, normal, State}.
{noreply, State#state{cdb_options = CDBopts0}}.
handle_info(_Info, State) ->
{noreply, State}.
@ -614,20 +623,6 @@ sort_run(RunOfFiles) ->
Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end,
lists:sort(CompareFun, RunOfFiles).
update_inker(Inker, ManifestSlice, FilesToDelete) ->
{ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker,
ManifestSlice,
FilesToDelete),
ok = leveled_inker:ink_compactioncomplete(Inker),
leveled_log:log("IC007", []),
lists:foreach(fun({_SQN, _FN, J2D, _LK}) ->
leveled_cdb:cdb_deletepending(J2D,
ManSQN,
Inker)
end,
FilesToDelete),
ok.
compact_files(BestRun, CDBopts, FilterFun, FilterServer,
MaxSQN, RStrategy, PressMethod) ->
BatchesOfPositions = get_all_positions(BestRun, []),
@ -1148,7 +1143,6 @@ size_score_test() ->
coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}),
{ok, _State1} = code_change(null, #state{}, null),
{reply, not_supported, _State2} = handle_call(null, null, #state{}),
terminate(error, #state{}).
-endif.

View file

@ -105,11 +105,10 @@
ink_registersnapshot/2,
ink_confirmdelete/2,
ink_compactjournal/3,
ink_compactioncomplete/1,
ink_clerkcomplete/3,
ink_compactionpending/1,
ink_trim/2,
ink_getmanifest/1,
ink_updatemanifest/3,
ink_printmanifest/1,
ink_close/1,
ink_doom/1,
@ -142,7 +141,7 @@
journal_sqn = 0 :: integer(),
active_journaldb :: pid() | undefined,
pending_removals = [] :: list(),
registered_snapshots = [] :: list(),
registered_snapshots = [] :: list(registered_snapshot()),
root_path :: string() | undefined,
cdb_options :: #cdb_options{} | undefined,
clerk :: pid() | undefined,
@ -157,7 +156,7 @@
-type inker_options() :: #inker_options{}.
-type ink_state() :: #state{}.
-type registered_snapshot() :: {pid(), os:timestamp(), integer()}.
%%%============================================================================
%%% API
@ -348,7 +347,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
as_ink},
infinity).
-spec ink_compactjournal(pid(), pid(), integer()) -> {ok, pid()}.
-spec ink_compactjournal(pid(), pid(), integer()) -> {ok|busy, pid()}.
%% @doc
%% Trigger a compaction event. the compaction event will use a sqn check
%% against the Ledger to see if a value can be compacted - if the penciller
@ -359,7 +358,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
%% that any value that was written more recently than the last flush to disk
%% of the Ledger will not be considered for compaction (as this may be
%% required to reload the Ledger on startup).
ink_compactjournal(Pid, Bookie, Timeout) ->
ink_compactjournal(Pid, Bookie, _Timeout) ->
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
CheckerCloseFun = fun leveled_penciller:pcl_close/1,
CheckerFilterFun =
@ -369,28 +368,26 @@ ink_compactjournal(Pid, Bookie, Timeout) ->
Bookie,
CheckerInitiateFun,
CheckerCloseFun,
CheckerFilterFun,
Timeout},
CheckerFilterFun},
infinity).
%% Allows the Checker to be overriden in test, use something other than a
%% penciller
ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) ->
ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, _Timeout) ->
gen_server:call(Pid,
{compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Timeout},
FilterFun},
infinity).
-spec ink_compactioncomplete(pid()) -> ok.
-spec ink_clerkcomplete(pid(), list(), list()) -> ok.
%% @doc
%% Used by a clerk to state that a compaction process is over, only change
%% is to unlock the Inker for further compactions.
ink_compactioncomplete(Pid) ->
gen_server:call(Pid, compaction_complete, infinity).
ink_clerkcomplete(Pid, ManifestSnippet, FilesToDelete) ->
gen_server:cast(Pid, {clerk_complete, ManifestSnippet, FilesToDelete}).
-spec ink_compactionpending(pid()) -> boolean().
%% @doc
@ -425,21 +422,6 @@ ink_backup(Pid, BackupPath) ->
ink_getmanifest(Pid) ->
gen_server:call(Pid, get_manifest, infinity).
-spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}.
%% @doc
%% Add a section of new entries into the manifest, and drop a bunch of deleted
%% files out of the manifest. Used to update the manifest after a compaction
%% job.
%%
%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the
%% updated manifest
ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) ->
gen_server:call(Pid,
{update_manifest,
ManifestSnippet,
DeletedFiles},
infinity).
-spec ink_printmanifest(pid()) -> ok.
%% @doc
%% Used in tests to print out the manifest
@ -574,27 +556,6 @@ handle_call({confirm_delete, ManSQN}, _From, State) ->
State#state{registered_snapshots = RegisteredSnapshots0}};
handle_call(get_manifest, _From, State) ->
{reply, leveled_imanifest:to_list(State#state.manifest), State};
handle_call({update_manifest,
ManifestSnippet,
DeletedFiles}, _From, State) ->
DropFun =
fun(E, Acc) ->
leveled_imanifest:remove_entry(Acc, E)
end,
Man0 = lists:foldl(DropFun, State#state.manifest, DeletedFiles),
AddFun =
fun(E, Acc) ->
leveled_imanifest:add_entry(Acc, E, false)
end,
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
NewManifestSQN = State#state.manifest_sqn + 1,
leveled_imanifest:printer(Man1),
leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path),
{reply,
{ok, NewManifestSQN},
State#state{manifest=Man1,
manifest_sqn=NewManifestSQN,
pending_removals=DeletedFiles}};
handle_call(print_manifest, _From, State) ->
leveled_imanifest:printer(State#state.manifest),
{reply, ok, State};
@ -602,23 +563,27 @@ handle_call({compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Timeout},
FilterFun},
_From, State) ->
Clerk = State#state.clerk,
case State#state.compaction_pending of
true ->
{reply, {busy, Clerk}, State};
false ->
Manifest = leveled_imanifest:to_list(State#state.manifest),
leveled_iclerk:clerk_compact(State#state.clerk,
Checker,
InitiateFun,
CloseFun,
FilterFun,
self(),
Timeout),
{reply, {ok, State#state.clerk}, State#state{compaction_pending=true}};
handle_call(compaction_complete, _From, State) ->
{reply, ok, State#state{compaction_pending=false}};
Manifest),
{reply, {ok, Clerk}, State#state{compaction_pending=true}}
end;
handle_call(compaction_pending, _From, State) ->
{reply, State#state.compaction_pending, State};
handle_call({trim, PersistedSQN}, _From, State) ->
ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN),
Manifest = leveled_imanifest:to_list(State#state.manifest),
ok = leveled_iclerk:clerk_trim(State#state.clerk, PersistedSQN, Manifest),
{reply, ok, State};
handle_call(roll, _From, State) ->
case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of
@ -712,7 +677,7 @@ handle_call(close, _From, State) ->
leveled_log:log("I0005", [close]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest)
end,
@ -727,12 +692,33 @@ handle_call(doom, _From, State) ->
leveled_log:log("I0005", [doom]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest),
{stop, normal, {ok, FPs}, State}.
handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
DropFun =
fun(E, Acc) ->
leveled_imanifest:remove_entry(Acc, E)
end,
Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete),
AddFun =
fun(E, Acc) ->
leveled_imanifest:add_entry(Acc, E, false)
end,
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
NewManifestSQN = State#state.manifest_sqn + 1,
leveled_imanifest:printer(Man1),
leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path),
ok = leveled_iclerk:clerk_promptdeletions(State#state.clerk,
NewManifestSQN,
FilesToDelete),
{noreply, State#state{manifest=Man1,
manifest_sqn=NewManifestSQN,
pending_removals=FilesToDelete,
compaction_pending=false}};
handle_cast({release_snapshot, Snapshot}, State) ->
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
leveled_log:log("I0003", [Snapshot]),
@ -843,11 +829,12 @@ start_from_file(InkOpts) ->
clerk = Clerk}}.
-spec shutdown_snapshots(list(tuple())) -> ok.
-spec shutdown_snapshots(list(registered_snapshot())) -> ok.
%% @doc
%% Shutdown any snapshots before closing the store
shutdown_snapshots(Snapshots) ->
lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end, Snapshots).
lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end,
Snapshots).
-spec shutdown_manifest(leveled_imanifest:manifest()) -> ok.
%% @doc

View file

@ -300,6 +300,7 @@ journal_compaction_tester(Restart, WRP) ->
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie3} = leveled_bookie:book_start(StartOpts2),
ok = leveled_bookie:book_compactjournal(Bookie3, 30000),
busy = leveled_bookie:book_compactjournal(Bookie3, 30000),
testutil:wait_for_compaction(Bookie3),
ok = leveled_bookie:book_close(Bookie3),

View file

@ -724,6 +724,9 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) ->
{ok, FinalFNs} = file:list_dir(JFP),
ok = leveled_bookie:book_trimjournal(Bookie1),
% CCheck a second trim is still OK
[{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL,
case HeadOnly of
with_lookup ->