Isolate better changes needed to support changes to metadata extraction
More obvious how to extend the code as it is all in one module. Also add a new field to the standard object metadata tuple that may hold in the future other object metadata base don user-defined functions.
This commit is contained in:
parent
bfddb53e31
commit
881b93229b
7 changed files with 322 additions and 213 deletions
|
@ -1270,7 +1270,7 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
|
||||||
not_found ->
|
not_found ->
|
||||||
not_found;
|
not_found;
|
||||||
_ ->
|
_ ->
|
||||||
{ok, leveled_codec:build_metadata_object(LK, LedgerMD)}
|
{ok, leveled_head:build_head(Tag, LedgerMD)}
|
||||||
end,
|
end,
|
||||||
{_SW, UpdTimingsR} =
|
{_SW, UpdTimingsR} =
|
||||||
update_timings(SWr, {head, rsp}, UpdTimingsP),
|
update_timings(SWr, {head, rsp}, UpdTimingsP),
|
||||||
|
@ -1437,7 +1437,7 @@ snapshot_store(State, SnapType, Query, LongRunning) ->
|
||||||
Query,
|
Query,
|
||||||
LongRunning).
|
LongRunning).
|
||||||
|
|
||||||
-spec fetch_value(pid(), {any(), integer()}) -> not_present|any().
|
-spec fetch_value(pid(), leveled_codec:journal_ref()) -> not_present|any().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Fetch a value from the Journal
|
%% Fetch a value from the Journal
|
||||||
fetch_value(Inker, {Key, SQN}) ->
|
fetch_value(Inker, {Key, SQN}) ->
|
||||||
|
@ -2517,7 +2517,7 @@ foldobjects_vs_hashtree_testto() ->
|
||||||
MD,
|
MD,
|
||||||
_Size,
|
_Size,
|
||||||
_Fetcher} = binary_to_term(ProxyV),
|
_Fetcher} = binary_to_term(ProxyV),
|
||||||
{Hash, _Size} = MD,
|
{Hash, _Size, _UserDefinedMD} = MD,
|
||||||
[{B, K, Hash}|Acc]
|
[{B, K, Hash}|Acc]
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
|
|
@ -2,28 +2,8 @@
|
||||||
%%
|
%%
|
||||||
%% Functions for manipulating keys and values within leveled.
|
%% Functions for manipulating keys and values within leveled.
|
||||||
%%
|
%%
|
||||||
%%
|
%% Any thing specific to handling of a given tag should be encapsulated
|
||||||
%% Within the LEDGER:
|
%% within the leveled_head module
|
||||||
%% Keys are of the form -
|
|
||||||
%% {Tag, Bucket, Key, SubKey|null}
|
|
||||||
%% Values are of the form
|
|
||||||
%% {SQN, Status, MD}
|
|
||||||
%%
|
|
||||||
%% Within the JOURNAL:
|
|
||||||
%% Keys are of the form -
|
|
||||||
%% {SQN, LedgerKey}
|
|
||||||
%% Values are of the form
|
|
||||||
%% {Object, IndexSpecs} (as a binary)
|
|
||||||
%%
|
|
||||||
%% IndexSpecs are of the form of a Ledger Key/Value
|
|
||||||
%%
|
|
||||||
%% Tags need to be set during PUT operations and each Tag used must be
|
|
||||||
%% supported in an extract_metadata and a build_metadata_object function clause
|
|
||||||
%%
|
|
||||||
%% Currently the only tags supported are:
|
|
||||||
%% - o (standard objects)
|
|
||||||
%% - o_rkv (riak objects)
|
|
||||||
%% - i (index entries)
|
|
||||||
|
|
||||||
|
|
||||||
-module(leveled_codec).
|
-module(leveled_codec).
|
||||||
|
@ -58,30 +38,20 @@
|
||||||
check_forinkertype/2,
|
check_forinkertype/2,
|
||||||
maybe_compress/2,
|
maybe_compress/2,
|
||||||
create_value_for_journal/3,
|
create_value_for_journal/3,
|
||||||
build_metadata_object/2,
|
|
||||||
generate_ledgerkv/5,
|
generate_ledgerkv/5,
|
||||||
get_size/2,
|
get_size/2,
|
||||||
get_keyandobjhash/2,
|
get_keyandobjhash/2,
|
||||||
idx_indexspecs/5,
|
idx_indexspecs/5,
|
||||||
obj_objectspecs/3,
|
obj_objectspecs/3,
|
||||||
riak_extract_metadata/2,
|
|
||||||
segment_hash/1,
|
segment_hash/1,
|
||||||
to_lookup/1,
|
to_lookup/1,
|
||||||
riak_metadata_to_binary/2,
|
|
||||||
next_key/1]).
|
next_key/1]).
|
||||||
|
|
||||||
-define(V1_VERS, 1).
|
|
||||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
|
||||||
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
|
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
|
||||||
-define(NRT_IDX, "$aae.").
|
-define(NRT_IDX, "$aae.").
|
||||||
|
|
||||||
-type riak_metadata() :: {binary()|delete, % Sibling Metadata
|
|
||||||
binary()|null, % Vclock Metadata
|
|
||||||
integer()|null, % Hash of vclock - non-exportable
|
|
||||||
integer()}. % Size in bytes of real object
|
|
||||||
|
|
||||||
-type tag() ::
|
-type tag() ::
|
||||||
?STD_TAG|?RIAK_TAG|?IDX_TAG|?HEAD_TAG.
|
leveled_head:object_tag()|?IDX_TAG|?HEAD_TAG.
|
||||||
-type key() ::
|
-type key() ::
|
||||||
binary()|string()|{binary(), binary()}.
|
binary()|string()|{binary(), binary()}.
|
||||||
% Keys SHOULD be binary()
|
% Keys SHOULD be binary()
|
||||||
|
@ -113,12 +83,16 @@
|
||||||
{sqn(), ledger_status(), segment_hash(), metadata(), last_moddate()}.
|
{sqn(), ledger_status(), segment_hash(), metadata(), last_moddate()}.
|
||||||
-type ledger_kv() ::
|
-type ledger_kv() ::
|
||||||
{ledger_key(), ledger_value()}.
|
{ledger_key(), ledger_value()}.
|
||||||
|
-type compaction_method() ::
|
||||||
|
retain|skip|recalc.
|
||||||
-type compaction_strategy() ::
|
-type compaction_strategy() ::
|
||||||
list({tag(), retain|skip|recalc}).
|
list({tag(), compaction_method()}).
|
||||||
-type journal_key_tag() ::
|
-type journal_key_tag() ::
|
||||||
?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD.
|
?INKT_STND|?INKT_TOMB|?INKT_MPUT|?INKT_KEYD.
|
||||||
-type journal_key() ::
|
-type journal_key() ::
|
||||||
{integer(), journal_key_tag(), ledger_key()}.
|
{sqn(), journal_key_tag(), ledger_key()}.
|
||||||
|
-type journal_ref() ::
|
||||||
|
{ledger_key(), sqn()}.
|
||||||
-type object_spec_v0() ::
|
-type object_spec_v0() ::
|
||||||
{add|remove, key(), key(), key()|null, any()}.
|
{add|remove, key(), key(), key()|null, any()}.
|
||||||
-type object_spec_v1() ::
|
-type object_spec_v1() ::
|
||||||
|
@ -153,8 +127,10 @@
|
||||||
ledger_value/0,
|
ledger_value/0,
|
||||||
ledger_kv/0,
|
ledger_kv/0,
|
||||||
compaction_strategy/0,
|
compaction_strategy/0,
|
||||||
|
compaction_method/0,
|
||||||
journal_key_tag/0,
|
journal_key_tag/0,
|
||||||
journal_key/0,
|
journal_key/0,
|
||||||
|
journal_ref/0,
|
||||||
compression_method/0,
|
compression_method/0,
|
||||||
journal_keychanges/0,
|
journal_keychanges/0,
|
||||||
index_specs/0,
|
index_specs/0,
|
||||||
|
@ -179,29 +155,8 @@ segment_hash(Key) when is_binary(Key) ->
|
||||||
{segment_hash, SegmentID, ExtraHash, _AltHash}
|
{segment_hash, SegmentID, ExtraHash, _AltHash}
|
||||||
= leveled_tictac:keyto_segment48(Key),
|
= leveled_tictac:keyto_segment48(Key),
|
||||||
{SegmentID, ExtraHash};
|
{SegmentID, ExtraHash};
|
||||||
segment_hash({?RIAK_TAG, Bucket, Key, null})
|
segment_hash(KeyTuple) when is_tuple(KeyTuple) ->
|
||||||
when is_binary(Bucket), is_binary(Key) ->
|
segment_hash(leveled_head:key_to_canonicalbinary(KeyTuple)).
|
||||||
segment_hash(<<Bucket/binary, Key/binary>>);
|
|
||||||
segment_hash({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey})
|
|
||||||
when is_binary(BucketType), is_binary(Bucket) ->
|
|
||||||
segment_hash({?RIAK_TAG,
|
|
||||||
<<BucketType/binary, Bucket/binary>>,
|
|
||||||
Key,
|
|
||||||
SubKey});
|
|
||||||
segment_hash({?HEAD_TAG, Bucket, Key, SubK})
|
|
||||||
when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
|
|
||||||
segment_hash(<<Bucket/binary, Key/binary, SubK/binary>>);
|
|
||||||
segment_hash({?HEAD_TAG, Bucket, Key, _SubK})
|
|
||||||
when is_binary(Bucket), is_binary(Key) ->
|
|
||||||
segment_hash(<<Bucket/binary, Key/binary>>);
|
|
||||||
segment_hash({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
|
|
||||||
when is_binary(BucketType), is_binary(Bucket) ->
|
|
||||||
segment_hash({?HEAD_TAG,
|
|
||||||
<<BucketType/binary, Bucket/binary>>,
|
|
||||||
Key,
|
|
||||||
SubKey});
|
|
||||||
segment_hash(Key) ->
|
|
||||||
segment_hash(term_to_binary(Key)).
|
|
||||||
|
|
||||||
|
|
||||||
-spec to_lookup(ledger_key()) -> maybe_lookup().
|
-spec to_lookup(ledger_key()) -> maybe_lookup().
|
||||||
|
@ -355,7 +310,9 @@ endkey_passed(EndKey, CheckingKey) ->
|
||||||
%% Take the default startegy for compaction, and override the approach for any
|
%% Take the default startegy for compaction, and override the approach for any
|
||||||
%% tags passed in
|
%% tags passed in
|
||||||
inker_reload_strategy(AltList) ->
|
inker_reload_strategy(AltList) ->
|
||||||
ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}],
|
ReloadStrategy0 =
|
||||||
|
lists:map(fun leveled_head:default_reload_strategy/1,
|
||||||
|
leveled_head:defined_objecttags()),
|
||||||
lists:foldl(fun({X, Y}, SList) ->
|
lists:foldl(fun({X, Y}, SList) ->
|
||||||
lists:keyreplace(X, 1, SList, {X, Y})
|
lists:keyreplace(X, 1, SList, {X, Y})
|
||||||
end,
|
end,
|
||||||
|
@ -571,8 +528,6 @@ check_forinkertype(_LedgerKey, head_only) ->
|
||||||
check_forinkertype(_LedgerKey, _Object) ->
|
check_forinkertype(_LedgerKey, _Object) ->
|
||||||
?INKT_STND.
|
?INKT_STND.
|
||||||
|
|
||||||
hash(Obj) ->
|
|
||||||
erlang:phash2(term_to_binary(Obj)).
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -657,8 +612,8 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
||||||
{active, TS}
|
{active, TS}
|
||||||
end,
|
end,
|
||||||
Hash = segment_hash(PrimaryKey),
|
Hash = segment_hash(PrimaryKey),
|
||||||
{MD, LastMods} = extract_metadata(Obj, Size, Tag),
|
{MD, LastMods} = leveled_head:extract_metadata(Tag, Size, Obj),
|
||||||
ObjHash = get_objhash(Tag, MD),
|
ObjHash = leveled_head:get_hash(Tag, MD),
|
||||||
Value = {SQN,
|
Value = {SQN,
|
||||||
Status,
|
Status,
|
||||||
Hash,
|
Hash,
|
||||||
|
@ -679,23 +634,10 @@ get_last_lastmodification(LastMods) ->
|
||||||
{Mega, Sec, _Micro} = lists:max(LastMods),
|
{Mega, Sec, _Micro} = lists:max(LastMods),
|
||||||
Mega * 1000000 + Sec.
|
Mega * 1000000 + Sec.
|
||||||
|
|
||||||
|
|
||||||
extract_metadata(Obj, Size, ?RIAK_TAG) ->
|
|
||||||
riak_extract_metadata(Obj, Size);
|
|
||||||
extract_metadata(Obj, Size, ?STD_TAG) ->
|
|
||||||
{{hash(Obj), Size}, []}.
|
|
||||||
|
|
||||||
get_size(PK, Value) ->
|
get_size(PK, Value) ->
|
||||||
{Tag, _Bucket, _Key, _} = PK,
|
{Tag, _Bucket, _Key, _} = PK,
|
||||||
MD = element(4, Value),
|
MD = element(4, Value),
|
||||||
case Tag of
|
leveled_head:get_size(Tag, MD).
|
||||||
?RIAK_TAG ->
|
|
||||||
{_RMD, _VC, _Hash, Size} = MD,
|
|
||||||
Size;
|
|
||||||
?STD_TAG ->
|
|
||||||
{_Hash, Size} = MD,
|
|
||||||
Size
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec get_keyandobjhash(tuple(), tuple()) -> tuple().
|
-spec get_keyandobjhash(tuple(), tuple()) -> tuple().
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -709,105 +651,9 @@ get_keyandobjhash(LK, Value) ->
|
||||||
?IDX_TAG ->
|
?IDX_TAG ->
|
||||||
from_ledgerkey(LK); % returns {Bucket, Key, IdxValue}
|
from_ledgerkey(LK); % returns {Bucket, Key, IdxValue}
|
||||||
_ ->
|
_ ->
|
||||||
{Bucket, Key, get_objhash(Tag, MD)}
|
{Bucket, Key, leveled_head:get_hash(Tag, MD)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_objhash(Tag, ObjMetaData) ->
|
|
||||||
case Tag of
|
|
||||||
?RIAK_TAG ->
|
|
||||||
{_RMD, _VC, Hash, _Size} = ObjMetaData,
|
|
||||||
Hash;
|
|
||||||
?STD_TAG ->
|
|
||||||
{Hash, _Size} = ObjMetaData,
|
|
||||||
Hash
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
build_metadata_object(PrimaryKey, MD) ->
|
|
||||||
{Tag, _Bucket, _Key, _SubKey} = PrimaryKey,
|
|
||||||
case Tag of
|
|
||||||
?RIAK_TAG ->
|
|
||||||
{SibData, Vclock, _Hash, _Size} = MD,
|
|
||||||
riak_metadata_to_binary(Vclock, SibData);
|
|
||||||
?STD_TAG ->
|
|
||||||
MD;
|
|
||||||
?HEAD_TAG ->
|
|
||||||
MD
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
-spec riak_extract_metadata(binary()|delete, non_neg_integer()) ->
|
|
||||||
{riak_metadata(), list()}.
|
|
||||||
%% @doc
|
|
||||||
%% Riak extract metadata should extract a metadata object which is a
|
|
||||||
%% five-tuple of:
|
|
||||||
%% - Binary of sibling Metadata
|
|
||||||
%% - Binary of vector clock metadata
|
|
||||||
%% - Non-exportable hash of the vector clock metadata
|
|
||||||
%% - The largest last modified date of the object
|
|
||||||
%% - Size of the object
|
|
||||||
%%
|
|
||||||
%% The metadata object should be returned with the full list of last
|
|
||||||
%% modified dates (which will be used for recent anti-entropy index creation)
|
|
||||||
riak_extract_metadata(delete, Size) ->
|
|
||||||
{{delete, null, null, Size}, []};
|
|
||||||
riak_extract_metadata(ObjBin, Size) ->
|
|
||||||
{VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin),
|
|
||||||
{{SibBin,
|
|
||||||
VclockBin,
|
|
||||||
erlang:phash2(lists:sort(binary_to_term(VclockBin))),
|
|
||||||
Size},
|
|
||||||
LastMods}.
|
|
||||||
|
|
||||||
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
|
||||||
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
|
||||||
|
|
||||||
riak_metadata_to_binary(VclockBin, SibMetaBin) ->
|
|
||||||
VclockLen = byte_size(VclockBin),
|
|
||||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer,
|
|
||||||
VclockLen:32/integer, VclockBin/binary,
|
|
||||||
SibMetaBin/binary>>.
|
|
||||||
|
|
||||||
riak_metadata_from_binary(V1Binary) ->
|
|
||||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
|
||||||
Rest/binary>> = V1Binary,
|
|
||||||
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
|
||||||
{SibMetaBin, LastMods} =
|
|
||||||
case SibCount of
|
|
||||||
SC when is_integer(SC) ->
|
|
||||||
get_metadata_from_siblings(SibsBin,
|
|
||||||
SibCount,
|
|
||||||
<<SibCount:32/integer>>,
|
|
||||||
[])
|
|
||||||
end,
|
|
||||||
{VclockBin, SibMetaBin, LastMods}.
|
|
||||||
|
|
||||||
get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) ->
|
|
||||||
{SibMetaBin, LastMods};
|
|
||||||
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
|
|
||||||
SibCount,
|
|
||||||
SibMetaBin,
|
|
||||||
LastMods) ->
|
|
||||||
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
|
|
||||||
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
|
|
||||||
LastMod =
|
|
||||||
case MetaBin of
|
|
||||||
<<MegaSec:32/integer,
|
|
||||||
Sec:32/integer,
|
|
||||||
MicroSec:32/integer,
|
|
||||||
_Rest/binary>> ->
|
|
||||||
{MegaSec, Sec, MicroSec};
|
|
||||||
_ ->
|
|
||||||
{0, 0, 0}
|
|
||||||
end,
|
|
||||||
get_metadata_from_siblings(Rest2,
|
|
||||||
SibCount - 1,
|
|
||||||
<<SibMetaBin/binary,
|
|
||||||
0:32/integer,
|
|
||||||
MetaLen:32/integer,
|
|
||||||
MetaBin:MetaLen/binary>>,
|
|
||||||
[LastMod|LastMods]).
|
|
||||||
|
|
||||||
-spec next_key(key()) -> key().
|
-spec next_key(key()) -> key().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Get the next key to iterate from a given point
|
%% Get the next key to iterate from a given point
|
||||||
|
|
274
src/leveled_head.erl
Normal file
274
src/leveled_head.erl
Normal file
|
@ -0,0 +1,274 @@
|
||||||
|
%% -------- Metadata Seperation - Head and Body ---------
|
||||||
|
%%
|
||||||
|
%% The definition of the part of the object that belongs to the HEAD, and
|
||||||
|
%% the part which belongs to the body.
|
||||||
|
%%
|
||||||
|
%% For the ?RIAK tag this is pre-defined. For the ?STD_TAG there is minimal
|
||||||
|
%% definition. For best use of Riak define a new tag and use pattern matching
|
||||||
|
%% to extend these exported functions.
|
||||||
|
|
||||||
|
-module(leveled_head).
|
||||||
|
|
||||||
|
-include("include/leveled.hrl").
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-export([build_head/2,
|
||||||
|
maybe_build_proxy/4,
|
||||||
|
extract_metadata/3,
|
||||||
|
get_size/2,
|
||||||
|
get_hash/2,
|
||||||
|
default_reload_strategy/1,
|
||||||
|
defined_objecttags/0,
|
||||||
|
key_to_canonicalbinary/1]).
|
||||||
|
|
||||||
|
%% Exported for testing purposes
|
||||||
|
-export([riak_metadata_to_binary/2,
|
||||||
|
riak_extract_metadata/2]).
|
||||||
|
|
||||||
|
|
||||||
|
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||||
|
-define(V1_VERS, 1).
|
||||||
|
|
||||||
|
-type riak_metadata() :: {binary()|delete, % Sibling Metadata
|
||||||
|
binary()|null, % Vclock Metadata
|
||||||
|
non_neg_integer()|null, % Hash of vclock - non-exportable
|
||||||
|
non_neg_integer()}. % Size in bytes of real object
|
||||||
|
-type std_metadata() :: {non_neg_integer()|null, % Hash of value
|
||||||
|
non_neg_integer(), % Size in bytes of real object
|
||||||
|
list(tuple())|undefined}.
|
||||||
|
|
||||||
|
|
||||||
|
-type object_tag() :: ?STD_TAG|?RIAK_TAG.
|
||||||
|
% tags assigned to objects
|
||||||
|
% (not other special entities such as ?HEAD or ?IDX)
|
||||||
|
-type headonly_tag() :: ?HEAD_TAG.
|
||||||
|
% Tag assigned to head_only objects. Behaviour cannot be changed
|
||||||
|
|
||||||
|
-type object_metadata() :: riak_metadata()|std_metadata().
|
||||||
|
-type head_bin() ::
|
||||||
|
binary()|tuple().
|
||||||
|
% TODO:
|
||||||
|
% This is currently not always a binary. Wish is to migrate this so that
|
||||||
|
% it is predictably a binary
|
||||||
|
-type value_fetcher() ::
|
||||||
|
{fun((pid(), leveled_codec:journal_key()) -> any()),
|
||||||
|
pid(), leveled_codec:journal_key()}.
|
||||||
|
% A 2-arity function, which when passed the other two elements of the tuple
|
||||||
|
% will return the value
|
||||||
|
-type proxy_object() ::
|
||||||
|
{proxy_object, head_bin(), non_neg_integer(), value_fetcher()}.
|
||||||
|
% Returns the head, size and a tuple for accessing the value
|
||||||
|
-type proxy_objectbin() ::
|
||||||
|
binary().
|
||||||
|
% using term_to_binary(proxy_object())
|
||||||
|
|
||||||
|
|
||||||
|
-export_type([object_tag/0,
|
||||||
|
proxy_object/0,
|
||||||
|
value_fetcher/0,
|
||||||
|
head_bin/0,
|
||||||
|
proxy_objectbin/0]).
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% External Functions
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec defined_objecttags() -> list(object_tag()).
|
||||||
|
%% @doc
|
||||||
|
%% Return the list of object tags
|
||||||
|
defined_objecttags() ->
|
||||||
|
[?STD_TAG, ?RIAK_TAG].
|
||||||
|
|
||||||
|
|
||||||
|
-spec key_to_canonicalbinary(tuple()) -> binary().
|
||||||
|
%% @doc
|
||||||
|
%% Convert a key to a binary in a consistent way for the tag. The binary will
|
||||||
|
%% then be used to create the hash
|
||||||
|
key_to_canonicalbinary({?RIAK_TAG, Bucket, Key, null})
|
||||||
|
when is_binary(Bucket), is_binary(Key) ->
|
||||||
|
<<Bucket/binary, Key/binary>>;
|
||||||
|
key_to_canonicalbinary({?RIAK_TAG, {BucketType, Bucket}, Key, SubKey})
|
||||||
|
when is_binary(BucketType), is_binary(Bucket) ->
|
||||||
|
key_to_canonicalbinary({?RIAK_TAG,
|
||||||
|
<<BucketType/binary, Bucket/binary>>,
|
||||||
|
Key,
|
||||||
|
SubKey});
|
||||||
|
key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, SubK})
|
||||||
|
when is_binary(Bucket), is_binary(Key), is_binary(SubK) ->
|
||||||
|
<<Bucket/binary, Key/binary, SubK/binary>>;
|
||||||
|
key_to_canonicalbinary({?HEAD_TAG, Bucket, Key, _SubK})
|
||||||
|
when is_binary(Bucket), is_binary(Key) ->
|
||||||
|
<<Bucket/binary, Key/binary>>;
|
||||||
|
key_to_canonicalbinary({?HEAD_TAG, {BucketType, Bucket}, Key, SubKey})
|
||||||
|
when is_binary(BucketType), is_binary(Bucket) ->
|
||||||
|
key_to_canonicalbinary({?HEAD_TAG,
|
||||||
|
<<BucketType/binary, Bucket/binary>>,
|
||||||
|
Key,
|
||||||
|
SubKey});
|
||||||
|
key_to_canonicalbinary(Key) ->
|
||||||
|
term_to_binary(Key).
|
||||||
|
|
||||||
|
-spec build_head(object_tag()|headonly_tag(), object_metadata()) -> head_bin().
|
||||||
|
%% @doc
|
||||||
|
%% Return the object metadata as a binary to be the "head" of the object
|
||||||
|
build_head(?RIAK_TAG, Metadata) ->
|
||||||
|
{SibData, Vclock, _Hash, _Size} = Metadata,
|
||||||
|
riak_metadata_to_binary(Vclock, SibData);
|
||||||
|
build_head(_Tag, Metadata) ->
|
||||||
|
% term_to_binary(Metadata).
|
||||||
|
Metadata.
|
||||||
|
|
||||||
|
|
||||||
|
-spec maybe_build_proxy(object_tag()|headonly_tag(), object_metadata(),
|
||||||
|
pid(), leveled_codec:journal_ref())
|
||||||
|
-> proxy_objectbin()|object_metadata().
|
||||||
|
%% @doc
|
||||||
|
%% Return a proxyObject (e.g. form a head fold, so that the potential fetching
|
||||||
|
%% of an object can be deferred (e.g. it can be make dependent on the
|
||||||
|
%% applictaion making a decision on the contents of the object_metadata
|
||||||
|
maybe_build_proxy(?HEAD_TAG, ObjectMetadata, _InkerClone, _JR) ->
|
||||||
|
% Object has no value - so proxy object makese no sense, just return the
|
||||||
|
% metadata as is
|
||||||
|
ObjectMetadata;
|
||||||
|
maybe_build_proxy(Tag, ObjMetadata, InkerClone, JournalRef) ->
|
||||||
|
Size = get_size(Tag, ObjMetadata),
|
||||||
|
HeadBin = build_head(Tag, ObjMetadata),
|
||||||
|
term_to_binary({proxy_object,
|
||||||
|
HeadBin,
|
||||||
|
Size,
|
||||||
|
{fun leveled_bookie:fetch_value/2,
|
||||||
|
InkerClone,
|
||||||
|
JournalRef}}).
|
||||||
|
|
||||||
|
|
||||||
|
-spec extract_metadata(object_tag()|headonly_tag(), non_neg_integer(), any())
|
||||||
|
-> {object_metadata(), list(erlang:timestamp())}.
|
||||||
|
%% @doc
|
||||||
|
%% Take the inbound object and extract from it the metadata to be stored within
|
||||||
|
%% the ledger (and ultimately returned from a leveled_boookie:book_head/4
|
||||||
|
%% request (after conversion using build_head/2).
|
||||||
|
%%
|
||||||
|
%% As part of the response also return a list of last_modification_dates
|
||||||
|
%% associated with the object - with those dates being expressed as erlang
|
||||||
|
%% timestamps.
|
||||||
|
%%
|
||||||
|
%% The Object Size passed in to this function is as calculated when writing
|
||||||
|
%% the object to the Journal. It may be recalculated here, if an alternative
|
||||||
|
%% view of size is required within the header
|
||||||
|
extract_metadata(?RIAK_TAG, SizeAsStoredInJournal, RiakObj) ->
|
||||||
|
riak_extract_metadata(RiakObj, SizeAsStoredInJournal);
|
||||||
|
extract_metadata(_Tag, SizeAsStoredInJournal, Obj) ->
|
||||||
|
{{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec get_size(object_tag()|headonly_tag(), object_metadata())
|
||||||
|
-> non_neg_integer().
|
||||||
|
%% @doc
|
||||||
|
%% Fetch the size from the metadata
|
||||||
|
get_size(?RIAK_TAG, RiakObjectMetadata) ->
|
||||||
|
element(4, RiakObjectMetadata);
|
||||||
|
get_size(_Tag, ObjectMetadata) ->
|
||||||
|
element(2, ObjectMetadata).
|
||||||
|
|
||||||
|
|
||||||
|
-spec get_hash(object_tag()|headonly_tag(), object_metadata())
|
||||||
|
-> non_neg_integer().
|
||||||
|
%% @doc
|
||||||
|
%% Fetch the hash from the metadata
|
||||||
|
get_hash(?RIAK_TAG, RiakObjectMetadata) ->
|
||||||
|
element(3, RiakObjectMetadata);
|
||||||
|
get_hash(_Tag, ObjectMetadata) ->
|
||||||
|
element(1, ObjectMetadata).
|
||||||
|
|
||||||
|
|
||||||
|
-spec default_reload_strategy(object_tag())
|
||||||
|
-> {object_tag(),
|
||||||
|
leveled_codec:compaction_method()}.
|
||||||
|
%% @doc
|
||||||
|
%% State the compaction_method to be used when reloading the Ledger from the
|
||||||
|
%% journal for each object tag. Note, no compaction startegy required for
|
||||||
|
%% head_only tag
|
||||||
|
default_reload_strategy(Tag) ->
|
||||||
|
{Tag, retain}.
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Tag-specific Functions
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
standard_hash(Obj) ->
|
||||||
|
erlang:phash2(term_to_binary(Obj)).
|
||||||
|
|
||||||
|
|
||||||
|
-spec riak_extract_metadata(binary()|delete, non_neg_integer()) ->
|
||||||
|
{riak_metadata(), list()}.
|
||||||
|
%% @doc
|
||||||
|
%% Riak extract metadata should extract a metadata object which is a
|
||||||
|
%% five-tuple of:
|
||||||
|
%% - Binary of sibling Metadata
|
||||||
|
%% - Binary of vector clock metadata
|
||||||
|
%% - Non-exportable hash of the vector clock metadata
|
||||||
|
%% - The largest last modified date of the object
|
||||||
|
%% - Size of the object
|
||||||
|
%%
|
||||||
|
%% The metadata object should be returned with the full list of last
|
||||||
|
%% modified dates (which will be used for recent anti-entropy index creation)
|
||||||
|
riak_extract_metadata(delete, Size) ->
|
||||||
|
{{delete, null, null, Size}, []};
|
||||||
|
riak_extract_metadata(ObjBin, Size) ->
|
||||||
|
{VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin),
|
||||||
|
{{SibBin,
|
||||||
|
VclockBin,
|
||||||
|
erlang:phash2(lists:sort(binary_to_term(VclockBin))),
|
||||||
|
Size},
|
||||||
|
LastMods}.
|
||||||
|
|
||||||
|
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||||
|
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
||||||
|
|
||||||
|
riak_metadata_to_binary(VclockBin, SibMetaBin) ->
|
||||||
|
VclockLen = byte_size(VclockBin),
|
||||||
|
<<?MAGIC:8/integer, ?V1_VERS:8/integer,
|
||||||
|
VclockLen:32/integer, VclockBin/binary,
|
||||||
|
SibMetaBin/binary>>.
|
||||||
|
|
||||||
|
riak_metadata_from_binary(V1Binary) ->
|
||||||
|
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||||
|
Rest/binary>> = V1Binary,
|
||||||
|
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
||||||
|
{SibMetaBin, LastMods} =
|
||||||
|
case SibCount of
|
||||||
|
SC when is_integer(SC) ->
|
||||||
|
get_metadata_from_siblings(SibsBin,
|
||||||
|
SibCount,
|
||||||
|
<<SibCount:32/integer>>,
|
||||||
|
[])
|
||||||
|
end,
|
||||||
|
{VclockBin, SibMetaBin, LastMods}.
|
||||||
|
|
||||||
|
get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) ->
|
||||||
|
{SibMetaBin, LastMods};
|
||||||
|
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
|
||||||
|
SibCount,
|
||||||
|
SibMetaBin,
|
||||||
|
LastMods) ->
|
||||||
|
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
|
||||||
|
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
|
||||||
|
LastMod =
|
||||||
|
case MetaBin of
|
||||||
|
<<MegaSec:32/integer,
|
||||||
|
Sec:32/integer,
|
||||||
|
MicroSec:32/integer,
|
||||||
|
_Rest/binary>> ->
|
||||||
|
{MegaSec, Sec, MicroSec};
|
||||||
|
_ ->
|
||||||
|
{0, 0, 0}
|
||||||
|
end,
|
||||||
|
get_metadata_from_siblings(Rest2,
|
||||||
|
SibCount - 1,
|
||||||
|
<<SibMetaBin/binary,
|
||||||
|
0:32/integer,
|
||||||
|
MetaLen:32/integer,
|
||||||
|
MetaBin:MetaLen/binary>>,
|
||||||
|
[LastMod|LastMods]).
|
|
@ -678,27 +678,27 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
||||||
end,
|
end,
|
||||||
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
|
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
|
||||||
case DeferredFetch of
|
case DeferredFetch of
|
||||||
{true, true} ->
|
{true, JournalCheck} ->
|
||||||
InJournal =
|
|
||||||
leveled_inker:ink_keycheck(InkerClone,
|
|
||||||
LK,
|
|
||||||
SQN),
|
|
||||||
case InJournal of
|
|
||||||
probably ->
|
|
||||||
ProxyObj =
|
|
||||||
make_proxy_object(Tag,
|
|
||||||
LK, JK, MD, V,
|
|
||||||
InkerClone),
|
|
||||||
FoldObjectsFun(B, K, ProxyObj, Acc);
|
|
||||||
missing ->
|
|
||||||
Acc
|
|
||||||
end;
|
|
||||||
{true, false} ->
|
|
||||||
ProxyObj =
|
ProxyObj =
|
||||||
make_proxy_object(Tag,
|
leveled_head:maybe_build_proxy(Tag,
|
||||||
LK, JK, MD, V,
|
MD,
|
||||||
InkerClone),
|
InkerClone,
|
||||||
FoldObjectsFun(B, K, ProxyObj, Acc);
|
JK),
|
||||||
|
case JournalCheck of
|
||||||
|
true ->
|
||||||
|
InJournal =
|
||||||
|
leveled_inker:ink_keycheck(InkerClone,
|
||||||
|
LK,
|
||||||
|
SQN),
|
||||||
|
case InJournal of
|
||||||
|
probably ->
|
||||||
|
FoldObjectsFun(B, K, ProxyObj, Acc);
|
||||||
|
missing ->
|
||||||
|
Acc
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
FoldObjectsFun(B, K, ProxyObj, Acc)
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
R = leveled_bookie:fetch_value(InkerClone, JK),
|
R = leveled_bookie:fetch_value(InkerClone, JK),
|
||||||
case R of
|
case R of
|
||||||
|
@ -706,7 +706,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
||||||
Acc;
|
Acc;
|
||||||
Value ->
|
Value ->
|
||||||
FoldObjectsFun(B, K, Value, Acc)
|
FoldObjectsFun(B, K, Value, Acc)
|
||||||
|
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
|
@ -716,16 +715,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
||||||
AccFun.
|
AccFun.
|
||||||
|
|
||||||
|
|
||||||
make_proxy_object(?HEAD_TAG, _LK, _JK, MD, _V, _InkerClone) ->
|
|
||||||
MD;
|
|
||||||
make_proxy_object(_Tag, LK, JK, MD, V, InkerClone) ->
|
|
||||||
Size = leveled_codec:get_size(LK, V),
|
|
||||||
MDBin = leveled_codec:build_metadata_object(LK, MD),
|
|
||||||
term_to_binary({proxy_object,
|
|
||||||
MDBin,
|
|
||||||
Size,
|
|
||||||
{fun leveled_bookie:fetch_value/2, InkerClone, JK}}).
|
|
||||||
|
|
||||||
check_presence(Key, Value, InkerClone) ->
|
check_presence(Key, Value, InkerClone) ->
|
||||||
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
||||||
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
||||||
|
|
|
@ -3259,7 +3259,7 @@ nonsense_coverage_test() ->
|
||||||
|
|
||||||
hashmatching_bytreesize_test() ->
|
hashmatching_bytreesize_test() ->
|
||||||
B = <<"Bucket">>,
|
B = <<"Bucket">>,
|
||||||
V = leveled_codec:riak_metadata_to_binary(term_to_binary([{"actor1", 1}]),
|
V = leveled_head:riak_metadata_to_binary(term_to_binary([{"actor1", 1}]),
|
||||||
<<1:32/integer,
|
<<1:32/integer,
|
||||||
0:32/integer,
|
0:32/integer,
|
||||||
0:32/integer>>),
|
0:32/integer>>),
|
||||||
|
|
|
@ -69,9 +69,9 @@ simple_test_withlog(LogLevel, ForcedLogs) ->
|
||||||
ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", "Value2",
|
ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", "Value2",
|
||||||
[{add, "Index1", "Term1"}]),
|
[{add, "Index1", "Term1"}]),
|
||||||
{ok, "Value2"} = leveled_bookie:book_get(Bookie2, "Bucket1", "Key2"),
|
{ok, "Value2"} = leveled_bookie:book_get(Bookie2, "Bucket1", "Key2"),
|
||||||
{ok, {62888926, 60}} = leveled_bookie:book_head(Bookie2,
|
{ok, {62888926, 60, undefined}} = leveled_bookie:book_head(Bookie2,
|
||||||
"Bucket1",
|
"Bucket1",
|
||||||
"Key2"),
|
"Key2"),
|
||||||
testutil:check_formissingobject(Bookie2, "Bucket1", "Key2"),
|
testutil:check_formissingobject(Bookie2, "Bucket1", "Key2"),
|
||||||
ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", <<"Value2">>,
|
ok = leveled_bookie:book_put(Bookie2, "Bucket1", "Key2", <<"Value2">>,
|
||||||
[{remove, "Index1", "Term1"},
|
[{remove, "Index1", "Term1"},
|
||||||
|
|
|
@ -363,7 +363,7 @@ check_forobject(Bookie, TestObject) ->
|
||||||
TestObject#r_object.bucket,
|
TestObject#r_object.bucket,
|
||||||
TestObject#r_object.key),
|
TestObject#r_object.key),
|
||||||
{{_SibMetaBin, Vclock, _Hash, size}, _LMS}
|
{{_SibMetaBin, Vclock, _Hash, size}, _LMS}
|
||||||
= leveled_codec:riak_extract_metadata(HeadBinary, size),
|
= leveled_head:riak_extract_metadata(HeadBinary, size),
|
||||||
true = binary_to_term(Vclock) == TestObject#r_object.vclock.
|
true = binary_to_term(Vclock) == TestObject#r_object.vclock.
|
||||||
|
|
||||||
check_formissingobject(Bookie, Bucket, Key) ->
|
check_formissingobject(Bookie, Bucket, Key) ->
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue