Standardise retention decision

Use the same function to decide for both scoring and compaction - and avoid the situation where somethig is scored for cmpaction, but doesnt change (which was the case previously with tombstones that were still in the ledger).
This commit is contained in:
Martin Sumner 2020-11-29 15:43:29 +00:00
parent 00823584ec
commit 80e6920d6c
2 changed files with 86 additions and 67 deletions

View file

@ -125,10 +125,10 @@
journal :: pid() | undefined, journal :: pid() | undefined,
compaction_perc :: float() | undefined}). compaction_perc :: float() | undefined}).
-record(scoring_state, {filter_fun :: fun(), -record(scoring_state, {filter_fun :: leveled_inker:filterfun(),
filter_server :: pid(), filter_server :: leveled_inker:filterserver(),
max_sqn :: non_neg_integer(), max_sqn :: non_neg_integer(),
close_fun :: fun(), close_fun :: leveled_inker:filterclosefun(),
start_time :: erlang:timestamp()}). start_time :: erlang:timestamp()}).
-type iclerk_options() :: #iclerk_options{}. -type iclerk_options() :: #iclerk_options{}.
@ -166,8 +166,11 @@
clerk_new(InkerClerkOpts) -> clerk_new(InkerClerkOpts) ->
gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerClerkOpts], []). gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerClerkOpts], []).
-spec clerk_compact(pid(), pid(), -spec clerk_compact(pid(),
fun(), fun(), fun(), pid(),
leveled_inker:filterinitfun(),
leveled_inker:filterclosefun(),
leveled_inker:filterfun(),
list()) -> ok. list()) -> ok.
%% @doc %% @doc
%% Trigger a compaction for this clerk if the threshold of data recovery has %% Trigger a compaction for this clerk if the threshold of data recovery has
@ -538,7 +541,10 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%%% Internal functions %%% Internal functions
%%%============================================================================ %%%============================================================================
-spec check_single_file(pid(), fun(), any(), non_neg_integer(), -spec check_single_file(pid(),
leveled_inker:filterfun(),
leveled_inker:filterserver(),
leveled_codec:sqn(),
non_neg_integer(), non_neg_integer(), non_neg_integer(), non_neg_integer(),
leveled_codec:compaction_strategy()) -> leveled_codec:compaction_strategy()) ->
float(). float().
@ -578,44 +584,31 @@ safely_log_filescore(PositionList, FN, Score, SW) ->
leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW). leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW).
-spec size_comparison_score(list(key_size() | corrupted_test_key_size()), -spec size_comparison_score(list(key_size() | corrupted_test_key_size()),
fun(), leveled_inker:filterfun(),
any(), leveled_inker:filterserver(),
non_neg_integer(), leveled_codec:sqn(),
leveled_codec:compaction_strategy()) -> leveled_codec:compaction_strategy()) ->
float(). float().
size_comparison_score(KeySizeList, size_comparison_score(KeySizeList,
FilterFun, FilterServer, MaxSQN, FilterFun, FilterServer, MaxSQN,
RS) -> ReloadStrategy) ->
FoldFunForSizeCompare = FoldFunForSizeCompare =
fun(KS, {ActSize, RplSize}) -> fun(KS, {ActSize, RplSize}) ->
case KS of case KS of
{{SQN, Type, PK}, Size} -> {{SQN, Type, PK}, Size} ->
IsJournalEntry = ToRetain =
leveled_codec:is_full_journalentry({SQN, Type, PK}), to_retain({SQN, Type, PK},
case IsJournalEntry of FilterFun,
false -> FilterServer,
TS = leveled_codec:get_tagstrategy(PK, RS), MaxSQN,
% If the strategy is to retain key deltas, then ReloadStrategy),
% scoring must reflect that. Key deltas are case ToRetain of
% possible even if strategy does not allow as
% there is support for changing strategy from
% retain to recalc
case TS of
retain ->
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
end;
true -> true ->
Check = FilterFun(FilterServer, PK, SQN), {ActSize + Size - ?CRC_SIZE, RplSize};
case {Check, SQN > MaxSQN} of convert ->
{current, _} -> {ActSize, RplSize + Size - ?CRC_SIZE};
{ActSize + Size - ?CRC_SIZE, RplSize}; false ->
{_, true} -> {ActSize, RplSize + Size - ?CRC_SIZE}
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
end
end; end;
_ -> _ ->
% There is a key which is not in expected format % There is a key which is not in expected format
@ -839,53 +832,71 @@ split_positions_into_batches(Positions, Journal, Batches) ->
%% if it contains index entries. The hot_backup approach is also not safe with %% if it contains index entries. The hot_backup approach is also not safe with
%% a `recovr` strategy. The recovr strategy assumes faults in the ledger will %% a `recovr` strategy. The recovr strategy assumes faults in the ledger will
%% be resolved via application-level anti-entropy %% be resolved via application-level anti-entropy
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> filter_output(KVCs, FilterFun, FilterServer, MaxSQN, Strategy) ->
FoldFun = FoldFun =
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy), filter_output_fun(FilterFun, FilterServer, MaxSQN, Strategy),
lists:reverse(lists:foldl(FoldFun, [], KVCs)). lists:reverse(lists:foldl(FoldFun, [], KVCs)).
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> filter_output_fun(FilterFun, FilterServer, MaxSQN, Strategy) ->
fun(KVC0, Acc) -> fun(KVC0, Acc) ->
case KVC0 of case KVC0 of
{_InkKey, crc_wonky, false} -> {_InkKey, crc_wonky, false} ->
% Bad entry, disregard, don't check % Bad entry, disregard, don't check
Acc; Acc;
{JK, JV, _Check} -> {JK, JV, _Check} ->
{SQN, LK} = ToRetain =
leveled_codec:from_journalkey(JK), to_retain(JK, FilterFun, FilterServer, MaxSQN, Strategy),
CompactStrategy = case ToRetain of
leveled_codec:get_tagstrategy(LK, ReloadStrategy), true ->
IsJournalEntry =
leveled_codec:is_full_journalentry(JK),
case {CompactStrategy, IsJournalEntry} of
{retain, false} ->
[KVC0|Acc]; [KVC0|Acc];
_ -> convert ->
KeyCurrent = FilterFun(FilterServer, LK, SQN), {JK0, JV0} =
IsInMemory = SQN > MaxSQN, leveled_codec:revert_to_keydeltas(JK, JV),
case {KeyCurrent, IsInMemory, CompactStrategy} of [{JK0, JV0, null}|Acc];
{KC, InMem, _} when KC == current; InMem -> false ->
% This entry may still be required Acc
% regardless of strategy
[KVC0|Acc];
{_, _, retain} ->
% If we have a retain strategy, it can't be
% discarded - but the value part is no
% longer required as this version has been
% replaced
{JK0, JV0} =
leveled_codec:revert_to_keydeltas(JK, JV),
[{JK0, JV0, null}|Acc];
{_, _, _} ->
% This is out of date and not retained so
% discard
Acc
end
end end
end end
end. end.
-spec to_retain(leveled_codec:journal_key(),
leveled_inker:filterfun(),
leveled_inker:fillter_server(),
leveled_codec:sqn(),
leveled_codec:compaction_strategy()) -> boolean()|convert.
to_retain(JournalKey, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
{SQN, LK} =
leveled_codec:from_journalkey(JournalKey),
CompactStrategy =
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
IsJournalEntry =
leveled_codec:is_full_journalentry(JournalKey),
case {CompactStrategy, IsJournalEntry} of
{retain, false} ->
true;
_ ->
KeyCurrent = FilterFun(FilterServer, LK, SQN),
IsInMemory = SQN > MaxSQN,
case {KeyCurrent, IsInMemory, CompactStrategy} of
{KC, InMem, _} when KC == current; InMem ->
% This entry may still be required
% regardless of strategy
true;
{_, _, retain} ->
% If we have a retain strategy, it can't be
% discarded - but the value part is no
% longer required as this version has been
% replaced
convert;
{_, _, _} ->
% This is out of date and not retained so
% discard
false
end
end.
write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
{Journal0, ManSlice0}; {Journal0, ManSlice0};
write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->

View file

@ -157,6 +157,14 @@
-type inker_options() :: #inker_options{}. -type inker_options() :: #inker_options{}.
-type ink_state() :: #state{}. -type ink_state() :: #state{}.
-type registered_snapshot() :: {pid(), os:timestamp(), integer()}. -type registered_snapshot() :: {pid(), os:timestamp(), integer()}.
-type filterserver() :: pid()|list(tuple()).
-type filterfun() ::
fun((filterserver(), leveled_codec:ledger_key(), leveled_codec:sqn()) ->
current|replaced|missing).
-type filterclosefun() :: fun((filterserver()) -> ok).
-type filterinitfun() :: fun((pid()) -> {filterserver(), leveled_codec:sqn()}).
-export_type([filterserver/0, filterfun/0, filterclosefun/0, filterinitfun/0]).
%%%============================================================================ %%%============================================================================
%%% API %%% API