From 39ad5c9680c493a8817e7b1e58926df1d38ed611 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 15 Nov 2017 16:08:24 +0000 Subject: [PATCH 1/6] Make inker fold generic ink_loadpcl is in effect an inker fold - so abstract out the inker fold part to make this a generic capability --- src/leveled_inker.erl | 104 +++++++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 27 deletions(-) diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 98af268..764da24 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -98,6 +98,7 @@ ink_get/3, ink_fetch/3, ink_keycheck/3, + ink_fold/4, ink_loadpcl/4, ink_registersnapshot/2, ink_confirmdelete/2, @@ -252,8 +253,42 @@ ink_close(Pid) -> ink_doom(Pid) -> gen_server:call(Pid, doom, 60000). --spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. +-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, pid()) -> ok. %% @doc +%% Fold over the journal from a starting sequence number (MinSQN), passing +%% in three functions and a snapshot of the penciller. The Fold functions +%% should be +%% - a FilterFun to accumulate the objects and decided when to stop or loop +%% - a InitAccFun to re-initialise for the fold over the accumulator +%% - a FoldFun to actually perform the fold +%% +%% The inker fold works in batches, so the FilterFun determines what should +%% go into a batch and when the batch is complete. The FoldFun completes the +%% actual desired outcome by being applied on the batch. +%% +%% The FilterFun should be a five arity function which takes as inputs: +%% KeyInJournal +%% ValueInJournal +%% Position - the actual position within the CDB file of the object +%% Acc - the accumulator +%% ExtractFun - a single arity function which can be applied to ValueInJournal +%% to extract the actual object, and the size of the object, +%% +%% The FilterFun should return either: +%% {loop, {MinSQN, MaxSQN, UpdAcc}} or +%% {stop, {MinSQN, MaxSQN, UpdAcc}} +%% The FilterFun is required to call stop when MaxSQN is reached +%% +%% The InitAccFun should return an initial accumulator for each subfold. +%% +%% The FoldFun is a 2 arity function that should take as inputs: +%% The Recipient +%% The Accumulator built over the sub-fold +ink_fold(Pid, MinSQN, FoldFuns, Recipient) -> + gen_server:call(Pid, {fold, MinSQN, FoldFuns, Recipient}, infinity). + +-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. +%% %% Function to prompt load of the Ledger at startup. the Penciller should %% have determined the lowest SQN not present in the Ledger, and the inker %% should fold over the Journal from that point, using the function to load @@ -262,7 +297,14 @@ ink_doom(Pid) -> %% The load fun should be a five arity function like: %% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> - gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity). + gen_server:call(Pid, + {fold, + MinSQN, + {FilterFun, + fun leveled_bookie:empty_ledgercache/0, + fun push_to_penciller/2}, + Penciller}, + infinity). -spec ink_compactjournal(pid(), pid(), integer()) -> ok. %% @doc @@ -381,9 +423,16 @@ handle_call({get, Key, SQN}, _From, State) -> {reply, get_object(Key, SQN, State#state.manifest), State}; handle_call({key_check, Key, SQN}, _From, State) -> {reply, key_check(Key, SQN, State#state.manifest), State}; -handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> +handle_call({fold, + StartSQN, + {FilterFun, InitAccFun, FoldFun}, + Recipient}, _From, State) -> Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), - Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), + Reply = + fold_from_sequence(StartSQN, + {FilterFun, InitAccFun, FoldFun}, + Recipient, + Manifest), {reply, Reply, State}; handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, @@ -748,59 +797,60 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> %% FilterFun{K, V, Acc} -> Penciller Key List %% Load the output for the CDB file into the Penciller. -load_from_sequence(_MinSQN, _FilterFun, _PCL, []) -> +fold_from_sequence(_MinSQN, _FoldFuns, _Rec, []) -> ok; -load_from_sequence(MinSQN, FilterFun, PCL, [{LowSQN, FN, Pid, _LK}|Rest]) - when LowSQN >= MinSQN -> - load_between_sequence(MinSQN, +fold_from_sequence(MinSQN, FoldFuns, Rec, [{LowSQN, FN, Pid, _LK}|Rest]) + when LowSQN >= MinSQN -> + fold_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, - FilterFun, - PCL, + FoldFuns, + Rec, Pid, undefined, FN, Rest); -load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) -> +fold_from_sequence(MinSQN, FoldFuns, Rec, [{_LowSQN, FN, Pid, _LK}|Rest]) -> case Rest of [] -> - load_between_sequence(MinSQN, + fold_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, - FilterFun, - PCL, + FoldFuns, + Rec, Pid, undefined, FN, Rest); [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> - load_between_sequence(MinSQN, + fold_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, - FilterFun, - PCL, + FoldFuns, + Rec, Pid, undefined, FN, Rest); _ -> - load_from_sequence(MinSQN, FilterFun, PCL, Rest) + fold_from_sequence(MinSQN, FoldFuns, Rec, Rest) end. -load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, - CDBpid, StartPos, FN, Rest) -> +fold_between_sequence(MinSQN, MaxSQN, FoldFuns, + Recipient, CDBpid, StartPos, FN, Rest) -> leveled_log:log("I0014", [FN, MinSQN]), - InitAcc = {MinSQN, MaxSQN, leveled_bookie:empty_ledgercache()}, + {FilterFun, InitAccFun, FoldFun} = FoldFuns, + InitAcc = {MinSQN, MaxSQN, InitAccFun()}, Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, AccLC}} -> - ok = push_to_penciller(Penciller, AccLC), + ok = FoldFun(Recipient, AccLC), {ok, AccMinSQN}; {LastPosition, {_AccMinSQN, _AccMaxSQN, AccLC}} -> - ok = push_to_penciller(Penciller, AccLC), + ok = FoldFun(Recipient, AccLC), NextSQN = MaxSQN + 1, - load_between_sequence(NextSQN, + fold_between_sequence(NextSQN, NextSQN + ?LOADING_BATCH, - FilterFun, - Penciller, + FoldFuns, + Recipient, CDBpid, LastPosition, FN, @@ -808,7 +858,7 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, end, case Res of {ok, LMSQN} -> - load_from_sequence(LMSQN, FilterFun, Penciller, Rest); + fold_from_sequence(LMSQN, FoldFuns, Recipient, Rest); ok -> ok end. From 50c81d0626e747eeec9a62d8e7edf98c5699e607 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 17 Nov 2017 14:54:53 +0000 Subject: [PATCH 2/6] Make ink fold more generic Also makes the fold_from_sequence loop much easier to follow --- src/leveled_bookie.erl | 16 ++++- src/leveled_codec.erl | 14 ++++ src/leveled_inker.erl | 160 ++++++++++++++++++++++------------------- src/leveled_runner.erl | 7 +- 4 files changed, 116 insertions(+), 81 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 61493a7..edd339d 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -653,7 +653,9 @@ loadqueue_ledgercache(Cache) -> %% Query can be no_lookup, indicating the snapshot will be used for non-specific %% range queries and not direct fetch requests. {StartKey, EndKey} if the the %% snapshot is to be used for one specific query only (this is much quicker to -%% setup, assuming the range is a small subset of the overall key space). +%% setup, assuming the range is a small subset of the overall key space). If +%% lookup is required but the range isn't defined then 'undefined' should be +%% passed as the query snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) -> LedgerCacheReady = readycache_forsnapshot(LedgerCache, Query), BookiesMem = {LedgerCacheReady#ledger_cache.loader, @@ -760,10 +762,18 @@ get_runner(State, leveled_runner:foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList); -get_runner(State, +get_runner(State, {foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) -> + get_runner(State, + {foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order}); +get_runner(State, + {foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order}) -> SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold), - leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun); + leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order); +get_runner(State, + {foldobjects_allkeys, Tag, FoldFun, SnapPreFold, sqn_order}) -> + SnapFun = return_snapfun(State, store, undefined, true, SnapPreFold), + leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, sqn_order); get_runner(State, {foldheads_bybucket, Tag, Bucket, KeyRange, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index c33af4c..df4d491 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -46,6 +46,7 @@ to_ledgerkey/3, to_ledgerkey/5, from_ledgerkey/1, + from_ledgerkey/2, to_inkerkv/3, to_inkerkv/6, from_inkerkv/1, @@ -204,6 +205,19 @@ is_active(Key, Value, Now) -> false end. +-spec from_ledgerkey(atom(), tuple()) -> false|tuple(). +%% @doc +%% Return the "significant information" from the Ledger Key (normally the +%% {Bucket, Key} pair) if and only if the ExpectedTag matched the tag - +%% otherwise return false +from_ledgerkey(ExpectedTag, {ExpectedTag, Bucket, Key, SubKey}) -> + from_ledgerkey({ExpectedTag, Bucket, Key, SubKey}); +from_ledgerkey(_ExpectedTag, _OtherKey) -> + false. + +-spec from_ledgerkey(tuple()) -> tuple(). +%% @doc +%% Return identifying information from the LedgerKey from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) -> {Bucket, Key, IdxVal}; from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 764da24..6333e5b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -253,7 +253,7 @@ ink_close(Pid) -> ink_doom(Pid) -> gen_server:call(Pid, doom, 60000). --spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, pid()) -> ok. +-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> ok. %% @doc %% Fold over the journal from a starting sequence number (MinSQN), passing %% in three functions and a snapshot of the penciller. The Fold functions @@ -270,7 +270,7 @@ ink_doom(Pid) -> %% KeyInJournal %% ValueInJournal %% Position - the actual position within the CDB file of the object -%% Acc - the accumulator +%% Acc - the bathc accumulator %% ExtractFun - a single arity function which can be applied to ValueInJournal %% to extract the actual object, and the size of the object, %% @@ -279,13 +279,13 @@ ink_doom(Pid) -> %% {stop, {MinSQN, MaxSQN, UpdAcc}} %% The FilterFun is required to call stop when MaxSQN is reached %% -%% The InitAccFun should return an initial accumulator for each subfold. +%% The InitAccFun should return an initial batch accumulator for each subfold. %% -%% The FoldFun is a 2 arity function that should take as inputs: -%% The Recipient -%% The Accumulator built over the sub-fold -ink_fold(Pid, MinSQN, FoldFuns, Recipient) -> - gen_server:call(Pid, {fold, MinSQN, FoldFuns, Recipient}, infinity). +%% The BatchFun is a two arity function that should take as inputs: +%% An overall accumulator +%% The batch accumulator built over the sub-fold +ink_fold(Pid, MinSQN, FoldFuns, Acc) -> + gen_server:call(Pid, {fold, MinSQN, FoldFuns, Acc}, infinity). -spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. %% @@ -297,13 +297,17 @@ ink_fold(Pid, MinSQN, FoldFuns, Recipient) -> %% The load fun should be a five arity function like: %% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> + BatchFun = + fun(BatchAcc, _Acc) -> + push_to_penciller(Penciller, BatchAcc) + end, gen_server:call(Pid, {fold, MinSQN, {FilterFun, fun leveled_bookie:empty_ledgercache/0, - fun push_to_penciller/2}, - Penciller}, + BatchFun}, + ok}, infinity). -spec ink_compactjournal(pid(), pid(), integer()) -> ok. @@ -426,12 +430,12 @@ handle_call({key_check, Key, SQN}, _From, State) -> handle_call({fold, StartSQN, {FilterFun, InitAccFun, FoldFun}, - Recipient}, _From, State) -> + Acc}, _From, State) -> Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), Reply = fold_from_sequence(StartSQN, {FilterFun, InitAccFun, FoldFun}, - Recipient, + Acc, Manifest), {reply, Reply, State}; handle_call({register_snapshot, Requestor}, _From , State) -> @@ -793,76 +797,82 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> {SQN, Filename, PidW, empty}. -%% Scan between sequence numbers applying FilterFun to each entry where -%% FilterFun{K, V, Acc} -> Penciller Key List -%% Load the output for the CDB file into the Penciller. -fold_from_sequence(_MinSQN, _FoldFuns, _Rec, []) -> - ok; -fold_from_sequence(MinSQN, FoldFuns, Rec, [{LowSQN, FN, Pid, _LK}|Rest]) +-spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list()) + -> any(). +%% @doc +%% +%% Scan from the starting sequence number to the end of the Journal. Apply +%% the FilterFun as it scans over the CDB file to build up a Batch of relevant +%% objects - and then apply the FoldFun to the batch once the batch is +%% complete +%% +%% Inputs - MinSQN, FoldFuns, OverallAccumulator, Inker's Manifest +%% +%% The fold loops over all the CDB files in the Manifest. Each file is looped +%% over in batches using foldfile_between_sequence/7. The batch is a range of +%% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted +%% files +fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) -> + Acc; +fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) when LowSQN >= MinSQN -> - fold_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Rec, - Pid, - undefined, - FN, - Rest); -fold_from_sequence(MinSQN, FoldFuns, Rec, [{_LowSQN, FN, Pid, _LK}|Rest]) -> - case Rest of - [] -> - fold_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Rec, - Pid, - undefined, - FN, - Rest); - [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> - fold_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Rec, - Pid, - undefined, - FN, - Rest); - _ -> - fold_from_sequence(MinSQN, FoldFuns, Rec, Rest) - end. + Acc0 = foldfile_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FoldFuns, + Acc, + Pid, + undefined, + FN), + fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest); +fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> + % If this file has a LowSQN less than the minimum, we can skip it if the + % next file also has a LowSQN below the minimum + Acc0 = + case Rest of + [] -> + foldfile_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FoldFuns, + Acc, + Pid, + undefined, + FN); + [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> + foldfile_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FoldFuns, + Acc, + Pid, + undefined, + FN); + _ -> + Acc + end, + fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest). - - -fold_between_sequence(MinSQN, MaxSQN, FoldFuns, - Recipient, CDBpid, StartPos, FN, Rest) -> +foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, + Acc, CDBpid, StartPos, FN) -> leveled_log:log("I0014", [FN, MinSQN]), {FilterFun, InitAccFun, FoldFun} = FoldFuns, - InitAcc = {MinSQN, MaxSQN, InitAccFun()}, - Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of - {eof, {AccMinSQN, _AccMaxSQN, AccLC}} -> - ok = FoldFun(Recipient, AccLC), - {ok, AccMinSQN}; - {LastPosition, {_AccMinSQN, _AccMaxSQN, AccLC}} -> - ok = FoldFun(Recipient, AccLC), - NextSQN = MaxSQN + 1, - fold_between_sequence(NextSQN, - NextSQN + ?LOADING_BATCH, - FoldFuns, - Recipient, - CDBpid, - LastPosition, - FN, - Rest) - end, - case Res of - {ok, LMSQN} -> - fold_from_sequence(LMSQN, FoldFuns, Recipient, Rest); - ok -> - ok + InitBatchAcc = {MinSQN, MaxSQN, InitAccFun()}, + + case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of + {eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> + FoldFun(BatchAcc, Acc); + {LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> + UpdAcc = FoldFun(BatchAcc, Acc), + NextSQN = MaxSQN + 1, + foldfile_between_sequence(NextSQN, + NextSQN + ?LOADING_BATCH, + FoldFuns, + UpdAcc, + CDBpid, + LastPosition, + FN) end. + push_to_penciller(Penciller, LedgerCache) -> % The push to penciller must start as a tree to correctly de-duplicate % the list by order before becoming a de-duplicated list for loading diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 710aba7..62d1930 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -29,7 +29,7 @@ hashlist_query/3, tictactree/5, foldheads_allkeys/5, - foldobjects_allkeys/3, + foldobjects_allkeys/4, foldheads_bybucket/5, foldobjects_bybucket/3, foldobjects_byindex/3 @@ -226,10 +226,11 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) -> FoldFun, {true, JournalCheck}, SegmentList). --spec foldobjects_allkeys(fun(), atom(), fun()) -> {async, fun()}. +-spec foldobjects_allkeys(fun(), atom(), fun(), key_order|sqn_order) + -> {async, fun()}. %% @doc %% Fold over all objects for a given tag -foldobjects_allkeys(SnapFun, Tag, FoldFun) -> +foldobjects_allkeys(SnapFun, Tag, FoldFun, _Order) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), foldobjects(SnapFun, From 0e071d078ea987bd7e506bd6713c12abe2af1661 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 17 Nov 2017 18:30:51 +0000 Subject: [PATCH 3/6] fold_objects in SQN order This adds a test that fold_objects works in SQN order --- include/leveled.hrl | 4 +- src/leveled_inker.erl | 14 +-- src/leveled_log.erl | 5 +- src/leveled_runner.erl | 89 ++++++++++++++++++- test/end_to_end/riak_SUITE.erl | 152 ++++++++++++++++++++++++++++++--- test/end_to_end/testutil.erl | 36 ++++++++ 6 files changed, 278 insertions(+), 22 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 833ba8b..5763040 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -8,10 +8,12 @@ %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). + %% Inker key type used for objects which contain no value, only key changes %% This is used currently for objects formed under a 'retain' strategy on Inker -%% compaction, but could be used for special set-type objects +%% compaction -define(INKT_KEYD, keyd). + %% Inker key type used for tombstones -define(INKT_TOMB, tomb). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 6333e5b..fde2dda 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -280,6 +280,8 @@ ink_doom(Pid) -> %% The FilterFun is required to call stop when MaxSQN is reached %% %% The InitAccFun should return an initial batch accumulator for each subfold. +%% It is a 2-arity function that takes a filename and a MinSQN as an input +%% potentially to be use din logging %% %% The BatchFun is a two arity function that should take as inputs: %% An overall accumulator @@ -301,12 +303,15 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> fun(BatchAcc, _Acc) -> push_to_penciller(Penciller, BatchAcc) end, + InitAccFun = + fun(FN, CurrentMinSQN) -> + leveled_log:log("I0014", [FN, CurrentMinSQN]), + leveled_bookie:empty_ledgercache() + end, gen_server:call(Pid, {fold, MinSQN, - {FilterFun, - fun leveled_bookie:empty_ledgercache/0, - BatchFun}, + {FilterFun, InitAccFun, BatchFun}, ok}, infinity). @@ -853,9 +858,8 @@ fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, Acc, CDBpid, StartPos, FN) -> - leveled_log:log("I0014", [FN, MinSQN]), {FilterFun, InitAccFun, FoldFun} = FoldFuns, - InitBatchAcc = {MinSQN, MaxSQN, InitAccFun()}, + InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)}, case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of {eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> diff --git a/src/leveled_log.erl b/src/leveled_log.erl index a3c8977..85d8ab6 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -327,7 +327,10 @@ {info, "After ~w PUTs total write time is ~w total sync time is ~w " ++ "and max write time is ~w and max sync time is ~w"}}, {"CDB18", - {info, "Handled return and write of hashtable"}} + {info, "Handled return and write of hashtable"}}, + + {"R0001", + {debug, "Object fold to process batch of ~w objects"}} ]). diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 62d1930..9fe8dcf 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -230,13 +230,96 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) -> -> {async, fun()}. %% @doc %% Fold over all objects for a given tag -foldobjects_allkeys(SnapFun, Tag, FoldFun, _Order) -> +foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, - false, false). + false, false); +foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> + % Fold over the journal in order of receipt + {FoldFun, InitAcc} = + case is_tuple(FoldObjectsFun) of + true -> + % FoldObjectsFun is already a tuple with a Fold function and an + % initial accumulator + FoldObjectsFun; + false -> + % no initial accumulatr passed, and so should be just a list + {FoldObjectsFun, []} + end, + + FilterFun = + fun(JKey, JVal, _Pos, Acc, ExtractFun) -> + + {SQN, InkTag, LedgerKey} = JKey, + case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of + {?INKT_STND, {B, K}} -> + % Ignore tombstones and non-matching Tags and Key changes + % objects. + {MinSQN, MaxSQN, BatchAcc} = Acc, + case SQN of + SQN when SQN < MinSQN -> + {loop, Acc}; + SQN when SQN > MaxSQN -> + {stop, Acc}; + _ -> + {VBin, _VSize} = ExtractFun(JVal), + {Obj, _IdxSpecs} = leveled_codec:split_inkvalue(VBin), + ToLoop = + case SQN of + MaxSQN -> stop; + _ -> loop + end, + {ToLoop, + {MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}} + end; + _ -> + {loop, Acc} + end + end, + + InitAccFun = fun(_FN, _SQN) -> [] end, + + Folder = + fun() -> + + {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), + IsValidFun = + fun(Bucket, Key, SQN) -> + LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + leveled_penciller:pcl_checksequencenumber(LedgerSnapshot, + LedgerKey, + SQN) + end, + + BatchFoldFun = + fun(BatchAcc, ObjAcc) -> + ObjFun = + fun({B, K, SQN, Obj}, Acc) -> + case IsValidFun(B, K, SQN) of + true -> + FoldFun(B, K, Obj, Acc); + false -> + Acc + end + end, + leveled_log:log("R0001", [length(BatchAcc)]), + lists:foldr(ObjFun, ObjAcc, BatchAcc) + end, + + Acc = + leveled_inker:ink_fold(JournalSnapshot, + 0, + {FilterFun, InitAccFun, BatchFoldFun}, + InitAcc), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + ok = leveled_inker:ink_close(JournalSnapshot), + Acc + end, + {async, Folder}. + -spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) -> {async, fun()}. @@ -481,7 +564,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> ProxyObj = make_proxy_object(LK, JK, MD, V, InkerClone), - FoldObjectsFun(B, K,ProxyObj, Acc); + FoldObjectsFun(B, K, ProxyObj, Acc); false -> R = leveled_bookie:fetch_value(InkerClone, JK), case R of diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 7c979da..7eec3df 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -3,11 +3,13 @@ -include("include/leveled.hrl"). -export([all/0]). -export([ - crossbucket_aae/1 + crossbucket_aae/1, + handoff/1 ]). all() -> [ - crossbucket_aae + crossbucket_aae, + handoff ]. -define(MAGIC, 53). % riak_kv -> riak_object @@ -39,7 +41,7 @@ crossbucket_aae(_Config) -> {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), - % Generate 200K objects to be sued within the test, and load them into + % Generate 200K objects to be used within the test, and load them into % the first store (outputting the generated objects as a list of lists) % to be used elsewhere @@ -219,7 +221,7 @@ head_tictac_foldfun(B, K, PO, {Count, TreeAcc}) -> leveled_tictac:add_kv(TreeAcc, {B, K}, PO, ExtractFun)}. -check_tictacfold(BookA, BookB, HeadTicTacFolder, {B1, K1}, TreeSize) -> +check_tictacfold(BookA, BookB, HeadTicTacFolder, DeltaKey, TreeSize) -> SW_TT0 = os:timestamp(), {async, BookATreeFolder} = leveled_bookie:book_returnfolder(BookA, HeadTicTacFolder), @@ -233,17 +235,22 @@ check_tictacfold(BookA, BookB, HeadTicTacFolder, {B1, K1}, TreeSize) -> io:format("Fold over keys revealed counts of ~w and ~w~n", [CountA, CountB]), - % There should be a single delta between the stores - 1 = CountA - CountB, - DLs = leveled_tictac:find_dirtyleaves(BookATree, BookBTree), io:format("Found dirty leaves with Riak fold_heads of ~w~n", [length(DLs)]), - true = length(DLs) == 1, - ExpSeg = leveled_tictac:keyto_segment32(<>), - TreeSeg = leveled_tictac:get_segment(ExpSeg, TreeSize), - [ActualSeg] = DLs, - true = TreeSeg == ActualSeg, + case DeltaKey of + {B1, K1} -> + % There should be a single delta between the stores + 1 = CountA - CountB, + true = length(DLs) == 1, + ExpSeg = leveled_tictac:keyto_segment32(<>), + TreeSeg = leveled_tictac:get_segment(ExpSeg, TreeSize), + [ActualSeg] = DLs, + true = TreeSeg == ActualSeg; + none -> + 0 = CountA - CountB, + true = length(DLs) == 0 + end, DLs. @@ -261,3 +268,124 @@ summary_from_binary(ObjBin, ObjSize) -> _Rest/binary>> = ObjBin, {lists:usort(binary_to_term(VclockBin)), ObjSize, SibCount}. + + +handoff(_Config) -> + % Test requires multiple different databases, so want to mount them all + % on individual file paths + RootPathA = testutil:reset_filestructure("testA"), + RootPathB = testutil:reset_filestructure("testB"), + RootPathC = testutil:reset_filestructure("testC"), + RootPathD = testutil:reset_filestructure("testD"), + + % Start the first database, load a test object, close it, start it again + StartOpts1 = [{root_path, RootPathA}, + {max_pencillercachesize, 16000}, + {sync_strategy, riak_sync}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + + % Generate 200K objects to be used within the test, and load them into + % the first store (outputting the generated objects as a list of lists) + % to be used elsewhere + + GenList = + [binary_uuid, binary_uuid, binary_uuid, binary_uuid], + [CL0, CL1, CL2, CL3] = + testutil:load_objects(40000, + GenList, + Bookie1, + no_check, + fun testutil:generate_smallobjects/2, + 40000), + + % Update an delete some objects + testutil:update_some_objects(Bookie1, CL0, 1000), + testutil:update_some_objects(Bookie1, CL1, 20000), + testutil:delete_some_objects(Bookie1, CL2, 10000), + testutil:delete_some_objects(Bookie1, CL3, 4000), + + % Compact the journal + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + testutil:wait_for_compaction(Bookie1), + + % Start two new empty stores + StartOpts2 = [{root_path, RootPathB}, + {max_pencillercachesize, 24000}, + {sync_strategy, none}], + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + StartOpts3 = [{root_path, RootPathC}, + {max_pencillercachesize, 30000}, + {sync_strategy, none}], + {ok, Bookie3} = leveled_bookie:book_start(StartOpts3), + StartOpts4 = [{root_path, RootPathD}, + {max_pencillercachesize, 30000}, + {sync_strategy, none}], + {ok, Bookie4} = leveled_bookie:book_start(StartOpts4), + + FoldObjectsFun = + fun(Book) -> + fun(B, K, Obj, ok) -> + leveled_bookie:book_put(Book, B, K, Obj, [], ?RIAK_TAG), + ok + end + end, + + % Handoff the data from the first store to the other three stores + HandoffFolder2 = + {foldobjects_allkeys, + ?RIAK_TAG, + {FoldObjectsFun(Bookie2), ok}, + false, + key_order}, + HandoffFolder3 = + {foldobjects_allkeys, + ?RIAK_TAG, + {FoldObjectsFun(Bookie3), ok}, + true, + sqn_order}, + HandoffFolder4 = + {foldobjects_allkeys, + ?RIAK_TAG, + {FoldObjectsFun(Bookie4), ok}, + true, + sqn_order}, + {async, Handoff2} = + leveled_bookie:book_returnfolder(Bookie1, HandoffFolder2), + SW2 = os:timestamp(), + ok = Handoff2(), + Time_HO2 = timer:now_diff(os:timestamp(), SW2)/1000, + io:format("Handoff to Book2 in key_order took ~w milliseconds ~n", + [Time_HO2]), + SW3 = os:timestamp(), + {async, Handoff3} = + leveled_bookie:book_returnfolder(Bookie1, HandoffFolder3), + ok = Handoff3(), + Time_HO3 = timer:now_diff(os:timestamp(), SW3)/1000, + io:format("Handoff to Book3 in sqn_order took ~w milliseconds ~n", + [Time_HO3]), + SW4 = os:timestamp(), + {async, Handoff4} = + leveled_bookie:book_returnfolder(Bookie1, HandoffFolder4), + ok = Handoff4(), + Time_HO4 = timer:now_diff(os:timestamp(), SW4)/1000, + io:format("Handoff to Book4 in sqn_order took ~w milliseconds ~n", + [Time_HO4]), + + % Run tictac folds to confirm all stores consistent after handoff + TreeSize = xxsmall, + + TicTacFolder = + {foldheads_allkeys, + ?RIAK_TAG, + {fun head_tictac_foldfun/4, + {0, leveled_tictac:new_tree(test, TreeSize)}}, + false, true, false}, + check_tictacfold(Bookie1, Bookie2, TicTacFolder, none, TreeSize), + check_tictacfold(Bookie2, Bookie3, TicTacFolder, none, TreeSize), + check_tictacfold(Bookie3, Bookie4, TicTacFolder, none, TreeSize), + + % Shutdown + ok = leveled_bookie:book_close(Bookie1), + ok = leveled_bookie:book_close(Bookie2), + ok = leveled_bookie:book_close(Bookie3), + ok = leveled_bookie:book_close(Bookie4). diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 4277289..95ace81 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -32,6 +32,8 @@ name_list/0, load_objects/5, load_objects/6, + update_some_objects/3, + delete_some_objects/3, put_indexed_objects/3, put_altered_indexed_objects/3, put_altered_indexed_objects/4, @@ -402,6 +404,37 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> vclock=generate_vclock()}, Spec1}. +update_some_objects(Bookie, ObjList, SampleSize) -> + StartWatchA = os:timestamp(), + ToUpdateList = lists:sublist(lists:sort(ObjList), SampleSize), + UpdateFun = + fun({R, Obj, Spec}) -> + VC = Obj#r_object.vclock, + VC0 = update_vclock(VC), + [C] = Obj#r_object.contents, + C0 = C#r_content{value = leveled_rand:rand_bytes(512)}, + UpdObj = Obj#r_object{vclock = VC0, contents = [C0]}, + {R, UpdObj, Spec} + end, + UpdatedObjList = lists:map(UpdateFun, ToUpdateList), + riakload(Bookie, UpdatedObjList), + Time = timer:now_diff(os:timestamp(), StartWatchA), + io:format("~w objects updates in ~w seconds~n", + [SampleSize, Time/1000000]). + +delete_some_objects(Bookie, ObjList, SampleSize) -> + StartWatchA = os:timestamp(), + ToDeleteList = lists:sublist(lists:sort(ObjList), SampleSize), + DeleteFun = + fun({_R, Obj, Spec}) -> + B = Obj#r_object.bucket, + K = Obj#r_object.key, + book_riakdelete(Bookie, B, K, Spec) + end, + lists:foreach(DeleteFun, ToDeleteList), + Time = timer:now_diff(os:timestamp(), StartWatchA), + io:format("~w objects deleted in ~w seconds~n", + [SampleSize, Time/1000000]). generate_vclock() -> lists:map(fun(X) -> @@ -411,6 +444,9 @@ generate_vclock() -> {Actor, X} end, lists:seq(1, leveled_rand:uniform(8))). +update_vclock(VC) -> + [{Actor, X}|Rest] = VC, + [{Actor, X + 1}|Rest]. actor_list() -> [{1, albert}, {2, bertie}, {3, clara}, {4, dave}, {5, elton}, From d5babe0c29f5ab2682cbbc2e7509f374984b9f90 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 20 Nov 2017 10:21:30 +0000 Subject: [PATCH 4/6] Expand tests for coverage Prove tests handle SQN over batch, and also can handle mismatched tags in the store. --- test/end_to_end/riak_SUITE.erl | 27 +++++++++++++++++++++++++++ test/end_to_end/testutil.erl | 17 +++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 7eec3df..60c2609 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -284,6 +284,8 @@ handoff(_Config) -> {sync_strategy, riak_sync}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + % Add some noe Riak objects in - which should be ignored in folds. + Hashes = testutil:stdload(Bookie1, 1000), % Generate 200K objects to be used within the test, and load them into % the first store (outputting the generated objects as a list of lists) % to be used elsewhere @@ -322,6 +324,11 @@ handoff(_Config) -> {sync_strategy, none}], {ok, Bookie4} = leveled_bookie:book_start(StartOpts4), + FoldStObjectsFun = + fun(B, K, V, Acc) -> + [{B, K, erlang:phash2(V)}|Acc] + end, + FoldObjectsFun = fun(Book) -> fun(B, K, Obj, ok) -> @@ -384,6 +391,26 @@ handoff(_Config) -> check_tictacfold(Bookie2, Bookie3, TicTacFolder, none, TreeSize), check_tictacfold(Bookie3, Bookie4, TicTacFolder, none, TreeSize), + StdFolder = + {foldobjects_allkeys, + ?STD_TAG, + FoldStObjectsFun, + true, + sqn_order}, + + {async, StdFold1} = leveled_bookie:book_returnfolder(Bookie1, StdFolder), + {async, StdFold2} = leveled_bookie:book_returnfolder(Bookie2, StdFolder), + {async, StdFold3} = leveled_bookie:book_returnfolder(Bookie3, StdFolder), + {async, StdFold4} = leveled_bookie:book_returnfolder(Bookie4, StdFolder), + StdFoldOut1 = lists:sort(StdFold1()), + StdFoldOut2 = lists:sort(StdFold2()), + StdFoldOut3 = lists:sort(StdFold3()), + StdFoldOut4 = lists:sort(StdFold4()), + true = StdFoldOut1 == lists:sort(Hashes), + true = StdFoldOut2 == [], + true = StdFoldOut3 == [], + true = StdFoldOut4 == [], + % Shutdown ok = leveled_bookie:book_close(Bookie1), ok = leveled_bookie:book_close(Bookie2), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 95ace81..0d1af19 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -7,6 +7,7 @@ book_riakget/3, book_riakhead/3, riakload/2, + stdload/2, reset_filestructure/0, reset_filestructure/1, check_bucket_stats/2, @@ -173,6 +174,22 @@ riakload(Bookie, ObjectList) -> end, ObjectList). +stdload(Bookie, Count) -> + stdload(Bookie, Count, []). + +stdload(_Bookie, 0, Acc) -> + Acc; +stdload(Bookie, Count, Acc) -> + B = "Bucket", + K = leveled_codec:generate_uuid(), + V = get_compressiblevalue(), + R = leveled_bookie:book_put(Bookie, B, K, V, [], ?STD_TAG), + case R of + ok -> ok; + pause -> timer:sleep(?SLOWOFFER_DELAY) + end, + stdload(Bookie, Count - 1, [{B, K, erlang:phash2(V)}|Acc]). + reset_filestructure() -> reset_filestructure(0, ?ROOT_PATH). From 62a84b95bbe2d5e5ce26c67efd18c94eab9f388e Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 20 Nov 2017 10:40:09 +0000 Subject: [PATCH 5/6] Add fadvise help to scan --- src/leveled_cdb.erl | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 90f2491..3c2a03e 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -649,19 +649,29 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, {ok, StartPos} end, file:position(State#state.handle, StartPos0), + file:advise(State#state.handle, + StartPos0, + EndPos0 - StartPos0, + sequential), MaybeEnd = (check_last_key(State#state.last_key) == empty) or (StartPos0 >= (EndPos0 - ?DWORD_SIZE)), - case MaybeEnd of - true -> - {reply, {eof, Acc}, StateName, State}; - false -> - {LastPosition, Acc2} = scan_over_file(State#state.handle, - StartPos0, - FilterFun, - Acc, - State#state.last_key), - {reply, {LastPosition, Acc2}, StateName, State} - end; + {LastPosition, Acc2} = + case MaybeEnd of + true -> + {eof, Acc}; + false -> + scan_over_file(State#state.handle, + StartPos0, + FilterFun, + Acc, + State#state.last_key) + end, + {ok, LastReadPos} = file:position(State#state.handle, cur), + file:advise(State#state.handle, + StartPos0, + LastReadPos - StartPos0, + dont_need), + {reply, {LastPosition, Acc2}, StateName, State}; handle_sync_event(cdb_lastkey, _From, StateName, State) -> {reply, State#state.last_key, StateName, State}; handle_sync_event(cdb_firstkey, _From, StateName, State) -> From 51f504fec5b45551d35e79b965f66be43ce830dd Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 20 Nov 2017 17:29:57 +0000 Subject: [PATCH 6/6] Add extra slow_fetch test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sometimes ct tests don’t hit this - surprisingly --- src/leveled_penciller.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 495ad1e..a2523bc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -1820,7 +1820,8 @@ create_file_test() -> ?assertMatch("hello", binary_to_term(Bin)). slow_fetch_test() -> - ?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)). + ?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)), + ?assertMatch("value", log_slowfetch(2, "value", "fake", 0, 1)). checkready(Pid) -> try