diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index f7cec00..760914e 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -125,10 +125,10 @@ journal :: pid() | undefined, compaction_perc :: float() | undefined}). --record(scoring_state, {filter_fun :: fun(), - filter_server :: pid(), +-record(scoring_state, {filter_fun :: leveled_inker:filterfun(), + filter_server :: leveled_inker:filterserver(), max_sqn :: non_neg_integer(), - close_fun :: fun(), + close_fun :: leveled_inker:filterclosefun(), start_time :: erlang:timestamp()}). -type iclerk_options() :: #iclerk_options{}. @@ -166,8 +166,11 @@ clerk_new(InkerClerkOpts) -> gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerClerkOpts], []). --spec clerk_compact(pid(), pid(), - fun(), fun(), fun(), +-spec clerk_compact(pid(), + pid(), + leveled_inker:filterinitfun(), + leveled_inker:filterclosefun(), + leveled_inker:filterfun(), list()) -> ok. %% @doc %% Trigger a compaction for this clerk if the threshold of data recovery has @@ -538,7 +541,10 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) -> %%% Internal functions %%%============================================================================ --spec check_single_file(pid(), fun(), any(), non_neg_integer(), +-spec check_single_file(pid(), + leveled_inker:filterfun(), + leveled_inker:filterserver(), + leveled_codec:sqn(), non_neg_integer(), non_neg_integer(), leveled_codec:compaction_strategy()) -> float(). @@ -578,44 +584,31 @@ safely_log_filescore(PositionList, FN, Score, SW) -> leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW). -spec size_comparison_score(list(key_size() | corrupted_test_key_size()), - fun(), - any(), - non_neg_integer(), + leveled_inker:filterfun(), + leveled_inker:filterserver(), + leveled_codec:sqn(), leveled_codec:compaction_strategy()) -> float(). size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN, - RS) -> + ReloadStrategy) -> FoldFunForSizeCompare = fun(KS, {ActSize, RplSize}) -> case KS of {{SQN, Type, PK}, Size} -> - IsJournalEntry = - leveled_codec:is_full_journalentry({SQN, Type, PK}), - case IsJournalEntry of - false -> - TS = leveled_codec:get_tagstrategy(PK, RS), - % If the strategy is to retain key deltas, then - % scoring must reflect that. Key deltas are - % possible even if strategy does not allow as - % there is support for changing strategy from - % retain to recalc - case TS of - retain -> - {ActSize + Size - ?CRC_SIZE, RplSize}; - _ -> - {ActSize, RplSize + Size - ?CRC_SIZE} - end; + ToRetain = + to_retain({SQN, Type, PK}, + FilterFun, + FilterServer, + MaxSQN, + ReloadStrategy), + case ToRetain of true -> - Check = FilterFun(FilterServer, PK, SQN), - case {Check, SQN > MaxSQN} of - {current, _} -> - {ActSize + Size - ?CRC_SIZE, RplSize}; - {_, true} -> - {ActSize + Size - ?CRC_SIZE, RplSize}; - _ -> - {ActSize, RplSize + Size - ?CRC_SIZE} - end + {ActSize + Size - ?CRC_SIZE, RplSize}; + convert -> + {ActSize, RplSize + Size - ?CRC_SIZE}; + false -> + {ActSize, RplSize + Size - ?CRC_SIZE} end; _ -> % There is a key which is not in expected format @@ -839,53 +832,71 @@ split_positions_into_batches(Positions, Journal, Batches) -> %% if it contains index entries. The hot_backup approach is also not safe with %% a `recovr` strategy. The recovr strategy assumes faults in the ledger will %% be resolved via application-level anti-entropy -filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> +filter_output(KVCs, FilterFun, FilterServer, MaxSQN, Strategy) -> FoldFun = - filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy), + filter_output_fun(FilterFun, FilterServer, MaxSQN, Strategy), lists:reverse(lists:foldl(FoldFun, [], KVCs)). -filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> +filter_output_fun(FilterFun, FilterServer, MaxSQN, Strategy) -> fun(KVC0, Acc) -> case KVC0 of {_InkKey, crc_wonky, false} -> % Bad entry, disregard, don't check Acc; {JK, JV, _Check} -> - {SQN, LK} = - leveled_codec:from_journalkey(JK), - CompactStrategy = - leveled_codec:get_tagstrategy(LK, ReloadStrategy), - IsJournalEntry = - leveled_codec:is_full_journalentry(JK), - case {CompactStrategy, IsJournalEntry} of - {retain, false} -> + ToRetain = + to_retain(JK, FilterFun, FilterServer, MaxSQN, Strategy), + case ToRetain of + true -> [KVC0|Acc]; - _ -> - KeyCurrent = FilterFun(FilterServer, LK, SQN), - IsInMemory = SQN > MaxSQN, - case {KeyCurrent, IsInMemory, CompactStrategy} of - {KC, InMem, _} when KC == current; InMem -> - % This entry may still be required - % regardless of strategy - [KVC0|Acc]; - {_, _, retain} -> - % If we have a retain strategy, it can't be - % discarded - but the value part is no - % longer required as this version has been - % replaced - {JK0, JV0} = - leveled_codec:revert_to_keydeltas(JK, JV), - [{JK0, JV0, null}|Acc]; - {_, _, _} -> - % This is out of date and not retained so - % discard - Acc - end + convert -> + {JK0, JV0} = + leveled_codec:revert_to_keydeltas(JK, JV), + [{JK0, JV0, null}|Acc]; + false -> + Acc end end end. +-spec to_retain(leveled_codec:journal_key(), + leveled_inker:filterfun(), + leveled_inker:fillter_server(), + leveled_codec:sqn(), + leveled_codec:compaction_strategy()) -> boolean()|convert. +to_retain(JournalKey, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> + {SQN, LK} = + leveled_codec:from_journalkey(JournalKey), + CompactStrategy = + leveled_codec:get_tagstrategy(LK, ReloadStrategy), + IsJournalEntry = + leveled_codec:is_full_journalentry(JournalKey), + case {CompactStrategy, IsJournalEntry} of + {retain, false} -> + true; + _ -> + KeyCurrent = FilterFun(FilterServer, LK, SQN), + IsInMemory = SQN > MaxSQN, + case {KeyCurrent, IsInMemory, CompactStrategy} of + {KC, InMem, _} when KC == current; InMem -> + % This entry may still be required + % regardless of strategy + true; + {_, _, retain} -> + % If we have a retain strategy, it can't be + % discarded - but the value part is no + % longer required as this version has been + % replaced + convert; + {_, _, _} -> + % This is out of date and not retained so + % discard + false + end + end. + + write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> {Journal0, ManSlice0}; write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 8391007..4eb34c6 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -157,6 +157,14 @@ -type inker_options() :: #inker_options{}. -type ink_state() :: #state{}. -type registered_snapshot() :: {pid(), os:timestamp(), integer()}. +-type filterserver() :: pid()|list(tuple()). +-type filterfun() :: + fun((filterserver(), leveled_codec:ledger_key(), leveled_codec:sqn()) -> + current|replaced|missing). +-type filterclosefun() :: fun((filterserver()) -> ok). +-type filterinitfun() :: fun((pid()) -> {filterserver(), leveled_codec:sqn()}). + +-export_type([filterserver/0, filterfun/0, filterclosefun/0, filterinitfun/0]). %%%============================================================================ %%% API