Merge pull request #251 from martinsumner/mas-i249-iclerkfsm

Mas i249 iclerkfsm
This commit is contained in:
Martin Sumner 2019-01-25 15:16:08 +00:00 committed by GitHub
commit a41183b8cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 118 additions and 46 deletions

View file

@ -115,15 +115,24 @@
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(), singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(),
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(), maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
compression_method = native :: lz4|native}). compression_method = native :: lz4|native,
scored_files = [] :: list(candidate()),
scoring_state :: scoring_state()|undefined}).
-record(candidate, {low_sqn :: integer() | undefined, -record(candidate, {low_sqn :: integer() | undefined,
filename :: string() | undefined, filename :: string() | undefined,
journal :: pid() | undefined, journal :: pid() | undefined,
compaction_perc :: float() | undefined}). compaction_perc :: float() | undefined}).
-record(scoring_state, {filter_fun :: fun(),
filter_server :: pid(),
max_sqn :: non_neg_integer(),
close_fun :: fun(),
start_time :: erlang:timestamp()}).
-type iclerk_options() :: #iclerk_options{}. -type iclerk_options() :: #iclerk_options{}.
-type candidate() :: #candidate{}. -type candidate() :: #candidate{}.
-type scoring_state() :: #scoring_state{}.
-type score_parameters() :: {integer(), float(), float()}. -type score_parameters() :: {integer(), float(), float()}.
% Score parameters are a tuple % Score parameters are a tuple
% - of maximum run length; how long a run of consecutive files can be for % - of maximum run length; how long a run of consecutive files can be for
@ -188,7 +197,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
%% @doc %% @doc
%% Stop the clerk %% Stop the clerk
clerk_stop(Pid) -> clerk_stop(Pid) ->
gen_server:call(Pid, stop, 60000). gen_server:call(Pid, stop, infinity).
-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. -spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc %% @doc
@ -208,6 +217,17 @@ clerk_addlogs(Pid, ForcedLogs) ->
clerk_removelogs(Pid, ForcedLogs) -> clerk_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}). gen_server:cast(Pid, {remove_logs, ForcedLogs}).
-spec clerk_scorefilelist(pid(), list(candidate())) -> ok.
%% @doc
%% Score the file at the head of the list and then send the tail of the list to
%% be scored
clerk_scorefilelist(Pid, []) ->
gen_server:cast(Pid, scoring_complete);
clerk_scorefilelist(Pid, CandidateList) ->
gen_server:cast(Pid, {score_filelist, CandidateList}).
%%%============================================================================ %%%============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================ %%%============================================================================
@ -254,6 +274,16 @@ init([LogOpts, IClerkOpts]) ->
IClerkOpts#iclerk_options.compression_method}}. IClerkOpts#iclerk_options.compression_method}}.
handle_call(stop, _From, State) -> handle_call(stop, _From, State) ->
case State#state.scoring_state of
undefined ->
ok;
ScoringState ->
% Closed when scoring files, and so need to shutdown FilterServer
% to close down neatly
CloseFun = ScoringState#scoring_state.close_fun,
FilterServer = ScoringState#scoring_state.filter_server,
CloseFun(FilterServer)
end,
{stop, normal, ok, State}. {stop, normal, ok, State}.
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
@ -267,12 +297,42 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
% Need to fetch manifest at start rather than have it be passed in % Need to fetch manifest at start rather than have it be passed in
% Don't want to process a queued call waiting on an old manifest % Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = Manifest0, [_Active|Manifest] = Manifest0,
Inker = State#state.inker,
MaxRunLength = State#state.max_run_length,
{FilterServer, MaxSQN} = InitiateFun(Checker), {FilterServer, MaxSQN} = InitiateFun(Checker),
ok = clerk_scorefilelist(self(), Manifest),
ScoringState =
#scoring_state{filter_fun = FilterFun,
filter_server = FilterServer,
max_sqn = MaxSQN,
close_fun = CloseFun,
start_time = SW},
{noreply, State#state{scored_files = [], scoring_state = ScoringState}};
handle_cast({score_filelist, [Entry|Tail]}, State) ->
Candidates = State#state.scored_files,
{LowSQN, FN, JournalP, _LK} = Entry,
ScoringState = State#state.scoring_state,
CpctPerc = check_single_file(JournalP,
ScoringState#scoring_state.filter_fun,
ScoringState#scoring_state.filter_server,
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE),
Candidate =
#candidate{low_sqn = LowSQN,
filename = FN,
journal = JournalP,
compaction_perc = CpctPerc},
ok = clerk_scorefilelist(self(), Tail),
{noreply, State#state{scored_files = [Candidate|Candidates]}};
handle_cast(scoring_complete, State) ->
MaxRunLength = State#state.max_run_length,
CDBopts = State#state.cdb_options, CDBopts = State#state.cdb_options,
Candidates = lists:reverse(State#state.scored_files),
Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), ScoringState = State#state.scoring_state,
FilterFun = ScoringState#scoring_state.filter_fun,
FilterServer = ScoringState#scoring_state.filter_server,
MaxSQN = ScoringState#scoring_state.max_sqn,
CloseFun = ScoringState#scoring_state.close_fun,
SW = ScoringState#scoring_state.start_time,
ScoreParams = ScoreParams =
{MaxRunLength, {MaxRunLength,
State#state.maxrunlength_compactionperc, State#state.maxrunlength_compactionperc,
@ -298,15 +358,15 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
end, end,
BestRun1), BestRun1),
leveled_log:log("IC002", [length(FilesToDelete)]), leveled_log:log("IC002", [length(FilesToDelete)]),
ok = leveled_inker:ink_clerkcomplete(Inker, ok = CloseFun(FilterServer),
ok = leveled_inker:ink_clerkcomplete(State#state.inker,
ManifestSlice, ManifestSlice,
FilesToDelete), FilesToDelete),
ok = CloseFun(FilterServer), {noreply, State#state{scoring_state = undefined}};
{noreply, State};
false -> false ->
ok = leveled_inker:ink_clerkcomplete(Inker, [], []),
ok = CloseFun(FilterServer), ok = CloseFun(FilterServer),
{noreply, State} ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
{noreply, State#state{scoring_state = undefined}}
end; end;
handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
FilesToDelete = FilesToDelete =
@ -487,28 +547,6 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
100 * ActiveSize / (ActiveSize + ReplacedSize) 100 * ActiveSize / (ActiveSize + ReplacedSize)
end. end.
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) ->
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN, []).
scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) ->
CandidateList;
scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) ->
{LowSQN, FN, JournalP, _LK} = Entry,
CpctPerc = check_single_file(JournalP,
FilterFun,
FilterServer,
MaxSQN,
?SAMPLE_SIZE,
?BATCH_SIZE),
scan_all_files(Tail,
FilterFun,
FilterServer,
MaxSQN,
CandidateList ++
[#candidate{low_sqn = LowSQN,
filename = FN,
journal = JournalP,
compaction_perc = CpctPerc}]).
fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> fetch_inbatches([], _BatchSize, _CDB, CheckedList) ->
CheckedList; CheckedList;
@ -757,8 +795,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
SQN, SQN,
compact_journal), compact_journal),
leveled_log:log("IC009", [FN]), leveled_log:log("IC009", [FN]),
leveled_cdb:cdb_open_writer(FN, leveled_cdb:cdb_open_writer(FN, CDBopts);
CDBopts);
_ -> _ ->
{ok, Journal0} {ok, Journal0}
end, end,
@ -981,9 +1018,10 @@ compact_single_file_recovr_test() ->
LedgerFun1, LedgerFun1,
CompactFP, CompactFP,
CDB} = compact_single_file_setup(), CDB} = compact_single_file_setup(),
[{LowSQN, FN, PidR, _LastKey}] = CDBOpts = #cdb_options{binary_mode=true},
[{LowSQN, FN, _PidOldR, LastKey}] =
compact_files([Candidate], compact_files([Candidate],
#cdb_options{file_path=CompactFP, binary_mode=true}, CDBOpts#cdb_options{file_path=CompactFP},
LedgerFun1, LedgerFun1,
LedgerSrv1, LedgerSrv1,
9, 9,
@ -991,6 +1029,7 @@ compact_single_file_recovr_test() ->
native), native),
io:format("FN of ~s~n", [FN]), io:format("FN of ~s~n", [FN]),
?assertMatch(2, LowSQN), ?assertMatch(2, LowSQN),
{ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts),
?assertMatch(probably, ?assertMatch(probably,
leveled_cdb:cdb_keycheck(PidR, leveled_cdb:cdb_keycheck(PidR,
{8, {8,
@ -1010,6 +1049,7 @@ compact_single_file_recovr_test() ->
test_ledgerkey("Key2")}), test_ledgerkey("Key2")}),
?assertMatch({{_, _}, {"Value2", {[], infinity}}}, ?assertMatch({{_, _}, {"Value2", {[], infinity}}},
leveled_codec:from_inkerkv(RKV1)), leveled_codec:from_inkerkv(RKV1)),
ok = leveled_cdb:cdb_close(PidR),
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB). ok = leveled_cdb:cdb_destroy(CDB).
@ -1020,9 +1060,10 @@ compact_single_file_retain_test() ->
LedgerFun1, LedgerFun1,
CompactFP, CompactFP,
CDB} = compact_single_file_setup(), CDB} = compact_single_file_setup(),
[{LowSQN, FN, PidR, _LK}] = CDBOpts = #cdb_options{binary_mode=true},
[{LowSQN, FN, _PidOldR, LastKey}] =
compact_files([Candidate], compact_files([Candidate],
#cdb_options{file_path=CompactFP, binary_mode=true}, CDBOpts#cdb_options{file_path=CompactFP},
LedgerFun1, LedgerFun1,
LedgerSrv1, LedgerSrv1,
9, 9,
@ -1030,6 +1071,7 @@ compact_single_file_retain_test() ->
native), native),
io:format("FN of ~s~n", [FN]), io:format("FN of ~s~n", [FN]),
?assertMatch(1, LowSQN), ?assertMatch(1, LowSQN),
{ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts),
?assertMatch(probably, ?assertMatch(probably,
leveled_cdb:cdb_keycheck(PidR, leveled_cdb:cdb_keycheck(PidR,
{8, {8,
@ -1044,11 +1086,12 @@ compact_single_file_retain_test() ->
stnd, stnd,
test_ledgerkey("Key1")})), test_ledgerkey("Key1")})),
RKV1 = leveled_cdb:cdb_get(PidR, RKV1 = leveled_cdb:cdb_get(PidR,
{2, {2,
stnd, stnd,
test_ledgerkey("Key2")}), test_ledgerkey("Key2")}),
?assertMatch({{_, _}, {"Value2", {[], infinity}}}, ?assertMatch({{_, _}, {"Value2", {[], infinity}}},
leveled_codec:from_inkerkv(RKV1)), leveled_codec:from_inkerkv(RKV1)),
ok = leveled_cdb:cdb_close(PidR),
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB). ok = leveled_cdb:cdb_destroy(CDB).

