diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index cd8c547..0e2b2ae 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -115,15 +115,24 @@ reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(), maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(), - compression_method = native :: lz4|native}). + compression_method = native :: lz4|native, + scored_files = [] :: list(candidate()), + scoring_state :: scoring_state()|undefined}). -record(candidate, {low_sqn :: integer() | undefined, filename :: string() | undefined, journal :: pid() | undefined, compaction_perc :: float() | undefined}). +-record(scoring_state, {filter_fun :: fun(), + filter_server :: pid(), + max_sqn :: non_neg_integer(), + close_fun :: fun(), + start_time :: erlang:timestamp()}). + -type iclerk_options() :: #iclerk_options{}. -type candidate() :: #candidate{}. +-type scoring_state() :: #scoring_state{}. -type score_parameters() :: {integer(), float(), float()}. % Score parameters are a tuple % - of maximum run length; how long a run of consecutive files can be for @@ -188,7 +197,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> %% @doc %% Stop the clerk clerk_stop(Pid) -> - gen_server:call(Pid, stop, 60000). + gen_server:call(Pid, stop, infinity). -spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. %% @doc @@ -208,6 +217,16 @@ clerk_addlogs(Pid, ForcedLogs) -> clerk_removelogs(Pid, ForcedLogs) -> gen_server:cast(Pid, {remove_logs, ForcedLogs}). + +-spec clerk_scorefilelist(pid(), list(candidate())) -> ok. +%% @doc +%% Score the file at the head of the list and then send the tail of the list to +%% be scored +clerk_scorefilelist(Pid, []) -> + gen_server:cast(Pid, scoring_complete); +clerk_scorefilelist(Pid, CandidateList) -> + gen_server:cast(Pid, {score_filelist, CandidateList}). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -254,6 +273,16 @@ init([LogOpts, IClerkOpts]) -> IClerkOpts#iclerk_options.compression_method}}. handle_call(stop, _From, State) -> + case State#state.scoring_state of + undefined -> + ok; + ScoringState -> + % Closed when scoring files, and so need to shutdown FilterServer + % to close down neatly + CloseFun = ScoringState#scoring_state.close_fun, + FilterServer = ScoringState#scoring_state.filter_server, + CloseFun(FilterServer) + end, {stop, normal, ok, State}. handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, @@ -267,12 +296,42 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest [_Active|Manifest] = Manifest0, - Inker = State#state.inker, - MaxRunLength = State#state.max_run_length, {FilterServer, MaxSQN} = InitiateFun(Checker), + ok = clerk_scorefilelist(self(), Manifest), + ScoringState = + #scoring_state{filter_fun = FilterFun, + filter_server = FilterServer, + max_sqn = MaxSQN, + close_fun = CloseFun, + start_time = SW}, + {noreply, State#state{scored_files = [], scoring_state = ScoringState}}; +handle_cast({score_filelist, [Entry|Tail]}, State) -> + Candidates = State#state.scored_files, + {LowSQN, FN, JournalP, _LK} = Entry, + ScoringState = State#state.scoring_state, + CpctPerc = check_single_file(JournalP, + ScoringState#scoring_state.filter_fun, + ScoringState#scoring_state.filter_server, + ScoringState#scoring_state.max_sqn, + ?SAMPLE_SIZE, + ?BATCH_SIZE), + Candidate = + #candidate{low_sqn = LowSQN, + filename = FN, + journal = JournalP, + compaction_perc = CpctPerc}, + ok = clerk_scorefilelist(self(), Tail), + {noreply, State#state{scored_files = [Candidate|Candidates]}}; +handle_cast(scoring_complete, State) -> + MaxRunLength = State#state.max_run_length, CDBopts = State#state.cdb_options, - - Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), + Candidates = lists:reverse(State#state.scored_files), + ScoringState = State#state.scoring_state, + FilterFun = ScoringState#scoring_state.filter_fun, + FilterServer = ScoringState#scoring_state.filter_server, + MaxSQN = ScoringState#scoring_state.max_sqn, + CloseFun = ScoringState#scoring_state.close_fun, + SW = ScoringState#scoring_state.start_time, ScoreParams = {MaxRunLength, State#state.maxrunlength_compactionperc, @@ -298,15 +357,15 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, end, BestRun1), leveled_log:log("IC002", [length(FilesToDelete)]), - ok = leveled_inker:ink_clerkcomplete(Inker, + ok = leveled_inker:ink_clerkcomplete(State#state.inker, ManifestSlice, FilesToDelete), ok = CloseFun(FilterServer), - {noreply, State}; + {noreply, State#state{scoring_state = undefined}}; false -> - ok = leveled_inker:ink_clerkcomplete(Inker, [], []), + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []), ok = CloseFun(FilterServer), - {noreply, State} + {noreply, State#state{scoring_state = undefined}} end; handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> FilesToDelete = @@ -487,28 +546,6 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> 100 * ActiveSize / (ActiveSize + ReplacedSize) end. -scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) -> - scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN, []). - -scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) -> - CandidateList; -scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) -> - {LowSQN, FN, JournalP, _LK} = Entry, - CpctPerc = check_single_file(JournalP, - FilterFun, - FilterServer, - MaxSQN, - ?SAMPLE_SIZE, - ?BATCH_SIZE), - scan_all_files(Tail, - FilterFun, - FilterServer, - MaxSQN, - CandidateList ++ - [#candidate{low_sqn = LowSQN, - filename = FN, - journal = JournalP, - compaction_perc = CpctPerc}]). fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> CheckedList; diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 304b1d7..fb4a9db 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -10,7 +10,8 @@ recovr_strategy/1, aae_missingjournal/1, aae_bustedjournal/1, - journal_compaction_bustedjournal/1 + journal_compaction_bustedjournal/1, + close_duringcompaction/1 ]). all() -> [ @@ -21,10 +22,29 @@ all() -> [ recovr_strategy, aae_missingjournal, aae_bustedjournal, - journal_compaction_bustedjournal + journal_compaction_bustedjournal, + close_duringcompaction ]. +close_duringcompaction(_Config) -> + % Prompt a compaction, and close immedately - confirm that the close + % happens without error. + % This should trigger the iclerk to receive a close during the file + % scoring stage + RootPath = testutil:reset_filestructure(), + BookOpts = [{root_path, RootPath}, + {cache_size, 2000}, + {max_journalsize, 2000000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 6400), + {ok, Book1} = leveled_bookie:book_start(BookOpts), + ok = leveled_bookie:book_compactjournal(Book1, 30000), + ok = leveled_bookie:book_close(Book1), + {ok, Book2} = leveled_bookie:book_start(BookOpts), + ok = testutil:check_indexed_objects(Book2, "Bucket1", Spcl1, LastV1), + ok = leveled_bookie:book_close(Book2). + recovery_with_samekeyupdates(_Config) -> % Setup to prove https://github.com/martinsumner/leveled/issues/229 % run a test that involves many updates to the same key, and check that