2016-10-13 21:02:15 +01:00
|
|
|
%% -------- Key Codec ---------
|
|
|
|
%%
|
2016-10-14 18:43:16 +01:00
|
|
|
%% Functions for manipulating keys and values within leveled.
|
|
|
|
%%
|
|
|
|
%%
|
|
|
|
%% Within the LEDGER:
|
|
|
|
%% Keys are of the form -
|
|
|
|
%% {Tag, Bucket, Key, SubKey|null}
|
|
|
|
%% Values are of the form
|
|
|
|
%% {SQN, Status, MD}
|
|
|
|
%%
|
|
|
|
%% Within the JOURNAL:
|
|
|
|
%% Keys are of the form -
|
|
|
|
%% {SQN, LedgerKey}
|
|
|
|
%% Values are of the form
|
|
|
|
%% {Object, IndexSpecs} (as a binary)
|
|
|
|
%%
|
|
|
|
%% IndexSpecs are of the form of a Ledger Key/Value
|
|
|
|
%%
|
|
|
|
%% Tags need to be set during PUT operations and each Tag used must be
|
|
|
|
%% supported in an extract_metadata and a build_metadata_object function clause
|
|
|
|
%%
|
|
|
|
%% Currently the only tags supported are:
|
|
|
|
%% - o (standard objects)
|
2016-10-16 15:41:09 +01:00
|
|
|
%% - o_rkv (riak objects)
|
2016-10-14 18:43:16 +01:00
|
|
|
%% - i (index entries)
|
|
|
|
|
2016-10-13 21:02:15 +01:00
|
|
|
|
|
|
|
-module(leveled_codec).
|
|
|
|
|
2016-10-18 01:59:03 +01:00
|
|
|
-include("include/leveled.hrl").
|
2016-10-13 21:02:15 +01:00
|
|
|
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
2016-10-25 23:13:14 +01:00
|
|
|
-export([
|
|
|
|
inker_reload_strategy/1,
|
2016-10-13 21:02:15 +01:00
|
|
|
strip_to_seqonly/1,
|
|
|
|
strip_to_statusonly/1,
|
2016-10-31 17:26:28 +00:00
|
|
|
strip_to_keyseqonly/1,
|
2016-12-11 01:02:56 +00:00
|
|
|
strip_to_seqnhashonly/1,
|
2016-10-13 21:02:15 +01:00
|
|
|
striphead_to_details/1,
|
2016-10-31 12:12:06 +00:00
|
|
|
is_active/3,
|
2016-10-13 21:02:15 +01:00
|
|
|
endkey_passed/2,
|
|
|
|
key_dominates/2,
|
2016-10-21 16:08:41 +01:00
|
|
|
maybe_reap_expiredkey/2,
|
2016-10-14 18:43:16 +01:00
|
|
|
to_ledgerkey/3,
|
2016-10-18 01:59:03 +01:00
|
|
|
to_ledgerkey/5,
|
|
|
|
from_ledgerkey/1,
|
2016-10-25 23:13:14 +01:00
|
|
|
to_inkerkv/4,
|
|
|
|
from_inkerkv/1,
|
2017-03-29 15:37:04 +01:00
|
|
|
from_inkerkv/2,
|
2016-10-25 23:13:14 +01:00
|
|
|
from_journalkey/1,
|
|
|
|
compact_inkerkvc/2,
|
|
|
|
split_inkvalue/1,
|
|
|
|
check_forinkertype/2,
|
2017-03-20 15:43:54 +00:00
|
|
|
maybe_compress/1,
|
|
|
|
create_value_for_journal/2,
|
2016-10-13 21:02:15 +01:00
|
|
|
build_metadata_object/2,
|
2016-10-14 18:43:16 +01:00
|
|
|
generate_ledgerkv/5,
|
|
|
|
get_size/2,
|
2016-10-31 16:02:32 +00:00
|
|
|
get_keyandhash/2,
|
2016-10-31 12:12:06 +00:00
|
|
|
convert_indexspecs/5,
|
|
|
|
generate_uuid/0,
|
2016-11-28 22:26:09 +00:00
|
|
|
integer_now/0,
|
2016-12-11 01:02:56 +00:00
|
|
|
riak_extract_metadata/2,
|
2017-03-10 20:43:37 +00:00
|
|
|
magic_hash/1,
|
|
|
|
to_lookup/1]).
|
2016-11-28 22:26:09 +00:00
|
|
|
|
|
|
|
-define(V1_VERS, 1).
|
|
|
|
-define(MAGIC, 53). % riak_kv -> riak_object
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-12-11 01:02:56 +00:00
|
|
|
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
|
|
|
|
%% phash2 but provides a much more balanced result.
|
|
|
|
%%
|
|
|
|
%% Hash function contains mysterious constants, some explanation here as to
|
|
|
|
%% what they are -
|
2016-12-11 20:38:20 +00:00
|
|
|
%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
|
2016-12-11 01:02:56 +00:00
|
|
|
|
2017-03-10 20:43:37 +00:00
|
|
|
to_lookup(Key) ->
|
|
|
|
case element(1, Key) of
|
|
|
|
?IDX_TAG ->
|
|
|
|
no_lookup;
|
|
|
|
_ ->
|
|
|
|
lookup
|
|
|
|
end.
|
|
|
|
|
2016-12-11 01:02:56 +00:00
|
|
|
magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) ->
|
|
|
|
magic_hash({Bucket, Key});
|
|
|
|
magic_hash({?STD_TAG, Bucket, Key, _SubKey}) ->
|
|
|
|
magic_hash({Bucket, Key});
|
|
|
|
magic_hash(AnyKey) ->
|
|
|
|
BK = term_to_binary(AnyKey),
|
|
|
|
H = 5381,
|
|
|
|
hash1(H, BK) band 16#FFFFFFFF.
|
|
|
|
|
|
|
|
hash1(H, <<>>) ->
|
|
|
|
H;
|
|
|
|
hash1(H, <<B:8/integer, Rest/bytes>>) ->
|
|
|
|
H1 = H * 33,
|
|
|
|
H2 = H1 bxor B,
|
|
|
|
hash1(H2, Rest).
|
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-10-18 01:59:03 +01:00
|
|
|
%% Credit to
|
|
|
|
%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
|
|
|
|
generate_uuid() ->
|
|
|
|
<<A:32, B:16, C:16, D:16, E:48>> = crypto:rand_bytes(16),
|
2016-11-18 15:53:22 +00:00
|
|
|
L = io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
|
|
|
|
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]),
|
|
|
|
binary_to_list(list_to_binary(L)).
|
2016-10-18 01:59:03 +01:00
|
|
|
|
2016-10-25 23:13:14 +01:00
|
|
|
inker_reload_strategy(AltList) ->
|
|
|
|
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
|
|
|
|
lists:foldl(fun({X, Y}, SList) ->
|
2016-10-27 00:57:19 +01:00
|
|
|
lists:keyreplace(X, 1, SList, {X, Y})
|
2016-10-25 23:13:14 +01:00
|
|
|
end,
|
|
|
|
ReloadStrategy0,
|
|
|
|
AltList).
|
2016-10-13 21:02:15 +01:00
|
|
|
|
2016-12-11 01:02:56 +00:00
|
|
|
strip_to_statusonly({_, {_, St, _, _}}) -> St.
|
2016-10-13 21:02:15 +01:00
|
|
|
|
2016-12-11 01:02:56 +00:00
|
|
|
strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN.
|
2016-10-13 21:02:15 +01:00
|
|
|
|
2016-12-11 01:02:56 +00:00
|
|
|
strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}.
|
2016-10-13 21:02:15 +01:00
|
|
|
|
2016-12-11 01:02:56 +00:00
|
|
|
strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}.
|
|
|
|
|
|
|
|
striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}.
|
2016-10-31 17:26:28 +00:00
|
|
|
|
2016-10-13 21:02:15 +01:00
|
|
|
|
|
|
|
key_dominates(LeftKey, RightKey) ->
|
|
|
|
case {LeftKey, RightKey} of
|
|
|
|
{{LK, _LVAL}, {RK, _RVAL}} when LK < RK ->
|
|
|
|
left_hand_first;
|
|
|
|
{{LK, _LVAL}, {RK, _RVAL}} when RK < LK ->
|
|
|
|
right_hand_first;
|
2016-12-11 01:02:56 +00:00
|
|
|
{{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}}
|
2016-10-13 21:02:15 +01:00
|
|
|
when LK == RK, LSN >= RSN ->
|
|
|
|
left_hand_dominant;
|
2016-12-11 01:02:56 +00:00
|
|
|
{{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}}
|
2016-10-13 21:02:15 +01:00
|
|
|
when LK == RK, LSN < RSN ->
|
|
|
|
right_hand_dominant
|
|
|
|
end.
|
2016-10-16 15:41:09 +01:00
|
|
|
|
2016-10-21 16:08:41 +01:00
|
|
|
|
2016-10-21 21:26:28 +01:00
|
|
|
maybe_reap_expiredkey(KV, LevelD) ->
|
2016-10-21 16:08:41 +01:00
|
|
|
Status = strip_to_statusonly(KV),
|
2016-10-21 21:26:28 +01:00
|
|
|
maybe_reap(Status, LevelD).
|
2016-10-21 16:08:41 +01:00
|
|
|
|
|
|
|
maybe_reap({_, infinity}, _) ->
|
|
|
|
false; % key is not set to expire
|
2016-10-21 21:26:28 +01:00
|
|
|
maybe_reap({_, TS}, {true, CurrTS}) when CurrTS > TS ->
|
2016-10-21 16:08:41 +01:00
|
|
|
true; % basement and ready to expire
|
2016-10-21 21:26:28 +01:00
|
|
|
maybe_reap(tomb, {true, _CurrTS}) ->
|
2016-10-21 16:08:41 +01:00
|
|
|
true; % always expire in basement
|
|
|
|
maybe_reap(_, _) ->
|
|
|
|
false.
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
is_active(Key, Value, Now) ->
|
2016-10-16 15:41:09 +01:00
|
|
|
case strip_to_statusonly({Key, Value}) of
|
|
|
|
{active, infinity} ->
|
|
|
|
true;
|
|
|
|
tomb ->
|
2016-10-31 12:12:06 +00:00
|
|
|
false;
|
2016-10-31 16:02:32 +00:00
|
|
|
{active, TS} when TS >= Now ->
|
2016-10-31 12:12:06 +00:00
|
|
|
true;
|
|
|
|
{active, _TS} ->
|
2016-10-16 15:41:09 +01:00
|
|
|
false
|
|
|
|
end.
|
|
|
|
|
2016-10-18 01:59:03 +01:00
|
|
|
from_ledgerkey({Tag, Bucket, {_IdxField, IdxValue}, Key})
|
|
|
|
when Tag == ?IDX_TAG ->
|
2016-10-23 22:45:43 +01:00
|
|
|
{Bucket, Key, IdxValue};
|
|
|
|
from_ledgerkey({_Tag, Bucket, Key, null}) ->
|
|
|
|
{Bucket, Key}.
|
2016-10-18 01:59:03 +01:00
|
|
|
|
|
|
|
to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
|
|
|
|
{?IDX_TAG, Bucket, {Field, Value}, Key}.
|
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
to_ledgerkey(Bucket, Key, Tag) ->
|
|
|
|
{Tag, Bucket, Key, null}.
|
2016-10-13 21:02:15 +01:00
|
|
|
|
2016-10-25 23:13:14 +01:00
|
|
|
%% Return the Key, Value and Hash Option for this object. The hash option
|
|
|
|
%% indicates whether the key would ever be looked up directly, and so if it
|
|
|
|
%% requires an entry in the hash table
|
|
|
|
to_inkerkv(LedgerKey, SQN, to_fetch, null) ->
|
|
|
|
{{SQN, ?INKT_STND, LedgerKey}, null, true};
|
|
|
|
to_inkerkv(LedgerKey, SQN, Object, KeyChanges) ->
|
2016-10-26 11:39:27 +01:00
|
|
|
InkerType = check_forinkertype(LedgerKey, Object),
|
2017-03-20 15:43:54 +00:00
|
|
|
Value = create_value_for_journal({Object, KeyChanges}, false),
|
2016-10-26 11:39:27 +01:00
|
|
|
{{SQN, InkerType, LedgerKey}, Value}.
|
2016-10-25 23:13:14 +01:00
|
|
|
|
|
|
|
%% Used when fetching objects, so only handles standard, hashable entries
|
|
|
|
from_inkerkv(Object) ->
|
2017-03-29 15:37:04 +01:00
|
|
|
from_inkerkv(Object, false).
|
|
|
|
|
|
|
|
from_inkerkv(Object, ToIgnoreKeyChanges) ->
|
2016-10-25 23:13:14 +01:00
|
|
|
case Object of
|
|
|
|
{{SQN, ?INKT_STND, PK}, Bin} when is_binary(Bin) ->
|
2017-03-29 15:37:04 +01:00
|
|
|
{{SQN, PK}, revert_value_from_journal(Bin, ToIgnoreKeyChanges)};
|
2016-10-25 23:13:14 +01:00
|
|
|
{{SQN, ?INKT_STND, PK}, Term} ->
|
|
|
|
{{SQN, PK}, Term};
|
|
|
|
_ ->
|
|
|
|
Object
|
|
|
|
end.
|
|
|
|
|
2017-03-29 15:37:04 +01:00
|
|
|
create_value_for_journal({Object, KeyChanges}, Compress)
|
|
|
|
when not is_binary(KeyChanges) ->
|
2017-03-20 15:43:54 +00:00
|
|
|
KeyChangeBin = term_to_binary(KeyChanges, [compressed]),
|
2017-03-29 15:37:04 +01:00
|
|
|
create_value_for_journal({Object, KeyChangeBin}, Compress);
|
|
|
|
create_value_for_journal({Object, KeyChangeBin}, Compress) ->
|
2017-03-20 15:43:54 +00:00
|
|
|
KeyChangeBinLen = byte_size(KeyChangeBin),
|
|
|
|
ObjectBin = serialise_object(Object, Compress),
|
|
|
|
TypeCode = encode_valuetype(is_binary(Object), Compress),
|
|
|
|
<<ObjectBin/binary,
|
|
|
|
KeyChangeBin/binary,
|
|
|
|
KeyChangeBinLen:32/integer,
|
|
|
|
TypeCode:8/integer>>.
|
|
|
|
|
|
|
|
maybe_compress({null, KeyChanges}) ->
|
|
|
|
create_value_for_journal({null, KeyChanges}, false);
|
|
|
|
maybe_compress(JournalBin) ->
|
2017-03-29 15:37:04 +01:00
|
|
|
Length0 = byte_size(JournalBin) - 5,
|
|
|
|
<<JBin0:Length0/binary,
|
|
|
|
KeyChangeLength:32/integer,
|
|
|
|
Type:8/integer>> = JournalBin,
|
2017-03-20 15:43:54 +00:00
|
|
|
{IsBinary, IsCompressed} = decode_valuetype(Type),
|
|
|
|
case IsCompressed of
|
|
|
|
true ->
|
|
|
|
JournalBin;
|
|
|
|
false ->
|
2017-03-29 15:37:04 +01:00
|
|
|
Length1 = Length0 - KeyChangeLength,
|
|
|
|
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
|
|
|
|
V0 = {deserialise_object(OBin2, IsBinary, IsCompressed),
|
|
|
|
binary_to_term(KCBin2)},
|
2017-03-20 15:43:54 +00:00
|
|
|
create_value_for_journal(V0, true)
|
|
|
|
end.
|
|
|
|
|
|
|
|
serialise_object(Object, false) when is_binary(Object) ->
|
|
|
|
Object;
|
|
|
|
serialise_object(Object, true) when is_binary(Object) ->
|
|
|
|
zlib:compress(Object);
|
|
|
|
serialise_object(Object, false) ->
|
|
|
|
term_to_binary(Object);
|
|
|
|
serialise_object(Object, true) ->
|
|
|
|
term_to_binary(Object, [compressed]).
|
|
|
|
|
|
|
|
revert_value_from_journal(JournalBin) ->
|
2017-03-29 15:37:04 +01:00
|
|
|
revert_value_from_journal(JournalBin, false).
|
|
|
|
|
|
|
|
revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) ->
|
|
|
|
Length0 = byte_size(JournalBin) - 5,
|
|
|
|
<<JBin0:Length0/binary,
|
|
|
|
KeyChangeLength:32/integer,
|
|
|
|
Type:8/integer>> = JournalBin,
|
2017-03-20 15:43:54 +00:00
|
|
|
{IsBinary, IsCompressed} = decode_valuetype(Type),
|
2017-03-29 15:37:04 +01:00
|
|
|
Length1 = Length0 - KeyChangeLength,
|
|
|
|
case ToIgnoreKeyChanges of
|
|
|
|
true ->
|
|
|
|
<<OBin2:Length1/binary, _KCBin2:KeyChangeLength/binary>> = JBin0,
|
|
|
|
{deserialise_object(OBin2, IsBinary, IsCompressed), []};
|
|
|
|
false ->
|
|
|
|
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
|
|
|
|
{deserialise_object(OBin2, IsBinary, IsCompressed),
|
|
|
|
binary_to_term(KCBin2)}
|
|
|
|
end.
|
2017-03-20 15:43:54 +00:00
|
|
|
|
|
|
|
deserialise_object(Binary, true, true) ->
|
|
|
|
zlib:uncompress(Binary);
|
|
|
|
deserialise_object(Binary, true, false) ->
|
|
|
|
Binary;
|
|
|
|
deserialise_object(Binary, false, _) ->
|
|
|
|
binary_to_term(Binary).
|
|
|
|
|
|
|
|
encode_valuetype(IsBinary, IsCompressed) ->
|
|
|
|
Bit2 =
|
|
|
|
case IsBinary of
|
|
|
|
true -> 2;
|
|
|
|
false -> 0
|
|
|
|
end,
|
|
|
|
Bit1 =
|
|
|
|
case IsCompressed of
|
|
|
|
true -> 1;
|
|
|
|
false -> 0
|
|
|
|
end,
|
|
|
|
Bit1 + Bit2.
|
|
|
|
|
|
|
|
decode_valuetype(TypeInt) ->
|
|
|
|
IsCompressed = TypeInt band 1 == 1,
|
|
|
|
IsBinary = TypeInt band 2 == 2,
|
|
|
|
{IsBinary, IsCompressed}.
|
|
|
|
|
2016-10-25 23:13:14 +01:00
|
|
|
from_journalkey({SQN, _Type, LedgerKey}) ->
|
|
|
|
{SQN, LedgerKey}.
|
|
|
|
|
2016-11-01 00:46:14 +00:00
|
|
|
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
|
|
|
|
skip;
|
2016-10-25 23:13:14 +01:00
|
|
|
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
|
|
|
|
skip;
|
2016-10-27 00:57:19 +01:00
|
|
|
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
|
2017-03-13 14:32:46 +00:00
|
|
|
case get_tagstrategy(LK, Strategy) of
|
|
|
|
skip ->
|
|
|
|
skip;
|
2016-10-25 23:13:14 +01:00
|
|
|
retain ->
|
2016-10-27 00:57:19 +01:00
|
|
|
{retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}};
|
2016-10-25 23:13:14 +01:00
|
|
|
TagStrat ->
|
|
|
|
{TagStrat, null}
|
|
|
|
end;
|
|
|
|
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
|
2017-03-13 14:32:46 +00:00
|
|
|
case get_tagstrategy(LK, Strategy) of
|
|
|
|
skip ->
|
|
|
|
skip;
|
|
|
|
retain ->
|
2017-03-20 15:43:54 +00:00
|
|
|
{_V, KeyDeltas} = revert_value_from_journal(V),
|
2017-03-13 14:32:46 +00:00
|
|
|
{retain, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
|
|
|
|
TagStrat ->
|
|
|
|
{TagStrat, null}
|
2016-12-11 06:53:25 +00:00
|
|
|
end;
|
|
|
|
compact_inkerkvc(_KVC, _Strategy) ->
|
|
|
|
skip.
|
2016-10-25 23:13:14 +01:00
|
|
|
|
2017-03-13 14:32:46 +00:00
|
|
|
get_tagstrategy(LK, Strategy) ->
|
|
|
|
case LK of
|
|
|
|
{Tag, _, _, _} ->
|
|
|
|
case lists:keyfind(Tag, 1, Strategy) of
|
|
|
|
{Tag, TagStrat} ->
|
|
|
|
TagStrat;
|
|
|
|
false ->
|
|
|
|
leveled_log:log("IC012", [Tag, Strategy]),
|
|
|
|
skip
|
|
|
|
end;
|
|
|
|
_ ->
|
|
|
|
skip
|
|
|
|
end.
|
|
|
|
|
2016-10-25 23:13:14 +01:00
|
|
|
split_inkvalue(VBin) ->
|
|
|
|
case is_binary(VBin) of
|
|
|
|
true ->
|
2017-03-20 15:43:54 +00:00
|
|
|
revert_value_from_journal(VBin);
|
2016-10-25 23:13:14 +01:00
|
|
|
false ->
|
|
|
|
VBin
|
|
|
|
end.
|
|
|
|
|
|
|
|
check_forinkertype(_LedgerKey, delete) ->
|
2016-10-26 11:39:27 +01:00
|
|
|
?INKT_TOMB;
|
2016-10-25 23:13:14 +01:00
|
|
|
check_forinkertype(_LedgerKey, _Object) ->
|
2016-10-26 11:39:27 +01:00
|
|
|
?INKT_STND.
|
2016-10-25 23:13:14 +01:00
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
hash(Obj) ->
|
|
|
|
erlang:phash2(term_to_binary(Obj)).
|
2016-10-13 21:02:15 +01:00
|
|
|
|
|
|
|
|
|
|
|
% Compare a key against a query key, only comparing elements that are non-null
|
|
|
|
% in the Query key. This is used for comparing against end keys in queries.
|
2017-01-14 16:36:05 +00:00
|
|
|
endkey_passed(all, _) ->
|
|
|
|
false;
|
2016-10-13 21:02:15 +01:00
|
|
|
endkey_passed({EK1, null, null, null}, {CK1, _, _, _}) ->
|
|
|
|
EK1 < CK1;
|
|
|
|
endkey_passed({EK1, EK2, null, null}, {CK1, CK2, _, _}) ->
|
|
|
|
{EK1, EK2} < {CK1, CK2};
|
|
|
|
endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
|
|
|
|
{EK1, EK2, EK3} < {CK1, CK2, CK3};
|
|
|
|
endkey_passed(EndKey, CheckingKey) ->
|
|
|
|
EndKey < CheckingKey.
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
|
2016-10-18 01:59:03 +01:00
|
|
|
lists:map(fun({IndexOp, IdxField, IdxValue}) ->
|
2016-10-14 18:43:16 +01:00
|
|
|
Status = case IndexOp of
|
|
|
|
add ->
|
2016-10-31 12:12:06 +00:00
|
|
|
{active, TTL};
|
2016-10-14 18:43:16 +01:00
|
|
|
remove ->
|
|
|
|
%% TODO: timestamps for delayed reaping
|
2016-10-18 19:41:33 +01:00
|
|
|
tomb
|
2016-10-14 18:43:16 +01:00
|
|
|
end,
|
2016-10-18 01:59:03 +01:00
|
|
|
{to_ledgerkey(Bucket, Key, ?IDX_TAG,
|
|
|
|
IdxField, IdxValue),
|
2016-12-11 01:02:56 +00:00
|
|
|
{SQN, Status, no_lookup, null}}
|
2016-10-14 18:43:16 +01:00
|
|
|
end,
|
|
|
|
IndexSpecs).
|
|
|
|
|
|
|
|
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
|
|
|
{Tag, Bucket, Key, _} = PrimaryKey,
|
2016-10-16 15:41:09 +01:00
|
|
|
Status = case Obj of
|
|
|
|
delete ->
|
|
|
|
tomb;
|
|
|
|
_ ->
|
|
|
|
{active, TS}
|
|
|
|
end,
|
2017-01-05 21:58:33 +00:00
|
|
|
Hash = magic_hash(PrimaryKey),
|
2016-12-11 01:02:56 +00:00
|
|
|
Value = {SQN,
|
|
|
|
Status,
|
2017-01-05 21:58:33 +00:00
|
|
|
Hash,
|
2016-12-11 01:02:56 +00:00
|
|
|
extract_metadata(Obj, Size, Tag)},
|
2017-01-05 21:58:33 +00:00
|
|
|
{Bucket, Key, {PrimaryKey, Value}, Hash}.
|
2016-10-16 15:41:09 +01:00
|
|
|
|
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
integer_now() ->
|
|
|
|
integer_time(os:timestamp()).
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-10-31 12:12:06 +00:00
|
|
|
integer_time(TS) ->
|
|
|
|
DT = calendar:now_to_universal_time(TS),
|
|
|
|
calendar:datetime_to_gregorian_seconds(DT).
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-10-16 15:41:09 +01:00
|
|
|
extract_metadata(Obj, Size, ?RIAK_TAG) ->
|
2016-10-14 18:43:16 +01:00
|
|
|
riak_extract_metadata(Obj, Size);
|
2016-10-16 15:41:09 +01:00
|
|
|
extract_metadata(Obj, Size, ?STD_TAG) ->
|
2016-10-14 18:43:16 +01:00
|
|
|
{hash(Obj), Size}.
|
|
|
|
|
|
|
|
get_size(PK, Value) ->
|
|
|
|
{Tag, _Bucket, _Key, _} = PK,
|
2016-12-11 01:02:56 +00:00
|
|
|
{_, _, _, MD} = Value,
|
2016-10-14 18:43:16 +01:00
|
|
|
case Tag of
|
2016-10-16 15:41:09 +01:00
|
|
|
?RIAK_TAG ->
|
2016-10-14 18:43:16 +01:00
|
|
|
{_RMD, _VC, _Hash, Size} = MD,
|
|
|
|
Size;
|
2016-10-16 15:41:09 +01:00
|
|
|
?STD_TAG ->
|
2016-10-14 18:43:16 +01:00
|
|
|
{_Hash, Size} = MD,
|
|
|
|
Size
|
|
|
|
end.
|
|
|
|
|
2016-10-31 16:02:32 +00:00
|
|
|
get_keyandhash(LK, Value) ->
|
|
|
|
{Tag, Bucket, Key, _} = LK,
|
2016-12-11 01:02:56 +00:00
|
|
|
{_, _, _, MD} = Value,
|
2016-10-31 16:02:32 +00:00
|
|
|
case Tag of
|
|
|
|
?RIAK_TAG ->
|
|
|
|
{_RMD, _VC, Hash, _Size} = MD,
|
|
|
|
{Bucket, Key, Hash};
|
|
|
|
?STD_TAG ->
|
|
|
|
{Hash, _Size} = MD,
|
2017-06-19 11:36:57 +01:00
|
|
|
{Bucket, Key, Hash};
|
|
|
|
?IDX_TAG ->
|
|
|
|
from_ledgerkey(LK) % returns {Bucket, Key, IdxValue}
|
2016-10-31 16:02:32 +00:00
|
|
|
end.
|
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
|
|
|
|
build_metadata_object(PrimaryKey, MD) ->
|
2016-11-28 22:26:09 +00:00
|
|
|
{Tag, _Bucket, _Key, null} = PrimaryKey,
|
2016-10-14 18:43:16 +01:00
|
|
|
case Tag of
|
2016-10-16 15:41:09 +01:00
|
|
|
?RIAK_TAG ->
|
2016-12-14 10:27:11 +00:00
|
|
|
{SibData, Vclock, _Hash, _Size} = MD,
|
|
|
|
riak_metadata_to_binary(Vclock, SibData);
|
2016-10-16 15:41:09 +01:00
|
|
|
?STD_TAG ->
|
2016-10-14 18:43:16 +01:00
|
|
|
MD
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
2016-10-16 15:41:09 +01:00
|
|
|
riak_extract_metadata(delete, Size) ->
|
|
|
|
{delete, null, null, Size};
|
2016-11-28 22:26:09 +00:00
|
|
|
riak_extract_metadata(ObjBin, Size) ->
|
2017-06-16 10:14:24 +01:00
|
|
|
{VclockBin, SibBin} = riak_metadata_from_binary(ObjBin),
|
|
|
|
{SibBin,
|
|
|
|
VclockBin,
|
|
|
|
erlang:phash2(lists:sort(binary_to_term(VclockBin))),
|
|
|
|
Size}.
|
2016-11-28 22:26:09 +00:00
|
|
|
|
|
|
|
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
|
|
|
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
|
|
|
|
2017-04-04 10:02:35 +00:00
|
|
|
riak_metadata_to_binary(VclockBin, SibMetaBin) ->
|
2016-11-28 22:26:09 +00:00
|
|
|
VclockLen = byte_size(VclockBin),
|
2017-04-04 10:02:35 +00:00
|
|
|
<<?MAGIC:8/integer, ?V1_VERS:8/integer,
|
|
|
|
VclockLen:32/integer, VclockBin/binary,
|
|
|
|
SibMetaBin/binary>>.
|
2016-11-28 22:26:09 +00:00
|
|
|
|
|
|
|
riak_metadata_from_binary(V1Binary) ->
|
|
|
|
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
|
|
|
Rest/binary>> = V1Binary,
|
2016-12-14 10:27:11 +00:00
|
|
|
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
2017-04-04 10:02:35 +00:00
|
|
|
SibMetaBin =
|
2016-12-14 10:27:11 +00:00
|
|
|
case SibCount of
|
|
|
|
SC when is_integer(SC) ->
|
2017-04-04 10:02:35 +00:00
|
|
|
get_metadata_from_siblings(SibsBin,
|
|
|
|
SibCount,
|
|
|
|
<<SibCount:32/integer>>)
|
2016-12-14 10:27:11 +00:00
|
|
|
end,
|
2017-04-04 10:02:35 +00:00
|
|
|
{VclockBin, SibMetaBin}.
|
2016-12-14 10:27:11 +00:00
|
|
|
|
2017-04-04 10:02:35 +00:00
|
|
|
get_metadata_from_siblings(<<>>, 0, SibMetaBin) ->
|
|
|
|
SibMetaBin;
|
2016-12-14 10:27:11 +00:00
|
|
|
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
|
|
|
|
SibCount,
|
2017-04-04 10:02:35 +00:00
|
|
|
SibMetaBin) ->
|
2016-12-14 10:27:11 +00:00
|
|
|
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
|
|
|
|
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
|
|
|
|
get_metadata_from_siblings(Rest2,
|
|
|
|
SibCount - 1,
|
2017-04-04 10:02:35 +00:00
|
|
|
<<SibMetaBin/binary,
|
|
|
|
0:32/integer,
|
|
|
|
MetaLen:32/integer,
|
|
|
|
MetaBin:MetaLen/binary>>).
|
2016-12-14 10:27:11 +00:00
|
|
|
|
2016-10-14 18:43:16 +01:00
|
|
|
|
2016-10-13 21:02:15 +01:00
|
|
|
|
|
|
|
|
|
|
|
%%%============================================================================
|
|
|
|
%%% Test
|
|
|
|
%%%============================================================================
|
|
|
|
|
|
|
|
-ifdef(TEST).
|
|
|
|
|
|
|
|
|
|
|
|
indexspecs_test() ->
|
|
|
|
IndexSpecs = [{add, "t1_int", 456},
|
|
|
|
{add, "t1_bin", "adbc123"},
|
|
|
|
{remove, "t1_bin", "abdc456"}],
|
2016-10-31 12:12:06 +00:00
|
|
|
Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity),
|
2016-10-13 21:02:15 +01:00
|
|
|
?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"},
|
2016-12-11 01:02:56 +00:00
|
|
|
{1, {active, infinity}, no_lookup, null}},
|
|
|
|
lists:nth(1, Changes)),
|
2016-10-13 21:02:15 +01:00
|
|
|
?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"},
|
2016-12-11 01:02:56 +00:00
|
|
|
{1, {active, infinity}, no_lookup, null}},
|
|
|
|
lists:nth(2, Changes)),
|
2016-10-13 21:02:15 +01:00
|
|
|
?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"},
|
2016-12-11 01:02:56 +00:00
|
|
|
{1, tomb, no_lookup, null}},
|
|
|
|
lists:nth(3, Changes)).
|
2016-10-14 22:58:01 +01:00
|
|
|
|
|
|
|
endkey_passed_test() ->
|
|
|
|
TestKey = {i, null, null, null},
|
|
|
|
K1 = {i, 123, {"a", "b"}, <<>>},
|
|
|
|
K2 = {o, 123, {"a", "b"}, <<>>},
|
|
|
|
?assertMatch(false, endkey_passed(TestKey, K1)),
|
|
|
|
?assertMatch(true, endkey_passed(TestKey, K2)).
|
|
|
|
|
|
|
|
|
2017-03-13 14:32:46 +00:00
|
|
|
corrupted_ledgerkey_test() ->
|
|
|
|
% When testing for compacted journal which has been corrupted, there may
|
|
|
|
% be a corruptes ledger key. Always skip these keys
|
|
|
|
% Key has become a 3-tuple not a 4-tuple
|
|
|
|
TagStrat1 = compact_inkerkvc({{1,
|
|
|
|
?INKT_STND,
|
|
|
|
{?STD_TAG, "B1", "K1andSK"}},
|
|
|
|
{},
|
|
|
|
true},
|
|
|
|
[{?STD_TAG, retain}]),
|
|
|
|
?assertMatch(skip, TagStrat1),
|
|
|
|
TagStrat2 = compact_inkerkvc({{1,
|
|
|
|
?INKT_KEYD,
|
|
|
|
{?STD_TAG, "B1", "K1andSK"}},
|
|
|
|
{},
|
|
|
|
true},
|
|
|
|
[{?STD_TAG, retain}]),
|
|
|
|
?assertMatch(skip, TagStrat2).
|
2017-03-14 17:26:39 +00:00
|
|
|
|
|
|
|
general_skip_strategy_test() ->
|
|
|
|
% Confirm that we will skip if the strategy says so
|
|
|
|
TagStrat1 = compact_inkerkvc({{1,
|
|
|
|
?INKT_STND,
|
|
|
|
{?STD_TAG, "B1", "K1andSK"}},
|
|
|
|
{},
|
|
|
|
true},
|
|
|
|
[{?STD_TAG, skip}]),
|
|
|
|
?assertMatch(skip, TagStrat1),
|
|
|
|
TagStrat2 = compact_inkerkvc({{1,
|
|
|
|
?INKT_KEYD,
|
|
|
|
{?STD_TAG, "B1", "K1andSK"}},
|
|
|
|
{},
|
|
|
|
true},
|
|
|
|
[{?STD_TAG, skip}]),
|
2017-03-14 22:47:48 +00:00
|
|
|
?assertMatch(skip, TagStrat2),
|
|
|
|
TagStrat3 = compact_inkerkvc({{1,
|
|
|
|
?INKT_KEYD,
|
|
|
|
{?IDX_TAG, "B1", "K1", "SK"}},
|
|
|
|
{},
|
|
|
|
true},
|
|
|
|
[{?STD_TAG, skip}]),
|
|
|
|
?assertMatch(skip, TagStrat3),
|
|
|
|
TagStrat4 = compact_inkerkvc({{1,
|
|
|
|
?INKT_KEYD,
|
|
|
|
{?IDX_TAG, "B1", "K1", "SK"}},
|
|
|
|
{},
|
|
|
|
true},
|
|
|
|
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
|
|
|
|
?assertMatch({recalc, null}, TagStrat4),
|
|
|
|
TagStrat5 = compact_inkerkvc({{1,
|
|
|
|
?INKT_TOMB,
|
|
|
|
{?IDX_TAG, "B1", "K1", "SK"}},
|
|
|
|
{},
|
|
|
|
true},
|
|
|
|
[{?STD_TAG, skip}, {?IDX_TAG, recalc}]),
|
|
|
|
?assertMatch(skip, TagStrat5).
|
2017-03-13 14:32:46 +00:00
|
|
|
|
2017-03-14 17:26:39 +00:00
|
|
|
corrupted_inker_tag_test() ->
|
|
|
|
% Confirm that we will skip on unknown inker tag
|
|
|
|
TagStrat1 = compact_inkerkvc({{1,
|
|
|
|
foo,
|
|
|
|
{?STD_TAG, "B1", "K1andSK"}},
|
|
|
|
{},
|
|
|
|
true},
|
|
|
|
[{?STD_TAG, retain}]),
|
|
|
|
?assertMatch(skip, TagStrat1).
|
2017-03-13 14:32:46 +00:00
|
|
|
|
2016-12-20 20:55:56 +00:00
|
|
|
%% Test below proved that the overhead of performing hashes was trivial
|
|
|
|
%% Maybe 5 microseconds per hash
|
|
|
|
|
2017-03-20 20:28:47 +00:00
|
|
|
hashperf_test() ->
|
|
|
|
OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 1000)),
|
|
|
|
SW = os:timestamp(),
|
|
|
|
_HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL),
|
|
|
|
io:format(user, "1000 object hashes in ~w microseconds~n",
|
|
|
|
[timer:now_diff(os:timestamp(), SW)]).
|
|
|
|
|
|
|
|
magichashperf_test() ->
|
|
|
|
KeyFun =
|
|
|
|
fun(X) ->
|
|
|
|
K = {o, "Bucket", "Key" ++ integer_to_list(X), null},
|
|
|
|
{K, X}
|
|
|
|
end,
|
|
|
|
KL = lists:map(KeyFun, lists:seq(1, 1000)),
|
|
|
|
{TimeMH, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
|
|
|
|
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH]),
|
|
|
|
{TimePH, _Hl2} = timer:tc(lists, map, [fun(K) -> erlang:phash2(K) end, KL]),
|
|
|
|
io:format(user, "1000 keys phash2 hashed in ~w microseconds~n", [TimePH]),
|
|
|
|
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
|
|
|
|
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
|
|
|
|
|
2016-12-20 20:55:56 +00:00
|
|
|
|
2016-10-13 21:02:15 +01:00
|
|
|
-endif.
|