View file

@ -52,6 +52,9 @@ generate_entry(Journal) ->
case leveled_cdb:cdb_firstkey(PidR) of case leveled_cdb:cdb_firstkey(PidR) of
{StartSQN, _Type, _PK} -> {StartSQN, _Type, _PK} ->
LastKey = leveled_cdb:cdb_lastkey(PidR), LastKey = leveled_cdb:cdb_lastkey(PidR),
% close the file here. This will then be re-opened by the inker
% and so will be correctly linked to the inker not to the iclerk
ok = leveled_cdb:cdb_close(PidR),
[{StartSQN, NewFN, PidR, LastKey}]; [{StartSQN, NewFN, PidR, LastKey}];
empty -> empty ->
leveled_log:log("IC013", [NewFN]), leveled_log:log("IC013", [NewFN]),

View file

@ -699,14 +699,20 @@ handle_call(doom, _From, State) ->
handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) -> handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
CDBOpts = State#state.cdb_options,
DropFun = DropFun =
fun(E, Acc) -> fun(E, Acc) ->
leveled_imanifest:remove_entry(Acc, E) leveled_imanifest:remove_entry(Acc, E)
end, end,
Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete), Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete),
AddFun = AddFun =
fun(E, Acc) -> fun(ManEntry, Acc) ->
leveled_imanifest:add_entry(Acc, E, false) {LowSQN, FN, _, LK_RO} = ManEntry,
% At this stage the FN has a .cdb extension, which will be
% stripped during add_entry - so need to add the .cdb here
{ok, Pid} = leveled_cdb:cdb_reopen_reader(FN, LK_RO, CDBOpts),
UpdEntry = {LowSQN, FN, Pid, LK_RO},
leveled_imanifest:add_entry(Acc, UpdEntry, false)
end, end,
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
NewManifestSQN = State#state.manifest_sqn + 1, NewManifestSQN = State#state.manifest_sqn + 1,

