Initial code with busted ct test
Initiat comparison made betwene trees externally - but ct test is bust.
This commit is contained in:
parent
6ad98d77c5
commit
c586b78f45
5 changed files with 238 additions and 39 deletions
|
@ -310,6 +310,18 @@ book_head(Pid, Bucket, Key, Tag) ->
|
||||||
%% bucket
|
%% bucket
|
||||||
%% {hashlist_query, Tag, JournalCheck} -> return keys and hashes for all
|
%% {hashlist_query, Tag, JournalCheck} -> return keys and hashes for all
|
||||||
%% objects with a given tag
|
%% objects with a given tag
|
||||||
|
%% {tictactree_idx, {Bucket, IdxField, StartValue, EndValue}, PartitionFilter}
|
||||||
|
%% -> compile a hashtree for the items on the index. A partition filter is
|
||||||
|
%% required to avoid adding an index entry in this vnode as a fallback.
|
||||||
|
%% There is no de-duplicate of results, duplicate reuslts corrupt the tree.
|
||||||
|
%% {tictactree_obj, {Bucket, StartKey, EndKey, CheckPresence}, PartitionFilter}
|
||||||
|
%% -> compile a hashtree for all the objects in the range. A partition filter
|
||||||
|
%% may be passed to restrict the query to a given partition on this vnode. The
|
||||||
|
%% filter should bea function that takes (Bucket, Key) as inputs and outputs
|
||||||
|
%% one of the atoms accumulate or pass. There is no de-duplicate of results,
|
||||||
|
%% duplicate reuslts corrupt the tree.
|
||||||
|
%% CheckPresence can be used if there is a need to do a deeper check to ensure
|
||||||
|
%% that the object is in the Journal (or at least indexed within the Journal).
|
||||||
%% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects
|
%% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects
|
||||||
%% in a given bucket
|
%% in a given bucket
|
||||||
%% {foldobjects_byindex,
|
%% {foldobjects_byindex,
|
||||||
|
@ -535,6 +547,28 @@ handle_call({return_folder, FolderType}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
hashlist_query(State, Tag, JournalCheck),
|
hashlist_query(State, Tag, JournalCheck),
|
||||||
State};
|
State};
|
||||||
|
{tictactree_obj,
|
||||||
|
{Tag, Bucket, StartKey, EndKey, CheckPresence},
|
||||||
|
PartitionFilter} ->
|
||||||
|
{reply,
|
||||||
|
tictactree(State,
|
||||||
|
Tag,
|
||||||
|
Bucket,
|
||||||
|
{StartKey, EndKey},
|
||||||
|
CheckPresence,
|
||||||
|
PartitionFilter),
|
||||||
|
State};
|
||||||
|
{tictactree_idx,
|
||||||
|
{Bucket, IdxField, StartValue, EndValue},
|
||||||
|
PartitionFilter} ->
|
||||||
|
{reply,
|
||||||
|
tictactree(State,
|
||||||
|
?IDX_TAG,
|
||||||
|
Bucket,
|
||||||
|
{IdxField, StartValue, EndValue},
|
||||||
|
false,
|
||||||
|
PartitionFilter),
|
||||||
|
State};
|
||||||
{foldheads_allkeys, Tag, FoldHeadsFun} ->
|
{foldheads_allkeys, Tag, FoldHeadsFun} ->
|
||||||
{reply,
|
{reply,
|
||||||
foldheads_allkeys(State, Tag, FoldHeadsFun),
|
foldheads_allkeys(State, Tag, FoldHeadsFun),
|
||||||
|
@ -848,6 +882,76 @@ hashlist_query(State, Tag, JournalCheck) ->
|
||||||
end,
|
end,
|
||||||
{async, Folder}.
|
{async, Folder}.
|
||||||
|
|
||||||
|
tictactree(State, Tag, Bucket, Query, JournalCheck, Filter) ->
|
||||||
|
% Journal check can be used for object key folds to confirm that the
|
||||||
|
% object is still indexed within the journal
|
||||||
|
SnapType = case JournalCheck of
|
||||||
|
false ->
|
||||||
|
ledger;
|
||||||
|
check_presence ->
|
||||||
|
store
|
||||||
|
end,
|
||||||
|
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State,
|
||||||
|
SnapType,
|
||||||
|
no_lookup),
|
||||||
|
Tree = leveled_tictac:new_tree(temp),
|
||||||
|
Folder =
|
||||||
|
fun() ->
|
||||||
|
% The start key and end key will vary depending on whether the
|
||||||
|
% fold is to fold over an index or a key range
|
||||||
|
{StartKey, EndKey, HashFun} =
|
||||||
|
case Tag of
|
||||||
|
?IDX_TAG ->
|
||||||
|
{IdxField, StartIdx, EndIdx} = Query,
|
||||||
|
HashIdxValFun =
|
||||||
|
fun(_Key, IdxValue) ->
|
||||||
|
erlang:phash2(IdxValue)
|
||||||
|
end,
|
||||||
|
{leveled_codec:to_ledgerkey(Bucket,
|
||||||
|
null,
|
||||||
|
?IDX_TAG,
|
||||||
|
IdxField,
|
||||||
|
StartIdx),
|
||||||
|
leveled_codec:to_ledgerkey(Bucket,
|
||||||
|
null,
|
||||||
|
?IDX_TAG,
|
||||||
|
IdxField,
|
||||||
|
EndIdx),
|
||||||
|
HashIdxValFun};
|
||||||
|
_ ->
|
||||||
|
{StartObjKey, EndObjKey} = Query,
|
||||||
|
PassHashFun = fun(_Key, Hash) -> Hash end,
|
||||||
|
{leveled_codec:to_ledgerkey(Bucket,
|
||||||
|
StartObjKey,
|
||||||
|
Tag),
|
||||||
|
leveled_codec:to_ledgerkey(Bucket,
|
||||||
|
EndObjKey,
|
||||||
|
Tag),
|
||||||
|
PassHashFun}
|
||||||
|
end,
|
||||||
|
|
||||||
|
AccFun = accumulate_tree(Filter,
|
||||||
|
JournalCheck,
|
||||||
|
JournalSnapshot,
|
||||||
|
HashFun),
|
||||||
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||||
|
StartKey,
|
||||||
|
EndKey,
|
||||||
|
AccFun,
|
||||||
|
Tree),
|
||||||
|
|
||||||
|
% Close down snapshot when complete so as not to hold removed
|
||||||
|
% files open
|
||||||
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||||
|
case JournalCheck of
|
||||||
|
false ->
|
||||||
|
ok;
|
||||||
|
check_presence ->
|
||||||
|
leveled_inker:ink_close(JournalSnapshot)
|
||||||
|
end,
|
||||||
|
Acc
|
||||||
|
end,
|
||||||
|
{async, Folder}.
|
||||||
|
|
||||||
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
||||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
|
@ -1088,27 +1192,51 @@ accumulate_size() ->
|
||||||
AccFun.
|
AccFun.
|
||||||
|
|
||||||
accumulate_hashes(JournalCheck, InkerClone) ->
|
accumulate_hashes(JournalCheck, InkerClone) ->
|
||||||
|
AddKeyFun =
|
||||||
|
fun(B, K, H, Acc) ->
|
||||||
|
[{B, K, H}|Acc]
|
||||||
|
end,
|
||||||
|
get_hashaccumulator(JournalCheck,
|
||||||
|
InkerClone,
|
||||||
|
AddKeyFun).
|
||||||
|
|
||||||
|
accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) ->
|
||||||
|
AddKeyFun =
|
||||||
|
fun(B, K, H, Tree) ->
|
||||||
|
case FilterFun(B, K) of
|
||||||
|
accumulate ->
|
||||||
|
leveled_tictac:add_kv(Tree, K, H, HashFun);
|
||||||
|
pass ->
|
||||||
|
Tree
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
get_hashaccumulator(JournalCheck,
|
||||||
|
InkerClone,
|
||||||
|
AddKeyFun).
|
||||||
|
|
||||||
|
get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
|
||||||
Now = leveled_codec:integer_now(),
|
Now = leveled_codec:integer_now(),
|
||||||
AccFun = fun(LK, V, KHList) ->
|
AccFun =
|
||||||
case leveled_codec:is_active(LK, V, Now) of
|
fun(LK, V, Acc) ->
|
||||||
true ->
|
case leveled_codec:is_active(LK, V, Now) of
|
||||||
{B, K, H} = leveled_codec:get_keyandhash(LK, V),
|
true ->
|
||||||
Check = random:uniform() < ?CHECKJOURNAL_PROB,
|
{B, K, H} = leveled_codec:get_keyandhash(LK, V),
|
||||||
case {JournalCheck, Check} of
|
Check = random:uniform() < ?CHECKJOURNAL_PROB,
|
||||||
{check_presence, true} ->
|
case {JournalCheck, Check} of
|
||||||
case check_presence(LK, V, InkerClone) of
|
{check_presence, true} ->
|
||||||
true ->
|
case check_presence(LK, V, InkerClone) of
|
||||||
[{B, K, H}|KHList];
|
true ->
|
||||||
false ->
|
AddKeyFun(B, K, H, Acc);
|
||||||
KHList
|
false ->
|
||||||
end;
|
Acc
|
||||||
_ ->
|
|
||||||
[{B, K, H}|KHList]
|
|
||||||
end;
|
end;
|
||||||
false ->
|
_ ->
|
||||||
KHList
|
AddKeyFun(B, K, H, Acc)
|
||||||
end
|
end;
|
||||||
end,
|
false ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end,
|
||||||
AccFun.
|
AccFun.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -427,7 +427,9 @@ get_keyandhash(LK, Value) ->
|
||||||
{Bucket, Key, Hash};
|
{Bucket, Key, Hash};
|
||||||
?STD_TAG ->
|
?STD_TAG ->
|
||||||
{Hash, _Size} = MD,
|
{Hash, _Size} = MD,
|
||||||
{Bucket, Key, Hash}
|
{Bucket, Key, Hash};
|
||||||
|
?IDX_TAG ->
|
||||||
|
from_ledgerkey(LK) % returns {Bucket, Key, IdxValue}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -52,8 +52,6 @@
|
||||||
|
|
||||||
-module(leveled_tictac).
|
-module(leveled_tictac).
|
||||||
|
|
||||||
% -behaviour(gen_server).
|
|
||||||
|
|
||||||
-include("include/leveled.hrl").
|
-include("include/leveled.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -67,7 +65,6 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(LEVEL1_WIDTH, 256).
|
-define(LEVEL1_WIDTH, 256).
|
||||||
|
@ -78,17 +75,11 @@
|
||||||
|
|
||||||
-record(tictactree, {treeID :: any(),
|
-record(tictactree, {treeID :: any(),
|
||||||
level1 :: binary(),
|
level1 :: binary(),
|
||||||
level2 :: array:array()}).
|
level2 :: any() % an array - but OTP compatibility
|
||||||
|
}).
|
||||||
|
|
||||||
-type tictactree() :: #tictactree{}.
|
-type tictactree() :: #tictactree{}.
|
||||||
|
|
||||||
%%%============================================================================
|
|
||||||
%%% API
|
|
||||||
%%%============================================================================
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% External functions
|
%%% External functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -192,8 +183,8 @@ fetch_leaves(TicTacTree, BranchList) ->
|
||||||
merge_trees(TreeA, TreeB) ->
|
merge_trees(TreeA, TreeB) ->
|
||||||
MergedTree = new_tree(merge),
|
MergedTree = new_tree(merge),
|
||||||
|
|
||||||
L1A = TreeA#tictactree.level1,
|
L1A = fetch_root(TreeA),
|
||||||
L1B = TreeB#tictactree.level1,
|
L1B = fetch_root(TreeB),
|
||||||
NewLevel1 = merge_binaries(L1A, L1B),
|
NewLevel1 = merge_binaries(L1A, L1B),
|
||||||
|
|
||||||
MergeFun =
|
MergeFun =
|
||||||
|
|
|
@ -31,6 +31,7 @@
|
||||||
get_randomindexes_generator/1,
|
get_randomindexes_generator/1,
|
||||||
name_list/0,
|
name_list/0,
|
||||||
load_objects/5,
|
load_objects/5,
|
||||||
|
load_objects/6,
|
||||||
put_indexed_objects/3,
|
put_indexed_objects/3,
|
||||||
put_altered_indexed_objects/3,
|
put_altered_indexed_objects/3,
|
||||||
put_altered_indexed_objects/4,
|
put_altered_indexed_objects/4,
|
||||||
|
@ -52,6 +53,7 @@
|
||||||
-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>).
|
-define(MD_LASTMOD, <<"X-Riak-Last-Modified">>).
|
||||||
-define(MD_DELETED, <<"X-Riak-Deleted">>).
|
-define(MD_DELETED, <<"X-Riak-Deleted">>).
|
||||||
-define(EMPTY_VTAG_BIN, <<"e">>).
|
-define(EMPTY_VTAG_BIN, <<"e">>).
|
||||||
|
-define(ROOT_PATH, "test").
|
||||||
|
|
||||||
%% =================================================
|
%% =================================================
|
||||||
%% From riak_object
|
%% From riak_object
|
||||||
|
@ -169,13 +171,17 @@ riakload(Bookie, ObjectList) ->
|
||||||
|
|
||||||
|
|
||||||
reset_filestructure() ->
|
reset_filestructure() ->
|
||||||
reset_filestructure(0).
|
reset_filestructure(0, ?ROOT_PATH).
|
||||||
|
|
||||||
reset_filestructure(Wait) ->
|
reset_filestructure(Wait) when is_integer(Wait) ->
|
||||||
io:format("Waiting ~w ms to give a chance for all file closes " ++
|
reset_filestructure(Wait, ?ROOT_PATH);
|
||||||
|
reset_filestructure(RootPath) when is_list(RootPath) ->
|
||||||
|
reset_filestructure(0, RootPath).
|
||||||
|
|
||||||
|
reset_filestructure(Wait, RootPath) ->
|
||||||
|
io:format("Waiting ~w ms to give a chance for all file closes " ++
|
||||||
"to complete~n", [Wait]),
|
"to complete~n", [Wait]),
|
||||||
timer:sleep(Wait),
|
timer:sleep(Wait),
|
||||||
RootPath = "test",
|
|
||||||
filelib:ensure_dir(RootPath ++ "/journal/"),
|
filelib:ensure_dir(RootPath ++ "/journal/"),
|
||||||
filelib:ensure_dir(RootPath ++ "/ledger/"),
|
filelib:ensure_dir(RootPath ++ "/ledger/"),
|
||||||
leveled_inker:clean_testdir(RootPath ++ "/journal"),
|
leveled_inker:clean_testdir(RootPath ++ "/journal"),
|
||||||
|
@ -420,6 +426,9 @@ get_vclock(ObjectBin) ->
|
||||||
binary_to_term(VclockBin).
|
binary_to_term(VclockBin).
|
||||||
|
|
||||||
load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
|
load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
|
||||||
|
load_objects(ChunkSize, GenList, Bookie, TestObject, Generator, 1000).
|
||||||
|
|
||||||
|
load_objects(ChunkSize, GenList, Bookie, TestObject, Generator, SubListL) ->
|
||||||
lists:map(fun(KN) ->
|
lists:map(fun(KN) ->
|
||||||
ObjListA = Generator(ChunkSize, KN),
|
ObjListA = Generator(ChunkSize, KN),
|
||||||
StartWatchA = os:timestamp(),
|
StartWatchA = os:timestamp(),
|
||||||
|
@ -433,7 +442,7 @@ load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
|
||||||
true ->
|
true ->
|
||||||
check_forobject(Bookie, TestObject)
|
check_forobject(Bookie, TestObject)
|
||||||
end,
|
end,
|
||||||
lists:sublist(ObjListA, 1000) end,
|
lists:sublist(ObjListA, SubListL) end,
|
||||||
GenList).
|
GenList).
|
||||||
|
|
||||||
|
|
||||||
|
|
69
test/end_to_end/tictac_SUITE.erl
Normal file
69
test/end_to_end/tictac_SUITE.erl
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
-module(tictac_SUITE).
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include("include/leveled.hrl").
|
||||||
|
-export([all/0]).
|
||||||
|
-export([
|
||||||
|
many_put_compare/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
all() -> [
|
||||||
|
many_put_compare
|
||||||
|
].
|
||||||
|
|
||||||
|
|
||||||
|
many_put_compare(_Config) ->
|
||||||
|
RootPathA = testutil:reset_filestructure("testA"),
|
||||||
|
StartOpts1 = [{root_path, RootPathA},
|
||||||
|
{max_pencillercachesize, 16000},
|
||||||
|
{sync_strategy, riak_sync}],
|
||||||
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
|
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
||||||
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
StartOpts2 = [{root_path, RootPathA},
|
||||||
|
{max_journalsize, 500000000},
|
||||||
|
{max_pencillercachesize, 32000},
|
||||||
|
{sync_strategy, testutil:sync_strategy()}],
|
||||||
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
|
||||||
|
testutil:check_forobject(Bookie2, TestObject),
|
||||||
|
GenList = [2, 20002, 40002, 60002, 80002,
|
||||||
|
100002, 120002, 140002, 160002, 180002],
|
||||||
|
CLs = testutil:load_objects(20000,
|
||||||
|
GenList,
|
||||||
|
Bookie2,
|
||||||
|
TestObject,
|
||||||
|
fun testutil:generate_smallobjects/2,
|
||||||
|
20000),
|
||||||
|
|
||||||
|
RootPathB = testutil:reset_filestructure("testB"),
|
||||||
|
StartOpts3 = [{root_path, RootPathB},
|
||||||
|
{max_journalsize, 200000000},
|
||||||
|
{max_pencillercachesize, 16000},
|
||||||
|
{sync_strategy, testutil:sync_strategy()}],
|
||||||
|
{ok, Bookie3} = leveled_bookie:book_start(StartOpts3),
|
||||||
|
lists:foreach(fun(ObjL) -> testutil:riakload(Bookie3, ObjL) end, CLs),
|
||||||
|
|
||||||
|
TicTacQ = {tictactree_obj,
|
||||||
|
{o_rkv, "Bucket", null, null, false},
|
||||||
|
fun(_B, _K) -> accumulate end},
|
||||||
|
{async, TreeAFolder} = leveled_bookie:book_returnfolder(Bookie2, TicTacQ),
|
||||||
|
{async, TreeBFolder} = leveled_bookie:book_returnfolder(Bookie3, TicTacQ),
|
||||||
|
SWA0 = os:timestamp(),
|
||||||
|
TreeA = TreeAFolder(),
|
||||||
|
io:format("Build tictac tree with 200K objects in ~w~n",
|
||||||
|
[timer:now_diff(os:timestamp(), SWA0)]),
|
||||||
|
SWB0 = os:timestamp(),
|
||||||
|
TreeB = TreeBFolder(),
|
||||||
|
io:format("Build tictac tree with 200K objects in ~w~n",
|
||||||
|
[timer:now_diff(os:timestamp(), SWB0)]),
|
||||||
|
SWC0 = os:timestamp(),
|
||||||
|
SegList = leveled_tictac:find_dirtyleaves(TreeA, TreeB),
|
||||||
|
io:format("Compare tictac trees with 200K objects in ~w~n",
|
||||||
|
[timer:now_diff(os:timestamp(), SWC0)]),
|
||||||
|
io:format("Tree comparison shows ~w different leaves~n",
|
||||||
|
[length(SegList)]),
|
||||||
|
true = length(SegList) == 1,
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_destroy(Bookie2),
|
||||||
|
ok = leveled_bookie:book_destroy(Bookie3).
|
Loading…
Add table
Add a link
Reference in a new issue