iClerk refactor

the skip/retain/recalc handlign was confusing.  This removes the switcheroo between leveled_codec and leveled_iclerk when mkaing the decision.

Also now the building of the accumulator is handled efficiently (not using ++ on the list).

Tried to rmeove as much of ?HEAD tag handling from leveled_head - as we want leveled_head to be only concerned with the head manipulation for object tags (?STD, ?RIAK and user-defined).
This commit is contained in:
Martin Sumner 2018-12-06 22:45:05 +00:00
parent 8e687ee7c8
commit e0352414f2
4 changed files with 116 additions and 133 deletions

View file

@ -1033,10 +1033,8 @@ book_destroy(Pid) ->
%% The function will be 1-arity, and can be passed the absolute folder name %% The function will be 1-arity, and can be passed the absolute folder name
%% to store the backup. %% to store the backup.
%% %%
%% Backup files are hard-linked. Does not work in head_only mode %% Backup files are hard-linked. Does not work in head_only mode, or if
%% %% index changes are used with a `skip` compaction/reload strategy
%% TODO: Can extend to head_only mode, and also support another parameter
%% which would backup persisted part of ledger (to make restart faster)
book_hotbackup(Pid) -> book_hotbackup(Pid) ->
gen_server:call(Pid, hot_backup, infinity). gen_server:call(Pid, hot_backup, infinity).

View file

