Allow clerk to be stopped during compaction scoring

This will stop needless compaction work from being completed when the iclerk is sent a close at this stage.
This commit is contained in:
Martin Sumner 2019-01-25 12:11:42 +00:00
parent 5fab9e2d62
commit e349774167
2 changed files with 91 additions and 34 deletions

View file

@ -115,15 +115,24 @@
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(),
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
compression_method = native :: lz4|native}).
compression_method = native :: lz4|native,
scored_files = [] :: list(candidate()),
scoring_state :: scoring_state()|undefined}).
-record(candidate, {low_sqn :: integer() | undefined,
filename :: string() | undefined,
journal :: pid() | undefined,
compaction_perc :: float() | undefined}).
-record(scoring_state, {filter_fun :: fun(),
filter_server :: pid(),
max_sqn :: non_neg_integer(),
close_fun :: fun(),
start_time :: erlang:timestamp()}).
-type iclerk_options() :: #iclerk_options{}.
-type candidate() :: #candidate{}.
-type scoring_state() :: #scoring_state{}.
-type score_parameters() :: {integer(), float(), float()}.
% Score parameters are a tuple
% - of maximum run length; how long a run of consecutive files can be for
@ -188,7 +197,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
%% @doc
%% Stop the clerk
clerk_stop(Pid) ->
gen_server:call(Pid, stop, 60000).
gen_server:call(Pid, stop, infinity).
-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
@ -208,6 +217,16 @@ clerk_addlogs(Pid, ForcedLogs) ->
clerk_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
-spec clerk_scorefilelist(pid(), list(candidate())) -> ok.
%% @doc
%% Score the file at the head of the list and then send the tail of the list to
%% be scored
clerk_scorefilelist(Pid, []) ->
gen_server:cast(Pid, scoring_complete);
clerk_scorefilelist(Pid, CandidateList) ->
gen_server:cast(Pid, {score_filelist, CandidateList}).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
@ -254,6 +273,16 @@ init([LogOpts, IClerkOpts]) ->
IClerkOpts#iclerk_options.compression_method}}.
handle_call(stop, _From, State) ->
case State#state.scoring_state of
undefined ->
ok;
ScoringState ->
% Closed when scoring files, and so need to shutdown FilterServer
% to close down neatly
CloseFun = ScoringState#scoring_state.close_fun,
FilterServer = ScoringState#scoring_state.filter_server,
CloseFun(FilterServer)
end,
{stop, normal, ok, State}.
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
@ -267,12 +296,42 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
% Need to fetch manifest at start rather than have it be passed in
% Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = Manifest0,
Inker = State#state.inker,
MaxRunLength = State#state.max_run_length,
{FilterServer, MaxSQN} = InitiateFun(Checker),
ok = clerk_scorefilelist(self(), Manifest),
ScoringState =
#scoring_state{filter_fun = FilterFun,
filter_server = FilterServer,
max_sqn = MaxSQN,
close_fun = CloseFun,
start_time = SW},
{noreply, State#state{scored_files = [], scoring_state = ScoringState}};
handle_cast({score_filelist, [Entry|Tail]}, State) ->
Candidates = State#state.scored_files,
{LowSQN, FN, JournalP, _LK} = Entry,
ScoringState = State#state.scoring_state,
CpctPerc = check_single_file(JournalP,
ScoringState#scoring_state.filter_fun,
ScoringState#scoring_state.filter_server,
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE),
Candidate =
#candidate{low_sqn = LowSQN,
filename = FN,
journal = JournalP,
compaction_perc = CpctPerc},
ok = clerk_scorefilelist(self(), Tail),
{noreply, State#state{scored_files = [Candidate|Candidates]}};
handle_cast(scoring_complete, State) ->
MaxRunLength = State#state.max_run_length,
CDBopts = State#state.cdb_options,
Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN),
Candidates = lists:reverse(State#state.scored_files),
ScoringState = State#state.scoring_state,
FilterFun = ScoringState#scoring_state.filter_fun,
FilterServer = ScoringState#scoring_state.filter_server,
MaxSQN = ScoringState#scoring_state.max_sqn,
CloseFun = ScoringState#scoring_state.close_fun,
SW = ScoringState#scoring_state.start_time,
ScoreParams =
{MaxRunLength,
State#state.maxrunlength_compactionperc,
@ -298,15 +357,15 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
end,
BestRun1),
leveled_log:log("IC002", [length(FilesToDelete)]),
ok = leveled_inker:ink_clerkcomplete(Inker,
ok = leveled_inker:ink_clerkcomplete(State#state.inker,
ManifestSlice,
FilesToDelete),
ok = CloseFun(FilterServer),
{noreply, State};
{noreply, State#state{scoring_state = undefined}};
false ->
ok = leveled_inker:ink_clerkcomplete(Inker, [], []),
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
ok = CloseFun(FilterServer),
{noreply, State}
{noreply, State#state{scoring_state = undefined}}
end;
handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
FilesToDelete =
@ -487,28 +546,6 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
100 * ActiveSize / (ActiveSize + ReplacedSize)
end.
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) ->
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN, []).
scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) ->
CandidateList;
scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) ->
{LowSQN, FN, JournalP, _LK} = Entry,
CpctPerc = check_single_file(JournalP,
FilterFun,
FilterServer,
MaxSQN,
?SAMPLE_SIZE,
?BATCH_SIZE),
scan_all_files(Tail,
FilterFun,
FilterServer,
MaxSQN,
CandidateList ++
[#candidate{low_sqn = LowSQN,
filename = FN,
journal = JournalP,
compaction_perc = CpctPerc}]).
fetch_inbatches([], _BatchSize, _CDB, CheckedList) ->
CheckedList;