View file

@ -10,7 +10,8 @@
recovr_strategy/1, recovr_strategy/1,
aae_missingjournal/1, aae_missingjournal/1,
aae_bustedjournal/1, aae_bustedjournal/1,
journal_compaction_bustedjournal/1 journal_compaction_bustedjournal/1,
close_duringcompaction/1
]). ]).
all() -> [ all() -> [
@ -21,10 +22,29 @@ all() -> [
recovr_strategy, recovr_strategy,
aae_missingjournal, aae_missingjournal,
aae_bustedjournal, aae_bustedjournal,
journal_compaction_bustedjournal journal_compaction_bustedjournal,
close_duringcompaction
]. ].
close_duringcompaction(_Config) ->
% Prompt a compaction, and close immedately - confirm that the close
% happens without error.
% This should trigger the iclerk to receive a close during the file
% scoring stage
RootPath = testutil:reset_filestructure(),
BookOpts = [{root_path, RootPath},
{cache_size, 2000},
{max_journalsize, 2000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 6400),
{ok, Book1} = leveled_bookie:book_start(BookOpts),
ok = leveled_bookie:book_compactjournal(Book1, 30000),
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts),
ok = testutil:check_indexed_objects(Book2, "Bucket1", Spcl1, LastV1),
ok = leveled_bookie:book_close(Book2).
recovery_with_samekeyupdates(_Config) -> recovery_with_samekeyupdates(_Config) ->
% Setup to prove https://github.com/martinsumner/leveled/issues/229 % Setup to prove https://github.com/martinsumner/leveled/issues/229
% run a test that involves many updates to the same key, and check that % run a test that involves many updates to the same key, and check that