leveled/src/leveled_codec.erl

970 lines
34 KiB
Erlang
Raw Normal View History

%% -------- Key Codec ---------
%%
%% Functions for manipulating keys and values within leveled.
%%
%%
%% Within the LEDGER:
%% Keys are of the form -
%% {Tag, Bucket, Key, SubKey|null}
%% Values are of the form
%% {SQN, Status, MD}
%%
%% Within the JOURNAL:
%% Keys are of the form -
%% {SQN, LedgerKey}
%% Values are of the form
%% {Object, IndexSpecs} (as a binary)
%%
%% IndexSpecs are of the form of a Ledger Key/Value
%%
%% Tags need to be set during PUT operations and each Tag used must be
%% supported in an extract_metadata and a build_metadata_object function clause
%%
%% Currently the only tags supported are:
%% - o (standard objects)
%% - o_rkv (riak objects)
%% - i (index entries)
-module(leveled_codec).
-include("include/leveled.hrl").
-include_lib("eunit/include/eunit.hrl").
-export([
inker_reload_strategy/1,
strip_to_seqonly/1,
strip_to_statusonly/1,
strip_to_keyseqonly/1,
strip_to_seqnhashonly/1,
striphead_to_details/1,
is_active/3,
endkey_passed/2,
key_dominates/2,
maybe_reap_expiredkey/2,
to_ledgerkey/3,
to_ledgerkey/5,
from_ledgerkey/1,
from_ledgerkey/2,
to_inkerkey/2,
to_inkerkv/6,
from_inkerkv/1,
from_inkerkv/2,
from_journalkey/1,
compact_inkerkvc/2,
split_inkvalue/1,
check_forinkertype/2,
maybe_compress/2,
create_value_for_journal/3,
build_metadata_object/2,
generate_ledgerkv/5,
get_size/2,
get_keyandobjhash/2,
idx_indexspecs/5,
obj_objectspecs/3,
aae_indexspecs/6,
riak_extract_metadata/2,
segment_hash/1,
to_lookup/1,
riak_metadata_to_binary/2]).
-define(V1_VERS, 1).
-define(MAGIC, 53). % riak_kv -> riak_object
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae.").
-define(ALL_BUCKETS, <<"$all">>).
-type recent_aae() :: #recent_aae{}.
-type riak_metadata() :: {binary()|delete, % Sibling Metadata
binary()|null, % Vclock Metadata
integer()|null, % Hash of vclock - non-exportable
integer()}. % Size in bytes of real object
2018-05-03 18:26:02 +01:00
-type tag() ::
?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG.
-type segment_hash() ::
{integer(), integer()}.
-type ledger_status() ::
tomb|{active, non_neg_integer()|infinity}.
-type ledger_key() ::
{tag(), any(), any(), any()}.
-type ledger_value() ::
{integer(), ledger_status(), segment_hash(), tuple()}.
-type ledger_kv() ::
{ledger_key(), ledger_value()}.
-type compaction_strategy() ::
list({tag(), retain|skip|recalc}).
-type journal_key_tag() ::
?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD.
-type journal_key() ::
{integer(), journal_key_tag(), ledger_key()}.
-type compression_method() ::
lz4|native.
%%%============================================================================
%%% Ledger Key Manipulation
%%%============================================================================
-spec segment_hash(ledger_key()|binary()) -> {integer(), integer()}.
%% @doc
%% Return two 16 bit integers - the segment ID and a second integer for spare
%% entropy. The hashed should be used in blooms or indexes such that some
%% speed can be gained if just the segment ID is known - but more can be
%% gained should the extended hash (with the second element) is known
segment_hash(Key) when is_binary(Key) ->
{segment_hash, SegmentID, ExtraHash} = leveled_tictac:keyto_segment48(Key),
{SegmentID, ExtraHash};
segment_hash({?RIAK_TAG, Bucket, Key, null})
when is_binary(Bucket), is_binary(Key) ->
segment_hash(<<Bucket/binary, Key/binary>>);
segment_hash({?HEAD_TAG, Bucket, Key, SubK})
when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
segment_hash(<<Bucket/binary, Key/binary, SubK/binary>>);
segment_hash({?HEAD_TAG, Bucket, Key, _SubK})
when is_binary(Bucket), is_binary(Key) ->
segment_hash(<<Bucket/binary, Key/binary>>);
segment_hash(Key) ->
segment_hash(term_to_binary(Key)).
-spec to_lookup(ledger_key()) -> lookup|no_lookup.
%% @doc
%% Should it be possible to lookup a key in the merge tree. This is not true
%% For keys that should only be read through range queries. Direct lookup
%% keys will have presence in bloom filters and other lookup accelerators.
to_lookup(Key) ->
case element(1, Key) of
?IDX_TAG ->
no_lookup;
_ ->
lookup
end.
%% @doc
%% Some helper functions to get a sub_components of the key/value
-spec strip_to_statusonly(ledger_kv()) -> ledger_status().
strip_to_statusonly({_, {_, St, _, _}}) -> St.
-spec strip_to_seqonly(ledger_kv()) -> non_neg_integer().
strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN.
-spec strip_to_keyseqonly(ledger_kv()) -> {ledger_key(), integer()}.
strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}.
-spec strip_to_seqnhashonly(ledger_kv()) -> {integer(), segment_hash()}.
strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}.
-spec striphead_to_details(ledger_value()) -> ledger_value().
striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}.
-spec key_dominates(ledger_kv(), ledger_kv()) ->
left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant.
%% @doc
%% When comparing two keys in the ledger need to find if one key comes before
%% the other, or if the match, which key is "better" and should be the winner
key_dominates(LeftKey, RightKey) ->
case {LeftKey, RightKey} of
{{LK, _LVAL}, {RK, _RVAL}} when LK < RK ->
left_hand_first;
{{LK, _LVAL}, {RK, _RVAL}} when RK < LK ->
right_hand_first;
{{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}}
when LK == RK, LSN >= RSN ->
left_hand_dominant;
{{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}}
when LK == RK, LSN < RSN ->
right_hand_dominant
end.
-spec maybe_reap_expiredkey(ledger_kv(), {boolean(), integer()}) -> boolean().
%% @doc
%% Make a reap decision based on the level in the ledger (needs to be expired
%% and in the basement). the level is a tuple of the is_basement boolean, and
%% a timestamp passed into the calling function
maybe_reap_expiredkey(KV, LevelD) ->
Status = strip_to_statusonly(KV),
maybe_reap(Status, LevelD).
maybe_reap({_, infinity}, _) ->
false; % key is not set to expire
maybe_reap({_, TS}, {true, CurrTS}) when CurrTS > TS ->
true; % basement and ready to expire
maybe_reap(tomb, {true, _CurrTS}) ->
true; % always expire in basement
maybe_reap(_, _) ->
false.
-spec is_active(ledger_key(), ledger_value(), non_neg_integer()) -> boolean().
%% @doc
%% Is this an active KV pair or has the timestamp expired
is_active(Key, Value, Now) ->
case strip_to_statusonly({Key, Value}) of
{active, infinity} ->
true;
tomb ->
false;
{active, TS} when TS >= Now ->
true;
{active, _TS} ->
false
end.
-spec from_ledgerkey(atom(), tuple()) -> false|tuple().
%% @doc
%% Return the "significant information" from the Ledger Key (normally the
%% {Bucket, Key} pair) if and only if the ExpectedTag matched the tag -
%% otherwise return false
from_ledgerkey(ExpectedTag, {ExpectedTag, Bucket, Key, SubKey}) ->
from_ledgerkey({ExpectedTag, Bucket, Key, SubKey});
from_ledgerkey(_ExpectedTag, _OtherKey) ->
false.
-spec from_ledgerkey(tuple()) -> tuple().
%% @doc
%% Return identifying information from the LedgerKey
from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({_Tag, Bucket, Key, _SubKey}) ->
{Bucket, Key}.
-spec to_ledgerkey(any(), any(), tag(), any(), any()) -> ledger_key().
%% @doc
%% Convert something into a ledger key
to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
{?IDX_TAG, Bucket, {Field, Value}, Key}.
-spec to_ledgerkey(any(), any(), tag()) -> ledger_key().
%% @doc
%% Convert something into a ledger key
to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) ->
{?HEAD_TAG, Bucket, Key, SubKey};
to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}.
-spec endkey_passed(ledger_key(), ledger_key()) -> boolean().
%% @oc
%% Compare a key against a query key, only comparing elements that are non-null
%% in the Query key. This is used for comparing against end keys in queries.
endkey_passed(all, _) ->
false;
endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) ->
EK1 < CK1;
endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) ->
{EK1, EK2} < {CK1, CK2};
endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
{EK1, EK2, EK3} < {CK1, CK2, CK3};
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
%%%============================================================================
%%% Journal Compaction functions
%%%============================================================================
-spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy().
%% @doc
%% Take the default startegy for compaction, and override the approach for any
%% tags passed in
inker_reload_strategy(AltList) ->
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
lists:foldl(fun({X, Y}, SList) ->
lists:keyreplace(X, 1, SList, {X, Y})
end,
ReloadStrategy0,
AltList).
-spec compact_inkerkvc({journal_key(), any(), boolean()},
compaction_strategy()) ->
skip|{retain, any()}|{recalc, null}.
%% @doc
%% Decide whether a superceded object should be replicated in the compacted
%% file and in what format.
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
skip;
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
skip;
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
case get_tagstrategy(LK, Strategy) of
skip ->
skip;
retain ->
{_V, KeyDeltas} = revert_value_from_journal(V),
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat ->
{TagStrat, null}
end.
-spec get_tagstrategy(ledger_key(), compaction_strategy())
-> skip|retain|recalc.
%% @doc
%% Work out the compaction startegy for the key
2018-05-03 20:14:36 +01:00
get_tagstrategy({Tag, _, _, _}, Strategy) ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
TagStrat;
false ->
leveled_log:log("IC012", [Tag, Strategy]),
skip
end.
%%%============================================================================
%%% Manipulate Journal Key and Value
%%%============================================================================
-spec to_inkerkey(ledger_key(), non_neg_integer()) -> journal_key().
%% @doc
%% convertion from ledger_key to journal_key to allow for the key to be fetched
to_inkerkey(LedgerKey, SQN) ->
{SQN, ?INKT_STND, LedgerKey}.
-spec to_inkerkv(ledger_key(), non_neg_integer(), any(), list(),
compression_method(), boolean()) -> {journal_key(), any()}.
%% @doc
%% Convert to the correct format of a Journal key and value
to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
InkerType = check_forinkertype(LedgerKey, Object),
Value =
create_value_for_journal({Object, KeyChanges}, Compress, PressMethod),
{{SQN, InkerType, LedgerKey}, Value}.
%% Used when fetching objects, so only handles standard, hashable entries
from_inkerkv(Object) ->
from_inkerkv(Object, false).
from_inkerkv(Object, ToIgnoreKeyChanges) ->
case Object of
{{SQN, ?INKT_STND, PK}, Bin} when is_binary(Bin) ->
{{SQN, PK}, revert_value_from_journal(Bin, ToIgnoreKeyChanges)};
_ ->
Object
end.
create_value_for_journal({Object, KeyChanges}, Compress, Method)
when not is_binary(KeyChanges) ->
KeyChangeBin = term_to_binary(KeyChanges, [compressed]),
create_value_for_journal({Object, KeyChangeBin}, Compress, Method);
create_value_for_journal({Object, KeyChangeBin}, Compress, Method) ->
KeyChangeBinLen = byte_size(KeyChangeBin),
ObjectBin = serialise_object(Object, Compress, Method),
TypeCode = encode_valuetype(is_binary(Object), Compress, Method),
<<ObjectBin/binary,
KeyChangeBin/binary,
KeyChangeBinLen:32/integer,
TypeCode:8/integer>>.
maybe_compress({null, KeyChanges}, _PressMethod) ->
create_value_for_journal({null, KeyChanges}, false, native);
maybe_compress(JournalBin, PressMethod) ->
Length0 = byte_size(JournalBin) - 5,
<<JBin0:Length0/binary,
KeyChangeLength:32/integer,
Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type),
case IsCompressed of
true ->
JournalBin;
false ->
Length1 = Length0 - KeyChangeLength,
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
binary_to_term(KCBin2)},
create_value_for_journal(V0, true, PressMethod)
end.
serialise_object(Object, false, _Method) when is_binary(Object) ->
Object;
serialise_object(Object, true, Method) when is_binary(Object) ->
case Method of
lz4 ->
{ok, Bin} = lz4:pack(Object),
Bin;
native ->
zlib:compress(Object)
end;
serialise_object(Object, false, _Method) ->
term_to_binary(Object);
serialise_object(Object, true, _Method) ->
term_to_binary(Object, [compressed]).
revert_value_from_journal(JournalBin) ->
revert_value_from_journal(JournalBin, false).
revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) ->
Length0 = byte_size(JournalBin) - 5,
<<JBin0:Length0/binary,
KeyChangeLength:32/integer,
Type:8/integer>> = JournalBin,
{IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type),
Length1 = Length0 - KeyChangeLength,
case ToIgnoreKeyChanges of
true ->
<<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), []};
false ->
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
{deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
binary_to_term(KCBin2)}
end.
deserialise_object(Binary, true, true, true) ->
{ok, Deflated} = lz4:unpack(Binary),
Deflated;
deserialise_object(Binary, true, true, false) ->
zlib:uncompress(Binary);
deserialise_object(Binary, true, false, _IsLz4) ->
Binary;
deserialise_object(Binary, false, _, _IsLz4) ->
binary_to_term(Binary).
encode_valuetype(IsBinary, IsCompressed, Method) ->
Bit3 =
case Method of
lz4 -> 4;
native -> 0
end,
Bit2 =
case IsBinary of
true -> 2;
false -> 0
end,
Bit1 =
case IsCompressed of
true -> 1;
false -> 0
end,
Bit1 + Bit2 + Bit3.
decode_valuetype(TypeInt) ->
IsCompressed = TypeInt band 1 == 1,
IsBinary = TypeInt band 2 == 2,
IsLz4 = TypeInt band 4 == 4,
{IsBinary, IsCompressed, IsLz4}.
from_journalkey({SQN, _Type, LedgerKey}) ->
{SQN, LedgerKey}.
2017-11-07 13:43:29 +00:00
split_inkvalue(VBin) when is_binary(VBin) ->
revert_value_from_journal(VBin).
check_forinkertype(_LedgerKey, delete) ->
?INKT_TOMB;
check_forinkertype(_LedgerKey, head_only) ->
?INKT_MPUT;
check_forinkertype(_LedgerKey, _Object) ->
?INKT_STND.
hash(Obj) ->
erlang:phash2(term_to_binary(Obj)).
%%%============================================================================
%%% Other functions
%%%============================================================================
obj_objectspecs(ObjectSpecs, SQN, TTL) ->
lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) ->
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL)
end,
ObjectSpecs).
idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
lists:map(
fun({IdxOp, IdxFld, IdxTrm}) ->
gen_indexspec(Bucket, Key, IdxOp, IdxFld, IdxTrm, SQN, TTL)
end,
IndexSpecs
).
gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) ->
2018-02-16 20:56:12 +00:00
Status = set_status(IdxOp, TTL),
case Bucket of
{all, RealBucket} ->
{to_ledgerkey(?ALL_BUCKETS,
{RealBucket, Key},
?IDX_TAG,
IdxField,
IdxTerm),
{SQN, Status, no_lookup, null}};
_ ->
{to_ledgerkey(Bucket,
Key,
?IDX_TAG,
IdxField,
IdxTerm),
{SQN, Status, no_lookup, null}}
end.
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) ->
2018-02-16 20:56:12 +00:00
Status = set_status(IdxOp, TTL),
K = to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG),
{K, {SQN, Status, segment_hash(K), Value}}.
2018-02-16 20:56:12 +00:00
set_status(add, TTL) ->
{active, TTL};
set_status(remove, _TTL) ->
%% TODO: timestamps for delayed reaping
tomb.
-spec aae_indexspecs(false|recent_aae(),
any(), any(),
integer(), integer(),
list())
-> list().
%% @doc
%% Generate an additional index term representing the change, if the last
%% modified date for the change is within the definition of recency.
%%
%% The object may have multiple last modified dates (siblings), and in this
%% case index entries for all dates within the range are added.
%%
%% The index should entry auto-expire in the future (when it is no longer
%% relevant to assessing recent changes)
aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) ->
[];
aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) ->
[];
aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
InList = lists:member(Bucket, AAE#recent_aae.buckets),
Bucket0 =
case AAE#recent_aae.filter of
blacklist ->
case InList of
true ->
false;
false ->
{all, Bucket}
end;
whitelist ->
case InList of
true ->
Bucket;
false ->
false
end
end,
case Bucket0 of
false ->
[];
Bucket0 ->
GenIdxFun =
fun(LMD0, Acc) ->
Dates = parse_date(LMD0,
AAE#recent_aae.unit_minutes,
AAE#recent_aae.limit_minutes,
leveled_util:integer_now()),
case Dates of
no_index ->
Acc;
{LMD1, TTL} ->
TreeSize = AAE#recent_aae.tree_size,
SegID32 = leveled_tictac:keyto_segment32(Key),
SegID =
leveled_tictac:get_segment(SegID32, TreeSize),
IdxFldStr = ?NRT_IDX ++ LMD1 ++ "_bin",
IdxTrmStr =
string:right(integer_to_list(SegID), 8, $0) ++
"." ++
string:right(integer_to_list(H), 8, $0),
{IdxK, IdxV} =
gen_indexspec(Bucket0, Key,
add,
list_to_binary(IdxFldStr),
list_to_binary(IdxTrmStr),
SQN, TTL),
[{IdxK, IdxV}|Acc]
end
end,
lists:foldl(GenIdxFun, [], LastMods)
end.
-spec parse_date(tuple(), integer(), integer(), integer()) ->
no_index|{binary(), integer()}.
%% @doc
%% Parse the last modified date and the AAE date configuration to return a
%% binary to be used as the last modified date part of the index, and an
%% integer to be used as the TTL of the index entry.
%% Return no_index if the change is not recent.
parse_date(LMD, UnitMins, LimitMins, Now) ->
LMDsecs = leveled_util:integer_time(LMD),
Recent = (LMDsecs + LimitMins * 60) > Now,
case Recent of
false ->
no_index;
true ->
{{Y, M, D}, {Hour, Minute, _Second}} =
calendar:now_to_datetime(LMD),
RoundMins =
UnitMins * (Minute div UnitMins),
StrTime =
lists:flatten(io_lib:format(?LMD_FORMAT,
[Y, M, D, Hour, RoundMins])),
TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60,
{StrTime, TTL}
end.
-spec generate_ledgerkv(
tuple(), integer(), any(), integer(), tuple()|infinity) ->
{any(), any(), any(),
{{integer(), integer()}|no_lookup, integer()},
list()}.
%% @doc
%% Function to extract from an object the information necessary to populate
%% the Penciller's ledger.
%% Outputs -
%% Bucket - original Bucket extracted from the PrimaryKey
%% Key - original Key extracted from the PrimaryKey
%% Value - the value to be used in the Ledger (essentially the extracted
%% metadata)
%% {Hash, ObjHash} - A magic hash of the key to accelerate lookups, and a hash
%% of the value to be used for equality checking between objects
%% LastMods - the last modified dates for the object (may be multiple due to
%% siblings)
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
{Tag, Bucket, Key, _} = PrimaryKey,
Status = case Obj of
delete ->
tomb;
_ ->
{active, TS}
end,
Hash = segment_hash(PrimaryKey),
{MD, LastMods} = extract_metadata(Obj, Size, Tag),
ObjHash = get_objhash(Tag, MD),
Value = {SQN,
Status,
Hash,
MD},
{Bucket, Key, Value, {Hash, ObjHash}, LastMods}.
extract_metadata(Obj, Size, ?RIAK_TAG) ->
riak_extract_metadata(Obj, Size);
extract_metadata(Obj, Size, ?STD_TAG) ->
{{hash(Obj), Size}, []}.
get_size(PK, Value) ->
{Tag, _Bucket, _Key, _} = PK,
{_, _, _, MD} = Value,
case Tag of
?RIAK_TAG ->
{_RMD, _VC, _Hash, Size} = MD,
Size;
?STD_TAG ->
{_Hash, Size} = MD,
Size
end.
-spec get_keyandobjhash(tuple(), tuple()) -> tuple().
%% @doc
%% Return a tucple of {Bucket, Key, Hash} where hash is a hash of the object
%% not the key (for example with Riak tagged objects this will be a hash of
%% the sorted vclock)
get_keyandobjhash(LK, Value) ->
{Tag, Bucket, Key, _} = LK,
{_, _, _, MD} = Value,
case Tag of
?IDX_TAG ->
from_ledgerkey(LK); % returns {Bucket, Key, IdxValue}
_ ->
{Bucket, Key, get_objhash(Tag, MD)}
end.
get_objhash(Tag, ObjMetaData) ->
case Tag of
?RIAK_TAG ->
{_RMD, _VC, Hash, _Size} = ObjMetaData,
Hash;
?STD_TAG ->
{Hash, _Size} = ObjMetaData,
Hash
end.
build_metadata_object(PrimaryKey, MD) ->
{Tag, _Bucket, _Key, _SubKey} = PrimaryKey,
case Tag of
?RIAK_TAG ->
{SibData, Vclock, _Hash, _Size} = MD,
riak_metadata_to_binary(Vclock, SibData);
?STD_TAG ->
MD;
?HEAD_TAG ->
MD
end.
-spec riak_extract_metadata(binary()|delete, non_neg_integer()) ->
{riak_metadata(), list()}.
%% @doc
%% Riak extract metadata should extract a metadata object which is a
%% five-tuple of:
%% - Binary of sibling Metadata
%% - Binary of vector clock metadata
%% - Non-exportable hash of the vector clock metadata
%% - The largest last modified date of the object
%% - Size of the object
%%
%% The metadata object should be returned with the full list of last
%% modified dates (which will be used for recent anti-entropy index creation)
riak_extract_metadata(delete, Size) ->
{{delete, null, null, Size}, []};
riak_extract_metadata(ObjBin, Size) ->
{VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin),
{{SibBin,
VclockBin,
erlang:phash2(lists:sort(binary_to_term(VclockBin))),
Size},
LastMods}.
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
riak_metadata_to_binary(VclockBin, SibMetaBin) ->
VclockLen = byte_size(VclockBin),
<<?MAGIC:8/integer, ?V1_VERS:8/integer,
VclockLen:32/integer, VclockBin/binary,
SibMetaBin/binary>>.
riak_metadata_from_binary(V1Binary) ->
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
Rest/binary>> = V1Binary,
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
{SibMetaBin, LastMods} =
case SibCount of
SC when is_integer(SC) ->
get_metadata_from_siblings(SibsBin,
SibCount,
<<SibCount:32/integer>>,
[])
end,
{VclockBin, SibMetaBin, LastMods}.
get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) ->
{SibMetaBin, LastMods};
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
SibCount,
SibMetaBin,
LastMods) ->
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
LastMod =
case MetaBin of
<<MegaSec:32/integer,
Sec:32/integer,
MicroSec:32/integer,
_Rest/binary>> ->
{MegaSec, Sec, MicroSec};
_ ->
{0, 0, 0}
end,
get_metadata_from_siblings(Rest2,
SibCount - 1,
<<SibMetaBin/binary,
0:32/integer,
MetaLen:32/integer,
MetaBin:MetaLen/binary>>,
[LastMod|LastMods]).
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
indexspecs_test() ->
IndexSpecs = [{add, "t1_int", 456},
{add, "t1_bin", "adbc123"},
{remove, "t1_bin", "abdc456"}],
Changes = idx_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity),
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
{1, {active, infinity}, no_lookup, null}},
lists:nth(1, Changes)),
?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"},
{1, {active, infinity}, no_lookup, null}},
lists:nth(2, Changes)),
?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"},
{1, tomb, no_lookup, null}},
lists:nth(3, Changes)).
endkey_passed_test() ->
TestKey = {i, null, null, null},
K1 = {i, 123, {"a", "b"}, <<>>},
K2 = {o, 123, {"a", "b"}, <<>>},
?assertMatch(false, endkey_passed(TestKey, K1)),
?assertMatch(true, endkey_passed(TestKey, K2)).
general_skip_strategy_test() ->
% Confirm that we will skip if the strategy says so
TagStrat1 = compact_inkerkvc({{1,
?INKT_STND,
{?STD_TAG, "B1", "K1andSK", null}},
{},
true},
[{?STD_TAG, skip}]),
?assertMatch(skip, TagStrat1),
TagStrat2 = compact_inkerkvc({{1,
?INKT_KEYD,
{?STD_TAG, "B1", "K1andSK", null}},
{},
true},
[{?STD_TAG, skip}]),
2017-03-14 22:47:48 +00:00
?assertMatch(skip, TagStrat2),
TagStrat3 = compact_inkerkvc({{1,
?INKT_KEYD,
{?IDX_TAG, "B1", "K1", "SK"}},
{},
true},
[{?STD_TAG, skip}]),
?assertMatch(skip, TagStrat3),
TagStrat4 = compact_inkerkvc({{1,
?INKT_KEYD,
{?IDX_TAG, "B1", "K1", "SK"}},
{},
true},
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
?assertMatch({recalc, null}, TagStrat4),
TagStrat5 = compact_inkerkvc({{1,
?INKT_TOMB,
{?IDX_TAG, "B1", "K1", "SK"}},
{},
true},
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
?assertMatch(skip, TagStrat5).
%% Test below proved that the overhead of performing hashes was trivial
%% Maybe 5 microseconds per hash
hashperf_test() ->
OL = lists:map(fun(_X) -> leveled_rand:rand_bytes(8192) end, lists:seq(1, 1000)),
SW = os:timestamp(),
_HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL),
io:format(user, "1000 object hashes in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]).
parsedate_test() ->
{MeS, S, MiS} = os:timestamp(),
timer:sleep(100),
Now = leveled_util:integer_now(),
UnitMins = 5,
LimitMins = 60,
PD = parse_date({MeS, S, MiS}, UnitMins, LimitMins, Now),
io:format("Parsed Date ~w~n", [PD]),
?assertMatch(true, is_tuple(PD)),
check_pd(PD, UnitMins),
CheckFun =
fun(Offset) ->
ModDate = {MeS, S + Offset * 60, MiS},
check_pd(parse_date(ModDate, UnitMins, LimitMins, Now), UnitMins)
end,
lists:foreach(CheckFun, lists:seq(1, 60)).
check_pd(PD, UnitMins) ->
{LMDstr, _TTL} = PD,
Minutes = list_to_integer(lists:nthtail(10, LMDstr)),
?assertMatch(0, Minutes rem UnitMins).
parseolddate_test() ->
LMD = os:timestamp(),
timer:sleep(100),
Now = leveled_util:integer_now() + 60 * 60,
UnitMins = 5,
LimitMins = 60,
PD = parse_date(LMD, UnitMins, LimitMins, Now),
io:format("Parsed Date ~w~n", [PD]),
?assertMatch(no_index, PD).
genaaeidx_test() ->
AAE = #recent_aae{filter=blacklist,
buckets=[],
limit_minutes=60,
unit_minutes=5},
Bucket = <<"Bucket1">>,
Key = <<"Key1">>,
SQN = 1,
H = erlang:phash2(null),
LastMods = [os:timestamp(), os:timestamp()],
AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
?assertMatch(2, length(AAESpecs)),
LastMods1 = [os:timestamp()],
AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1),
?assertMatch(1, length(AAESpecs1)),
IdxB = element(2, element(1, lists:nth(1, AAESpecs1))),
io:format(user, "AAE IDXSpecs1 ~w~n", [AAESpecs1]),
?assertMatch(<<"$all">>, IdxB),
LastMods0 = [],
AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0),
?assertMatch(0, length(AAESpecs0)),
AAE0 = AAE#recent_aae{filter=whitelist,
buckets=[<<"Bucket0">>]},
AAESpecsB0 = aae_indexspecs(AAE0, Bucket, Key, SQN, H, LastMods1),
?assertMatch(0, length(AAESpecsB0)),
AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1),
?assertMatch(1, length(AAESpecsB1)),
[{{?IDX_TAG, <<"Bucket0">>, {Fld, Term}, <<"Key1">>},
{SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1,
?assertMatch(true, is_integer(TS)),
?assertMatch(17, length(binary_to_list(Term))),
?assertMatch("$aae.", lists:sublist(binary_to_list(Fld), 5)),
AAE1 = AAE#recent_aae{filter=blacklist,
buckets=[<<"Bucket0">>]},
AAESpecsB2 = aae_indexspecs(AAE1, <<"Bucket0">>, Key, SQN, H, LastMods1),
?assertMatch(0, length(AAESpecsB2)).
delayedupdate_aaeidx_test() ->
AAE = #recent_aae{filter=blacklist,
buckets=[],
limit_minutes=60,
unit_minutes=5},
Bucket = <<"Bucket1">>,
Key = <<"Key1">>,
SQN = 1,
H = erlang:phash2(null),
{Mega, Sec, MSec} = os:timestamp(),
LastMods = [{Mega -1, Sec, MSec}],
AAESpecs = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods),
?assertMatch(0, length(AAESpecs)).
head_segment_compare_test() ->
% Reminder to align native and parallel(leveled_ko) key stores for
% kv_index_tictactree
H1 = segment_hash({?HEAD_TAG, <<"B1">>, <<"K1">>, null}),
H2 = segment_hash({?RIAK_TAG, <<"B1">>, <<"K1">>, null}),
H3 = segment_hash({?HEAD_TAG, <<"B1">>, <<"K1">>, <<>>}),
?assertMatch(H1, H2),
?assertMatch(H1, H3).
-endif.