diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 98af268..764da24 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -98,6 +98,7 @@ ink_get/3, ink_fetch/3, ink_keycheck/3, + ink_fold/4, ink_loadpcl/4, ink_registersnapshot/2, ink_confirmdelete/2, @@ -252,8 +253,42 @@ ink_close(Pid) -> ink_doom(Pid) -> gen_server:call(Pid, doom, 60000). --spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. +-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, pid()) -> ok. %% @doc +%% Fold over the journal from a starting sequence number (MinSQN), passing +%% in three functions and a snapshot of the penciller. The Fold functions +%% should be +%% - a FilterFun to accumulate the objects and decided when to stop or loop +%% - a InitAccFun to re-initialise for the fold over the accumulator +%% - a FoldFun to actually perform the fold +%% +%% The inker fold works in batches, so the FilterFun determines what should +%% go into a batch and when the batch is complete. The FoldFun completes the +%% actual desired outcome by being applied on the batch. +%% +%% The FilterFun should be a five arity function which takes as inputs: +%% KeyInJournal +%% ValueInJournal +%% Position - the actual position within the CDB file of the object +%% Acc - the accumulator +%% ExtractFun - a single arity function which can be applied to ValueInJournal +%% to extract the actual object, and the size of the object, +%% +%% The FilterFun should return either: +%% {loop, {MinSQN, MaxSQN, UpdAcc}} or +%% {stop, {MinSQN, MaxSQN, UpdAcc}} +%% The FilterFun is required to call stop when MaxSQN is reached +%% +%% The InitAccFun should return an initial accumulator for each subfold. +%% +%% The FoldFun is a 2 arity function that should take as inputs: +%% The Recipient +%% The Accumulator built over the sub-fold +ink_fold(Pid, MinSQN, FoldFuns, Recipient) -> + gen_server:call(Pid, {fold, MinSQN, FoldFuns, Recipient}, infinity). + +-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. +%% %% Function to prompt load of the Ledger at startup. the Penciller should %% have determined the lowest SQN not present in the Ledger, and the inker %% should fold over the Journal from that point, using the function to load @@ -262,7 +297,14 @@ ink_doom(Pid) -> %% The load fun should be a five arity function like: %% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> - gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity). + gen_server:call(Pid, + {fold, + MinSQN, + {FilterFun, + fun leveled_bookie:empty_ledgercache/0, + fun push_to_penciller/2}, + Penciller}, + infinity). -spec ink_compactjournal(pid(), pid(), integer()) -> ok. %% @doc @@ -381,9 +423,16 @@ handle_call({get, Key, SQN}, _From, State) -> {reply, get_object(Key, SQN, State#state.manifest), State}; handle_call({key_check, Key, SQN}, _From, State) -> {reply, key_check(Key, SQN, State#state.manifest), State}; -handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> +handle_call({fold, + StartSQN, + {FilterFun, InitAccFun, FoldFun}, + Recipient}, _From, State) -> Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), - Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), + Reply = + fold_from_sequence(StartSQN, + {FilterFun, InitAccFun, FoldFun}, + Recipient, + Manifest), {reply, Reply, State}; handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, @@ -748,59 +797,60 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> %% FilterFun{K, V, Acc} -> Penciller Key List %% Load the output for the CDB file into the Penciller. -load_from_sequence(_MinSQN, _FilterFun, _PCL, []) -> +fold_from_sequence(_MinSQN, _FoldFuns, _Rec, []) -> ok; -load_from_sequence(MinSQN, FilterFun, PCL, [{LowSQN, FN, Pid, _LK}|Rest]) - when LowSQN >= MinSQN -> - load_between_sequence(MinSQN, +fold_from_sequence(MinSQN, FoldFuns, Rec, [{LowSQN, FN, Pid, _LK}|Rest]) + when LowSQN >= MinSQN -> + fold_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, - FilterFun, - PCL, + FoldFuns, + Rec, Pid, undefined, FN, Rest); -load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) -> +fold_from_sequence(MinSQN, FoldFuns, Rec, [{_LowSQN, FN, Pid, _LK}|Rest]) -> case Rest of [] -> - load_between_sequence(MinSQN, + fold_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, - FilterFun, - PCL, + FoldFuns, + Rec, Pid, undefined, FN, Rest); [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> - load_between_sequence(MinSQN, + fold_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, - FilterFun, - PCL, + FoldFuns, + Rec, Pid, undefined, FN, Rest); _ -> - load_from_sequence(MinSQN, FilterFun, PCL, Rest) + fold_from_sequence(MinSQN, FoldFuns, Rec, Rest) end. -load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, - CDBpid, StartPos, FN, Rest) -> +fold_between_sequence(MinSQN, MaxSQN, FoldFuns, + Recipient, CDBpid, StartPos, FN, Rest) -> leveled_log:log("I0014", [FN, MinSQN]), - InitAcc = {MinSQN, MaxSQN, leveled_bookie:empty_ledgercache()}, + {FilterFun, InitAccFun, FoldFun} = FoldFuns, + InitAcc = {MinSQN, MaxSQN, InitAccFun()}, Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, AccLC}} -> - ok = push_to_penciller(Penciller, AccLC), + ok = FoldFun(Recipient, AccLC), {ok, AccMinSQN}; {LastPosition, {_AccMinSQN, _AccMaxSQN, AccLC}} -> - ok = push_to_penciller(Penciller, AccLC), + ok = FoldFun(Recipient, AccLC), NextSQN = MaxSQN + 1, - load_between_sequence(NextSQN, + fold_between_sequence(NextSQN, NextSQN + ?LOADING_BATCH, - FilterFun, - Penciller, + FoldFuns, + Recipient, CDBpid, LastPosition, FN, @@ -808,7 +858,7 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, end, case Res of {ok, LMSQN} -> - load_from_sequence(LMSQN, FilterFun, Penciller, Rest); + fold_from_sequence(LMSQN, FoldFuns, Recipient, Rest); ok -> ok end.