leveled_codec spec/doc

Try and make this code a little bit more organised andd easier to follow
This commit is contained in:
Martin Sumner 2018-05-03 17:18:13 +01:00
parent 931129d25f
commit f88f511df3
8 changed files with 274 additions and 186 deletions

View file

@ -47,7 +47,7 @@
to_ledgerkey/5,
from_ledgerkey/1,
from_ledgerkey/2,
to_inkerkv/3,
to_inkerkey/2,
to_inkerkv/6,
from_inkerkv/1,
from_inkerkv/2,
@ -64,10 +64,7 @@
idx_indexspecs/5,
obj_objectspecs/3,
aae_indexspecs/6,
generate_uuid/0,
integer_now/0,
riak_extract_metadata/2,
magic_hash/1,
segment_hash/1,
to_lookup/1,
riak_metadata_to_binary/2]).
@ -84,8 +81,31 @@
integer()|null, % Hash of vclock - non-exportable
integer()}. % Size in bytes of real object
-type tag() :: ?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG.
-type segment_hash() ::
{integer(), integer()}.
-type ledger_status() ::
tomb|{active, non_neg_integer()|infinity}.
-type ledger_key() ::
{tag(), any(), any(), any()}.
-type ledger_value() ::
{integer(), ledger_status(), segment_hash(), tuple()}.
-type ledger_kv() ::
{ledger_key(), ledger_value()}.
-type compaction_strategy() ::
list({tag(), retain|skip|recalc}).
-type journal_key_tag() ::
?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD.
-type journal_key() ::
{integer(), journal_key_tag(), ledger_key()}.
-type compression_method() ::
lz4|native.
-spec segment_hash(any()) -> {integer(), integer()}.
%%%============================================================================
%%% Ledger Key Manipulation
%%%============================================================================
-spec segment_hash(ledger_key()|binary()) -> {integer(), integer()}.
%% @doc
%% Return two 16 bit integers - the segment ID and a second integer for spare
%% entropy. The hashed should be used in blooms or indexes such that some
@ -107,28 +127,7 @@ segment_hash(Key) ->
segment_hash(term_to_binary(Key)).
-spec magic_hash(any()) -> integer().
%% @doc
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
%% phash2 but provides a much more balanced result.
%%
%% Hash function contains mysterious constants, some explanation here as to
%% what they are -
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
magic_hash({binary, BinaryKey}) ->
H = 5381,
hash1(H, BinaryKey) band 16#FFFFFFFF;
magic_hash(AnyKey) ->
BK = term_to_binary(AnyKey),
magic_hash({binary, BK}).
hash1(H, <<>>) ->
H;
hash1(H, <<B:8/integer, Rest/bytes>>) ->
H1 = H * 33,
H2 = H1 bxor B,
hash1(H2, Rest).
-spec to_lookup(ledger_key()) -> lookup|no_lookup.
%% @doc
%% Should it be possible to lookup a key in the merge tree. This is not true
%% For keys that should only be read through range queries. Direct lookup
@ -141,36 +140,30 @@ to_lookup(Key) ->
lookup
end.
-spec generate_uuid() -> list().
%% @doc
%% Generate a new globally unique ID as a string.
%% Credit to
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
generate_uuid() ->
<<A:32, B:16, C:16, D:16, E:48>> = leveled_rand:rand_bytes(16),
L = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]),
binary_to_list(list_to_binary(L)).
inker_reload_strategy(AltList) ->
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
lists:foldl(fun({X, Y}, SList) ->
lists:keyreplace(X, 1, SList, {X, Y})
end,
ReloadStrategy0,
AltList).
%% Some helper functions to get a sub_components of the key/value
-spec strip_to_statusonly(ledger_kv()) -> ledger_status().
strip_to_statusonly({_, {_, St, _, _}}) -> St.
-spec strip_to_seqonly(ledger_kv()) -> non_neg_integer().
strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN.
-spec strip_to_keyseqonly(ledger_kv()) -> {ledger_key(), integer()}.
strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}.
-spec strip_to_seqnhashonly(ledger_kv()) -> {integer(), segment_hash()}.
strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}.
-spec striphead_to_details(ledger_value()) -> ledger_value().
striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}.
-spec key_dominates(ledger_kv(), ledger_kv()) ->
left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant.
%% @doc
%% When comparing two keys in the ledger need to find if one key comes before
%% the other, or if the match, which key is "better" and should be the winner
key_dominates(LeftKey, RightKey) ->
case {LeftKey, RightKey} of
{{LK, _LVAL}, {RK, _RVAL}} when LK < RK ->
@ -185,7 +178,11 @@ key_dominates(LeftKey, RightKey) ->
right_hand_dominant
end.
-spec maybe_reap_expiredkey(ledger_kv(), {boolean(), integer()}) -> boolean().
%% @doc
%% Make a reap decision based on the level in the ledger (needs to be expired
%% and in the basement). the level is a tuple of the is_basement boolean, and
%% a timestamp passed into the calling function
maybe_reap_expiredkey(KV, LevelD) ->
Status = strip_to_statusonly(KV),
maybe_reap(Status, LevelD).
@ -199,6 +196,9 @@ maybe_reap(tomb, {true, _CurrTS}) ->
maybe_reap(_, _) ->
false.
-spec is_active(ledger_key(), ledger_value(), non_neg_integer()) -> boolean().
%% @doc
%% Is this an active KV pair or has the timestamp expired
is_active(Key, Value, Now) ->
case strip_to_statusonly({Key, Value}) of
{active, infinity} ->
@ -231,21 +231,116 @@ from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) ->
from_ledgerkey({_Tag, Bucket, Key, _SubKey}) ->
{Bucket, Key}.
-spec to_ledgerkey(any(), any(), tag(), any(), any()) -> ledger_key().
%% @doc
%% Convert something into a ledger key
to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
{?IDX_TAG, Bucket, {Field, Value}, Key}.
-spec to_ledgerkey(any(), any(), tag()) -> ledger_key().
%% @doc
%% Convert something into a ledger key
to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) ->
{?HEAD_TAG, Bucket, Key, SubKey};
to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}.
-spec endkey_passed(ledger_key(), ledger_key()) -> boolean().
%% @oc
%% Compare a key against a query key, only comparing elements that are non-null
%% in the Query key. This is used for comparing against end keys in queries.
endkey_passed(all, _) ->
false;
endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) ->
EK1 < CK1;
endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) ->
{EK1, EK2} < {CK1, CK2};
endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
{EK1, EK2, EK3} < {CK1, CK2, CK3};
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
%% Return the Key, Value and Hash Option for this object. The hash option
%% indicates whether the key would ever be looked up directly, and so if it
%% requires an entry in the hash table
to_inkerkv(LedgerKey, SQN, to_fetch) ->
{{SQN, ?INKT_STND, LedgerKey}, null, true}.
%%%============================================================================
%%% Journal Compaction functions
%%%============================================================================
-spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy().
%% @doc
%% Take the default startegy for compaction, and override the approach for any
%% tags passed in
inker_reload_strategy(AltList) ->
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
lists:foldl(fun({X, Y}, SList) ->
lists:keyreplace(X, 1, SList, {X, Y})
end,
ReloadStrategy0,
AltList).
-spec compact_inkerkvc({journal_key(), any(), boolean()},
compaction_strategy()) ->
skip|{retain, any()}|{recalc, null}.
%% @doc
%% Decide whether a superceded object should be replicated in the compacted
%% file and in what format.
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
skip;
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
skip;
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{_V, KeyDeltas} = revert_value_from_journal(V),
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat ->
{TagStrat, null}
end.
-spec get_tagstrategy(ledger_key(), compaction_strategy())
-> skip|retain|recalc.
%% @doc
%% Work out the compaction startegy for the key
get_tagstrategy(LK, Strategy) ->
case LK of
{Tag, _, _, _} ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
TagStrat;
false ->
leveled_log:log("IC012", [Tag, Strategy]),
skip
end;
_ ->
skip
end.
%%%============================================================================
%%% Manipulate Journal Key and Value
%%%============================================================================
-spec to_inkerkey(ledger_key(), non_neg_integer()) -> journal_key().
%% @doc
%% convertion from ledger_key to journal_key to allow for the key to be fetched
to_inkerkey(LedgerKey, SQN) ->
{SQN, ?INKT_STND, LedgerKey}.
-spec to_inkerkv(ledger_key(), non_neg_integer(), any(), list(),
compression_method(), boolean()) -> {journal_key(), any()}.
%% @doc
%% Convert to the correct format of a Journal key and value
to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
InkerType = check_forinkertype(LedgerKey, Object),
Value =
@ -368,45 +463,6 @@ decode_valuetype(TypeInt) ->
from_journalkey({SQN, _Type, LedgerKey}) ->
{SQN, LedgerKey}.
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
skip;
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
skip;
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{_V, KeyDeltas} = revert_value_from_journal(V),
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
compact_inkerkvc(_KVC, _Strategy) ->
skip.
get_tagstrategy(LK, Strategy) ->
case LK of
{Tag, _, _, _} ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
TagStrat;
false ->
leveled_log:log("IC012", [Tag, Strategy]),
skip
end;
_ ->
skip
end.
split_inkvalue(VBin) when is_binary(VBin) ->
revert_value_from_journal(VBin).
@ -422,18 +478,10 @@ hash(Obj) ->
erlang:phash2(term_to_binary(Obj)).
% Compare a key against a query key, only comparing elements that are non-null
% in the Query key. This is used for comparing against end keys in queries.
endkey_passed(all, _) ->
false;
endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) ->
EK1 < CK1;
endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) ->
{EK1, EK2} < {CK1, CK2};
endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
{EK1, EK2, EK3} < {CK1, CK2, CK3};
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
%%%============================================================================
%%% Other functions
%%%============================================================================
obj_objectspecs(ObjectSpecs, SQN, TTL) ->
@ -528,7 +576,7 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
Dates = parse_date(LMD0,
AAE#recent_aae.unit_minutes,
AAE#recent_aae.limit_minutes,
integer_now()),
leveled_util:integer_now()),
case Dates of
no_index ->
Acc;
@ -557,12 +605,12 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
-spec parse_date(tuple(), integer(), integer(), integer()) ->
no_index|{binary(), integer()}.
%% @doc
%% Parse the lat modified date and the AAE date configuration to return a
%% Parse the last modified date and the AAE date configuration to return a
%% binary to be used as the last modified date part of the index, and an
%% integer to be used as the TTL of the index entry.
%% Return no_index if the change is not recent.
parse_date(LMD, UnitMins, LimitMins, Now) ->
LMDsecs = integer_time(LMD),
LMDsecs = leveled_util:integer_time(LMD),
Recent = (LMDsecs + LimitMins * 60) > Now,
case Recent of
false ->
@ -614,13 +662,6 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
{Bucket, Key, Value, {Hash, ObjHash}, LastMods}.
integer_now() ->
integer_time(os:timestamp()).
integer_time(TS) ->
DT = calendar:now_to_universal_time(TS),
calendar:datetime_to_gregorian_seconds(DT).
extract_metadata(Obj, Size, ?RIAK_TAG) ->
riak_extract_metadata(Obj, Size);
extract_metadata(Obj, Size, ?STD_TAG) ->
@ -782,37 +823,18 @@ endkey_passed_test() ->
?assertMatch(true, endkey_passed(TestKey, K2)).
corrupted_ledgerkey_test() ->
% When testing for compacted journal which has been corrupted, there may
% be a corruptes ledger key. Always skip these keys
% Key has become a 3-tuple not a 4-tuple
TagStrat1 = compact_inkerkvc({{1,
?INKT_STND,
{?STD_TAG, "B1", "K1andSK"}},
{},
true},
[{?STD_TAG, retain}]),
?assertMatch(skip, TagStrat1),
TagStrat2 = compact_inkerkvc({{1,
?INKT_KEYD,
{?STD_TAG, "B1", "K1andSK"}},
{},
true},
[{?STD_TAG, retain}]),
?assertMatch(skip, TagStrat2).
general_skip_strategy_test() ->
% Confirm that we will skip if the strategy says so
TagStrat1 = compact_inkerkvc({{1,
?INKT_STND,
{?STD_TAG, "B1", "K1andSK"}},
{?STD_TAG, "B1", "K1andSK", null}},
{},
true},
[{?STD_TAG, skip}]),
?assertMatch(skip, TagStrat1),
TagStrat2 = compact_inkerkvc({{1,
?INKT_KEYD,
{?STD_TAG, "B1", "K1andSK"}},
{?STD_TAG, "B1", "K1andSK", null}},
{},
true},
[{?STD_TAG, skip}]),
@ -839,15 +861,6 @@ general_skip_strategy_test() ->
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
?assertMatch(skip, TagStrat5).
corrupted_inker_tag_test() ->
% Confirm that we will skip on unknown inker tag
TagStrat1 = compact_inkerkvc({{1,
foo,
{?STD_TAG, "B1", "K1andSK"}},
{},
true},
[{?STD_TAG, retain}]),
?assertMatch(skip, TagStrat1).
%% Test below proved that the overhead of performing hashes was trivial
%% Maybe 5 microseconds per hash
@ -859,24 +872,10 @@ hashperf_test() ->
io:format(user, "1000 object hashes in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]).
magichashperf_test() ->
KeyFun =
fun(X) ->
K = {o, "Bucket", "Key" ++ integer_to_list(X), null},
{K, X}
end,
KL = lists:map(KeyFun, lists:seq(1, 1000)),
{TimeMH, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH]),
{TimePH, _Hl2} = timer:tc(lists, map, [fun(K) -> erlang:phash2(K) end, KL]),
io:format(user, "1000 keys phash2 hashed in ~w microseconds~n", [TimePH]),
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
parsedate_test() ->
{MeS, S, MiS} = os:timestamp(),
timer:sleep(100),
Now = integer_now(),
Now = leveled_util:integer_now(),
UnitMins = 5,
LimitMins = 60,
PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now),
@ -898,7 +897,7 @@ check_pd(PD, UnitMins) ->
parseolddate_test() ->
LMD = os:timestamp(),
timer:sleep(100),
Now = integer_now() + 60 * 60,
Now = leveled_util:integer_now() + 60 * 60,
UnitMins = 5,
LimitMins = 60,
PD = parse_date(LMD, UnitMins, LimitMins, Now),