diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 30d2caf..1333eed 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1033,10 +1033,8 @@ book_destroy(Pid) -> %% The function will be 1-arity, and can be passed the absolute folder name %% to store the backup. %% -%% Backup files are hard-linked. Does not work in head_only mode -%% -%% TODO: Can extend to head_only mode, and also support another parameter -%% which would backup persisted part of ledger (to make restart faster) +%% Backup files are hard-linked. Does not work in head_only mode, or if +%% index changes are used with a `skip` compaction/reload strategy book_hotbackup(Pid) -> gen_server:call(Pid, hot_backup, infinity). diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 6e0ce63..977bcaa 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -33,9 +33,10 @@ from_inkerkv/1, from_inkerkv/2, from_journalkey/1, - compact_inkerkvc/2, + revert_to_keydeltas/2, split_inkvalue/1, check_forinkertype/2, + get_tagstrategy/2, maybe_compress/2, create_value_for_journal/3, generate_ledgerkv/5, @@ -170,7 +171,32 @@ segment_hash(Key) when is_binary(Key) -> = leveled_tictac:keyto_segment48(Key), {SegmentID, ExtraHash}; segment_hash(KeyTuple) when is_tuple(KeyTuple) -> - segment_hash(leveled_head:key_to_canonicalbinary(KeyTuple)). + BinKey = + case element(1, Keytuple) of + ?HEAD_TAG -> + headkey_to_canonicalbinary(KeyTuple); + _ -> + leveled_head:key_to_canonicalbinary(KeyTuple) + end, + segment_hash(BinKey). + + +headkey_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK}) + when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> + <>; +headkey_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null}) + when is_binary(Bucket), is_binary(Key) -> + <>; +headkey_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) + when is_binary(BucketType), is_binary(Bucket) -> + headkey_to_canonicalbinary({?HEAD_TAG, + <>, + Key, + SubKey}); +headkey_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG -> + % In unit tests head specs can have non-binary keys, so handle + % this through hashing the whole key + term_to_binary(Key). -spec to_lookup(ledger_key()) -> maybe_lookup(). @@ -334,36 +360,6 @@ inker_reload_strategy(AltList) -> AltList). --spec compact_inkerkvc({journal_key(), any(), boolean()}, - compaction_strategy()) -> - skip|{retain, any()}|{recalc, null}. -%% @doc -%% Decide whether a superceded object should be replicated in the compacted -%% file and in what format. -compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) -> - skip; -compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) -> - skip; -compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) -> - case get_tagstrategy(LK, Strategy) of - skip -> - skip; - retain -> - {retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}}; - TagStrat -> - {TagStrat, null} - end; -compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> - case get_tagstrategy(LK, Strategy) of - skip -> - skip; - retain -> - {_V, KeyDeltas} = revert_value_from_journal(V), - {retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; - TagStrat -> - {TagStrat, null} - end. - -spec get_tagstrategy(ledger_key(), compaction_strategy()) -> skip|retain|recalc. %% @doc @@ -398,6 +394,17 @@ to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) -> create_value_for_journal({Object, KeyChanges}, Compress, PressMethod), {{SQN, InkerType, LedgerKey}, Value}. +-spec revert_to_keydeltas(journal_key(), any()) -> {journal_key(), any()}. +%% @doc +%% If we wish to retain key deltas when an object in the Journal has been +%% replaced - then this converts a Journal Key and Value into one which has no +%% object body just the key deltas. +revert_to_keydeltas({SQN, ?INKT_STND, LedgerKey}, InkerV) -> + {_V, KeyDeltas} = revert_value_from_journal(InkerV), + {{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}}; +revert_to_keydeltas(JournalKey, InkerV) -> + {JournalKey, InkerV}. + %% Used when fetching objects, so only handles standard, hashable entries from_inkerkv(Object) -> from_inkerkv(Object, false). @@ -730,44 +737,6 @@ endkey_passed_test() -> ?assertMatch(true, endkey_passed(TestKey, K2)). -general_skip_strategy_test() -> - % Confirm that we will skip if the strategy says so - TagStrat1 = compact_inkerkvc({{1, - ?INKT_STND, - {?STD_TAG, "B1", "K1andSK", null}}, - {}, - true}, - [{?STD_TAG, skip}]), - ?assertMatch(skip, TagStrat1), - TagStrat2 = compact_inkerkvc({{1, - ?INKT_KEYD, - {?STD_TAG, "B1", "K1andSK", null}}, - {}, - true}, - [{?STD_TAG, skip}]), - ?assertMatch(skip, TagStrat2), - TagStrat3 = compact_inkerkvc({{1, - ?INKT_KEYD, - {?IDX_TAG, "B1", "K1", "SK"}}, - {}, - true}, - [{?STD_TAG, skip}]), - ?assertMatch(skip, TagStrat3), - TagStrat4 = compact_inkerkvc({{1, - ?INKT_KEYD, - {?IDX_TAG, "B1", "K1", "SK"}}, - {}, - true}, - [{?STD_TAG, skip}, {?IDX_TAG, recalc}]), - ?assertMatch({recalc, null}, TagStrat4), - TagStrat5 = compact_inkerkvc({{1, - ?INKT_TOMB, - {?IDX_TAG, "B1", "K1", "SK"}}, - {}, - true}, - [{?STD_TAG, skip}, {?IDX_TAG, recalc}]), - ?assertMatch(skip, TagStrat5). - %% Test below proved that the overhead of performing hashes was trivial %% Maybe 5 microseconds per hash diff --git a/src/leveled_head.erl b/src/leveled_head.erl index 93d0330..d6ffd4f 100644 --- a/src/leveled_head.erl +++ b/src/leveled_head.erl @@ -93,22 +93,6 @@ %%% Mutable External Functions %%%============================================================================ --spec get_appdefined_function(appdefinable_function(), - fun(), - non_neg_integer()) -> fun(). -%% @doc -%% If a keylist of [{function_name, fun()}] has been set as an environment -%% variable for a tag, then this FunctionName can be used instead of the -%% default -get_appdefined_function(FunctionName, DefaultFun, RequiredArity) -> - case application:get_env(leveled, FunctionName) of - undefined -> - DefaultFun; - {ok, Fun} when is_function(Fun, RequiredArity) -> - Fun - end. - - -spec key_to_canonicalbinary(tuple()) -> binary(). %% @doc %% Convert a key to a binary in a consistent way for the tag. The binary will @@ -122,22 +106,6 @@ key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey}) <>, Key, SubKey}); -key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK}) - when is_binary(Bucket), is_binary(Key), is_binary(SubK) -> - <>; -key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, null}) - when is_binary(Bucket), is_binary(Key) -> - <>; -key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey}) - when is_binary(BucketType), is_binary(Bucket) -> - key_to_canonicalbinary({?HEAD_TAG, - <>, - Key, - SubKey}); -key_to_canonicalbinary(Key) when element(1, Key) == ?HEAD_TAG -> - % In unit tests head specs can have non-binary keys, so handle - % this through hashing the whole key - default_key_to_canonicalbinary(Key); key_to_canonicalbinary(Key) when element(1, Key) == ?STD_TAG -> default_key_to_canonicalbinary(Key); key_to_canonicalbinary(Key) -> @@ -154,12 +122,13 @@ default_key_to_canonicalbinary(Key) -> -spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head(). %% @doc %% Return the object metadata as a binary to be the "head" of the object +build_head(?HEAD_TAG, Value) -> + % Metadata is not extracted with head objects, the head response is + % just the unfiltered value that was input. + default_build_head(?HEAD_TAG, Value); build_head(?RIAK_TAG, Metadata) -> {SibData, Vclock, _Hash, _Size} = Metadata, riak_metadata_to_binary(Vclock, SibData); -build_head(?HEAD_TAG, Metadata) -> - % term_to_binary(Metadata). - default_build_head(?HEAD_TAG, Metadata); build_head(?STD_TAG, Metadata) -> default_build_head(?STD_TAG, Metadata); build_head(Tag, Metadata) -> @@ -173,7 +142,7 @@ default_build_head(_Tag, Metadata) -> Metadata. --spec extract_metadata(object_tag()|headonly_tag(), non_neg_integer(), any()) +-spec extract_metadata(object_tag(), non_neg_integer(), any()) -> {object_metadata(), list(erlang:timestamp())}. %% @doc %% Take the inbound object and extract from it the metadata to be stored within @@ -187,10 +156,10 @@ default_build_head(_Tag, Metadata) -> %% The Object Size passed in to this function is as calculated when writing %% the object to the Journal. It may be recalculated here, if an alternative %% view of size is required within the header +%% +%% Note objects with a ?HEAD_TAG should never be passed, as there is no extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) -> riak_extract_metadata(RiakObj, SizeAsStoredInJournal); -extract_metadata(?HEAD_TAG, SizeAsStoredInJournal, Obj) -> - {{standard_hash(Obj), SizeAsStoredInJournal}, []}; extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj) -> default_extract_metadata(?STD_TAG, SizeAsStoredInJournal, Obj); extract_metadata(Tag, SizeAsStoredInJournal, Obj) -> @@ -251,6 +220,26 @@ get_hash(_Tag, ObjectMetadata) -> standard_hash(Obj) -> erlang:phash2(term_to_binary(Obj)). + +%%%============================================================================ +%%% Handling Override Functions +%%%============================================================================ + +-spec get_appdefined_function(appdefinable_function(), + fun(), + non_neg_integer()) -> fun(). +%% @doc +%% If a keylist of [{function_name, fun()}] has been set as an environment +%% variable for a tag, then this FunctionName can be used instead of the +%% default +get_appdefined_function(FunctionName, DefaultFun, RequiredArity) -> + case application:get_env(leveled, FunctionName) of + undefined -> + DefaultFun; + {ok, Fun} when is_function(Fun, RequiredArity) -> + Fun + end. + %%%============================================================================ %%% Tag-specific Internal Functions %%%============================================================================ diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 11ca1ba..957f3cb 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -654,29 +654,56 @@ split_positions_into_batches(Positions, Journal, Batches) -> Batches ++ [{Journal, ThisBatch}]). +%% @doc +%% For the Keys and values taken from the Journal file, which are required +%% in the compacted journal file. To be required, they must still be active +%% (i.e. be the current SQN for that LedgerKey in the Ledger). However, if +%% it is not active, we still need to retain some information if for this +%% object tag we want to be able to rebuild the KeyStore by relaoding the +%% KeyDeltas (the retain reload strategy) +%% +%% If the reload strategy is recalc, we assume that we can reload by +%% recalculating the KeyChanges by looking at the object when we reload. So +%% old objects can be discarded. +%% +%% If the strategy is skip, we don't care about KeyDeltas. Note though, that +%% if the ledger is deleted it may not be possible to safely rebuild a KeyStore +%% if it contains index entries. The hot_backup approach is also not safe with +%% a `skip` strategy. filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> - lists:foldl(fun(KVC0, Acc) -> - R = leveled_codec:compact_inkerkvc(KVC0, ReloadStrategy), - case R of - skip -> - Acc; - {TStrat, KVC1} -> - {K, _V, CrcCheck} = KVC0, - {SQN, LedgerKey} = leveled_codec:from_journalkey(K), - KeyValid = FilterFun(FilterServer, LedgerKey, SQN), - case {KeyValid, CrcCheck, SQN > MaxSQN, TStrat} of - {false, true, false, retain} -> - Acc ++ [KVC1]; - {false, true, false, _} -> - Acc; - _ -> - Acc ++ [KVC0] - end + FoldFun = + fun(KVC0, Acc) -> + case KVC0 of + {_InkKey, crc_wonky, false} -> + % Bad entry, disregard, don't check + Acc; + {JK, JV, _Check} -> + {SQN, LK} = + leveled_codec:from_journalkey(JK), + CompactStrategy = + leveled_codec:get_tagstrategy(LK, ReloadStrategy), + KeyValid = FilterFun(FilterServer, LK, SQN), + IsInMemory = SQN > MaxSQN, + case {KeyValid or IsInMemory, CompactStrategy} of + {true, _} -> + % This entry may still be required regardless of + % strategy + [KVC0|Acc]; + {false, retain} -> + % If we have a retain startegy, it can't be + % discarded - but the value part is no longer + % required as this version has been replaced + {JK0, JV0} = + leveled_codec:revert_to_keydeltas(JK, JV), + [{JK0, JV0, null}|Acc]; + {false, _} -> + % This is out of date and not retained - discard + Acc end - end, - [], - KVCs). - + end + end, + lists:reverse(lists:foldl(FoldFun, [], KVCs)). + write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> {Journal0, ManSlice0};