@ -33,9 +33,10 @@
from_inkerkv/1, from_inkerkv/1,
from_inkerkv/2, from_inkerkv/2,
from_journalkey/1, from_journalkey/1,
compact_inkerkvc/2, revert_to_keydeltas/2,
split_inkvalue/1, split_inkvalue/1,
check_forinkertype/2, check_forinkertype/2,
get_tagstrategy/2,
maybe_compress/2, maybe_compress/2,
create_value_for_journal/3, create_value_for_journal/3,
generate_ledgerkv/5, generate_ledgerkv/5,
@ -170,7 +171,32 @@ segment_hash(Key) when is_binary(Key) ->
= leveled_tictac:keyto_segment48(Key), = leveled_tictac:keyto_segment48(Key),
{SegmentID, ExtraHash}; {SegmentID, ExtraHash};
segment_hash(KeyTuple) when is_tuple(KeyTuple) -> segment_hash(KeyTuple) when is_tuple(KeyTuple) ->
segment_hash(leveled_head:key_to_canonicalbinary(KeyTuple)). BinKey =
case element(1, Keytuple) of
?HEAD_TAG ->
headkey_to_canonicalbinary(KeyTuple);
_ ->
leveled_head:key_to_canonicalbinary(KeyTuple)
end,
segment_hash(BinKey).
headkey_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK})
when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
<<Bucket/binary, Key/binary, SubK/binary>>;
headkey_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null})
when is_binary(Bucket), is_binary(Key) ->
<<Bucket/binary, Key/binary>>;
headkey_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
when is_binary(BucketType), is_binary(Bucket) ->
headkey_to_canonicalbinary({?HEAD_TAG,
<<BucketType/binary, Bucket/binary>>,
Key,
SubKey});
headkey_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG ->
% In unit tests head specs can have non-binary keys, so handle
% this through hashing the whole key
term_to_binary(Key).
-spec to_lookup(ledger_key()) -> maybe_lookup(). -spec to_lookup(ledger_key()) -> maybe_lookup().
@ -334,36 +360,6 @@ inker_reload_strategy(AltList) ->
AltList). AltList).
-spec compact_inkerkvc({journal_key(), any(), boolean()},
compaction_strategy()) ->
skip|{retain, any()}|{recalc, null}.
%% @doc
%% Decide whether a superceded object should be replicated in the compacted
%% file and in what format.
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
skip;
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
skip;
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{_V, KeyDeltas} = revert_value_from_journal(V),
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat ->
{TagStrat, null}
end.
-spec get_tagstrategy(ledger_key(), compaction_strategy()) -spec get_tagstrategy(ledger_key(), compaction_strategy())
-> skip|retain|recalc. -> skip|retain|recalc.
%% @doc %% @doc
@ -398,6 +394,17 @@ to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
create_value_for_journal({Object, KeyChanges}, Compress, PressMethod), create_value_for_journal({Object, KeyChanges}, Compress, PressMethod),
{{SQN, InkerType, LedgerKey}, Value}. {{SQN, InkerType, LedgerKey}, Value}.
-spec revert_to_keydeltas(journal_key(), any()) -> {journal_key(), any()}.
%% @doc
%% If we wish to retain key deltas when an object in the Journal has been
%% replaced - then this converts a Journal Key and Value into one which has no
%% object body just the key deltas.
revert_to_keydeltas({SQN, ?INKT_STND, LedgerKey}, InkerV) ->
{_V, KeyDeltas} = revert_value_from_journal(InkerV),
{{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}};
revert_to_keydeltas(JournalKey, InkerV) ->
{JournalKey, InkerV}.
%% Used when fetching objects, so only handles standard, hashable entries %% Used when fetching objects, so only handles standard, hashable entries
from_inkerkv(Object) -> from_inkerkv(Object) ->
from_inkerkv(Object, false). from_inkerkv(Object, false).
@ -730,44 +737,6 @@ endkey_passed_test() ->
?assertMatch(true, endkey_passed(TestKey, K2)). ?assertMatch(true, endkey_passed(TestKey, K2)).
general_skip_strategy_test() ->
% Confirm that we will skip if the strategy says so
TagStrat1 = compact_inkerkvc({{1,
?INKT_STND,
{?STD_TAG, "B1", "K1andSK", null}},
{},
true},
[{?STD_TAG, skip}]),
?assertMatch(skip, TagStrat1),
TagStrat2 = compact_inkerkvc({{1,
?INKT_KEYD,
{?STD_TAG, "B1", "K1andSK", null}},
{},
true},
[{?STD_TAG, skip}]),
?assertMatch(skip, TagStrat2),
TagStrat3 = compact_inkerkvc({{1,
?INKT_KEYD,
{?IDX_TAG, "B1", "K1", "SK"}},
{},
true},
[{?STD_TAG, skip}]),
?assertMatch(skip, TagStrat3),
TagStrat4 = compact_inkerkvc({{1,
?INKT_KEYD,
{?IDX_TAG, "B1", "K1", "SK"}},
{},
true},
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
?assertMatch({recalc, null}, TagStrat4),
TagStrat5 = compact_inkerkvc({{1,
?INKT_TOMB,
{?IDX_TAG, "B1", "K1", "SK"}},
{},
true},
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
?assertMatch(skip, TagStrat5).
%% Test below proved that the overhead of performing hashes was trivial %% Test below proved that the overhead of performing hashes was trivial
%% Maybe 5 microseconds per hash %% Maybe 5 microseconds per hash

View file

@ -93,22 +93,6 @@
%%% Mutable External Functions %%% Mutable External Functions
%%%============================================================================ %%%============================================================================
-spec get_appdefined_function(appdefinable_function(),
fun(),
non_neg_integer()) -> fun().
%% @doc
%% If a keylist of [{function_name, fun()}] has been set as an environment
%% variable for a tag, then this FunctionName can be used instead of the
%% default
get_appdefined_function(FunctionName, DefaultFun, RequiredArity) ->
case application:get_env(leveled, FunctionName) of
undefined ->
DefaultFun;
{ok, Fun} when is_function(Fun, RequiredArity) ->
Fun
end.
-spec key_to_canonicalbinary(tuple()) -> binary(). -spec key_to_canonicalbinary(tuple()) -> binary().
%% @doc %% @doc
%% Convert a key to a binary in a consistent way for the tag. The binary will %% Convert a key to a binary in a consistent way for the tag. The binary will
@ -122,22 +106,6 @@ key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey})
<<BucketType/binary, Bucket/binary>>, <<BucketType/binary, Bucket/binary>>,
Key, Key,
SubKey}); SubKey});
key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK})
when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
<<Bucket/binary, Key/binary, SubK/binary>>;
key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null})
when is_binary(Bucket), is_binary(Key) ->
<<Bucket/binary, Key/binary>>;
key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
when is_binary(BucketType), is_binary(Bucket) ->
key_to_canonicalbinary({?HEAD_TAG,
<<BucketType/binary, Bucket/binary>>,
Key,
SubKey});
key_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG ->
% In unit tests head specs can have non-binary keys, so handle
% this through hashing the whole key
default_key_to_canonicalbinary(Key);
key_to_canonicalbinary(Key) when element(1, Key) == ?STD_TAG -> key_to_canonicalbinary(Key) when element(1, Key) == ?STD_TAG ->
default_key_to_canonicalbinary(Key); default_key_to_canonicalbinary(Key);
key_to_canonicalbinary(Key) -> key_to_canonicalbinary(Key) ->
@ -154,12 +122,13 @@ default_key_to_canonicalbinary(Key) ->
-spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head(). -spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head().
%% @doc %% @doc
%% Return the object metadata as a binary to be the "head" of the object %% Return the object metadata as a binary to be the "head" of the object
build_head(?HEAD_TAG, Value) ->
% Metadata is not extracted with head objects, the head response is
% just the unfiltered value that was input.
default_build_head(?HEAD_TAG, Value);
build_head(?RIAK_TAG, Metadata) -> build_head(?RIAK_TAG, Metadata) ->
{SibData, Vclock, _Hash, _Size} = Metadata, {SibData, Vclock, _Hash, _Size} = Metadata,
riak_metadata_to_binary(Vclock, SibData); riak_metadata_to_binary(Vclock, SibData);
build_head(?HEAD_TAG, Metadata) ->
% term_to_binary(Metadata).
default_build_head(?HEAD_TAG, Metadata);
build_head(?STD_TAG, Metadata) -> build_head(?STD_TAG, Metadata) ->
default_build_head(?STD_TAG, Metadata); default_build_head(?STD_TAG, Metadata);
build_head(Tag, Metadata) -> build_head(Tag, Metadata) ->
@ -173,7 +142,7 @@ default_build_head(_Tag, Metadata) ->
Metadata. Metadata.
-spec extract_metadata(object_tag()|headonly_tag(), non_neg_integer(), any()) -spec extract_metadata(object_tag(), non_neg_integer(), any())
-> {object_metadata(), list(erlang:timestamp())}. -> {object_metadata(), list(erlang:timestamp())}.
%% @doc %% @doc
%% Take the inbound object and extract from it the metadata to be stored within %% Take the inbound object and extract from it the metadata to be stored within
@ -187,10 +156,10 @@ default_build_head(_Tag, Metadata) ->
%% The Object Size passed in to this function is as calculated when writing %% The Object Size passed in to this function is as calculated when writing
%% the object to the Journal. It may be recalculated here, if an alternative %% the object to the Journal. It may be recalculated here, if an alternative
%% view of size is required within the header %% view of size is required within the header
%%
%% Note objects with a ?HEAD_TAG should never be passed, as there is no
extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) -> extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) ->
riak_extract_metadata(RiakObj, SizeAsStoredInJournal); riak_extract_metadata(RiakObj, SizeAsStoredInJournal);
extract_metadata(?HEAD_TAG, SizeAsStoredInJournal, Obj) ->
{{standard_hash(Obj), SizeAsStoredInJournal}, []};
extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj) -> extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj) ->
default_extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj); default_extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj);
extract_metadata(Tag, SizeAsStoredInJournal, Obj) -> extract_metadata(Tag, SizeAsStoredInJournal, Obj) ->
@ -251,6 +220,26 @@ get_hash(_Tag, ObjectMetadata) ->
standard_hash(Obj) -> standard_hash(Obj) ->
erlang:phash2(term_to_binary(Obj)). erlang:phash2(term_to_binary(Obj)).
%%%============================================================================
%%% Handling Override Functions
%%%============================================================================
-spec get_appdefined_function(appdefinable_function(),
fun(),
non_neg_integer()) -> fun().
%% @doc
%% If a keylist of [{function_name, fun()}] has been set as an environment
%% variable for a tag, then this FunctionName can be used instead of the
%% default
get_appdefined_function(FunctionName, DefaultFun, RequiredArity) ->
case application:get_env(leveled, FunctionName) of
undefined ->
DefaultFun;
{ok, Fun} when is_function(Fun, RequiredArity) ->
Fun
end.
%%%============================================================================ %%%============================================================================
%%% Tag-specific Internal Functions %%% Tag-specific Internal Functions
%%%============================================================================ %%%============================================================================

