From 50c81d0626e747eeec9a62d8e7edf98c5699e607 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 17 Nov 2017 14:54:53 +0000 Subject: [PATCH] Make ink fold more generic Also makes the fold_from_sequence loop much easier to follow --- src/leveled_bookie.erl | 16 ++++- src/leveled_codec.erl | 14 ++++ src/leveled_inker.erl | 160 ++++++++++++++++++++++------------------- src/leveled_runner.erl | 7 +- 4 files changed, 116 insertions(+), 81 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 61493a7..edd339d 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -653,7 +653,9 @@ loadqueue_ledgercache(Cache) -> %% Query can be no_lookup, indicating the snapshot will be used for non-specific %% range queries and not direct fetch requests. {StartKey, EndKey} if the the %% snapshot is to be used for one specific query only (this is much quicker to -%% setup, assuming the range is a small subset of the overall key space). +%% setup, assuming the range is a small subset of the overall key space). If +%% lookup is required but the range isn't defined then 'undefined' should be +%% passed as the query snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) -> LedgerCacheReady = readycache_forsnapshot(LedgerCache, Query), BookiesMem = {LedgerCacheReady#ledger_cache.loader, @@ -760,10 +762,18 @@ get_runner(State, leveled_runner:foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList); -get_runner(State, +get_runner(State, {foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) -> + get_runner(State, + {foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order}); +get_runner(State, + {foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order}) -> SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold), - leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun); + leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order); +get_runner(State, + {foldobjects_allkeys, Tag, FoldFun, SnapPreFold, sqn_order}) -> + SnapFun = return_snapfun(State, store, undefined, true, SnapPreFold), + leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, sqn_order); get_runner(State, {foldheads_bybucket, Tag, Bucket, KeyRange, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index c33af4c..df4d491 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -46,6 +46,7 @@ to_ledgerkey/3, to_ledgerkey/5, from_ledgerkey/1, + from_ledgerkey/2, to_inkerkv/3, to_inkerkv/6, from_inkerkv/1, @@ -204,6 +205,19 @@ is_active(Key, Value, Now) -> false end. +-spec from_ledgerkey(atom(), tuple()) -> false|tuple(). +%% @doc +%% Return the "significant information" from the Ledger Key (normally the +%% {Bucket, Key} pair) if and only if the ExpectedTag matched the tag - +%% otherwise return false +from_ledgerkey(ExpectedTag, {ExpectedTag, Bucket, Key, SubKey}) -> + from_ledgerkey({ExpectedTag, Bucket, Key, SubKey}); +from_ledgerkey(_ExpectedTag, _OtherKey) -> + false. + +-spec from_ledgerkey(tuple()) -> tuple(). +%% @doc +%% Return identifying information from the LedgerKey from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) -> {Bucket, Key, IdxVal}; from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 764da24..6333e5b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -253,7 +253,7 @@ ink_close(Pid) -> ink_doom(Pid) -> gen_server:call(Pid, doom, 60000). --spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, pid()) -> ok. +-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> ok. %% @doc %% Fold over the journal from a starting sequence number (MinSQN), passing %% in three functions and a snapshot of the penciller. The Fold functions @@ -270,7 +270,7 @@ ink_doom(Pid) -> %% KeyInJournal %% ValueInJournal %% Position - the actual position within the CDB file of the object -%% Acc - the accumulator +%% Acc - the bathc accumulator %% ExtractFun - a single arity function which can be applied to ValueInJournal %% to extract the actual object, and the size of the object, %% @@ -279,13 +279,13 @@ ink_doom(Pid) -> %% {stop, {MinSQN, MaxSQN, UpdAcc}} %% The FilterFun is required to call stop when MaxSQN is reached %% -%% The InitAccFun should return an initial accumulator for each subfold. +%% The InitAccFun should return an initial batch accumulator for each subfold. %% -%% The FoldFun is a 2 arity function that should take as inputs: -%% The Recipient -%% The Accumulator built over the sub-fold -ink_fold(Pid, MinSQN, FoldFuns, Recipient) -> - gen_server:call(Pid, {fold, MinSQN, FoldFuns, Recipient}, infinity). +%% The BatchFun is a two arity function that should take as inputs: +%% An overall accumulator +%% The batch accumulator built over the sub-fold +ink_fold(Pid, MinSQN, FoldFuns, Acc) -> + gen_server:call(Pid, {fold, MinSQN, FoldFuns, Acc}, infinity). -spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. %% @@ -297,13 +297,17 @@ ink_fold(Pid, MinSQN, FoldFuns, Recipient) -> %% The load fun should be a five arity function like: %% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> + BatchFun = + fun(BatchAcc, _Acc) -> + push_to_penciller(Penciller, BatchAcc) + end, gen_server:call(Pid, {fold, MinSQN, {FilterFun, fun leveled_bookie:empty_ledgercache/0, - fun push_to_penciller/2}, - Penciller}, + BatchFun}, + ok}, infinity). -spec ink_compactjournal(pid(), pid(), integer()) -> ok. @@ -426,12 +430,12 @@ handle_call({key_check, Key, SQN}, _From, State) -> handle_call({fold, StartSQN, {FilterFun, InitAccFun, FoldFun}, - Recipient}, _From, State) -> + Acc}, _From, State) -> Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), Reply = fold_from_sequence(StartSQN, {FilterFun, InitAccFun, FoldFun}, - Recipient, + Acc, Manifest), {reply, Reply, State}; handle_call({register_snapshot, Requestor}, _From , State) -> @@ -793,76 +797,82 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> {SQN, Filename, PidW, empty}. -%% Scan between sequence numbers applying FilterFun to each entry where -%% FilterFun{K, V, Acc} -> Penciller Key List -%% Load the output for the CDB file into the Penciller. -fold_from_sequence(_MinSQN, _FoldFuns, _Rec, []) -> - ok; -fold_from_sequence(MinSQN, FoldFuns, Rec, [{LowSQN, FN, Pid, _LK}|Rest]) +-spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list()) + -> any(). +%% @doc +%% +%% Scan from the starting sequence number to the end of the Journal. Apply +%% the FilterFun as it scans over the CDB file to build up a Batch of relevant +%% objects - and then apply the FoldFun to the batch once the batch is +%% complete +%% +%% Inputs - MinSQN, FoldFuns, OverallAccumulator, Inker's Manifest +%% +%% The fold loops over all the CDB files in the Manifest. Each file is looped +%% over in batches using foldfile_between_sequence/7. The batch is a range of +%% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted +%% files +fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) -> + Acc; +fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) when LowSQN >= MinSQN -> - fold_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Rec, - Pid, - undefined, - FN, - Rest); -fold_from_sequence(MinSQN, FoldFuns, Rec, [{_LowSQN, FN, Pid, _LK}|Rest]) -> - case Rest of - [] -> - fold_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Rec, - Pid, - undefined, - FN, - Rest); - [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> - fold_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Rec, - Pid, - undefined, - FN, - Rest); - _ -> - fold_from_sequence(MinSQN, FoldFuns, Rec, Rest) - end. + Acc0 = foldfile_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FoldFuns, + Acc, + Pid, + undefined, + FN), + fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest); +fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> + % If this file has a LowSQN less than the minimum, we can skip it if the + % next file also has a LowSQN below the minimum + Acc0 = + case Rest of + [] -> + foldfile_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FoldFuns, + Acc, + Pid, + undefined, + FN); + [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> + foldfile_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FoldFuns, + Acc, + Pid, + undefined, + FN); + _ -> + Acc + end, + fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest). - - -fold_between_sequence(MinSQN, MaxSQN, FoldFuns, - Recipient, CDBpid, StartPos, FN, Rest) -> +foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, + Acc, CDBpid, StartPos, FN) -> leveled_log:log("I0014", [FN, MinSQN]), {FilterFun, InitAccFun, FoldFun} = FoldFuns, - InitAcc = {MinSQN, MaxSQN, InitAccFun()}, - Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of - {eof, {AccMinSQN, _AccMaxSQN, AccLC}} -> - ok = FoldFun(Recipient, AccLC), - {ok, AccMinSQN}; - {LastPosition, {_AccMinSQN, _AccMaxSQN, AccLC}} -> - ok = FoldFun(Recipient, AccLC), - NextSQN = MaxSQN + 1, - fold_between_sequence(NextSQN, - NextSQN + ?LOADING_BATCH, - FoldFuns, - Recipient, - CDBpid, - LastPosition, - FN, - Rest) - end, - case Res of - {ok, LMSQN} -> - fold_from_sequence(LMSQN, FoldFuns, Recipient, Rest); - ok -> - ok + InitBatchAcc = {MinSQN, MaxSQN, InitAccFun()}, + + case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of + {eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> + FoldFun(BatchAcc, Acc); + {LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> + UpdAcc = FoldFun(BatchAcc, Acc), + NextSQN = MaxSQN + 1, + foldfile_between_sequence(NextSQN, + NextSQN + ?LOADING_BATCH, + FoldFuns, + UpdAcc, + CDBpid, + LastPosition, + FN) end. + push_to_penciller(Penciller, LedgerCache) -> % The push to penciller must start as a tree to correctly de-duplicate % the list by order before becoming a de-duplicated list for loading diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 710aba7..62d1930 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -29,7 +29,7 @@ hashlist_query/3, tictactree/5, foldheads_allkeys/5, - foldobjects_allkeys/3, + foldobjects_allkeys/4, foldheads_bybucket/5, foldobjects_bybucket/3, foldobjects_byindex/3 @@ -226,10 +226,11 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) -> FoldFun, {true, JournalCheck}, SegmentList). --spec foldobjects_allkeys(fun(), atom(), fun()) -> {async, fun()}. +-spec foldobjects_allkeys(fun(), atom(), fun(), key_order|sqn_order) + -> {async, fun()}. %% @doc %% Fold over all objects for a given tag -foldobjects_allkeys(SnapFun, Tag, FoldFun) -> +foldobjects_allkeys(SnapFun, Tag, FoldFun, _Order) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), foldobjects(SnapFun,