diff --git a/include/leveled.hrl b/include/leveled.hrl index 833ba8b..5763040 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -8,10 +8,12 @@ %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). + %% Inker key type used for objects which contain no value, only key changes %% This is used currently for objects formed under a 'retain' strategy on Inker -%% compaction, but could be used for special set-type objects +%% compaction -define(INKT_KEYD, keyd). + %% Inker key type used for tombstones -define(INKT_TOMB, tomb). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 6333e5b..fde2dda 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -280,6 +280,8 @@ ink_doom(Pid) -> %% The FilterFun is required to call stop when MaxSQN is reached %% %% The InitAccFun should return an initial batch accumulator for each subfold. +%% It is a 2-arity function that takes a filename and a MinSQN as an input +%% potentially to be use din logging %% %% The BatchFun is a two arity function that should take as inputs: %% An overall accumulator @@ -301,12 +303,15 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> fun(BatchAcc, _Acc) -> push_to_penciller(Penciller, BatchAcc) end, + InitAccFun = + fun(FN, CurrentMinSQN) -> + leveled_log:log("I0014", [FN, CurrentMinSQN]), + leveled_bookie:empty_ledgercache() + end, gen_server:call(Pid, {fold, MinSQN, - {FilterFun, - fun leveled_bookie:empty_ledgercache/0, - BatchFun}, + {FilterFun, InitAccFun, BatchFun}, ok}, infinity). @@ -853,9 +858,8 @@ fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, Acc, CDBpid, StartPos, FN) -> - leveled_log:log("I0014", [FN, MinSQN]), {FilterFun, InitAccFun, FoldFun} = FoldFuns, - InitBatchAcc = {MinSQN, MaxSQN, InitAccFun()}, + InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)}, case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of {eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> diff --git a/src/leveled_log.erl b/src/leveled_log.erl index a3c8977..85d8ab6 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -327,7 +327,10 @@ {info, "After ~w PUTs total write time is ~w total sync time is ~w " ++ "and max write time is ~w and max sync time is ~w"}}, {"CDB18", - {info, "Handled return and write of hashtable"}} + {info, "Handled return and write of hashtable"}}, + + {"R0001", + {debug, "Object fold to process batch of ~w objects"}} ]). diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 62d1930..9fe8dcf 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -230,13 +230,96 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) -> -> {async, fun()}. %% @doc %% Fold over all objects for a given tag -foldobjects_allkeys(SnapFun, Tag, FoldFun, _Order) -> +foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, - false, false). + false, false); +foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> + % Fold over the journal in order of receipt + {FoldFun, InitAcc} = + case is_tuple(FoldObjectsFun) of + true -> + % FoldObjectsFun is already a tuple with a Fold function and an + % initial accumulator + FoldObjectsFun; + false -> + % no initial accumulatr passed, and so should be just a list + {FoldObjectsFun, []} + end, + + FilterFun = + fun(JKey, JVal, _Pos, Acc, ExtractFun) -> + + {SQN, InkTag, LedgerKey} = JKey, + case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of + {?INKT_STND, {B, K}} -> + % Ignore tombstones and non-matching Tags and Key changes + % objects. + {MinSQN, MaxSQN, BatchAcc} = Acc, + case SQN of + SQN when SQN < MinSQN -> + {loop, Acc}; + SQN when SQN > MaxSQN -> + {stop, Acc}; + _ -> + {VBin, _VSize} = ExtractFun(JVal), + {Obj, _IdxSpecs} = leveled_codec:split_inkvalue(VBin), + ToLoop = + case SQN of + MaxSQN -> stop; + _ -> loop + end, + {ToLoop, + {MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}} + end; + _ -> + {loop, Acc} + end + end, + + InitAccFun = fun(_FN, _SQN) -> [] end, + + Folder = + fun() -> + + {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), + IsValidFun = + fun(Bucket, Key, SQN) -> + LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + leveled_penciller:pcl_checksequencenumber(LedgerSnapshot, + LedgerKey, + SQN) + end, + + BatchFoldFun = + fun(BatchAcc, ObjAcc) -> + ObjFun = + fun({B, K, SQN, Obj}, Acc) -> + case IsValidFun(B, K, SQN) of + true -> + FoldFun(B, K, Obj, Acc); + false -> + Acc + end + end, + leveled_log:log("R0001", [length(BatchAcc)]), + lists:foldr(ObjFun, ObjAcc, BatchAcc) + end, + + Acc = + leveled_inker:ink_fold(JournalSnapshot, + 0, + {FilterFun, InitAccFun, BatchFoldFun}, + InitAcc), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + ok = leveled_inker:ink_close(JournalSnapshot), + Acc + end, + {async, Folder}. + -spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) -> {async, fun()}. @@ -481,7 +564,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> ProxyObj = make_proxy_object(LK, JK, MD, V, InkerClone), - FoldObjectsFun(B, K,ProxyObj, Acc); + FoldObjectsFun(B, K, ProxyObj, Acc); false -> R = leveled_bookie:fetch_value(InkerClone, JK), case R of diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 7c979da..7eec3df 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -3,11 +3,13 @@ -include("include/leveled.hrl"). -export([all/0]). -export([ - crossbucket_aae/1 + crossbucket_aae/1, + handoff/1 ]). all() -> [ - crossbucket_aae + crossbucket_aae, + handoff ]. -define(MAGIC, 53). % riak_kv -> riak_object @@ -39,7 +41,7 @@ crossbucket_aae(_Config) -> {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), - % Generate 200K objects to be sued within the test, and load them into + % Generate 200K objects to be used within the test, and load them into % the first store (outputting the generated objects as a list of lists) % to be used elsewhere @@ -219,7 +221,7 @@ head_tictac_foldfun(B, K, PO, {Count, TreeAcc}) -> leveled_tictac:add_kv(TreeAcc, {B, K}, PO, ExtractFun)}. -check_tictacfold(BookA, BookB, HeadTicTacFolder, {B1, K1}, TreeSize) -> +check_tictacfold(BookA, BookB, HeadTicTacFolder, DeltaKey, TreeSize) -> SW_TT0 = os:timestamp(), {async, BookATreeFolder} = leveled_bookie:book_returnfolder(BookA, HeadTicTacFolder), @@ -233,17 +235,22 @@ check_tictacfold(BookA, BookB, HeadTicTacFolder, {B1, K1}, TreeSize) -> io:format("Fold over keys revealed counts of ~w and ~w~n", [CountA, CountB]), - % There should be a single delta between the stores - 1 = CountA - CountB, - DLs = leveled_tictac:find_dirtyleaves(BookATree, BookBTree), io:format("Found dirty leaves with Riak fold_heads of ~w~n", [length(DLs)]), - true = length(DLs) == 1, - ExpSeg = leveled_tictac:keyto_segment32(<>), - TreeSeg = leveled_tictac:get_segment(ExpSeg, TreeSize), - [ActualSeg] = DLs, - true = TreeSeg == ActualSeg, + case DeltaKey of + {B1, K1} -> + % There should be a single delta between the stores + 1 = CountA - CountB, + true = length(DLs) == 1, + ExpSeg = leveled_tictac:keyto_segment32(<>), + TreeSeg = leveled_tictac:get_segment(ExpSeg, TreeSize), + [ActualSeg] = DLs, + true = TreeSeg == ActualSeg; + none -> + 0 = CountA - CountB, + true = length(DLs) == 0 + end, DLs. @@ -261,3 +268,124 @@ summary_from_binary(ObjBin, ObjSize) -> _Rest/binary>> = ObjBin, {lists:usort(binary_to_term(VclockBin)), ObjSize, SibCount}. + + +handoff(_Config) -> + % Test requires multiple different databases, so want to mount them all + % on individual file paths + RootPathA = testutil:reset_filestructure("testA"), + RootPathB = testutil:reset_filestructure("testB"), + RootPathC = testutil:reset_filestructure("testC"), + RootPathD = testutil:reset_filestructure("testD"), + + % Start the first database, load a test object, close it, start it again + StartOpts1 = [{root_path, RootPathA}, + {max_pencillercachesize, 16000}, + {sync_strategy, riak_sync}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + + % Generate 200K objects to be used within the test, and load them into + % the first store (outputting the generated objects as a list of lists) + % to be used elsewhere + + GenList = + [binary_uuid, binary_uuid, binary_uuid, binary_uuid], + [CL0, CL1, CL2, CL3] = + testutil:load_objects(40000, + GenList, + Bookie1, + no_check, + fun testutil:generate_smallobjects/2, + 40000), + + % Update an delete some objects + testutil:update_some_objects(Bookie1, CL0, 1000), + testutil:update_some_objects(Bookie1, CL1, 20000), + testutil:delete_some_objects(Bookie1, CL2, 10000), + testutil:delete_some_objects(Bookie1, CL3, 4000), + + % Compact the journal + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + testutil:wait_for_compaction(Bookie1), + + % Start two new empty stores + StartOpts2 = [{root_path, RootPathB}, + {max_pencillercachesize, 24000}, + {sync_strategy, none}], + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + StartOpts3 = [{root_path, RootPathC}, + {max_pencillercachesize, 30000}, + {sync_strategy, none}], + {ok, Bookie3} = leveled_bookie:book_start(StartOpts3), + StartOpts4 = [{root_path, RootPathD}, + {max_pencillercachesize, 30000}, + {sync_strategy, none}], + {ok, Bookie4} = leveled_bookie:book_start(StartOpts4), + + FoldObjectsFun = + fun(Book) -> + fun(B, K, Obj, ok) -> + leveled_bookie:book_put(Book, B, K, Obj, [], ?RIAK_TAG), + ok + end + end, + + % Handoff the data from the first store to the other three stores + HandoffFolder2 = + {foldobjects_allkeys, + ?RIAK_TAG, + {FoldObjectsFun(Bookie2), ok}, + false, + key_order}, + HandoffFolder3 = + {foldobjects_allkeys, + ?RIAK_TAG, + {FoldObjectsFun(Bookie3), ok}, + true, + sqn_order}, + HandoffFolder4 = + {foldobjects_allkeys, + ?RIAK_TAG, + {FoldObjectsFun(Bookie4), ok}, + true, + sqn_order}, + {async, Handoff2} = + leveled_bookie:book_returnfolder(Bookie1, HandoffFolder2), + SW2 = os:timestamp(), + ok = Handoff2(), + Time_HO2 = timer:now_diff(os:timestamp(), SW2)/1000, + io:format("Handoff to Book2 in key_order took ~w milliseconds ~n", + [Time_HO2]), + SW3 = os:timestamp(), + {async, Handoff3} = + leveled_bookie:book_returnfolder(Bookie1, HandoffFolder3), + ok = Handoff3(), + Time_HO3 = timer:now_diff(os:timestamp(), SW3)/1000, + io:format("Handoff to Book3 in sqn_order took ~w milliseconds ~n", + [Time_HO3]), + SW4 = os:timestamp(), + {async, Handoff4} = + leveled_bookie:book_returnfolder(Bookie1, HandoffFolder4), + ok = Handoff4(), + Time_HO4 = timer:now_diff(os:timestamp(), SW4)/1000, + io:format("Handoff to Book4 in sqn_order took ~w milliseconds ~n", + [Time_HO4]), + + % Run tictac folds to confirm all stores consistent after handoff + TreeSize = xxsmall, + + TicTacFolder = + {foldheads_allkeys, + ?RIAK_TAG, + {fun head_tictac_foldfun/4, + {0, leveled_tictac:new_tree(test, TreeSize)}}, + false, true, false}, + check_tictacfold(Bookie1, Bookie2, TicTacFolder, none, TreeSize), + check_tictacfold(Bookie2, Bookie3, TicTacFolder, none, TreeSize), + check_tictacfold(Bookie3, Bookie4, TicTacFolder, none, TreeSize), + + % Shutdown + ok = leveled_bookie:book_close(Bookie1), + ok = leveled_bookie:book_close(Bookie2), + ok = leveled_bookie:book_close(Bookie3), + ok = leveled_bookie:book_close(Bookie4). diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 4277289..95ace81 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -32,6 +32,8 @@ name_list/0, load_objects/5, load_objects/6, + update_some_objects/3, + delete_some_objects/3, put_indexed_objects/3, put_altered_indexed_objects/3, put_altered_indexed_objects/4, @@ -402,6 +404,37 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> vclock=generate_vclock()}, Spec1}. +update_some_objects(Bookie, ObjList, SampleSize) -> + StartWatchA = os:timestamp(), + ToUpdateList = lists:sublist(lists:sort(ObjList), SampleSize), + UpdateFun = + fun({R, Obj, Spec}) -> + VC = Obj#r_object.vclock, + VC0 = update_vclock(VC), + [C] = Obj#r_object.contents, + C0 = C#r_content{value = leveled_rand:rand_bytes(512)}, + UpdObj = Obj#r_object{vclock = VC0, contents = [C0]}, + {R, UpdObj, Spec} + end, + UpdatedObjList = lists:map(UpdateFun, ToUpdateList), + riakload(Bookie, UpdatedObjList), + Time = timer:now_diff(os:timestamp(), StartWatchA), + io:format("~w objects updates in ~w seconds~n", + [SampleSize, Time/1000000]). + +delete_some_objects(Bookie, ObjList, SampleSize) -> + StartWatchA = os:timestamp(), + ToDeleteList = lists:sublist(lists:sort(ObjList), SampleSize), + DeleteFun = + fun({_R, Obj, Spec}) -> + B = Obj#r_object.bucket, + K = Obj#r_object.key, + book_riakdelete(Bookie, B, K, Spec) + end, + lists:foreach(DeleteFun, ToDeleteList), + Time = timer:now_diff(os:timestamp(), StartWatchA), + io:format("~w objects deleted in ~w seconds~n", + [SampleSize, Time/1000000]). generate_vclock() -> lists:map(fun(X) -> @@ -411,6 +444,9 @@ generate_vclock() -> {Actor, X} end, lists:seq(1, leveled_rand:uniform(8))). +update_vclock(VC) -> + [{Actor, X}|Rest] = VC, + [{Actor, X + 1}|Rest]. actor_list() -> [{1, albert}, {2, bertie}, {3, clara}, {4, dave}, {5, elton},