From 5fab9e2d62d54d0723787c9ff397793c0c8ad4e2 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 25 Jan 2019 10:25:55 +0000 Subject: [PATCH 1/3] Update .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 8031a86..01c0c33 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ .DS_Store rebar.lock test/test_area/* +leveled_data/* From e349774167831a369229bc55a561169a2fcb75a3 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 25 Jan 2019 12:11:42 +0000 Subject: [PATCH 2/3] Allow clerk to be stopped during compaction scoring This will stop needless compaction work from being completed when the iclerk is sent a close at this stage. --- src/leveled_iclerk.erl | 101 ++++++++++++++++++++--------- test/end_to_end/recovery_SUITE.erl | 24 ++++++- 2 files changed, 91 insertions(+), 34 deletions(-) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index cd8c547..0e2b2ae 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -115,15 +115,24 @@ reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(), maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(), - compression_method = native :: lz4|native}). + compression_method = native :: lz4|native, + scored_files = [] :: list(candidate()), + scoring_state :: scoring_state()|undefined}). -record(candidate, {low_sqn :: integer() | undefined, filename :: string() | undefined, journal :: pid() | undefined, compaction_perc :: float() | undefined}). +-record(scoring_state, {filter_fun :: fun(), + filter_server :: pid(), + max_sqn :: non_neg_integer(), + close_fun :: fun(), + start_time :: erlang:timestamp()}). + -type iclerk_options() :: #iclerk_options{}. -type candidate() :: #candidate{}. +-type scoring_state() :: #scoring_state{}. -type score_parameters() :: {integer(), float(), float()}. % Score parameters are a tuple % - of maximum run length; how long a run of consecutive files can be for @@ -188,7 +197,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> %% @doc %% Stop the clerk clerk_stop(Pid) -> - gen_server:call(Pid, stop, 60000). + gen_server:call(Pid, stop, infinity). -spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. %% @doc @@ -208,6 +217,16 @@ clerk_addlogs(Pid, ForcedLogs) -> clerk_removelogs(Pid, ForcedLogs) -> gen_server:cast(Pid, {remove_logs, ForcedLogs}). + +-spec clerk_scorefilelist(pid(), list(candidate())) -> ok. +%% @doc +%% Score the file at the head of the list and then send the tail of the list to +%% be scored +clerk_scorefilelist(Pid, []) -> + gen_server:cast(Pid, scoring_complete); +clerk_scorefilelist(Pid, CandidateList) -> + gen_server:cast(Pid, {score_filelist, CandidateList}). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -254,6 +273,16 @@ init([LogOpts, IClerkOpts]) -> IClerkOpts#iclerk_options.compression_method}}. handle_call(stop, _From, State) -> + case State#state.scoring_state of + undefined -> + ok; + ScoringState -> + % Closed when scoring files, and so need to shutdown FilterServer + % to close down neatly + CloseFun = ScoringState#scoring_state.close_fun, + FilterServer = ScoringState#scoring_state.filter_server, + CloseFun(FilterServer) + end, {stop, normal, ok, State}. handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, @@ -267,12 +296,42 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest [_Active|Manifest] = Manifest0, - Inker = State#state.inker, - MaxRunLength = State#state.max_run_length, {FilterServer, MaxSQN} = InitiateFun(Checker), + ok = clerk_scorefilelist(self(), Manifest), + ScoringState = + #scoring_state{filter_fun = FilterFun, + filter_server = FilterServer, + max_sqn = MaxSQN, + close_fun = CloseFun, + start_time = SW}, + {noreply, State#state{scored_files = [], scoring_state = ScoringState}}; +handle_cast({score_filelist, [Entry|Tail]}, State) -> + Candidates = State#state.scored_files, + {LowSQN, FN, JournalP, _LK} = Entry, + ScoringState = State#state.scoring_state, + CpctPerc = check_single_file(JournalP, + ScoringState#scoring_state.filter_fun, + ScoringState#scoring_state.filter_server, + ScoringState#scoring_state.max_sqn, + ?SAMPLE_SIZE, + ?BATCH_SIZE), + Candidate = + #candidate{low_sqn = LowSQN, + filename = FN, + journal = JournalP, + compaction_perc = CpctPerc}, + ok = clerk_scorefilelist(self(), Tail), + {noreply, State#state{scored_files = [Candidate|Candidates]}}; +handle_cast(scoring_complete, State) -> + MaxRunLength = State#state.max_run_length, CDBopts = State#state.cdb_options, - - Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), + Candidates = lists:reverse(State#state.scored_files), + ScoringState = State#state.scoring_state, + FilterFun = ScoringState#scoring_state.filter_fun, + FilterServer = ScoringState#scoring_state.filter_server, + MaxSQN = ScoringState#scoring_state.max_sqn, + CloseFun = ScoringState#scoring_state.close_fun, + SW = ScoringState#scoring_state.start_time, ScoreParams = {MaxRunLength, State#state.maxrunlength_compactionperc, @@ -298,15 +357,15 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, end, BestRun1), leveled_log:log("IC002", [length(FilesToDelete)]), - ok = leveled_inker:ink_clerkcomplete(Inker, + ok = leveled_inker:ink_clerkcomplete(State#state.inker, ManifestSlice, FilesToDelete), ok = CloseFun(FilterServer), - {noreply, State}; + {noreply, State#state{scoring_state = undefined}}; false -> - ok = leveled_inker:ink_clerkcomplete(Inker, [], []), + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []), ok = CloseFun(FilterServer), - {noreply, State} + {noreply, State#state{scoring_state = undefined}} end; handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> FilesToDelete = @@ -487,28 +546,6 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> 100 * ActiveSize / (ActiveSize + ReplacedSize) end. -scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) -> - scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN, []). - -scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) -> - CandidateList; -scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) -> - {LowSQN, FN, JournalP, _LK} = Entry, - CpctPerc = check_single_file(JournalP, - FilterFun, - FilterServer, - MaxSQN, - ?SAMPLE_SIZE, - ?BATCH_SIZE), - scan_all_files(Tail, - FilterFun, - FilterServer, - MaxSQN, - CandidateList ++ - [#candidate{low_sqn = LowSQN, - filename = FN, - journal = JournalP, - compaction_perc = CpctPerc}]). fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> CheckedList; diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 304b1d7..fb4a9db 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -10,7 +10,8 @@ recovr_strategy/1, aae_missingjournal/1, aae_bustedjournal/1, - journal_compaction_bustedjournal/1 + journal_compaction_bustedjournal/1, + close_duringcompaction/1 ]). all() -> [ @@ -21,10 +22,29 @@ all() -> [ recovr_strategy, aae_missingjournal, aae_bustedjournal, - journal_compaction_bustedjournal + journal_compaction_bustedjournal, + close_duringcompaction ]. +close_duringcompaction(_Config) -> + % Prompt a compaction, and close immedately - confirm that the close + % happens without error. + % This should trigger the iclerk to receive a close during the file + % scoring stage + RootPath = testutil:reset_filestructure(), + BookOpts = [{root_path, RootPath}, + {cache_size, 2000}, + {max_journalsize, 2000000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 6400), + {ok, Book1} = leveled_bookie:book_start(BookOpts), + ok = leveled_bookie:book_compactjournal(Book1, 30000), + ok = leveled_bookie:book_close(Book1), + {ok, Book2} = leveled_bookie:book_start(BookOpts), + ok = testutil:check_indexed_objects(Book2, "Bucket1", Spcl1, LastV1), + ok = leveled_bookie:book_close(Book2). + recovery_with_samekeyupdates(_Config) -> % Setup to prove https://github.com/martinsumner/leveled/issues/229 % run a test that involves many updates to the same key, and check that From 5b54affbf0ee9b1e0f3bb488d315ff1221a275d4 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 25 Jan 2019 14:32:41 +0000 Subject: [PATCH 3/3] Have inker reopen compacted files The inker cler will now close compacted files before prompting the inker to update the manifest. The inker should reopen those files, so that the file processes are linked to it and not the clerk. This also stops a stopped clerk leading to orphaned cdb files. --- src/leveled_iclerk.erl | 28 +++++++++++++++++----------- src/leveled_imanifest.erl | 3 +++ src/leveled_inker.erl | 10 ++++++++-- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 0e2b2ae..076d73b 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -227,6 +227,7 @@ clerk_scorefilelist(Pid, []) -> clerk_scorefilelist(Pid, CandidateList) -> gen_server:cast(Pid, {score_filelist, CandidateList}). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -357,14 +358,14 @@ handle_cast(scoring_complete, State) -> end, BestRun1), leveled_log:log("IC002", [length(FilesToDelete)]), + ok = CloseFun(FilterServer), ok = leveled_inker:ink_clerkcomplete(State#state.inker, ManifestSlice, FilesToDelete), - ok = CloseFun(FilterServer), {noreply, State#state{scoring_state = undefined}}; false -> - ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []), ok = CloseFun(FilterServer), + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []), {noreply, State#state{scoring_state = undefined}} end; handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> @@ -794,8 +795,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> SQN, compact_journal), leveled_log:log("IC009", [FN]), - leveled_cdb:cdb_open_writer(FN, - CDBopts); + leveled_cdb:cdb_open_writer(FN, CDBopts); _ -> {ok, Journal0} end, @@ -1018,9 +1018,10 @@ compact_single_file_recovr_test() -> LedgerFun1, CompactFP, CDB} = compact_single_file_setup(), - [{LowSQN, FN, PidR, _LastKey}] = + CDBOpts = #cdb_options{binary_mode=true}, + [{LowSQN, FN, _PidOldR, LastKey}] = compact_files([Candidate], - #cdb_options{file_path=CompactFP, binary_mode=true}, + CDBOpts#cdb_options{file_path=CompactFP}, LedgerFun1, LedgerSrv1, 9, @@ -1028,6 +1029,7 @@ compact_single_file_recovr_test() -> native), io:format("FN of ~s~n", [FN]), ?assertMatch(2, LowSQN), + {ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts), ?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, @@ -1047,6 +1049,7 @@ compact_single_file_recovr_test() -> test_ledgerkey("Key2")}), ?assertMatch({{_, _}, {"Value2", {[], infinity}}}, leveled_codec:from_inkerkv(RKV1)), + ok = leveled_cdb:cdb_close(PidR), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -1057,9 +1060,10 @@ compact_single_file_retain_test() -> LedgerFun1, CompactFP, CDB} = compact_single_file_setup(), - [{LowSQN, FN, PidR, _LK}] = + CDBOpts = #cdb_options{binary_mode=true}, + [{LowSQN, FN, _PidOldR, LastKey}] = compact_files([Candidate], - #cdb_options{file_path=CompactFP, binary_mode=true}, + CDBOpts#cdb_options{file_path=CompactFP}, LedgerFun1, LedgerSrv1, 9, @@ -1067,6 +1071,7 @@ compact_single_file_retain_test() -> native), io:format("FN of ~s~n", [FN]), ?assertMatch(1, LowSQN), + {ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts), ?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, @@ -1081,11 +1086,12 @@ compact_single_file_retain_test() -> stnd, test_ledgerkey("Key1")})), RKV1 = leveled_cdb:cdb_get(PidR, - {2, - stnd, - test_ledgerkey("Key2")}), + {2, + stnd, + test_ledgerkey("Key2")}), ?assertMatch({{_, _}, {"Value2", {[], infinity}}}, leveled_codec:from_inkerkv(RKV1)), + ok = leveled_cdb:cdb_close(PidR), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl index fb72e84..24da2b8 100644 --- a/src/leveled_imanifest.erl +++ b/src/leveled_imanifest.erl @@ -52,6 +52,9 @@ generate_entry(Journal) -> case leveled_cdb:cdb_firstkey(PidR) of {StartSQN, _Type, _PK} -> LastKey = leveled_cdb:cdb_lastkey(PidR), + % close the file here. This will then be re-opened by the inker + % and so will be correctly linked to the inker not to the iclerk + ok = leveled_cdb:cdb_close(PidR), [{StartSQN, NewFN, PidR, LastKey}]; empty -> leveled_log:log("IC013", [NewFN]), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index d25f6d9..22266e4 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -698,14 +698,20 @@ handle_call(doom, _From, State) -> handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) -> + CDBOpts = State#state.cdb_options, DropFun = fun(E, Acc) -> leveled_imanifest:remove_entry(Acc, E) end, Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete), AddFun = - fun(E, Acc) -> - leveled_imanifest:add_entry(Acc, E, false) + fun(ManEntry, Acc) -> + {LowSQN, FN, _, LK_RO} = ManEntry, + % At this stage the FN has a .cdb extension, which will be + % stripped during add_entry - so need to add the .cdb here + {ok, Pid} = leveled_cdb:cdb_reopen_reader(FN, LK_RO, CDBOpts), + UpdEntry = {LowSQN, FN, Pid, LK_RO}, + leveled_imanifest:add_entry(Acc, UpdEntry, false) end, Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), NewManifestSQN = State#state.manifest_sqn + 1,