diff --git a/include/leveled.hrl b/include/leveled.hrl index 6e1b603..bfb0593 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -5,8 +5,6 @@ -define(STD_TAG, o). %% Tag used for secondary index keys -define(IDX_TAG, i). -%% Tag used for near real-time anti-entropy index keys --define(AAE_TAG, i_aae). %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 8b46d81..f2b76db 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1246,7 +1246,7 @@ get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> fun(LK, V, Acc) -> case leveled_codec:is_active(LK, V, Now) of true -> - {B, K, H} = leveled_codec:get_keyandhash(LK, V), + {B, K, H} = leveled_codec:get_keyandobjhash(LK, V), Check = random:uniform() < ?CHECKJOURNAL_PROB, case {JournalCheck, Check} of {check_presence, true} -> @@ -1408,13 +1408,13 @@ preparefor_ledgercache(?INKT_KEYD, preparefor_ledgercache(_InkTag, LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}, AAE) -> - {Bucket, Key, MetaValue, H, LastMods} = + {Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} = leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL), KeyChanges = [{LedgerKey, MetaValue}] ++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++ - leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods), - {H, SQN, KeyChanges}. + leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, ObjH, LastMods), + {KeyH, SQN, KeyChanges}. addto_ledgercache({H, SQN, KeyChanges}, Cache) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 621f997..485a497 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -58,7 +58,7 @@ build_metadata_object/2, generate_ledgerkv/5, get_size/2, - get_keyandhash/2, + get_keyandobjhash/2, idx_indexspecs/5, aae_indexspecs/6, generate_uuid/0, @@ -70,7 +70,8 @@ -define(V1_VERS, 1). -define(MAGIC, 53). % riak_kv -> riak_object -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). - +-define(NRT_IDX, "$aae."). +-define(ALL_BUCKETS, list_to_binary("$all")). -type recent_aae() :: #recent_aae{}. @@ -106,8 +107,6 @@ to_lookup(Key) -> case element(1, Key) of ?IDX_TAG -> no_lookup; - ?AAE_TAG -> - no_lookup; _ -> lookup end. @@ -378,19 +377,24 @@ endkey_passed(EndKey, CheckingKey) -> EndKey < CheckingKey. idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> - lists:map(fun({IndexOp, IdxField, IdxValue}) -> - Status = case IndexOp of - add -> - {active, TTL}; - remove -> - %% TODO: timestamps for delayed reaping - tomb - end, - {to_ledgerkey(Bucket, Key, ?IDX_TAG, - IdxField, IdxValue), - {SQN, Status, no_lookup, null}} - end, - IndexSpecs). + lists:map( + fun({IdxOp, IdxFld, IdxTrm}) -> + gen_indexspec(Bucket, Key, IdxOp, IdxFld, IdxTrm, SQN, TTL) + end, + IndexSpecs + ). + +gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) -> + Status = + case IdxOp of + add -> + {active, TTL}; + remove -> + %% TODO: timestamps for delayed reaping + tomb + end, + {to_ledgerkey(Bucket, Key, ?IDX_TAG, IdxField, IdxTerm), + {SQN, Status, no_lookup, null}}. -spec aae_indexspecs(false|recent_aae(), any(), any(), @@ -411,16 +415,23 @@ aae_indexspecs(false, _Bucket, _Key, _SQN, _H, _LastMods) -> aae_indexspecs(_AAE, _Bucket, _Key, _SQN, _H, []) -> []; aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) -> - InBucket = + Bucket0 = case AAE#recent_aae.buckets of all -> - true; + ?ALL_BUCKETS; ListB -> - lists:member(Bucket, ListB) + case lists:member(Bucket, ListB) of + true -> + Bucket; + false -> + false + end end, - case InBucket of - true -> - GenTagFun = + case Bucket0 of + false -> + []; + Bucket0 -> + GenIdxFun = fun(LMD0, Acc) -> Dates = parse_date(LMD0, AAE#recent_aae.unit_minutes, @@ -431,19 +442,23 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) -> Acc; {LMD1, TTL} -> TreeSize = AAE#recent_aae.tree_size, - SegmentID = + SegID = leveled_tictac:get_segment(Key, TreeSize), - IdxK = {?AAE_TAG, - LMD1, - {{SegmentID, H}, Bucket}, - Key}, - IdxV = {SQN, {active, TTL}, no_lookup, null}, + IdxFldStr = ?NRT_IDX ++ LMD1 ++ "_bin", + IdxTrmStr = + string:right(integer_to_list(SegID), 24, $0) ++ + "." ++ + string:right(integer_to_list(H), 24, $0), + {IdxK, IdxV} = + gen_indexspec(Bucket0, Key, + add, + list_to_binary(IdxFldStr), + list_to_binary(IdxTrmStr), + SQN, TTL), [{IdxK, IdxV}|Acc] end end, - lists:foldl(GenTagFun, [], LastMods); - false -> - [] + lists:foldl(GenIdxFun, [], LastMods) end. -spec parse_date(tuple(), integer(), integer(), integer()) -> @@ -468,12 +483,12 @@ parse_date(LMD, UnitMins, LimitMins, Now) -> lists:flatten(io_lib:format(?LMD_FORMAT, [Y, M, D, Hour, RoundMins])), TTL = min(Now, LMDsecs) + (LimitMins + UnitMins) * 60, - {list_to_binary(StrTime), TTL} + {StrTime, TTL} end. --spec generate_ledgerkv(tuple(), integer(), any(), - integer(), tuple()|infinity) -> - {any(), any(), any(), integer()|no_lookup, list()}. +-spec generate_ledgerkv( + tuple(), integer(), any(), integer(), tuple()|infinity) -> + {any(), any(), any(), {integer()|no_lookup, integer()}, list()}. %% @doc %% Function to extract from an object the information necessary to populate %% the Penciller's ledger. @@ -482,7 +497,8 @@ parse_date(LMD, UnitMins, LimitMins, Now) -> %% Key - original Key extracted from the PrimaryKey %% Value - the value to be used in the Ledger (essentially the extracted %% metadata) -%% Hash - A magic hash of the key to be used in lookups and filters +%% {Hash, ObjHash} - A magic hash of the key to accelerate lookups, and a hash +%% of the value to be used for equality checking between objects %% LastMods - the last modified dates for the object (may be multiple due to %% siblings) generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> @@ -495,11 +511,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> end, Hash = magic_hash(PrimaryKey), {MD, LastMods} = extract_metadata(Obj, Size, Tag), + ObjHash = get_objhash(Tag, MD), Value = {SQN, Status, Hash, MD}, - {Bucket, Key, Value, Hash, LastMods}. + {Bucket, Key, Value, {Hash, ObjHash}, LastMods}. integer_now() -> @@ -525,22 +542,33 @@ get_size(PK, Value) -> {_Hash, Size} = MD, Size end. - -get_keyandhash(LK, Value) -> + +-spec get_keyandobjhash(tuple(), tuple()) -> tuple(). +%% @doc +%% Return a tucple of {Bucket, Key, Hash} where hash is a has of the object +%% not the key (for example with Riak tagged objects this will be a hash of +%% the sorted vclock) +get_keyandobjhash(LK, Value) -> {Tag, Bucket, Key, _} = LK, {_, _, _, MD} = Value, case Tag of - ?RIAK_TAG -> - {_RMD, _VC, Hash, _Size} = MD, - {Bucket, Key, Hash}; - ?STD_TAG -> - {Hash, _Size} = MD, - {Bucket, Key, Hash}; ?IDX_TAG -> - from_ledgerkey(LK) % returns {Bucket, Key, IdxValue} + from_ledgerkey(LK); % returns {Bucket, Key, IdxValue} + _ -> + {Bucket, Key, get_objhash(Tag, MD)} end. - +get_objhash(Tag, ObjMetaData) -> + case Tag of + ?RIAK_TAG -> + {_RMD, _VC, Hash, _Size} = ObjMetaData, + Hash; + ?STD_TAG -> + {Hash, _Size} = ObjMetaData, + Hash + end. + + build_metadata_object(PrimaryKey, MD) -> {Tag, _Bucket, _Key, null} = PrimaryKey, case Tag of @@ -753,8 +781,7 @@ parsedate_test() -> lists:foreach(CheckFun, lists:seq(1, 60)). check_pd(PD, UnitMins) -> - {LMDbin, _TTL} = PD, - LMDstr = binary_to_list(LMDbin), + {LMDstr, _TTL} = PD, Minutes = list_to_integer(lists:nthtail(10, LMDstr)), ?assertMatch(0, Minutes rem UnitMins). @@ -782,6 +809,9 @@ genaaeidx_test() -> LastMods1 = [os:timestamp()], AAESpecs1 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods1), ?assertMatch(1, length(AAESpecs1)), + IdxB = element(2, element(1, lists:nth(1, AAESpecs1))), + io:format(user, "AAE IDXSpecs1 ~w~n", [AAESpecs1]), + ?assertMatch(<<"$all">>, IdxB), LastMods0 = [], AAESpecs0 = aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods0), @@ -793,9 +823,10 @@ genaaeidx_test() -> AAESpecsB1 = aae_indexspecs(AAE0, <<"Bucket0">>, Key, SQN, H, LastMods1), ?assertMatch(1, length(AAESpecsB1)), - [{{?AAE_TAG, _LMD, {{SegID, H}, <<"Bucket0">>}, <<"Key1">>}, + [{{?IDX_TAG, <<"Bucket0">>, {Fld, Term}, <<"Key1">>}, {SQN, {active, TS}, no_lookup, null}}] = AAESpecsB1, - ?assertMatch(true, is_integer(SegID)), - ?assertMatch(true, is_integer(TS)). + ?assertMatch(true, is_integer(TS)), + ?assertMatch(49, length(binary_to_list(Term))), + ?assertMatch("$aae.", lists:sublist(binary_to_list(Fld), 5)). -endif. \ No newline at end of file diff --git a/src/leveled_tictac.erl b/src/leveled_tictac.erl index 88d1744..bd7f591 100644 --- a/src/leveled_tictac.erl +++ b/src/leveled_tictac.erl @@ -63,7 +63,8 @@ fetch_root/1, fetch_leaves/2, merge_trees/2, - get_segment/2 + get_segment/2, + tictac_hash/2 ]). @@ -117,7 +118,7 @@ new_tree(TreeID, Size) -> %% based on that key and value add_kv(TicTacTree, Key, Value, HashFun) -> HashV = HashFun(Key, Value), - SegChangeHash = erlang:phash2(Key, HashV), + SegChangeHash = tictac_hash(Key, HashV), Segment = get_segment(Key, TicTacTree#tictactree.segment_count), Level2Pos = @@ -235,6 +236,13 @@ get_segment(Key, SegmentCount) when is_integer(SegmentCount) -> get_segment(Key, TreeSize) -> get_segment(Key, element(3, get_size(TreeSize))). + +-spec tictac_hash(tuple(), any()) -> integer(). +%% @doc +%% Hash the key and term +tictac_hash(Key, Term) -> + erlang:phash2({Key, Term}). + %%%============================================================================ %%% Internal functions %%%============================================================================ diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index 2c2b3f8..33ace07 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -4,14 +4,19 @@ -export([all/0]). -export([ many_put_compare/1, - index_compare/1 + index_compare/1, + recent_aae_noaae/1, + recent_aae_allaae/1 ]). all() -> [ - many_put_compare, - index_compare + % many_put_compare, + % index_compare, + % recent_aae_noaae, + recent_aae_allaae ]. +-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). many_put_compare(_Config) -> TreeSize = small, @@ -419,3 +424,243 @@ index_compare(_Config) -> ok = leveled_bookie:book_close(Book2B), ok = leveled_bookie:book_close(Book2C), ok = leveled_bookie:book_close(Book2D). + + +recent_aae_noaae(_Config) -> + TreeSize = small, + % SegmentCount = 256 * 256, + UnitMins = 2, + + % Test requires multiple different databases, so want to mount them all + % on individual file paths + RootPathA = testutil:reset_filestructure("testA"), + RootPathB = testutil:reset_filestructure("testB"), + RootPathC = testutil:reset_filestructure("testC"), + RootPathD = testutil:reset_filestructure("testD"), + StartOptsA = aae_startopts(RootPathA, false), + StartOptsB = aae_startopts(RootPathB, false), + StartOptsC = aae_startopts(RootPathC, false), + StartOptsD = aae_startopts(RootPathD, false), + + % Book1A to get all objects + {ok, Book1A} = leveled_bookie:book_start(StartOptsA), + % Book1B/C/D will have objects partitioned across it + {ok, Book1B} = leveled_bookie:book_start(StartOptsB), + {ok, Book1C} = leveled_bookie:book_start(StartOptsC), + {ok, Book1D} = leveled_bookie:book_start(StartOptsD), + + {B1, K1, V1, S1, MD} = {"Bucket", + "Key1.1.4567.4321", + "Value1", + [], + [{"MDK1", "MDV1"}]}, + {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD), + + SW_StartLoad = os:timestamp(), + + ok = testutil:book_riakput(Book1A, TestObject, TestSpec), + ok = testutil:book_riakput(Book1B, TestObject, TestSpec), + testutil:check_forobject(Book1A, TestObject), + testutil:check_forobject(Book1B, TestObject), + + {TicTacTreeJoined, TicTacTreeFull, EmptyTree, _LMDIndexes} = + load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D, + SW_StartLoad, TreeSize, UnitMins, + false), + % Go compare! Also confirm we're not comparing empty trees + DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, + TicTacTreeJoined), + + DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree), + true = DL1_0 == [], + true = length(DL1_1) == 0, + + ok = leveled_bookie:book_close(Book1A), + ok = leveled_bookie:book_close(Book1B), + ok = leveled_bookie:book_close(Book1C), + ok = leveled_bookie:book_close(Book1D). + + +recent_aae_allaae(_Config) -> + TreeSize = small, + % SegmentCount = 256 * 256, + UnitMins = 2, + AAE = {all, 60, UnitMins}, + + % Test requires multiple different databases, so want to mount them all + % on individual file paths + RootPathA = testutil:reset_filestructure("testA"), + RootPathB = testutil:reset_filestructure("testB"), + RootPathC = testutil:reset_filestructure("testC"), + RootPathD = testutil:reset_filestructure("testD"), + StartOptsA = aae_startopts(RootPathA, AAE), + StartOptsB = aae_startopts(RootPathB, AAE), + StartOptsC = aae_startopts(RootPathC, AAE), + StartOptsD = aae_startopts(RootPathD, AAE), + + % Book1A to get all objects + {ok, Book1A} = leveled_bookie:book_start(StartOptsA), + % Book1B/C/D will have objects partitioned across it + {ok, Book1B} = leveled_bookie:book_start(StartOptsB), + {ok, Book1C} = leveled_bookie:book_start(StartOptsC), + {ok, Book1D} = leveled_bookie:book_start(StartOptsD), + + {B1, K1, V1, S1, MD} = {"Bucket", + "Key1.1.4567.4321", + "Value1", + [], + [{"MDK1", "MDV1"}]}, + {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD), + + SW_StartLoad = os:timestamp(), + + ok = testutil:book_riakput(Book1A, TestObject, TestSpec), + ok = testutil:book_riakput(Book1B, TestObject, TestSpec), + testutil:check_forobject(Book1A, TestObject), + testutil:check_forobject(Book1B, TestObject), + + {TicTacTreeJoined, TicTacTreeFull, EmptyTree, _LMDIndexes} = + load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D, + SW_StartLoad, TreeSize, UnitMins, + false), + % Go compare! Also confirm we're not comparing empty trees + DL1_0 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, + TicTacTreeJoined), + + DL1_1 = leveled_tictac:find_dirtyleaves(TicTacTreeFull, EmptyTree), + true = DL1_0 == [], + true = length(DL1_1) > 100, + + ok = leveled_bookie:book_close(Book1A), + ok = leveled_bookie:book_close(Book1B), + ok = leveled_bookie:book_close(Book1C), + ok = leveled_bookie:book_close(Book1D). + + +load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D, + SW_StartLoad, TreeSize, UnitMins, + LMDIndexes_Loaded) -> + + LMDIndexes = + case LMDIndexes_Loaded of + false -> + % Generate nine lists of objects + % BucketBin = list_to_binary("Bucket"), + GenMapFun = + fun(_X) -> + V = testutil:get_compressiblevalue(), + Indexes = testutil:get_randomindexes_generator(8), + testutil:generate_objects(5000, + binary_uuid, + [], + V, + Indexes) + end, + + ObjLists = lists:map(GenMapFun, lists:seq(1, 9)), + + % Load all nine lists into Book1A + lists:foreach(fun(ObjL) -> testutil:riakload(Book1A, ObjL) end, + ObjLists), + + % Split nine lists across Book1B to Book1D, three object lists + % in each + lists:foreach(fun(ObjL) -> testutil:riakload(Book1B, ObjL) end, + lists:sublist(ObjLists, 1, 3)), + lists:foreach(fun(ObjL) -> testutil:riakload(Book1C, ObjL) end, + lists:sublist(ObjLists, 4, 3)), + lists:foreach(fun(ObjL) -> testutil:riakload(Book1D, ObjL) end, + lists:sublist(ObjLists, 7, 3)), + + SW_EndLoad = os:timestamp(), + determine_lmd_indexes(SW_StartLoad, SW_EndLoad, UnitMins); + _ -> + LMDIndexes_Loaded + end, + + EmptyTree = leveled_tictac:new_tree(empty, TreeSize), + + GetTicTacTreeFun = + fun(Bookie) -> + fun(LMD, Acc) -> + SW = os:timestamp(), + ST = <<"0">>, + ET = <<"A">>, + Q = {tictactree_idx, + {<<"$all">>, + list_to_binary("$aae." ++ LMD ++ "_bin"), + ST, + ET}, + TreeSize, + fun(_B, _K) -> accumulate end}, + {async, Folder} = leveled_bookie:book_returnfolder(Bookie, Q), + R = Folder(), + io:format("TicTac Tree for index ~w took " ++ + "~w microseconds~n", + [LMD, timer:now_diff(os:timestamp(), SW)]), + leveled_tictac:merge_trees(R, Acc) + end + end, + + % Get a TicTac tree representing one of the indexes in Bucket A + TicTacTree1_Full = + lists:foldl(GetTicTacTreeFun(Book1A), EmptyTree, LMDIndexes), + + TicTacTree1_P1 = + lists:foldl(GetTicTacTreeFun(Book1B), EmptyTree, LMDIndexes), + TicTacTree1_P2 = + lists:foldl(GetTicTacTreeFun(Book1C), EmptyTree, LMDIndexes), + TicTacTree1_P3 = + lists:foldl(GetTicTacTreeFun(Book1D), EmptyTree, LMDIndexes), + + % Merge the tree across the partitions + TicTacTree1_Joined = lists:foldl(fun leveled_tictac:merge_trees/2, + TicTacTree1_P1, + [TicTacTree1_P2, TicTacTree1_P3]), + + {TicTacTree1_Full, TicTacTree1_Joined, EmptyTree, LMDIndexes}. + + +aae_startopts(RootPath, AAE) -> + LS = 2000, + JS = 50000000, + SS = testutil:sync_strategy(), + [{root_path, RootPath}, + {sync_strategy, SS}, + {cache_size, LS}, + {max_journalsize, JS}, + {recent_aae, AAE}]. + + +determine_lmd_indexes(StartTS, EndTS, UnitMins) -> + StartDT = calendar:now_to_datetime(StartTS), + EndDT = calendar:now_to_datetime(EndTS), + StartTimeStr = get_strtime(StartDT, UnitMins), + EndTimeStr = get_strtime(EndDT, UnitMins), + + AddTimeFun = + fun(X, Acc) -> + case lists:member(EndTimeStr, Acc) of + true -> + Acc; + false -> + NextTime = + 300 * X + + calendar:datetime_to_gregorian_seconds(StartDT), + NextDT = + calendar:gregorian_seconds_to_datetime(NextTime), + Acc ++ [get_strtime(NextDT, UnitMins)] + end + end, + + lists:foldl(AddTimeFun, [StartTimeStr], lists:seq(1, 5)). + + +get_strtime(DateTime, UnitMins) -> + {{Y, M, D}, {Hour, Minute, _Second}} = DateTime, + RoundMins = + UnitMins * (Minute div UnitMins), + StrTime = + lists:flatten(io_lib:format(?LMD_FORMAT, + [Y, M, D, Hour, RoundMins])), + StrTime. \ No newline at end of file