fold_objects in SQN order
This adds a test that fold_objects works in SQN order
This commit is contained in:
parent
50c81d0626
commit
0e071d078e
6 changed files with 278 additions and 22 deletions
|
@ -280,6 +280,8 @@ ink_doom(Pid) ->
|
|||
%% The FilterFun is required to call stop when MaxSQN is reached
|
||||
%%
|
||||
%% The InitAccFun should return an initial batch accumulator for each subfold.
|
||||
%% It is a 2-arity function that takes a filename and a MinSQN as an input
|
||||
%% potentially to be use din logging
|
||||
%%
|
||||
%% The BatchFun is a two arity function that should take as inputs:
|
||||
%% An overall accumulator
|
||||
|
@ -301,12 +303,15 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
|||
fun(BatchAcc, _Acc) ->
|
||||
push_to_penciller(Penciller, BatchAcc)
|
||||
end,
|
||||
InitAccFun =
|
||||
fun(FN, CurrentMinSQN) ->
|
||||
leveled_log:log("I0014", [FN, CurrentMinSQN]),
|
||||
leveled_bookie:empty_ledgercache()
|
||||
end,
|
||||
gen_server:call(Pid,
|
||||
{fold,
|
||||
MinSQN,
|
||||
{FilterFun,
|
||||
fun leveled_bookie:empty_ledgercache/0,
|
||||
BatchFun},
|
||||
{FilterFun, InitAccFun, BatchFun},
|
||||
ok},
|
||||
infinity).
|
||||
|
||||
|
@ -853,9 +858,8 @@ fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
|
|||
|
||||
foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
|
||||
Acc, CDBpid, StartPos, FN) ->
|
||||
leveled_log:log("I0014", [FN, MinSQN]),
|
||||
{FilterFun, InitAccFun, FoldFun} = FoldFuns,
|
||||
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun()},
|
||||
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},
|
||||
|
||||
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
|
||||
{eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
|
||||
|
|
|
@ -327,7 +327,10 @@
|
|||
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
|
||||
++ "and max write time is ~w and max sync time is ~w"}},
|
||||
{"CDB18",
|
||||
{info, "Handled return and write of hashtable"}}
|
||||
{info, "Handled return and write of hashtable"}},
|
||||
|
||||
{"R0001",
|
||||
{debug, "Object fold to process batch of ~w objects"}}
|
||||
]).
|
||||
|
||||
|
||||
|
|
|
@ -230,13 +230,96 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
|
|||
-> {async, fun()}.
|
||||
%% @doc
|
||||
%% Fold over all objects for a given tag
|
||||
foldobjects_allkeys(SnapFun, Tag, FoldFun, _Order) ->
|
||||
foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) ->
|
||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
foldobjects(SnapFun,
|
||||
Tag, StartKey, EndKey,
|
||||
FoldFun,
|
||||
false, false).
|
||||
false, false);
|
||||
foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
||||
% Fold over the journal in order of receipt
|
||||
{FoldFun, InitAcc} =
|
||||
case is_tuple(FoldObjectsFun) of
|
||||
true ->
|
||||
% FoldObjectsFun is already a tuple with a Fold function and an
|
||||
% initial accumulator
|
||||
FoldObjectsFun;
|
||||
false ->
|
||||
% no initial accumulatr passed, and so should be just a list
|
||||
{FoldObjectsFun, []}
|
||||
end,
|
||||
|
||||
FilterFun =
|
||||
fun(JKey, JVal, _Pos, Acc, ExtractFun) ->
|
||||
|
||||
{SQN, InkTag, LedgerKey} = JKey,
|
||||
case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of
|
||||
{?INKT_STND, {B, K}} ->
|
||||
% Ignore tombstones and non-matching Tags and Key changes
|
||||
% objects.
|
||||
{MinSQN, MaxSQN, BatchAcc} = Acc,
|
||||
case SQN of
|
||||
SQN when SQN < MinSQN ->
|
||||
{loop, Acc};
|
||||
SQN when SQN > MaxSQN ->
|
||||
{stop, Acc};
|
||||
_ ->
|
||||
{VBin, _VSize} = ExtractFun(JVal),
|
||||
{Obj, _IdxSpecs} = leveled_codec:split_inkvalue(VBin),
|
||||
ToLoop =
|
||||
case SQN of
|
||||
MaxSQN -> stop;
|
||||
_ -> loop
|
||||
end,
|
||||
{ToLoop,
|
||||
{MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}}
|
||||
end;
|
||||
_ ->
|
||||
{loop, Acc}
|
||||
end
|
||||
end,
|
||||
|
||||
InitAccFun = fun(_FN, _SQN) -> [] end,
|
||||
|
||||
Folder =
|
||||
fun() ->
|
||||
|
||||
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
||||
IsValidFun =
|
||||
fun(Bucket, Key, SQN) ->
|
||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||
leveled_penciller:pcl_checksequencenumber(LedgerSnapshot,
|
||||
LedgerKey,
|
||||
SQN)
|
||||
end,
|
||||
|
||||
BatchFoldFun =
|
||||
fun(BatchAcc, ObjAcc) ->
|
||||
ObjFun =
|
||||
fun({B, K, SQN, Obj}, Acc) ->
|
||||
case IsValidFun(B, K, SQN) of
|
||||
true ->
|
||||
FoldFun(B, K, Obj, Acc);
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
leveled_log:log("R0001", [length(BatchAcc)]),
|
||||
lists:foldr(ObjFun, ObjAcc, BatchAcc)
|
||||
end,
|
||||
|
||||
Acc =
|
||||
leveled_inker:ink_fold(JournalSnapshot,
|
||||
0,
|
||||
{FilterFun, InitAccFun, BatchFoldFun},
|
||||
InitAcc),
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
ok = leveled_inker:ink_close(JournalSnapshot),
|
||||
Acc
|
||||
end,
|
||||
{async, Folder}.
|
||||
|
||||
|
||||
-spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) ->
|
||||
{async, fun()}.
|
||||
|
@ -481,7 +564,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
|||
ProxyObj = make_proxy_object(LK, JK,
|
||||
MD, V,
|
||||
InkerClone),
|
||||
FoldObjectsFun(B, K,ProxyObj, Acc);
|
||||
FoldObjectsFun(B, K, ProxyObj, Acc);
|
||||
false ->
|
||||
R = leveled_bookie:fetch_value(InkerClone, JK),
|
||||
case R of
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue