Make inker fold generic

ink_loadpcl is in effect an inker fold - so abstract out the inker fold part to make this a generic capability
This commit is contained in:
Martin Sumner 2017-11-15 16:08:24 +00:00
parent 9892d26e60
commit 39ad5c9680

View file

@ -98,6 +98,7 @@
ink_get/3,
ink_fetch/3,
ink_keycheck/3,
ink_fold/4,
ink_loadpcl/4,
ink_registersnapshot/2,
ink_confirmdelete/2,
@ -252,8 +253,42 @@ ink_close(Pid) ->
ink_doom(Pid) ->
gen_server:call(Pid, doom, 60000).
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, pid()) -> ok.
%% @doc
%% Fold over the journal from a starting sequence number (MinSQN), passing
%% in three functions and a snapshot of the penciller. The Fold functions
%% should be
%% - a FilterFun to accumulate the objects and decided when to stop or loop
%% - a InitAccFun to re-initialise for the fold over the accumulator
%% - a FoldFun to actually perform the fold
%%
%% The inker fold works in batches, so the FilterFun determines what should
%% go into a batch and when the batch is complete. The FoldFun completes the
%% actual desired outcome by being applied on the batch.
%%
%% The FilterFun should be a five arity function which takes as inputs:
%% KeyInJournal
%% ValueInJournal
%% Position - the actual position within the CDB file of the object
%% Acc - the accumulator
%% ExtractFun - a single arity function which can be applied to ValueInJournal
%% to extract the actual object, and the size of the object,
%%
%% The FilterFun should return either:
%% {loop, {MinSQN, MaxSQN, UpdAcc}} or
%% {stop, {MinSQN, MaxSQN, UpdAcc}}
%% The FilterFun is required to call stop when MaxSQN is reached
%%
%% The InitAccFun should return an initial accumulator for each subfold.
%%
%% The FoldFun is a 2 arity function that should take as inputs:
%% The Recipient
%% The Accumulator built over the sub-fold
ink_fold(Pid, MinSQN, FoldFuns, Recipient) ->
gen_server:call(Pid, {fold, MinSQN, FoldFuns, Recipient}, infinity).
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
%%
%% Function to prompt load of the Ledger at startup. the Penciller should
%% have determined the lowest SQN not present in the Ledger, and the inker
%% should fold over the Journal from that point, using the function to load
@ -262,7 +297,14 @@ ink_doom(Pid) ->
%% The load fun should be a five arity function like:
%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun)
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity).
gen_server:call(Pid,
{fold,
MinSQN,
{FilterFun,
fun leveled_bookie:empty_ledgercache/0,
fun push_to_penciller/2},
Penciller},
infinity).
-spec ink_compactjournal(pid(), pid(), integer()) -> ok.
%% @doc
@ -381,9 +423,16 @@ handle_call({get, Key, SQN}, _From, State) ->
{reply, get_object(Key, SQN, State#state.manifest), State};
handle_call({key_check, Key, SQN}, _From, State) ->
{reply, key_check(Key, SQN, State#state.manifest), State};
handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) ->
handle_call({fold,
StartSQN,
{FilterFun, InitAccFun, FoldFun},
Recipient}, _From, State) ->
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest),
Reply =
fold_from_sequence(StartSQN,
{FilterFun, InitAccFun, FoldFun},
Recipient,
Manifest),
{reply, Reply, State};
handle_call({register_snapshot, Requestor}, _From , State) ->
Rs = [{Requestor,
@ -748,59 +797,60 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->
%% FilterFun{K, V, Acc} -> Penciller Key List
%% Load the output for the CDB file into the Penciller.
load_from_sequence(_MinSQN, _FilterFun, _PCL, []) ->
fold_from_sequence(_MinSQN, _FoldFuns, _Rec, []) ->
ok;
load_from_sequence(MinSQN, FilterFun, PCL, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
load_between_sequence(MinSQN,
fold_from_sequence(MinSQN, FoldFuns, Rec, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
fold_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
PCL,
FoldFuns,
Rec,
Pid,
undefined,
FN,
Rest);
load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
fold_from_sequence(MinSQN, FoldFuns, Rec, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
case Rest of
[] ->
load_between_sequence(MinSQN,
fold_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
PCL,
FoldFuns,
Rec,
Pid,
undefined,
FN,
Rest);
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
load_between_sequence(MinSQN,
fold_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
PCL,
FoldFuns,
Rec,
Pid,
undefined,
FN,
Rest);
_ ->
load_from_sequence(MinSQN, FilterFun, PCL, Rest)
fold_from_sequence(MinSQN, FoldFuns, Rec, Rest)
end.
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
CDBpid, StartPos, FN, Rest) ->
fold_between_sequence(MinSQN, MaxSQN, FoldFuns,
Recipient, CDBpid, StartPos, FN, Rest) ->
leveled_log:log("I0014", [FN, MinSQN]),
InitAcc = {MinSQN, MaxSQN, leveled_bookie:empty_ledgercache()},
{FilterFun, InitAccFun, FoldFun} = FoldFuns,
InitAcc = {MinSQN, MaxSQN, InitAccFun()},
Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, AccLC}} ->
ok = push_to_penciller(Penciller, AccLC),
ok = FoldFun(Recipient, AccLC),
{ok, AccMinSQN};
{LastPosition, {_AccMinSQN, _AccMaxSQN, AccLC}} ->
ok = push_to_penciller(Penciller, AccLC),
ok = FoldFun(Recipient, AccLC),
NextSQN = MaxSQN + 1,
load_between_sequence(NextSQN,
fold_between_sequence(NextSQN,
NextSQN + ?LOADING_BATCH,
FilterFun,
Penciller,
FoldFuns,
Recipient,
CDBpid,
LastPosition,
FN,
@ -808,7 +858,7 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
end,
case Res of
{ok, LMSQN} ->
load_from_sequence(LMSQN, FilterFun, Penciller, Rest);
fold_from_sequence(LMSQN, FoldFuns, Recipient, Rest);
ok ->
ok
end.