Support for recent AAE index
With basic ct test. Doesn't currently prove expiry of index. Doesn't prove ability to find segments. Assumes that either "all" buckets or a special list of buckets require indexing this way. Will lead to unexpected results if the same bucket name is used across different Tags. The format of the index has been chosen so that hopeully standard index features can be used (e.g. return_terms).
This commit is contained in:
parent
8da8722b9e
commit
954995e23f
5 changed files with 346 additions and 64 deletions
|
@ -58,7 +58,7 @@
|
|||
build_metadata_object/2,
|
||||
generate_ledgerkv/5,
|
||||
get_size/2,
|
||||
get_keyandhash/2,
|
||||
get_keyandobjhash/2,
|
||||
idx_indexspecs/5,
|
||||
aae_indexspecs/6,
|
||||
generate_uuid/0,
|
||||
|
@ -70,7 +70,8 @@
|
|||
-define(V1_VERS, 1).
|
||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
|
||||
|
||||
-define(NRT_IDX, "$aae.").
|
||||
-define(ALL_BUCKETS, list_to_binary("$all")).
|
||||
|
||||
-type recent_aae() :: #recent_aae{}.
|
||||
|
||||
|
@ -106,8 +107,6 @@ to_lookup(Key) ->
|
|||
case element(1, Key) of
|
||||
?IDX_TAG ->
|
||||
no_lookup;
|
||||
?AAE_TAG ->
|
||||
no_lookup;
|
||||
_ ->
|
||||
lookup
|
||||
end.
|
||||
|
@ -378,19 +377,24 @@ endkey_passed(EndKey, CheckingKey) ->
|
|||
EndKey < CheckingKey.
|
||||
|
||||
idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
|
||||
lists:map(fun({IndexOp, IdxField, IdxValue}) ->
|
||||
Status = case IndexOp of
|
||||
add ->
|
||||
{active, TTL};
|
||||
remove ->
|
||||
%% TODO: timestamps for delayed reaping
|
||||
tomb
|
||||
end,
|
||||
{to_ledgerkey(Bucket, Key, ?IDX_TAG,
|
||||
IdxField, IdxValue),
|
||||
{SQN, Status, no_lookup, null}}
|
||||
end,
|
||||
IndexSpecs).
|
||||
lists:map(
|
||||
fun({IdxOp, IdxFld, IdxTrm}) ->
|
||||
gen_indexspec(Bucket, Key, IdxOp, IdxFld, IdxTrm, SQN, TTL)
|
||||
end,
|
||||
IndexSpecs
|
||||
).
|
||||
|
||||
gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) ->
|
||||
Status =
|
||||
case IdxOp of
|
||||
add ->
|
||||
{active, TTL};
|
||||
remove ->
|
||||
%% TODO: timestamps for delayed reaping
|
||||
tomb
|
||||
end,
|
||||
{to_ledgerkey(Bucket, Key, ?IDX_TAG, IdxField, IdxTerm),
|
||||
{SQN, Status, no_lookup, null}}.
|
||||
|
||||
-spec aae_indexspecs(false|recent_aae(),
|
||||
any(), any(),
|
||||
|
@ -411,16 +415,23 @@ aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) ->
|
|||
aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) ->
|
||||
[];
|
||||
aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
|
||||
InBucket =
|
||||
Bucket0 =
|
||||
case AAE#recent_aae.buckets of
|
||||
all ->
|
||||
true;
|
||||
?ALL_BUCKETS;
|
||||
ListB ->
|
||||
lists:member(Bucket, ListB)
|
||||
case lists:member(Bucket, ListB) of
|
||||
true ->
|
||||
Bucket;
|
||||
false ->
|
||||
false
|
||||
end
|
||||
end,
|
||||
case InBucket of
|
||||
true ->
|
||||
GenTagFun =
|
||||
case Bucket0 of
|
||||
false ->
|
||||
[];
|
||||
Bucket0 ->
|
||||
GenIdxFun =
|
||||
fun(LMD0, Acc) ->
|
||||
Dates = parse_date(LMD0,
|
||||
AAE#recent_aae.unit_minutes,
|
||||
|
@ -431,19 +442,23 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
|
|||
Acc;
|
||||
{LMD1, TTL} ->
|
||||
TreeSize = AAE#recent_aae.tree_size,
|
||||
SegmentID =
|
||||
SegID =
|
||||
leveled_tictac:get_segment(Key, TreeSize),
|
||||
IdxK = {?AAE_TAG,
|
||||
LMD1,
|
||||
{{SegmentID, H}, Bucket},
|
||||
Key},
|
||||
IdxV = {SQN, {active, TTL}, no_lookup, null},
|
||||
IdxFldStr = ?NRT_IDX ++ LMD1 ++ "_bin",
|
||||
IdxTrmStr =
|
||||
string:right(integer_to_list(SegID), 24, $0) ++
|
||||
"." ++
|
||||
string:right(integer_to_list(H), 24, $0),
|
||||
{IdxK, IdxV} =
|
||||
gen_indexspec(Bucket0, Key,
|
||||
add,
|
||||
list_to_binary(IdxFldStr),
|
||||
list_to_binary(IdxTrmStr),
|
||||
SQN, TTL),
|
||||
[{IdxK, IdxV}|Acc]
|
||||
end
|
||||
end,
|
||||
lists:foldl(GenTagFun, [], LastMods);
|
||||
false ->
|
||||
[]
|
||||
lists:foldl(GenIdxFun, [], LastMods)
|
||||
end.
|
||||
|
||||
-spec parse_date(tuple(), integer(), integer(), integer()) ->
|
||||
|
@ -468,12 +483,12 @@ parse_date(LMD, UnitMins, LimitMins, Now) ->
|
|||
lists:flatten(io_lib:format(?LMD_FORMAT,
|
||||
[Y, M, D, Hour, RoundMins])),
|
||||
TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60,
|
||||
{list_to_binary(StrTime), TTL}
|
||||
{StrTime, TTL}
|
||||
end.
|
||||
|
||||
-spec generate_ledgerkv(tuple(), integer(), any(),
|
||||
integer(), tuple()|infinity) ->
|
||||
{any(), any(), any(), integer()|no_lookup, list()}.
|
||||
-spec generate_ledgerkv(
|
||||
tuple(), integer(), any(), integer(), tuple()|infinity) ->
|
||||
{any(), any(), any(), {integer()|no_lookup, integer()}, list()}.
|
||||
%% @doc
|
||||
%% Function to extract from an object the information necessary to populate
|
||||
%% the Penciller's ledger.
|
||||
|
@ -482,7 +497,8 @@ parse_date(LMD, UnitMins, LimitMins, Now) ->
|
|||
%% Key - original Key extracted from the PrimaryKey
|
||||
%% Value - the value to be used in the Ledger (essentially the extracted
|
||||
%% metadata)
|
||||
%% Hash - A magic hash of the key to be used in lookups and filters
|
||||
%% {Hash, ObjHash} - A magic hash of the key to accelerate lookups, and a hash
|
||||
%% of the value to be used for equality checking between objects
|
||||
%% LastMods - the last modified dates for the object (may be multiple due to
|
||||
%% siblings)
|
||||
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
||||
|
@ -495,11 +511,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
|||
end,
|
||||
Hash = magic_hash(PrimaryKey),
|
||||
{MD, LastMods} = extract_metadata(Obj, Size, Tag),
|
||||
ObjHash = get_objhash(Tag, MD),
|
||||
Value = {SQN,
|
||||
Status,
|
||||
Hash,
|
||||
MD},
|
||||
{Bucket, Key, Value, Hash, LastMods}.
|
||||
{Bucket, Key, Value, {Hash, ObjHash}, LastMods}.
|
||||
|
||||
|
||||
integer_now() ->
|
||||
|
@ -525,22 +542,33 @@ get_size(PK, Value) ->
|
|||
{_Hash, Size} = MD,
|
||||
Size
|
||||
end.
|
||||
|
||||
get_keyandhash(LK, Value) ->
|
||||
|
||||
-spec get_keyandobjhash(tuple(), tuple()) -> tuple().
|
||||
%% @doc
|
||||
%% Return a tucple of {Bucket, Key, Hash} where hash is a has of the object
|
||||
%% not the key (for example with Riak tagged objects this will be a hash of
|
||||
%% the sorted vclock)
|
||||
get_keyandobjhash(LK, Value) ->
|
||||
{Tag, Bucket, Key, _} = LK,
|
||||
{_, _, _, MD} = Value,
|
||||
case Tag of
|
||||
?RIAK_TAG ->
|
||||
{_RMD, _VC, Hash, _Size} = MD,
|
||||
{Bucket, Key, Hash};
|
||||
?STD_TAG ->
|
||||
{Hash, _Size} = MD,
|
||||
{Bucket, Key, Hash};
|
||||
?IDX_TAG ->
|
||||
from_ledgerkey(LK) % returns {Bucket, Key, IdxValue}
|
||||
from_ledgerkey(LK); % returns {Bucket, Key, IdxValue}
|
||||
_ ->
|
||||
{Bucket, Key, get_objhash(Tag, MD)}
|
||||
end.
|
||||
|
||||
|
||||
get_objhash(Tag, ObjMetaData) ->
|
||||
case Tag of
|
||||
?RIAK_TAG ->
|
||||
{_RMD, _VC, Hash, _Size} = ObjMetaData,
|
||||
Hash;
|
||||
?STD_TAG ->
|
||||
{Hash, _Size} = ObjMetaData,
|
||||
Hash
|
||||
end.
|
||||
|
||||
|
||||
build_metadata_object(PrimaryKey, MD) ->
|
||||
{Tag, _Bucket, _Key, null} = PrimaryKey,
|
||||
case Tag of
|
||||
|
@ -753,8 +781,7 @@ parsedate_test() ->
|
|||
lists:foreach(CheckFun, lists:seq(1, 60)).
|
||||
|
||||
check_pd(PD, UnitMins) ->
|
||||
{LMDbin, _TTL} = PD,
|
||||
LMDstr = binary_to_list(LMDbin),
|
||||
{LMDstr, _TTL} = PD,
|
||||
Minutes = list_to_integer(lists:nthtail(10, LMDstr)),
|
||||
?assertMatch(0, Minutes rem UnitMins).
|
||||
|
||||
|
@ -782,6 +809,9 @@ genaaeidx_test() ->
|
|||
LastMods1 = [os:timestamp()],
|
||||
AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1),
|
||||
?assertMatch(1, length(AAESpecs1)),
|
||||
IdxB = element(2, element(1, lists:nth(1, AAESpecs1))),
|
||||
io:format(user, "AAE IDXSpecs1 ~w~n", [AAESpecs1]),
|
||||
?assertMatch(<<"$all">>, IdxB),
|
||||
|
||||
LastMods0 = [],
|
||||
AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0),
|
||||
|
@ -793,9 +823,10 @@ genaaeidx_test() ->
|
|||
AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1),
|
||||
|
||||
?assertMatch(1, length(AAESpecsB1)),
|
||||
[{{?AAE_TAG, _LMD, {{SegID, H}, <<"Bucket0">>}, <<"Key1">>},
|
||||
[{{?IDX_TAG, <<"Bucket0">>, {Fld, Term}, <<"Key1">>},
|
||||
{SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1,
|
||||
?assertMatch(true, is_integer(SegID)),
|
||||
?assertMatch(true, is_integer(TS)).
|
||||
?assertMatch(true, is_integer(TS)),
|
||||
?assertMatch(49, length(binary_to_list(Term))),
|
||||
?assertMatch("$aae.", lists:sublist(binary_to_list(Fld), 5)).
|
||||
|
||||
-endif.
|
Loading…
Add table
Add a link
Reference in a new issue