From c586b78f45254a30cfbf2f164df88e351e4b659c Mon Sep 17 00:00:00 2001 From: martinsumner Date: Mon, 19 Jun 2017 11:36:57 +0100 Subject: [PATCH] Initial code with busted ct test Initiat comparison made betwene trees externally - but ct test is bust. --- src/leveled_bookie.erl | 166 +++++++++++++++++++++++++++---- src/leveled_codec.erl | 4 +- src/leveled_tictac.erl | 17 +--- test/end_to_end/testutil.erl | 21 ++-- test/end_to_end/tictac_SUITE.erl | 69 +++++++++++++ 5 files changed, 238 insertions(+), 39 deletions(-) create mode 100644 test/end_to_end/tictac_SUITE.erl diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 919c970..f1bca92 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -310,6 +310,18 @@ book_head(Pid, Bucket, Key, Tag) -> %% bucket %% {hashlist_query, Tag, JournalCheck} -> return keys and hashes for all %% objects with a given tag +%% {tictactree_idx, {Bucket, IdxField, StartValue, EndValue}, PartitionFilter} +%% -> compile a hashtree for the items on the index. A partition filter is +%% required to avoid adding an index entry in this vnode as a fallback. +%% There is no de-duplicate of results, duplicate reuslts corrupt the tree. +%% {tictactree_obj, {Bucket, StartKey, EndKey, CheckPresence}, PartitionFilter} +%% -> compile a hashtree for all the objects in the range. A partition filter +%% may be passed to restrict the query to a given partition on this vnode. The +%% filter should bea function that takes (Bucket, Key) as inputs and outputs +%% one of the atoms accumulate or pass. There is no de-duplicate of results, +%% duplicate reuslts corrupt the tree. +%% CheckPresence can be used if there is a need to do a deeper check to ensure +%% that the object is in the Journal (or at least indexed within the Journal). %% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects %% in a given bucket %% {foldobjects_byindex, @@ -535,6 +547,28 @@ handle_call({return_folder, FolderType}, _From, State) -> {reply, hashlist_query(State, Tag, JournalCheck), State}; + {tictactree_obj, + {Tag, Bucket, StartKey, EndKey, CheckPresence}, + PartitionFilter} -> + {reply, + tictactree(State, + Tag, + Bucket, + {StartKey, EndKey}, + CheckPresence, + PartitionFilter), + State}; + {tictactree_idx, + {Bucket, IdxField, StartValue, EndValue}, + PartitionFilter} -> + {reply, + tictactree(State, + ?IDX_TAG, + Bucket, + {IdxField, StartValue, EndValue}, + false, + PartitionFilter), + State}; {foldheads_allkeys, Tag, FoldHeadsFun} -> {reply, foldheads_allkeys(State, Tag, FoldHeadsFun), @@ -848,6 +882,76 @@ hashlist_query(State, Tag, JournalCheck) -> end, {async, Folder}. +tictactree(State, Tag, Bucket, Query, JournalCheck, Filter) -> + % Journal check can be used for object key folds to confirm that the + % object is still indexed within the journal + SnapType = case JournalCheck of + false -> + ledger; + check_presence -> + store + end, + {ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, + SnapType, + no_lookup), + Tree = leveled_tictac:new_tree(temp), + Folder = + fun() -> + % The start key and end key will vary depending on whether the + % fold is to fold over an index or a key range + {StartKey, EndKey, HashFun} = + case Tag of + ?IDX_TAG -> + {IdxField, StartIdx, EndIdx} = Query, + HashIdxValFun = + fun(_Key, IdxValue) -> + erlang:phash2(IdxValue) + end, + {leveled_codec:to_ledgerkey(Bucket, + null, + ?IDX_TAG, + IdxField, + StartIdx), + leveled_codec:to_ledgerkey(Bucket, + null, + ?IDX_TAG, + IdxField, + EndIdx), + HashIdxValFun}; + _ -> + {StartObjKey, EndObjKey} = Query, + PassHashFun = fun(_Key, Hash) -> Hash end, + {leveled_codec:to_ledgerkey(Bucket, + StartObjKey, + Tag), + leveled_codec:to_ledgerkey(Bucket, + EndObjKey, + Tag), + PassHashFun} + end, + + AccFun = accumulate_tree(Filter, + JournalCheck, + JournalSnapshot, + HashFun), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + Tree), + + % Close down snapshot when complete so as not to hold removed + % files open + ok = leveled_penciller:pcl_close(LedgerSnapshot), + case JournalCheck of + false -> + ok; + check_presence -> + leveled_inker:ink_close(JournalSnapshot) + end, + Acc + end, + {async, Folder}. foldobjects_allkeys(State, Tag, FoldObjectsFun) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), @@ -1088,27 +1192,51 @@ accumulate_size() -> AccFun. accumulate_hashes(JournalCheck, InkerClone) -> + AddKeyFun = + fun(B, K, H, Acc) -> + [{B, K, H}|Acc] + end, + get_hashaccumulator(JournalCheck, + InkerClone, + AddKeyFun). + +accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) -> + AddKeyFun = + fun(B, K, H, Tree) -> + case FilterFun(B, K) of + accumulate -> + leveled_tictac:add_kv(Tree, K, H, HashFun); + pass -> + Tree + end + end, + get_hashaccumulator(JournalCheck, + InkerClone, + AddKeyFun). + +get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> Now = leveled_codec:integer_now(), - AccFun = fun(LK, V, KHList) -> - case leveled_codec:is_active(LK, V, Now) of - true -> - {B, K, H} = leveled_codec:get_keyandhash(LK, V), - Check = random:uniform() < ?CHECKJOURNAL_PROB, - case {JournalCheck, Check} of - {check_presence, true} -> - case check_presence(LK, V, InkerClone) of - true -> - [{B, K, H}|KHList]; - false -> - KHList - end; - _ -> - [{B, K, H}|KHList] + AccFun = + fun(LK, V, Acc) -> + case leveled_codec:is_active(LK, V, Now) of + true -> + {B, K, H} = leveled_codec:get_keyandhash(LK, V), + Check = random:uniform() < ?CHECKJOURNAL_PROB, + case {JournalCheck, Check} of + {check_presence, true} -> + case check_presence(LK, V, InkerClone) of + true -> + AddKeyFun(B, K, H, Acc); + false -> + Acc end; - false -> - KHList - end - end, + _ -> + AddKeyFun(B, K, H, Acc) + end; + false -> + Acc + end + end, AccFun. diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 9179687..42dcb28 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -427,7 +427,9 @@ get_keyandhash(LK, Value) -> {Bucket, Key, Hash}; ?STD_TAG -> {Hash, _Size} = MD, - {Bucket, Key, Hash} + {Bucket, Key, Hash}; + ?IDX_TAG -> + from_ledgerkey(LK) % returns {Bucket, Key, IdxValue} end. diff --git a/src/leveled_tictac.erl b/src/leveled_tictac.erl index 4155807..3463a81 100644 --- a/src/leveled_tictac.erl +++ b/src/leveled_tictac.erl @@ -52,8 +52,6 @@ -module(leveled_tictac). -% -behaviour(gen_server). - -include("include/leveled.hrl"). -export([ @@ -67,7 +65,6 @@ ]). - -include_lib("eunit/include/eunit.hrl"). -define(LEVEL1_WIDTH, 256). @@ -78,17 +75,11 @@ -record(tictactree, {treeID :: any(), level1 :: binary(), - level2 :: array:array()}). - + level2 :: any() % an array - but OTP compatibility + }). -type tictactree() :: #tictactree{}. -%%%============================================================================ -%%% API -%%%============================================================================ - - - %%%============================================================================ %%% External functions %%%============================================================================ @@ -192,8 +183,8 @@ fetch_leaves(TicTacTree, BranchList) -> merge_trees(TreeA, TreeB) -> MergedTree = new_tree(merge), - L1A = TreeA#tictactree.level1, - L1B = TreeB#tictactree.level1, + L1A = fetch_root(TreeA), + L1B = fetch_root(TreeB), NewLevel1 = merge_binaries(L1A, L1B), MergeFun = diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 923b81e..01bd26d 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -31,6 +31,7 @@ get_randomindexes_generator/1, name_list/0, load_objects/5, + load_objects/6, put_indexed_objects/3, put_altered_indexed_objects/3, put_altered_indexed_objects/4, @@ -52,6 +53,7 @@ -define(MD_LASTMOD, <<"X-Riak-Last-Modified">>). -define(MD_DELETED, <<"X-Riak-Deleted">>). -define(EMPTY_VTAG_BIN, <<"e">>). +-define(ROOT_PATH, "test"). %% ================================================= %% From riak_object @@ -169,13 +171,17 @@ riakload(Bookie, ObjectList) -> reset_filestructure() -> - reset_filestructure(0). + reset_filestructure(0, ?ROOT_PATH). -reset_filestructure(Wait) -> - io:format("Waiting ~w ms to give a chance for all file closes " ++ +reset_filestructure(Wait) when is_integer(Wait) -> + reset_filestructure(Wait, ?ROOT_PATH); +reset_filestructure(RootPath) when is_list(RootPath) -> + reset_filestructure(0, RootPath). + +reset_filestructure(Wait, RootPath) -> + io:format("Waiting ~w ms to give a chance for all file closes " ++ "to complete~n", [Wait]), - timer:sleep(Wait), - RootPath = "test", + timer:sleep(Wait), filelib:ensure_dir(RootPath ++ "/journal/"), filelib:ensure_dir(RootPath ++ "/ledger/"), leveled_inker:clean_testdir(RootPath ++ "/journal"), @@ -420,6 +426,9 @@ get_vclock(ObjectBin) -> binary_to_term(VclockBin). load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) -> + load_objects(ChunkSize, GenList, Bookie, TestObject, Generator, 1000). + +load_objects(ChunkSize, GenList, Bookie, TestObject, Generator, SubListL) -> lists:map(fun(KN) -> ObjListA = Generator(ChunkSize, KN), StartWatchA = os:timestamp(), @@ -433,7 +442,7 @@ load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) -> true -> check_forobject(Bookie, TestObject) end, - lists:sublist(ObjListA, 1000) end, + lists:sublist(ObjListA, SubListL) end, GenList). diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl new file mode 100644 index 0000000..88ff214 --- /dev/null +++ b/test/end_to_end/tictac_SUITE.erl @@ -0,0 +1,69 @@ +-module(tictac_SUITE). +-include_lib("common_test/include/ct.hrl"). +-include("include/leveled.hrl"). +-export([all/0]). +-export([ + many_put_compare/1 + ]). + +all() -> [ + many_put_compare + ]. + + +many_put_compare(_Config) -> + RootPathA = testutil:reset_filestructure("testA"), + StartOpts1 = [{root_path, RootPathA}, + {max_pencillercachesize, 16000}, + {sync_strategy, riak_sync}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = testutil:generate_testobject(), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), + testutil:check_forobject(Bookie1, TestObject), + ok = leveled_bookie:book_close(Bookie1), + StartOpts2 = [{root_path, RootPathA}, + {max_journalsize, 500000000}, + {max_pencillercachesize, 32000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + testutil:check_forobject(Bookie2, TestObject), + GenList = [2, 20002, 40002, 60002, 80002, + 100002, 120002, 140002, 160002, 180002], + CLs = testutil:load_objects(20000, + GenList, + Bookie2, + TestObject, + fun testutil:generate_smallobjects/2, + 20000), + + RootPathB = testutil:reset_filestructure("testB"), + StartOpts3 = [{root_path, RootPathB}, + {max_journalsize, 200000000}, + {max_pencillercachesize, 16000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Bookie3} = leveled_bookie:book_start(StartOpts3), + lists:foreach(fun(ObjL) -> testutil:riakload(Bookie3, ObjL) end, CLs), + + TicTacQ = {tictactree_obj, + {o_rkv, "Bucket", null, null, false}, + fun(_B, _K) -> accumulate end}, + {async, TreeAFolder} = leveled_bookie:book_returnfolder(Bookie2, TicTacQ), + {async, TreeBFolder} = leveled_bookie:book_returnfolder(Bookie3, TicTacQ), + SWA0 = os:timestamp(), + TreeA = TreeAFolder(), + io:format("Build tictac tree with 200K objects in ~w~n", + [timer:now_diff(os:timestamp(), SWA0)]), + SWB0 = os:timestamp(), + TreeB = TreeBFolder(), + io:format("Build tictac tree with 200K objects in ~w~n", + [timer:now_diff(os:timestamp(), SWB0)]), + SWC0 = os:timestamp(), + SegList = leveled_tictac:find_dirtyleaves(TreeA, TreeB), + io:format("Compare tictac trees with 200K objects in ~w~n", + [timer:now_diff(os:timestamp(), SWC0)]), + io:format("Tree comparison shows ~w different leaves~n", + [length(SegList)]), + true = length(SegList) == 1, + + ok = leveled_bookie:book_destroy(Bookie2), + ok = leveled_bookie:book_destroy(Bookie3).