diff --git a/include/leveled.hrl b/include/leveled.hrl index fc6a7f0..64f0dfe 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -58,7 +58,9 @@ waste_retention_period :: integer() | undefined, compression_method = native :: lz4|native, compress_on_receipt = false :: boolean(), - max_run_length}). + max_run_length, + singlefile_compactionperc :: float()|undefined, + maxrunlength_compactionperc :: float()|undefined}). -record(penciller_options, {root_path :: string() | undefined, @@ -77,6 +79,8 @@ cdb_options = #cdb_options{} :: #cdb_options{}, waste_retention_period :: integer() | undefined, compression_method = native :: lz4|native, + singlefile_compactionperc :: float()|undefined, + maxrunlength_compactionperc :: float()|undefined, reload_strategy = [] :: list()}). -record(recent_aae, {filter :: whitelist|blacklist, diff --git a/priv/leveled.schema b/priv/leveled.schema index c061898..198a9b4 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -89,11 +89,35 @@ %% In a single compaction run, what is the maximum number of consecutive files %% which may be compacted. {mapping, "leveled.max_run_length", "leveled.max_run_length", [ - {default, 8}, - {datatype, integer}, + {default, 6}, + {datatype, integer} +]}. + +%% @doc Target Percentage for Max Run +%% What is the target score for a maximum run of files, to qualify for +%% compaction. If less than this percentage would be retained after compaction +%% then it is a candidate (e.g. in default case if 25% of space would be +%% recovered) +{mapping, "leveled.maxrunlength_compactionpercentage", "leveled.maxrunlength_compactionpercentage", [ + {default, 75.0}, + {datatype, float}, + hidden +]}. + +%% @doc Target Percentage for Single File +%% What is the target score for a run of a single file, to qualify for +%% compaction. If less than this percentage would be retained after compaction +%% then it is a candidate (e.g. in default case if 50% of space would be +%% recovered) +{mapping, "leveled.singlefile_compactionpercentage", "leveled.singlefile_compactionpercentage", [ + {default, 50.0}, + {datatype, float}, hidden ]}. + + + diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 0b0124e..9359f09 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -99,6 +99,8 @@ {head_only, false}, {waste_retention_period, undefined}, {max_run_length, undefined}, + {singlefile_compactionpercentage, 50.0}, + {maxrunlength_compactionpercentage, 70.0}, {reload_strategy, []}, {max_pencillercachesize, undefined}, {compression_method, ?COMPRESSION_METHOD}, @@ -225,6 +227,15 @@ % The maximum number of consecutive files that can be compacted in % one compaction operation. % Defaults to leveled_iclerk:?MAX_COMPACTION_RUN (if undefined) + {singlefile_compactionpercentage, float()} | + % What is the percentage of space to be recovered from compacting + % a single file, before that file can be a compaction candidate in + % a compaction run of length 1 + {maxrunlength_compactionpercentage, float()} | + % What is the percentage of space to be recovered from compacting + % a run of max_run_length, before that run can be a compaction + % candidate. For runs between 1 and max_run_length, a + % proportionate score is calculated {reload_strategy, list()} | % The reload_strategy is exposed as an option as currently no firm % decision has been made about how recovery from failure should @@ -1008,6 +1019,14 @@ set_options(Opts) -> ok = filelib:ensure_dir(JournalFP), ok = filelib:ensure_dir(LedgerFP), + SFL_CompPerc = + proplists:get_value(singlefile_compactionpercentage, Opts), + MRL_CompPerc = + proplists:get_value(maxrunlength_compactionpercentage, Opts), + true = MRL_CompPerc >= SFL_CompPerc, + true = 100.0 >= MRL_CompPerc, + true = SFL_CompPerc >= 0.0, + CompressionMethod = proplists:get_value(compression_method, Opts), CompressOnReceipt = case proplists:get_value(compression_point, Opts) of @@ -1023,6 +1042,8 @@ set_options(Opts) -> {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, max_run_length = proplists:get_value(max_run_length, Opts), + singlefile_compactionperc = SFL_CompPerc, + maxrunlength_compactionperc = MRL_CompPerc, waste_retention_period = WRP, compression_method = CompressionMethod, compress_on_receipt = CompressOnReceipt, diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index fc92e70..be03a12 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -95,14 +95,12 @@ -define(SAMPLE_SIZE, 100). -define(BATCH_SIZE, 32). -define(BATCHES_TO_CHECK, 8). -%% How many consecutive files to compact in one run --define(MAX_COMPACTION_RUN, 8). -%% Sliding scale to allow preference of longer runs up to maximum --define(SINGLEFILE_COMPACTION_TARGET, 40.0). --define(MAXRUN_COMPACTION_TARGET, 70.0). -define(CRC_SIZE, 4). -define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])). -define(INTERVALS_PER_HOUR, 4). +-define(MAX_COMPACTION_RUN, 8). +-define(SINGLEFILE_COMPACTION_TARGET, 50.0). +-define(MAXRUNLENGTH_COMPACTION_TARGET, 75.0). -record(state, {inker :: pid() | undefined, max_run_length :: integer() | undefined, @@ -110,6 +108,8 @@ waste_retention_period :: integer() | undefined, waste_path :: string() | undefined, reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), + singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(), + maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(), compression_method = native :: lz4|native}). -record(candidate, {low_sqn :: integer() | undefined, @@ -183,13 +183,30 @@ init([IClerkOpts]) -> MRL0 -> MRL0 end, - + + SFL_CompPerc = + case IClerkOpts#iclerk_options.singlefile_compactionperc of + undefined -> + ?SINGLEFILE_COMPACTION_TARGET; + SFLCP when is_float(SFLCP) -> + SFLCP + end, + MRL_CompPerc = + case IClerkOpts#iclerk_options.maxrunlength_compactionperc of + undefined -> + ?MAXRUNLENGTH_COMPACTION_TARGET; + MRLCP when is_float(MRLCP) -> + MRLCP + end, + {ok, #state{max_run_length = MRL, inker = IClerkOpts#iclerk_options.inker, cdb_options = CDBopts, reload_strategy = ReloadStrategy, waste_path = WP, waste_retention_period = WRP, + singlefile_compactionperc = SFL_CompPerc, + maxrunlength_compactionperc = MRL_CompPerc, compression_method = IClerkOpts#iclerk_options.compression_method}}. @@ -208,11 +225,15 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, CDBopts = State#state.cdb_options, Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), - BestRun0 = assess_candidates(Candidates, MaxRunLength), - case score_run(BestRun0, MaxRunLength) of + ScoreParams = + {MaxRunLength, + State#state.maxrunlength_compactionperc, + State#state.singlefile_compactionperc}, + BestRun0 = assess_candidates(Candidates, ScoreParams), + case score_run(BestRun0, ScoreParams) of Score when Score > 0.0 -> BestRun1 = sort_run(BestRun0), - print_compaction_run(BestRun1, MaxRunLength), + print_compaction_run(BestRun1, ScoreParams), ManifestSlice = compact_files(BestRun1, CDBopts, FilterFun, @@ -436,13 +457,6 @@ fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List). -assess_candidates(AllCandidates, MaxRunLength) when is_integer(MaxRunLength) -> - % This will take the defaults for other params. - % Unit tests should pass tuple as params including tested defaults - assess_candidates(AllCandidates, - {MaxRunLength, - ?MAXRUN_COMPACTION_TARGET, - ?SINGLEFILE_COMPACTION_TARGET}); assess_candidates(AllCandidates, Params) -> NaiveBestRun = assess_candidates(AllCandidates, Params, [], []), MaxRunLength = element(1, Params), @@ -492,11 +506,7 @@ choose_best_assessment(RunToAssess, BestRun, Params) -> end end. -score_run(Run, MaxRunLength) when is_integer(MaxRunLength) -> - Params = {MaxRunLength, - ?MAXRUN_COMPACTION_TARGET, - ?SINGLEFILE_COMPACTION_TARGET}, - score_run(Run, Params); + score_run([], _Params) -> 0.0; score_run(Run, {MaxRunLength, MR_CT, SF_CT}) -> @@ -515,9 +525,9 @@ score_run(Run, {MaxRunLength, MR_CT, SF_CT}) -> Target - RunTotal / length(Run). -print_compaction_run(BestRun, MaxRunLength) -> +print_compaction_run(BestRun, ScoreParams) -> leveled_log:log("IC005", [length(BestRun), - score_run(BestRun, MaxRunLength)]), + score_run(BestRun, ScoreParams)]), lists:foreach(fun(File) -> leveled_log:log("IC006", [File#candidate.filename]) end, @@ -716,19 +726,19 @@ simple_score_test() -> #candidate{compaction_perc = 75.0}, #candidate{compaction_perc = 76.0}, #candidate{compaction_perc = 70.0}], - ?assertMatch(-4.0, score_run(Run1, 4)), + ?assertMatch(-4.0, score_run(Run1, {4, 70.0, 40.0})), Run2 = [#candidate{compaction_perc = 75.0}], - ?assertMatch(-35.0, score_run(Run2, 4)), - ?assertMatch(0.0, score_run([], 4)), + ?assertMatch(-35.0, score_run(Run2, {4, 70.0, 40.0})), + ?assertMatch(0.0, score_run([], {4, 40.0, 70.0})), Run3 = [#candidate{compaction_perc = 100.0}], - ?assertMatch(-60.0, score_run(Run3, 4)). + ?assertMatch(-60.0, score_run(Run3, {4, 70.0, 40.0})). score_compare_test() -> Run1 = [#candidate{compaction_perc = 55.0}, #candidate{compaction_perc = 55.0}, #candidate{compaction_perc = 56.0}, #candidate{compaction_perc = 50.0}], - ?assertMatch(16.0, score_run(Run1, 4)), + ?assertMatch(16.0, score_run(Run1, {4, 70.0, 40.0})), Run2 = [#candidate{compaction_perc = 55.0}], ?assertMatch(Run1, choose_best_assessment(Run1, @@ -758,7 +768,7 @@ find_bestrun_test() -> %% Tests dependent on these defaults %% -define(MAX_COMPACTION_RUN, 4). %% -define(SINGLEFILE_COMPACTION_TARGET, 40.0). -%% -define(MAXRUN_COMPACTION_TARGET, 60.0). +%% -define(MAXRUNLENGTH_COMPACTION_TARGET, 60.0). %% Tested first with blocks significant as no back-tracking Params = {4, 60.0, 40.0}, Block1 = [#candidate{compaction_perc = 55.0}, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 631db81..e8c6abc 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -601,14 +601,20 @@ start_from_file(InkOpts) -> WRP = InkOpts#inker_options.waste_retention_period, ReloadStrategy = InkOpts#inker_options.reload_strategy, MRL = InkOpts#inker_options.max_run_length, + SFL_CompactPerc = InkOpts#inker_options.singlefile_compactionperc, + MRL_CompactPerc = InkOpts#inker_options.maxrunlength_compactionperc, PressMethod = InkOpts#inker_options.compression_method, PressOnReceipt = InkOpts#inker_options.compress_on_receipt, - IClerkOpts = #iclerk_options{inker = self(), - cdb_options=IClerkCDBOpts, - waste_retention_period = WRP, - reload_strategy = ReloadStrategy, - compression_method = PressMethod, - max_run_length = MRL}, + IClerkOpts = + #iclerk_options{inker = self(), + cdb_options=IClerkCDBOpts, + waste_retention_period = WRP, + reload_strategy = ReloadStrategy, + compression_method = PressMethod, + max_run_length = MRL, + singlefile_compactionperc = SFL_CompactPerc, + maxrunlength_compactionperc = MRL_CompactPerc + }, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), @@ -1175,6 +1181,8 @@ compact_journal_testto(WRP, ExpectedFiles) -> cdb_options=CDBopts, reload_strategy=RStrategy, waste_retention_period=WRP, + singlefile_compactionperc=40.0, + maxrunlength_compactionperc=70.0, compression_method=native, compress_on_receipt=false},