Extract Last Modified Date from Riak Object
As part of process to supporting a recent changes index for near-real-time anti-entropy
This commit is contained in:
parent
f81a4bca0d
commit
ebef27f021
4 changed files with 64 additions and 51 deletions
|
@ -1383,25 +1383,17 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
|
||||||
|
|
||||||
|
|
||||||
preparefor_ledgercache(?INKT_KEYD,
|
preparefor_ledgercache(?INKT_KEYD,
|
||||||
LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) ->
|
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) ->
|
||||||
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
|
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
|
||||||
KeyChanges = leveled_codec:convert_indexspecs(IndexSpecs,
|
KeyChanges =
|
||||||
Bucket,
|
leveled_codec:convert_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
|
||||||
Key,
|
|
||||||
SQN,
|
|
||||||
TTL),
|
|
||||||
{no_lookup, SQN, KeyChanges};
|
{no_lookup, SQN, KeyChanges};
|
||||||
preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) ->
|
preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) ->
|
||||||
{Bucket, Key, ObjKeyChange, H} = leveled_codec:generate_ledgerkv(LedgerKey,
|
{Bucket, Key, MetaValue, H, _LastMods} =
|
||||||
SQN,
|
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
|
||||||
Obj,
|
KeyChanges =
|
||||||
Size,
|
[{LedgerKey, MetaValue}] ++
|
||||||
TTL),
|
leveled_codec:convert_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
|
||||||
KeyChanges = [ObjKeyChange] ++ leveled_codec:convert_indexspecs(IndexSpecs,
|
|
||||||
Bucket,
|
|
||||||
Key,
|
|
||||||
SQN,
|
|
||||||
TTL),
|
|
||||||
{H, SQN, KeyChanges}.
|
{H, SQN, KeyChanges}.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -378,6 +378,20 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
|
||||||
end,
|
end,
|
||||||
IndexSpecs).
|
IndexSpecs).
|
||||||
|
|
||||||
|
-spec generate_ledgerkv(tuple(), integer(), any(),
|
||||||
|
integer(), tuple()|infinity) ->
|
||||||
|
{any(), any(), any(), integer()|no_lookup, list()}.
|
||||||
|
%% @doc
|
||||||
|
%% Function to extract from an object the information necessary to populate
|
||||||
|
%% the Penciller's ledger.
|
||||||
|
%% Outputs -
|
||||||
|
%% Bucket - original Bucket extracted from the PrimaryKey
|
||||||
|
%% Key - original Key extracted from the PrimaryKey
|
||||||
|
%% Value - the value to be used in the Ledger (essentially the extracted
|
||||||
|
%% metadata)
|
||||||
|
%% Hash - A magic hash of the key to be used in lookups and filters
|
||||||
|
%% LastMods - the last modified dates for the object (may be multiple due to
|
||||||
|
%% siblings)
|
||||||
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
||||||
{Tag, Bucket, Key, _} = PrimaryKey,
|
{Tag, Bucket, Key, _} = PrimaryKey,
|
||||||
Status = case Obj of
|
Status = case Obj of
|
||||||
|
@ -387,11 +401,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
||||||
{active, TS}
|
{active, TS}
|
||||||
end,
|
end,
|
||||||
Hash = magic_hash(PrimaryKey),
|
Hash = magic_hash(PrimaryKey),
|
||||||
|
{MD, LastMods} = extract_metadata(Obj, Size, Tag),
|
||||||
Value = {SQN,
|
Value = {SQN,
|
||||||
Status,
|
Status,
|
||||||
Hash,
|
Hash,
|
||||||
extract_metadata(Obj, Size, Tag)},
|
MD},
|
||||||
{Bucket, Key, {PrimaryKey, Value}, Hash}.
|
{Bucket, Key, Value, Hash, LastMods}.
|
||||||
|
|
||||||
|
|
||||||
integer_now() ->
|
integer_now() ->
|
||||||
|
@ -404,7 +419,7 @@ integer_time(TS) ->
|
||||||
extract_metadata(Obj, Size, ?RIAK_TAG) ->
|
extract_metadata(Obj, Size, ?RIAK_TAG) ->
|
||||||
riak_extract_metadata(Obj, Size);
|
riak_extract_metadata(Obj, Size);
|
||||||
extract_metadata(Obj, Size, ?STD_TAG) ->
|
extract_metadata(Obj, Size, ?STD_TAG) ->
|
||||||
{hash(Obj), Size}.
|
{{hash(Obj), Size}, []}.
|
||||||
|
|
||||||
get_size(PK, Value) ->
|
get_size(PK, Value) ->
|
||||||
{Tag, _Bucket, _Key, _} = PK,
|
{Tag, _Bucket, _Key, _} = PK,
|
||||||
|
@ -445,13 +460,14 @@ build_metadata_object(PrimaryKey, MD) ->
|
||||||
|
|
||||||
|
|
||||||
riak_extract_metadata(delete, Size) ->
|
riak_extract_metadata(delete, Size) ->
|
||||||
{delete, null, null, Size};
|
{{delete, null, null, Size}, []};
|
||||||
riak_extract_metadata(ObjBin, Size) ->
|
riak_extract_metadata(ObjBin, Size) ->
|
||||||
{VclockBin, SibBin} = riak_metadata_from_binary(ObjBin),
|
{VclockBin, SibBin, LastMods} = riak_metadata_from_binary(ObjBin),
|
||||||
{SibBin,
|
{{SibBin,
|
||||||
VclockBin,
|
VclockBin,
|
||||||
erlang:phash2(lists:sort(binary_to_term(VclockBin))),
|
erlang:phash2(lists:sort(binary_to_term(VclockBin))),
|
||||||
Size}.
|
Size},
|
||||||
|
LastMods}.
|
||||||
|
|
||||||
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||||
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
||||||
|
@ -466,28 +482,41 @@ riak_metadata_from_binary(V1Binary) ->
|
||||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||||
Rest/binary>> = V1Binary,
|
Rest/binary>> = V1Binary,
|
||||||
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
||||||
SibMetaBin =
|
{SibMetaBin, LastMods} =
|
||||||
case SibCount of
|
case SibCount of
|
||||||
SC when is_integer(SC) ->
|
SC when is_integer(SC) ->
|
||||||
get_metadata_from_siblings(SibsBin,
|
get_metadata_from_siblings(SibsBin,
|
||||||
SibCount,
|
SibCount,
|
||||||
<<SibCount:32/integer>>)
|
<<SibCount:32/integer>>,
|
||||||
|
[])
|
||||||
end,
|
end,
|
||||||
{VclockBin, SibMetaBin}.
|
{VclockBin, SibMetaBin, LastMods}.
|
||||||
|
|
||||||
get_metadata_from_siblings(<<>>, 0, SibMetaBin) ->
|
get_metadata_from_siblings(<<>>, 0, SibMetaBin, LastMods) ->
|
||||||
SibMetaBin;
|
{SibMetaBin, LastMods};
|
||||||
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
|
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
|
||||||
SibCount,
|
SibCount,
|
||||||
SibMetaBin) ->
|
SibMetaBin,
|
||||||
|
LastMods) ->
|
||||||
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
|
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
|
||||||
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
|
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
|
||||||
|
LastMod =
|
||||||
|
case MetaBin of
|
||||||
|
<<MegaSec:32/integer,
|
||||||
|
Sec:32/integer,
|
||||||
|
MicroSec:32/integer,
|
||||||
|
_Rest/binary>> ->
|
||||||
|
{MegaSec, Sec, MicroSec};
|
||||||
|
_ ->
|
||||||
|
{0, 0, 0}
|
||||||
|
end,
|
||||||
get_metadata_from_siblings(Rest2,
|
get_metadata_from_siblings(Rest2,
|
||||||
SibCount - 1,
|
SibCount - 1,
|
||||||
<<SibMetaBin/binary,
|
<<SibMetaBin/binary,
|
||||||
0:32/integer,
|
0:32/integer,
|
||||||
MetaLen:32/integer,
|
MetaLen:32/integer,
|
||||||
MetaBin:MetaLen/binary>>).
|
MetaBin:MetaLen/binary>>,
|
||||||
|
[LastMod|LastMods]).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1481,17 +1481,13 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
|
||||||
BRand = random:uniform(BRange),
|
BRand = random:uniform(BRange),
|
||||||
BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0),
|
BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0),
|
||||||
KNumber = string:right(integer_to_list(random:uniform(1000)), 6, $0),
|
KNumber = string:right(integer_to_list(random:uniform(1000)), 6, $0),
|
||||||
LedgerKey = leveled_codec:to_ledgerkey("Bucket" ++ BNumber,
|
LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o),
|
||||||
"Key" ++ KNumber,
|
Chunk = crypto:rand_bytes(64),
|
||||||
o),
|
{_B, _K, MV, _H, _LMs} =
|
||||||
{_B, _K, KV, _H} = leveled_codec:generate_ledgerkv(LedgerKey,
|
leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity),
|
||||||
Seqn,
|
|
||||||
crypto:rand_bytes(64),
|
|
||||||
64,
|
|
||||||
infinity),
|
|
||||||
generate_randomkeys(Seqn + 1,
|
generate_randomkeys(Seqn + 1,
|
||||||
Count - 1,
|
Count - 1,
|
||||||
[KV|Acc],
|
[{LK, MV}|Acc],
|
||||||
BucketLow,
|
BucketLow,
|
||||||
BRange).
|
BRange).
|
||||||
|
|
||||||
|
|
|
@ -238,17 +238,13 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
|
||||||
BRand = random:uniform(BRange),
|
BRand = random:uniform(BRange),
|
||||||
BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0),
|
BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0),
|
||||||
KNumber = string:right(integer_to_list(random:uniform(10000)), 6, $0),
|
KNumber = string:right(integer_to_list(random:uniform(10000)), 6, $0),
|
||||||
LedgerKey = leveled_codec:to_ledgerkey("Bucket" ++ BNumber,
|
LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o),
|
||||||
"Key" ++ KNumber,
|
Chunk = crypto:rand_bytes(64),
|
||||||
o),
|
{_B, _K, MV, _H, _LMs} =
|
||||||
{_B, _K, KV, _H} = leveled_codec:generate_ledgerkv(LedgerKey,
|
leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity),
|
||||||
Seqn,
|
|
||||||
crypto:rand_bytes(64),
|
|
||||||
64,
|
|
||||||
infinity),
|
|
||||||
generate_randomkeys(Seqn + 1,
|
generate_randomkeys(Seqn + 1,
|
||||||
Count - 1,
|
Count - 1,
|
||||||
[KV|Acc],
|
[{LK, MV}|Acc],
|
||||||
BucketLow,
|
BucketLow,
|
||||||
BRange).
|
BRange).
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue