Allow for caching of compaction scores

Potentially reduce the overheads of scoring each file on every run.

The change also alters the default thresholds for compaction to favour longer runs (which will tend towards greater storage efficiency).
This commit is contained in:
Martin Sumner 2020-11-27 02:35:27 +00:00
parent e3bcd7eaec
commit b4c79caf7a
9 changed files with 153 additions and 17 deletions

View file

@ -140,8 +140,9 @@
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
{singlefile_compactionpercentage, 50.0},
{singlefile_compactionpercentage, 30.0},
{maxrunlength_compactionpercentage, 70.0},
{journalcompaction_scoreonein, 1},
{reload_strategy, []},
{max_pencillercachesize, ?MAX_PCL_CACHE_SIZE},
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
@ -292,6 +293,11 @@
% a run of max_run_length, before that run can be a compaction
% candidate. For runs between 1 and max_run_length, a
% proportionate score is calculated
{journalcompaction_scoreonein, pos_integer()} |
% When scoring for compaction run a probability (1 in x) of whether
% any file will be scored this run. If not scored a cached score
% will be used, and the cached score is the average of the latest
% score and the rolling average of previous scores
{reload_strategy, list()} |
% The reload_strategy is exposed as an option as currently no firm
% decision has been made about how recovery from failure should
@ -1757,6 +1763,8 @@ set_options(Opts) ->
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),
{#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy,
max_run_length = proplists:get_value(max_run_length, Opts),
@ -1766,6 +1774,7 @@ set_options(Opts) ->
snaptimeout_long = SnapTimeoutLong,
compression_method = CompressionMethod,
compress_on_receipt = CompressOnReceipt,
score_onein = ScoreOneIn,
cdb_options =
#cdb_options{max_size=MaxJournalSize,
max_count=MaxJournalCount,

View file

@ -113,7 +113,9 @@
cdb_deletepending/1,
cdb_deletepending/3,
cdb_isrolling/1,
cdb_clerkcomplete/1]).
cdb_clerkcomplete/1,
cdb_getcachedscore/1,
cdb_putcachedscore/2]).
-export([finished_rolling/1,
hashtable_calc/2]).
@ -152,7 +154,8 @@
timings = no_timing :: cdb_timings(),
timings_countdown = 0 :: integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options()}).
:: leveled_log:log_options(),
cached_score :: float()|undefined}).
-record(cdb_timings, {sample_count = 0 :: integer(),
sample_cyclecount = 0 :: integer(),
@ -164,6 +167,9 @@
-type cdb_timings() :: no_timing|#cdb_timings{}.
-type hashtable_index() :: tuple().
-type file_location() :: integer()|eof.
-type filter_fun() ::
fun((any(), binary(), integer(), any(), fun((binary()) -> any())) ->
{stop|loop, any()}).
@ -369,7 +375,7 @@ cdb_deletepending(Pid) ->
cdb_deletepending(Pid, ManSQN, Inker) ->
gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}).
-spec cdb_scan(pid(), fun(), any(), integer()|undefined) ->
-spec cdb_scan(pid(), filter_fun(), any(), integer()|undefined) ->
{integer()|eof, any()}.
%% @doc
%% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to
@ -424,6 +430,20 @@ cdb_isrolling(Pid) ->
cdb_clerkcomplete(Pid) ->
gen_fsm:send_all_state_event(Pid, clerk_complete).
-spec cdb_getcachedscore(pid()) -> undefined|float().
%% @doc
%% Return the cached score for a CDB file
cdb_getcachedscore(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, get_cachedscore, infinity).
-spec cdb_putcachedscore(pid(), float()) -> ok.
%% @doc
%% Return the cached score for a CDB file
cdb_putcachedscore(Pid, Score) ->
gen_fsm:sync_send_all_state_event(Pid, {put_cachedscore, Score}, infinity).
%%%============================================================================
%%% gen_server callbacks
@ -829,6 +849,10 @@ handle_sync_event(cdb_filename, _From, StateName, State) ->
{reply, State#state.filename, StateName, State};
handle_sync_event(cdb_isrolling, _From, StateName, State) ->
{reply, StateName == rolling, StateName, State};
handle_sync_event(get_cachedscore, _From, StateName, State) ->
{reply, State#state.cached_score, StateName, State};
handle_sync_event({put_cachedscore, Score}, _From, StateName, State) ->
{reply, ok, StateName, State#state{cached_score = Score}};
handle_sync_event(cdb_close, _From, delete_pending, State) ->
leveled_log:log("CDB05",
[State#state.filename, delete_pending, cdb_close]),
@ -836,8 +860,7 @@ handle_sync_event(cdb_close, _From, delete_pending, State) ->
State#state.filename,
State#state.waste_path),
{stop, normal, ok, State};
handle_sync_event(cdb_close, _From, StateName, State) ->
leveled_log:log("CDB05", [State#state.filename, StateName, cdb_close]),
handle_sync_event(cdb_close, _From, _StateName, State) ->
file:close(State#state.handle),
{stop, normal, ok, State}.
@ -2396,6 +2419,10 @@ get_keys_byposition_manykeys_test_to() ->
SampleList3 = cdb_getpositions(P2, KeyCount + 1),
?assertMatch(KeyCount, length(SampleList3)),
?assertMatch(undefined, cdb_getcachedscore(P2)),
ok = cdb_putcachedscore(P2, 80.0),
?assertMatch(80.0, cdb_getcachedscore(P2)),
ok = cdb_close(P2),
ok = file:delete(F2).

View file

@ -117,7 +117,8 @@
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
compression_method = native :: lz4|native,
scored_files = [] :: list(candidate()),
scoring_state :: scoring_state()|undefined}).
scoring_state :: scoring_state()|undefined,
score_onein = 1 :: pos_integer()}).
-record(candidate, {low_sqn :: integer() | undefined,
filename :: string() | undefined,
@ -270,7 +271,7 @@ init([LogOpts, IClerkOpts]) ->
MRLCP when is_float(MRLCP) ->
MRLCP
end,
{ok, #state{max_run_length = MRL,
inker = IClerkOpts#iclerk_options.inker,
cdb_options = CDBopts,
@ -280,7 +281,10 @@ init([LogOpts, IClerkOpts]) ->
singlefile_compactionperc = SFL_CompPerc,
maxrunlength_compactionperc = MRL_CompPerc,
compression_method =
IClerkOpts#iclerk_options.compression_method}}.
IClerkOpts#iclerk_options.compression_method,
score_onein =
IClerkOpts#iclerk_options.score_onein
}}.
handle_call(stop, _From, State) ->
case State#state.scoring_state of
@ -325,13 +329,22 @@ handle_cast({score_filelist, [Entry|Tail]}, State) ->
Candidates = State#state.scored_files,
{LowSQN, FN, JournalP, _LK} = Entry,
ScoringState = State#state.scoring_state,
CpctPerc = check_single_file(JournalP,
CpctPerc =
case {leveled_cdb:cdb_getcachedscore(JournalP),
leveled_rand:uniform(State#state.score_onein) == 1} of
{CachedScore, UseNewScore}
when CachedScore == undefined; UseNewScore ->
check_single_file(JournalP,
ScoringState#scoring_state.filter_fun,
ScoringState#scoring_state.filter_server,
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE,
State#state.reload_strategy),
State#state.reload_strategy);
{CachedScore, false} ->
CachedScore
end,
ok = leveled_cdb:cdb_putcachedscore(JournalP, CpctPerc),
Candidate =
#candidate{low_sqn = LowSQN,
filename = FN,

View file

@ -806,6 +806,7 @@ start_from_file(InkOpts) ->
PressMethod = InkOpts#inker_options.compression_method,
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
SnapTimeout = InkOpts#inker_options.snaptimeout_long,
ScoreOneIn = InkOpts#inker_options.score_onein,
IClerkOpts =
#iclerk_options{inker = self(),
@ -815,7 +816,8 @@ start_from_file(InkOpts) ->
compression_method = PressMethod,
max_run_length = MRL,
singlefile_compactionperc = SFL_CompactPerc,
maxrunlength_compactionperc = MRL_CompactPerc},
maxrunlength_compactionperc = MRL_CompactPerc,
score_onein = ScoreOneIn},
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),