From 954995e23fb43f9412063d7768cdb6e1c182e6b0 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 30 Jun 2017 16:31:22 +0100 Subject: [PATCH] Support for recent AAE index With basic ct test. Doesn't currently prove expiry of index. Doesn't prove ability to find segments. Assumes that either "all" buckets or a special list of buckets require indexing this way. Will lead to unexpected results if the same bucket name is used across different Tags. The format of the index has been chosen so that hopeully standard index features can be used (e.g. return_terms). --- include/leveled.hrl | 2 - src/leveled_bookie.erl | 8 +- src/leveled_codec.erl | 137 ++++++++++------- src/leveled_tictac.erl | 12 +- test/end_to_end/tictac_SUITE.erl | 251 ++++++++++++++++++++++++++++++- 5 files changed, 346 insertions(+), 64 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 6e1b603..bfb0593 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -5,8 +5,6 @@ -define(STD_TAG, o). %% Tag used for secondary index keys -define(IDX_TAG, i). -%% Tag used for near real-time anti-entropy index keys --define(AAE_TAG, i_aae). %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 8b46d81..f2b76db 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1246,7 +1246,7 @@ get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> fun(LK, V, Acc) -> case leveled_codec:is_active(LK, V, Now) of true -> - {B, K, H} = leveled_codec:get_keyandhash(LK, V), + {B, K, H} = leveled_codec:get_keyandobjhash(LK, V), Check = random:uniform() < ?CHECKJOURNAL_PROB, case {JournalCheck, Check} of {check_presence, true} -> @@ -1408,13 +1408,13 @@ preparefor_ledgercache(?INKT_KEYD, preparefor_ledgercache(_InkTag, LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}, AAE) -> - {Bucket, Key, MetaValue, H, LastMods} = + {Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} = leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL), KeyChanges = [{LedgerKey, MetaValue}] ++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++ - leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods), - {H, SQN, KeyChanges}. + leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, ObjH, LastMods), + {KeyH, SQN, KeyChanges}. addto_ledgercache({H, SQN, KeyChanges}, Cache) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 621f997..485a497 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -58,7 +58,7 @@ build_metadata_object/2, generate_ledgerkv/5, get_size/2, - get_keyandhash/2, + get_keyandobjhash/2, idx_indexspecs/5, aae_indexspecs/6, generate_uuid/0, @@ -70,7 +70,8 @@ -define(V1_VERS, 1). -define(MAGIC, 53). % riak_kv -> riak_object -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). - +-define(NRT_IDX, "$aae."). +-define(ALL_BUCKETS, list_to_binary("$all")). -type recent_aae() :: #recent_aae{}. @@ -106,8 +107,6 @@ to_lookup(Key) -> case element(1, Key) of ?IDX_TAG -> no_lookup; - ?AAE_TAG -> - no_lookup; _ -> lookup end. @@ -378,19 +377,24 @@ endkey_passed(EndKey, CheckingKey) -> EndKey < CheckingKey. idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> - lists:map(fun({IndexOp, IdxField, IdxValue}) -> - Status = case IndexOp of - add -> - {active, TTL}; - remove -> - %% TODO: timestamps for delayed reaping - tomb - end, - {to_ledgerkey(Bucket, Key, ?IDX_TAG, - IdxField, IdxValue), - {SQN, Status, no_lookup, null}} - end, - IndexSpecs). + lists:map( + fun({IdxOp, IdxFld, IdxTrm}) -> + gen_indexspec(Bucket, Key, IdxOp, IdxFld, IdxTrm, SQN, TTL) + end, + IndexSpecs + ). + +gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) -> + Status = + case IdxOp of + add -> + {active, TTL}; + remove -> + %% TODO: timestamps for delayed reaping + tomb + end, + {to_ledgerkey(Bucket, Key, ?IDX_TAG, IdxField, IdxTerm), + {SQN, Status, no_lookup, null}}. -spec aae_indexspecs(false|recent_aae(), any(), any(), @@ -411,16 +415,23 @@ aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) -> aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) -> []; aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) -> - InBucket = + Bucket0 = case AAE#recent_aae.buckets of all -> - true; + ?ALL_BUCKETS; ListB -> - lists:member(Bucket, ListB) + case lists:member(Bucket, ListB) of + true -> + Bucket; + false -> + false + end end, - case InBucket of - true -> - GenTagFun = + case Bucket0 of + false -> + []; + Bucket0 -> + GenIdxFun = fun(LMD0, Acc) -> Dates = parse_date(LMD0, AAE#recent_aae.unit_minutes, @@ -431,19 +442,23 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) -> Acc; {LMD1, TTL} -> TreeSize = AAE#recent_aae.tree_size, - SegmentID = + SegID = leveled_tictac:get_segment(Key, TreeSize), - IdxK = {?AAE_TAG, - LMD1, - {{SegmentID, H}, Bucket}, - Key}, - IdxV = {SQN, {active, TTL}, no_lookup, null}, + IdxFldStr = ?NRT_IDX ++ LMD1 ++ "_bin", + IdxTrmStr = + string:right(integer_to_list(SegID), 24, $0) ++ + "." ++ + string:right(integer_to_list(H), 24, $0), + {IdxK, IdxV} = + gen_indexspec(Bucket0, Key, + add, + list_to_binary(IdxFldStr), + list_to_binary(IdxTrmStr), + SQN, TTL), [{IdxK, IdxV}|Acc] end end, - lists:foldl(GenTagFun, [], LastMods); - false -> - [] + lists:foldl(GenIdxFun, [], LastMods) end. -spec parse_date(tuple(), integer(), integer(), integer()) -> @@ -468,12 +483,12 @@ parse_date(LMD, UnitMins, LimitMins, Now) -> lists:flatten(io_lib:format(?LMD_FORMAT, [Y, M, D, Hour, RoundMins])), TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60, - {list_to_binary(StrTime), TTL} + {StrTime, TTL} end. --spec generate_ledgerkv(tuple(), integer(), any(), - integer(), tuple()|infinity) -> - {any(), any(), any(), integer()|no_lookup, list()}. +-spec generate_ledgerkv( + tuple(), integer(), any(), integer(), tuple()|infinity) -> + {any(), any(), any(), {integer()|no_lookup, integer()}, list()}. %% @doc %% Function to extract from an object the information necessary to populate %% the Penciller's ledger. @@ -482,7 +497,8 @@ parse_date(LMD, UnitMins, LimitMins, Now) -> %% Key - original Key extracted from the PrimaryKey %% Value - the value to be used in the Ledger (essentially the extracted %% metadata) -%% Hash - A magic hash of the key to be used in lookups and filters +%% {Hash, ObjHash} - A magic hash of the key to accelerate lookups, and a hash +%% of the value to be used for equality checking between objects %% LastMods - the last modified dates for the object (may be multiple due to %% siblings) generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> @@ -495,11 +511,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> end, Hash = magic_hash(PrimaryKey), {MD, LastMods} = extract_metadata(Obj, Size, Tag), + ObjHash = get_objhash(Tag, MD), Value = {SQN, Status, Hash, MD}, - {Bucket, Key, Value, Hash, LastMods}. + {Bucket, Key, Value, {Hash, ObjHash}, LastMods}. integer_now() -> @@ -525,22 +542,33 @@ get_size(PK, Value) -> {_Hash, Size} = MD, Size end. - -get_keyandhash(LK, Value) -> + +-spec get_keyandobjhash(tuple(), tuple()) -> tuple(). +%% @doc +%% Return a tucple of {Bucket, Key, Hash} where hash is a has of the object +%% not the key (for example with Riak tagged objects this will be a hash of +%% the sorted vclock) +get_keyandobjhash(LK, Value) -> {Tag, Bucket, Key, _} = LK, {_, _, _, MD} = Value, case Tag of - ?RIAK_TAG -> - {_RMD, _VC, Hash, _Size} = MD, - {Bucket, Key, Hash}; - ?STD_TAG -> - {Hash, _Size} = MD, - {Bucket, Key, Hash}; ?IDX_TAG -> - from_ledgerkey(LK) % returns {Bucket, Key, IdxValue} + from_ledgerkey(LK); % returns {Bucket, Key, IdxValue} + _ -> + {Bucket, Key, get_objhash(Tag, MD)} end. - +get_objhash(Tag, ObjMetaData) -> + case Tag of + ?RIAK_TAG -> + {_RMD, _VC, Hash, _Size} = ObjMetaData, + Hash; + ?STD_TAG -> + {Hash, _Size} = ObjMetaData, + Hash + end. + + build_metadata_object(PrimaryKey, MD) -> {Tag, _Bucket, _Key, null} = PrimaryKey, case Tag of @@ -753,8 +781,7 @@ parsedate_test() -> lists:foreach(CheckFun, lists:seq(1, 60)). check_pd(PD, UnitMins) -> - {LMDbin, _TTL} = PD, - LMDstr = binary_to_list(LMDbin), + {LMDstr, _TTL} = PD, Minutes = list_to_integer(lists:nthtail(10, LMDstr)), ?assertMatch(0, Minutes rem UnitMins). @@ -782,6 +809,9 @@ genaaeidx_test() -> LastMods1 = [os:timestamp()], AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1), ?assertMatch(1, length(AAESpecs1)), + IdxB = element(2, element(1, lists:nth(1, AAESpecs1))), + io:format(user, "AAE IDXSpecs1 ~w~n", [AAESpecs1]), + ?assertMatch(<<"$all">>, IdxB), LastMods0 = [], AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0), @@ -793,9 +823,10 @@ genaaeidx_test() -> AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1), ?assertMatch(1, length(AAESpecsB1)), - [{{?AAE_TAG, _LMD, {{SegID, H}, <<"Bucket0">>}, <<"Key1">>}, + [{{?IDX_TAG, <<"Bucket0">>, {Fld, Term}, <<"Key1">>}, {SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1, - ?assertMatch(true, is_integer(SegID)), - ?assertMatch(true, is_integer(TS)). + ?assertMatch(true, is_integer(TS)), + ?assertMatch(49, length(binary_to_list(Term))), + ?assertMatch("$aae.", lists:sublist(binary_to_list(Fld), 5)). -endif. \ No newline at end of file diff --git a/src/leveled_tictac.erl b/src/leveled_tictac.erl index 88d1744..bd7f591 100644 --- a/src/leveled_tictac.erl +++ b/src/leveled_tictac.erl @@ -63,7 +63,8 @@ fetch_root/1, fetch_leaves/2, merge_trees/2, - get_segment/2 + get_segment/2, + tictac_hash/2 ]). @@ -117,7 +118,7 @@ new_tree(TreeID, Size) -> %% based on that key and value add_kv(TicTacTree, Key, Value, HashFun) -> HashV = HashFun(Key, Value), - SegChangeHash = erlang:phash2(Key, HashV), + SegChangeHash = tictac_hash(Key, HashV), Segment = get_segment(Key, TicTacTree#tictactree.segment_count), Level2Pos = @@ -235,6 +236,13 @@ get_segment(Key, SegmentCount) when is_integer(SegmentCount) -> get_segment(Key, TreeSize) -> get_segment(Key, element(3, get_size(TreeSize))). + +-spec tictac_hash(tuple(), any()) -> integer(). +%% @doc +%% Hash the key and term +tictac_hash(Key, Term) -> + erlang:phash2({Key, Term}). + %%%============================================================================ %%% Internal functions %%%============================================================================ diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index 2c2b3f8..33ace07 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -4,14 +4,19 @@ -export([all/0]). -export([ many_put_compare/1, - index_compare/1 + index_compare/1, + recent_aae_noaae/1, + recent_aae_allaae/1 ]). all() -> [ - many_put_compare, - index_compare + % many_put_compare, + % index_compare, + % recent_aae_noaae, + recent_aae_allaae ]. +-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). many_put_compare(_Config) -> TreeSize = small, @@ -419,3 +424,243 @@ index_compare(_Config) -> ok = leveled_bookie:book_close(Book2B), ok = leveled_bookie:book_close(Book2C), ok = leveled_bookie:book_close(Book2D). + + +recent_aae_noaae(_Config) -> + TreeSize = small, + % SegmentCount = 256 * 256, + UnitMins = 2, + + % Test requires multiple different databases, so want to mount them all + % on individual file paths + RootPathA = testutil:reset_filestructure("testA"), + RootPathB = testutil:reset_filestructure("testB"), + RootPathC = testutil:reset_filestructure("testC"), + RootPathD = testutil:reset_filestructure("testD"), + StartOptsA = aae_startopts(RootPathA, false), + StartOptsB = aae_startopts(RootPathB, false), + StartOptsC = aae_startopts(RootPathC, false), + StartOptsD = aae_startopts(RootPathD, false), + + % Book1A to get all objects + {ok, Book1A} = leveled_bookie:book_start(StartOptsA), + % Book1B/C/D will have objects partitioned across it + {ok, Book1B} = leveled_bookie:book_start(StartOptsB), + {ok, Book1C} = leveled_bookie:book_start(StartOptsC), + {ok, Book1D} = leveled_bookie:book_start(StartOptsD), + + {B1, K1, V1, S1, MD} = {"Bucket", + "Key1.1.4567.4321", + "Value1", + [], + [{"MDK1", "MDV1"}]}, + {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD), + + SW_StartLoad = os:timestamp(), + + ok = testutil:book_riakput(Book1A, TestObject, TestSpec), + ok = testutil:book_riakput(Book1B, TestObject, TestSpec), + testutil:check_forobject(Book1A, TestObject), + testutil:check_forobject(Book1B, TestObject), + + {TicTacTreeJoined, TicTacTreeFull, EmptyTree, _LMDIndexes} = + load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D, + SW_StartLoad, TreeSize, UnitMins, + false), + % Go compare! Also confirm we're not comparing empty trees + DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, + TicTacTreeJoined), + + DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree), + true = DL1_0 == [], + true = length(DL1_1) == 0, + + ok = leveled_bookie:book_close(Book1A), + ok = leveled_bookie:book_close(Book1B), + ok = leveled_bookie:book_close(Book1C), + ok = leveled_bookie:book_close(Book1D). + + +recent_aae_allaae(_Config) -> + TreeSize = small, + % SegmentCount = 256 * 256, + UnitMins = 2, + AAE = {all, 60, UnitMins}, + + % Test requires multiple different databases, so want to mount them all + % on individual file paths + RootPathA = testutil:reset_filestructure("testA"), + RootPathB = testutil:reset_filestructure("testB"), + RootPathC = testutil:reset_filestructure("testC"), + RootPathD = testutil:reset_filestructure("testD"), + StartOptsA = aae_startopts(RootPathA, AAE), + StartOptsB = aae_startopts(RootPathB, AAE), + StartOptsC = aae_startopts(RootPathC, AAE), + StartOptsD = aae_startopts(RootPathD, AAE), + + % Book1A to get all objects + {ok, Book1A} = leveled_bookie:book_start(StartOptsA), + % Book1B/C/D will have objects partitioned across it + {ok, Book1B} = leveled_bookie:book_start(StartOptsB), + {ok, Book1C} = leveled_bookie:book_start(StartOptsC), + {ok, Book1D} = leveled_bookie:book_start(StartOptsD), + + {B1, K1, V1, S1, MD} = {"Bucket", + "Key1.1.4567.4321", + "Value1", + [], + [{"MDK1", "MDV1"}]}, + {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD), + + SW_StartLoad = os:timestamp(), + + ok = testutil:book_riakput(Book1A, TestObject, TestSpec), + ok = testutil:book_riakput(Book1B, TestObject, TestSpec), + testutil:check_forobject(Book1A, TestObject), + testutil:check_forobject(Book1B, TestObject), + + {TicTacTreeJoined, TicTacTreeFull, EmptyTree, _LMDIndexes} = + load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D, + SW_StartLoad, TreeSize, UnitMins, + false), + % Go compare! Also confirm we're not comparing empty trees + DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, + TicTacTreeJoined), + + DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree), + true = DL1_0 == [], + true = length(DL1_1) > 100, + + ok = leveled_bookie:book_close(Book1A), + ok = leveled_bookie:book_close(Book1B), + ok = leveled_bookie:book_close(Book1C), + ok = leveled_bookie:book_close(Book1D). + + +load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D, + SW_StartLoad, TreeSize, UnitMins, + LMDIndexes_Loaded) -> + + LMDIndexes = + case LMDIndexes_Loaded of + false -> + % Generate nine lists of objects + % BucketBin = list_to_binary("Bucket"), + GenMapFun = + fun(_X) -> + V = testutil:get_compressiblevalue(), + Indexes = testutil:get_randomindexes_generator(8), + testutil:generate_objects(5000, + binary_uuid, + [], + V, + Indexes) + end, + + ObjLists = lists:map(GenMapFun, lists:seq(1, 9)), + + % Load all nine lists into Book1A + lists:foreach(fun(ObjL) -> testutil:riakload(Book1A, ObjL) end, + ObjLists), + + % Split nine lists across Book1B to Book1D, three object lists + % in each + lists:foreach(fun(ObjL) -> testutil:riakload(Book1B, ObjL) end, + lists:sublist(ObjLists, 1, 3)), + lists:foreach(fun(ObjL) -> testutil:riakload(Book1C, ObjL) end, + lists:sublist(ObjLists, 4, 3)), + lists:foreach(fun(ObjL) -> testutil:riakload(Book1D, ObjL) end, + lists:sublist(ObjLists, 7, 3)), + + SW_EndLoad = os:timestamp(), + determine_lmd_indexes(SW_StartLoad, SW_EndLoad, UnitMins); + _ -> + LMDIndexes_Loaded + end, + + EmptyTree = leveled_tictac:new_tree(empty, TreeSize), + + GetTicTacTreeFun = + fun(Bookie) -> + fun(LMD, Acc) -> + SW = os:timestamp(), + ST = <<"0">>, + ET = <<"A">>, + Q = {tictactree_idx, + {<<"$all">>, + list_to_binary("$aae." ++ LMD ++ "_bin"), + ST, + ET}, + TreeSize, + fun(_B, _K) -> accumulate end}, + {async, Folder} = leveled_bookie:book_returnfolder(Bookie, Q), + R = Folder(), + io:format("TicTac Tree for index ~w took " ++ + "~w microseconds~n", + [LMD, timer:now_diff(os:timestamp(), SW)]), + leveled_tictac:merge_trees(R, Acc) + end + end, + + % Get a TicTac tree representing one of the indexes in Bucket A + TicTacTree1_Full = + lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes), + + TicTacTree1_P1 = + lists:foldl(GetTicTacTreeFun(Book1B), EmptyTree, LMDIndexes), + TicTacTree1_P2 = + lists:foldl(GetTicTacTreeFun(Book1C), EmptyTree, LMDIndexes), + TicTacTree1_P3 = + lists:foldl(GetTicTacTreeFun(Book1D), EmptyTree, LMDIndexes), + + % Merge the tree across the partitions + TicTacTree1_Joined = lists:foldl(fun leveled_tictac:merge_trees/2, + TicTacTree1_P1, + [TicTacTree1_P2, TicTacTree1_P3]), + + {TicTacTree1_Full, TicTacTree1_Joined, EmptyTree, LMDIndexes}. + + +aae_startopts(RootPath, AAE) -> + LS = 2000, + JS = 50000000, + SS = testutil:sync_strategy(), + [{root_path, RootPath}, + {sync_strategy, SS}, + {cache_size, LS}, + {max_journalsize, JS}, + {recent_aae, AAE}]. + + +determine_lmd_indexes(StartTS, EndTS, UnitMins) -> + StartDT = calendar:now_to_datetime(StartTS), + EndDT = calendar:now_to_datetime(EndTS), + StartTimeStr = get_strtime(StartDT, UnitMins), + EndTimeStr = get_strtime(EndDT, UnitMins), + + AddTimeFun = + fun(X, Acc) -> + case lists:member(EndTimeStr, Acc) of + true -> + Acc; + false -> + NextTime = + 300 * X + + calendar:datetime_to_gregorian_seconds(StartDT), + NextDT = + calendar:gregorian_seconds_to_datetime(NextTime), + Acc ++ [get_strtime(NextDT, UnitMins)] + end + end, + + lists:foldl(AddTimeFun, [StartTimeStr], lists:seq(1, 5)). + + +get_strtime(DateTime, UnitMins) -> + {{Y, M, D}, {Hour, Minute, _Second}} = DateTime, + RoundMins = + UnitMins * (Minute div UnitMins), + StrTime = + lists:flatten(io_lib:format(?LMD_FORMAT, + [Y, M, D, Hour, RoundMins])), + StrTime. \ No newline at end of file