View file

@ -654,29 +654,56 @@ split_positions_into_batches(Positions, Journal, Batches) ->
Batches ++ [{Journal, ThisBatch}]). Batches ++ [{Journal, ThisBatch}]).
%% @doc
%% For the Keys and values taken from the Journal file, which are required
%% in the compacted journal file. To be required, they must still be active
%% (i.e. be the current SQN for that LedgerKey in the Ledger). However, if
%% it is not active, we still need to retain some information if for this
%% object tag we want to be able to rebuild the KeyStore by relaoding the
%% KeyDeltas (the retain reload strategy)
%%
%% If the reload strategy is recalc, we assume that we can reload by
%% recalculating the KeyChanges by looking at the object when we reload. So
%% old objects can be discarded.
%%
%% If the strategy is skip, we don't care about KeyDeltas. Note though, that
%% if the ledger is deleted it may not be possible to safely rebuild a KeyStore
%% if it contains index entries. The hot_backup approach is also not safe with
%% a `skip` strategy.
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
lists:foldl(fun(KVC0, Acc) -> FoldFun =
R = leveled_codec:compact_inkerkvc(KVC0, ReloadStrategy), fun(KVC0, Acc) ->
case R of case KVC0 of
skip -> {_InkKey, crc_wonky, false} ->
Acc; % Bad entry, disregard, don't check
{TStrat, KVC1} -> Acc;
{K, _V, CrcCheck} = KVC0, {JK, JV, _Check} ->
{SQN, LedgerKey} = leveled_codec:from_journalkey(K), {SQN, LK} =
KeyValid = FilterFun(FilterServer, LedgerKey, SQN), leveled_codec:from_journalkey(JK),
case {KeyValid, CrcCheck, SQN > MaxSQN, TStrat} of CompactStrategy =
{false, true, false, retain} -> leveled_codec:get_tagstrategy(LK, ReloadStrategy),
Acc ++ [KVC1]; KeyValid = FilterFun(FilterServer, LK, SQN),
{false, true, false, _} -> IsInMemory = SQN > MaxSQN,
Acc; case {KeyValid or IsInMemory, CompactStrategy} of
_ -> {true, _} ->
Acc ++ [KVC0] % This entry may still be required regardless of
end % strategy
[KVC0|Acc];
{false, retain} ->
% If we have a retain startegy, it can't be
% discarded - but the value part is no longer
% required as this version has been replaced
{JK0, JV0} =
leveled_codec:revert_to_keydeltas(JK, JV),
[{JK0, JV0, null}|Acc];
{false, _} ->
% This is out of date and not retained - discard
Acc
end end
end, end
[], end,
KVCs). lists:reverse(lists:foldl(FoldFun, [], KVCs)).
write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
{Journal0, ManSlice0}; {Journal0, ManSlice0};