Merge pull request #325 from martinsumner/mas-i321-checkactive

Mas i321 checkactive
This commit is contained in:
Martin Sumner 2020-12-08 16:33:05 +00:00 committed by GitHub
commit 186e3868a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 225 additions and 205 deletions

View file

@ -258,6 +258,7 @@
is_snapshot = false :: boolean(), is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(), snapshot_fully_loaded = false :: boolean(),
snapshot_time :: pos_integer() | undefined,
source_penciller :: pid() | undefined, source_penciller :: pid() | undefined,
bookie_monref :: reference() | undefined, bookie_monref :: reference() | undefined,
levelzero_astree :: list() | undefined, levelzero_astree :: list() | undefined,
@ -306,6 +307,15 @@
-type iterator() :: list(iterator_entry()). -type iterator() :: list(iterator_entry()).
-type bad_ledgerkey() :: list(). -type bad_ledgerkey() :: list().
-type sqn_check() :: current|replaced|missing. -type sqn_check() :: current|replaced|missing.
-type pclacc_fun() ::
fun((leveled_codec:ledger_key(),
leveled_codec:ledger_value(),
any()) -> any()).
-type sst_fetchfun() ::
fun((pid(),
leveled_codec:ledger_key(),
leveled_codec:segment_hash(),
non_neg_integer()) -> leveled_codec:ledger_kv()|not_present).
-export_type([levelzero_cacheentry/0, sqn_check/0]). -export_type([levelzero_cacheentry/0, sqn_check/0]).
@ -351,7 +361,10 @@ pcl_pushmem(Pid, LedgerCache) ->
%% Bookie to dump memory onto penciller %% Bookie to dump memory onto penciller
gen_server:call(Pid, {push_mem, LedgerCache}, infinity). gen_server:call(Pid, {push_mem, LedgerCache}, infinity).
-spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok. -spec pcl_fetchlevelzero(pid(),
non_neg_integer(),
fun((levelzero_cacheentry()) -> ok))
-> ok.
%% @doc %% @doc
%% Allows a single slot of the penciller's levelzero cache to be fetched. The %% Allows a single slot of the penciller's levelzero cache to be fetched. The
%% levelzero cache can be up to 40K keys - sending this to the process that is %% levelzero cache can be up to 40K keys - sending this to the process that is
@ -402,7 +415,7 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) ->
-spec pcl_fetchkeys(pid(), -spec pcl_fetchkeys(pid(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
fun(), any(), as_pcl|by_runner) -> any(). pclacc_fun(), any(), as_pcl|by_runner) -> any().
%% @doc %% @doc
%% Run a range query between StartKey and EndKey (inclusive). This will cover %% Run a range query between StartKey and EndKey (inclusive). This will cover
%% all keys in the range - so must only be run against snapshots of the %% all keys in the range - so must only be run against snapshots of the
@ -428,7 +441,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
-spec pcl_fetchkeysbysegment(pid(), -spec pcl_fetchkeysbysegment(pid(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
fun(), any(), pclacc_fun(), any(),
leveled_codec:segment_list(), leveled_codec:segment_list(),
false | leveled_codec:lastmod_range(), false | leveled_codec:lastmod_range(),
boolean()) -> any(). boolean()) -> any().
@ -465,7 +478,7 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc,
-spec pcl_fetchnextkey(pid(), -spec pcl_fetchnextkey(pid(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
fun(), any()) -> any(). pclacc_fun(), any()) -> any().
%% @doc %% @doc
%% Run a range query between StartKey and EndKey (inclusive). This has the %% Run a range query between StartKey and EndKey (inclusive). This has the
%% same constraints as pcl_fetchkeys/5, but will only return the first key %% same constraints as pcl_fetchkeys/5, but will only return the first key
@ -799,7 +812,7 @@ handle_call({fetch_keys,
fun() -> fun() ->
keyfolder({FilteredL0, SSTiter}, keyfolder({FilteredL0, SSTiter},
{StartKey, EndKey}, {StartKey, EndKey},
{AccFun, InitAcc}, {AccFun, InitAcc, State#state.snapshot_time},
{SegmentList, LastModRange0, MaxKeys}) {SegmentList, LastModRange0, MaxKeys})
end, end,
case By of case By of
@ -889,6 +902,7 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
{reply, {reply,
{ok, {ok,
CloneState#state{snapshot_fully_loaded=true, CloneState#state{snapshot_fully_loaded=true,
snapshot_time = leveled_util:integer_now(),
manifest=ManifestClone}}, manifest=ManifestClone}},
State#state{manifest = Manifest0}}; State#state{manifest = Manifest0}};
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true -> handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
@ -1447,7 +1461,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
-spec fetch(tuple(), {integer(), integer()}, -spec fetch(tuple(), {integer(), integer()},
leveled_pmanifest:manifest(), integer(), leveled_pmanifest:manifest(), integer(),
fun()) -> {tuple()|not_present, integer()|basement}. sst_fetchfun()) -> {tuple()|not_present, integer()|basement}.
%% @doc %% @doc
%% Fetch from the persisted portion of the LSM tree, checking each level in %% Fetch from the persisted portion of the LSM tree, checking each level in
%% turn until a match is found. %% turn until a match is found.
@ -1524,7 +1538,8 @@ compare_to_sqn(Obj, SQN) ->
%%%============================================================================ %%%============================================================================
-spec keyfolder(list(), list(), tuple(), tuple(), {fun(), any()}) -> any(). -spec keyfolder(list(), list(), tuple(), tuple(),
{pclacc_fun(), any(), pos_integer()}) -> any().
%% @doc %% @doc
%% The keyfolder will compare an iterator across the immutable in-memory cache %% The keyfolder will compare an iterator across the immutable in-memory cache
%% of the Penciller (the IMMiter), with an iterator across the persisted part %% of the Penciller (the IMMiter), with an iterator across the persisted part
@ -1542,16 +1557,18 @@ compare_to_sqn(Obj, SQN) ->
%% To advance the SSTiter the find_nextkey/4 function is used, as the SSTiter %% To advance the SSTiter the find_nextkey/4 function is used, as the SSTiter
%% is an iterator across multiple levels - and so needs to do its own %% is an iterator across multiple levels - and so needs to do its own
%% comparisons to pop the next result. %% comparisons to pop the next result.
keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) -> keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc, Now}) ->
keyfolder({IMMiter, SSTiter}, keyfolder({IMMiter, SSTiter},
{StartKey, EndKey}, {StartKey, EndKey},
{AccFun, Acc}, {AccFun, Acc, Now},
{false, {0, infinity}, -1}). {false, {0, infinity}, -1}).
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, keyfolder(_Iterators,
_KeyRange,
{_AccFun, Acc, _Now},
{_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 -> {_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 ->
{0, Acc}; {0, Acc};
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc, Now},
{SegmentList, LastModRange, MaxKeys}) -> {SegmentList, LastModRange, MaxKeys}) ->
{StartKey, EndKey} = KeyRange, {StartKey, EndKey} = KeyRange,
case find_nextkey(SSTiter, StartKey, EndKey, case find_nextkey(SSTiter, StartKey, EndKey,
@ -1569,16 +1586,17 @@ keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc},
end; end;
{NxSSTiter, {SSTKey, SSTVal}} -> {NxSSTiter, {SSTKey, SSTVal}} ->
{Acc1, MK1} = {Acc1, MK1} =
maybe_accumulate(SSTKey, SSTVal, Acc, AccFun, maybe_accumulate(SSTKey, SSTVal,
{Acc, AccFun, Now},
MaxKeys, LastModRange), MaxKeys, LastModRange),
keyfolder({[], NxSSTiter}, keyfolder({[], NxSSTiter},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1, Now},
{SegmentList, LastModRange, MK1}) {SegmentList, LastModRange, MK1})
end; end;
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
KeyRange, KeyRange,
{AccFun, Acc}, {AccFun, Acc, Now},
{SegmentList, LastModRange, MaxKeys}) -> {SegmentList, LastModRange, MaxKeys}) ->
{StartKey, EndKey} = KeyRange, {StartKey, EndKey} = KeyRange,
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
@ -1588,7 +1606,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
% (see above) % (see above)
keyfolder({[], SSTiterator}, keyfolder({[], SSTiterator},
KeyRange, KeyRange,
{AccFun, Acc}, {AccFun, Acc, Now},
{SegmentList, LastModRange, MaxKeys}); {SegmentList, LastModRange, MaxKeys});
{false, false} -> {false, false} ->
case find_nextkey(SSTiterator, StartKey, EndKey, case find_nextkey(SSTiterator, StartKey, EndKey,
@ -1597,12 +1615,13 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
% No more keys in range in the persisted store, so use the % No more keys in range in the persisted store, so use the
% in-memory KV as the next % in-memory KV as the next
{Acc1, MK1} = {Acc1, MK1} =
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, maybe_accumulate(IMMKey, IMMVal,
{Acc, AccFun, Now},
MaxKeys, LastModRange), MaxKeys, LastModRange),
keyfolder({NxIMMiterator, keyfolder({NxIMMiterator,
[]}, []},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1, Now},
{SegmentList, LastModRange, MK1}); {SegmentList, LastModRange, MK1});
{NxSSTiterator, {SSTKey, SSTVal}} -> {NxSSTiterator, {SSTKey, SSTVal}} ->
% There is a next key, so need to know which is the % There is a next key, so need to know which is the
@ -1614,7 +1633,8 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
SSTVal}) of SSTVal}) of
left_hand_first -> left_hand_first ->
{Acc1, MK1} = {Acc1, MK1} =
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, maybe_accumulate(IMMKey, IMMVal,
{Acc, AccFun, Now},
MaxKeys, LastModRange), MaxKeys, LastModRange),
% Stow the previous best result away at Level -1 % Stow the previous best result away at Level -1
% so that there is no need to iterate to it again % so that there is no need to iterate to it again
@ -1625,20 +1645,22 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
NxSSTiterator, NxSSTiterator,
NewEntry)}, NewEntry)},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1, Now},
{SegmentList, LastModRange, MK1}); {SegmentList, LastModRange, MK1});
right_hand_first -> right_hand_first ->
{Acc1, MK1} = {Acc1, MK1} =
maybe_accumulate(SSTKey, SSTVal, Acc, AccFun, maybe_accumulate(SSTKey, SSTVal,
{Acc, AccFun, Now},
MaxKeys, LastModRange), MaxKeys, LastModRange),
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
NxSSTiterator}, NxSSTiterator},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1, Now},
{SegmentList, LastModRange, MK1}); {SegmentList, LastModRange, MK1});
left_hand_dominant -> left_hand_dominant ->
{Acc1, MK1} = {Acc1, MK1} =
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, maybe_accumulate(IMMKey, IMMVal,
{Acc, AccFun, Now},
MaxKeys, LastModRange), MaxKeys, LastModRange),
% We can add to the accumulator here. As the SST % We can add to the accumulator here. As the SST
% key was the most dominant across all SST levels, % key was the most dominant across all SST levels,
@ -1647,7 +1669,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
keyfolder({NxIMMiterator, keyfolder({NxIMMiterator,
NxSSTiterator}, NxSSTiterator},
KeyRange, KeyRange,
{AccFun, Acc1}, {AccFun, Acc1, Now},
{SegmentList, LastModRange, MK1}) {SegmentList, LastModRange, MK1})
end end
end end
@ -1655,16 +1677,21 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
-spec maybe_accumulate(leveled_codec:ledger_key(), -spec maybe_accumulate(leveled_codec:ledger_key(),
leveled_codec:ledger_value(), leveled_codec:ledger_value(),
any(), fun(), integer(), {any(), pclacc_fun(), pos_integer()},
{non_neg_integer(), non_neg_integer()|infinity}) -> integer(),
any(). {non_neg_integer(), non_neg_integer()|infinity})
-> any().
%% @doc %% @doc
%% Make an accumulation decision based one the date range %% Make an accumulation decision based one the date range
maybe_accumulate(LK, LV, Acc, AccFun, MaxKeys, {LowLastMod, HighLastMod}) -> maybe_accumulate(LK, LV,
{Acc, AccFun, QueryStartTime},
MaxKeys,
{LowLastMod, HighLastMod}) ->
{_SQN, _SH, LMD} = leveled_codec:strip_to_indexdetails({LK, LV}), {_SQN, _SH, LMD} = leveled_codec:strip_to_indexdetails({LK, LV}),
RunAcc = RunAcc =
(LMD == undefined) or ((LMD >= LowLastMod) and (LMD =< HighLastMod)), (LMD == undefined) or
case RunAcc of ((LMD >= LowLastMod) and (LMD =< HighLastMod)),
case RunAcc and leveled_codec:is_active(LK, LV, QueryStartTime) of
true -> true ->
{AccFun(LK, LV, Acc), MaxKeys - 1}; {AccFun(LK, LV, Acc), MaxKeys - 1};
false -> false ->
@ -2273,6 +2300,7 @@ sqnoverlap_otherway_findnextkey_test() ->
?assertMatch(no_more_keys, ER). ?assertMatch(no_more_keys, ER).
foldwithimm_simple_test() -> foldwithimm_simple_test() ->
Now = leveled_util:integer_now(),
QueryArray = [ QueryArray = [
{2, [{{o, "Bucket1", "Key1", null}, {2, [{{o, "Bucket1", "Key1", null},
{5, {active, infinity}, 0, null}}, {5, {active, infinity}, 0, null}},
@ -2295,7 +2323,7 @@ foldwithimm_simple_test() ->
Acc = keyfolder(IMMiter, Acc = keyfolder(IMMiter,
QueryArray, QueryArray,
{o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},
{AccFun, []}), {AccFun, [], Now}),
?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, ?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
{{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key3", null}, 3},
{{o, "Bucket1", "Key5", null}, 2}, {{o, "Bucket1", "Key5", null}, 2},
@ -2307,7 +2335,7 @@ foldwithimm_simple_test() ->
QueryArray, QueryArray,
{o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key1", null},
{o, "Bucket1", "Key6", null}, {o, "Bucket1", "Key6", null},
{AccFun, []}), {AccFun, [], Now}),
?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, ?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
{{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key3", null}, 3},
{{o, "Bucket1", "Key5", null}, 2}], AccA), {{o, "Bucket1", "Key5", null}, 2}], AccA),
@ -2322,7 +2350,7 @@ foldwithimm_simple_test() ->
AccB = keyfolder(IMMiterB, AccB = keyfolder(IMMiterB,
QueryArray, QueryArray,
{o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},
{AccFun, []}), {AccFun, [], Now}),
?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, ?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
{{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key3", null}, 3},
{{o, "Bucket1", "Key4", null}, 10}, {{o, "Bucket1", "Key4", null}, 10},

View file

@ -45,8 +45,26 @@
-type key_range() -type key_range()
:: {leveled_codec:ledger_key()|null, :: {leveled_codec:ledger_key()|null,
leveled_codec:ledger_key()|null}. leveled_codec:ledger_key()|null}.
-type fun_and_acc() -type foldacc() :: any().
:: {fun(), any()}. % Can't currently be specific about what an acc might be
-type fold_objects_fun()
:: fun((leveled_codec:key(), leveled_codec:key(), any(), foldacc())
-> foldacc()).
-type fold_keys_fun()
:: fun((leveled_codec:key(), leveled_codec:key(), foldacc())
-> foldacc()).
-type fold_buckets_fun()
:: fun((leveled_codec:key(), foldacc()) -> foldacc()).
-type fold_filter_fun()
:: fun((leveled_codec:key(), leveled_codec:key()) -> accumulate|pass).
-type snap_fun()
:: fun(() -> {ok, pid(), pid()|null}).
-type runner_fun()
:: fun(() -> foldacc()).
-type acc_fun()
:: fun((leveled_codec:key(), any(), foldacc()) -> foldacc()).
%%%============================================================================ %%%============================================================================
@ -54,7 +72,8 @@
%%%============================================================================ %%%============================================================================
-spec bucket_sizestats(fun(), any(), leveled_codec:tag()) -> {async, fun()}. -spec bucket_sizestats(snap_fun(),leveled_codec:key(), leveled_codec:tag())
-> {async, runner_fun()}.
%% @doc %% @doc
%% Fold over a bucket accumulating the count of objects and their total sizes %% Fold over a bucket accumulating the count of objects and their total sizes
bucket_sizestats(SnapFun, Bucket, Tag) -> bucket_sizestats(SnapFun, Bucket, Tag) ->
@ -75,16 +94,19 @@ bucket_sizestats(SnapFun, Bucket, Tag) ->
end, end,
{async, Runner}. {async, Runner}.
-spec bucket_list(fun(), leveled_codec:tag(), fun(), any()) -spec bucket_list(snap_fun(),
-> {async, fun()}. leveled_codec:tag(),
fold_buckets_fun(), foldacc()) -> {async, runner_fun()}.
%% @doc %% @doc
%% List buckets for tag, assuming bucket names are all either binary, ascii %% List buckets for tag, assuming bucket names are all either binary, ascii
%% strings or integers %% strings or integers
bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc) -> bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc) ->
bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, -1). bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, -1).
-spec bucket_list(fun(), leveled_codec:tag(), fun(), any(), integer()) -spec bucket_list(snap_fun(),
-> {async, fun()}. leveled_codec:tag(),
fold_buckets_fun(), foldacc(),
integer()) -> {async, runner_fun()}.
%% @doc %% @doc
%% set Max Buckets to -1 to list all buckets, otherwise will only return %% set Max Buckets to -1 to list all buckets, otherwise will only return
%% MaxBuckets (use 1 to confirm that there exists any bucket for a given Tag) %% MaxBuckets (use 1 to confirm that there exists any bucket for a given Tag)
@ -114,11 +136,12 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) ->
end, end,
{async, Runner}. {async, Runner}.
-spec index_query(fun(), -spec index_query(snap_fun(),
{leveled_codec:ledger_key(), {leveled_codec:ledger_key(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
{boolean(), undefined|re:mp()|iodata()}}, {boolean(), undefined|re:mp()|iodata()}},
fun_and_acc()) -> {async, fun()}. {fold_keys_fun(), foldacc()})
-> {async, runner_fun()}.
%% @doc %% @doc
%% Secondary index query %% Secondary index query
%% This has the special capability that it will expect a message to be thrown %% This has the special capability that it will expect a message to be thrown
@ -157,9 +180,13 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) ->
end, end,
{async, Runner}. {async, Runner}.
-spec bucketkey_query(fun(), leveled_codec:tag(), any(), -spec bucketkey_query(snap_fun(),
key_range(), fun_and_acc(), leveled_codec:tag(),
leveled_codec:regular_expression()) -> {async, fun()}. leveled_codec:key()|null,
key_range(),
{fold_keys_fun(), foldacc()},
leveled_codec:regular_expression())
-> {async, runner_fun()}.
%% @doc %% @doc
%% Fold over all keys in `KeyRange' under tag (restricted to a given bucket) %% Fold over all keys in `KeyRange' under tag (restricted to a given bucket)
bucketkey_query(SnapFun, Tag, Bucket, bucketkey_query(SnapFun, Tag, Bucket,
@ -186,14 +213,18 @@ bucketkey_query(SnapFun, Tag, Bucket,
end, end,
{async, Runner}. {async, Runner}.
-spec bucketkey_query(fun(), leveled_codec:tag(), any(), fun_and_acc()) -spec bucketkey_query(snap_fun(),
-> {async, fun()}. leveled_codec:tag(),
leveled_codec:key()|null,
{fold_keys_fun(), foldacc()}) -> {async, runner_fun()}.
%% @doc %% @doc
%% Fold over all keys under tag (potentially restricted to a given bucket) %% Fold over all keys under tag (potentially restricted to a given bucket)
bucketkey_query(SnapFun, Tag, Bucket, FunAcc) -> bucketkey_query(SnapFun, Tag, Bucket, FunAcc) ->
bucketkey_query(SnapFun, Tag, Bucket, {null, null}, FunAcc, undefined). bucketkey_query(SnapFun, Tag, Bucket, {null, null}, FunAcc, undefined).
-spec hashlist_query(fun(), leveled_codec:tag(), boolean()) -> {async, fun()}. -spec hashlist_query(snap_fun(),
leveled_codec:tag(),
boolean()) -> {async, runner_fun()}.
%% @doc %% @doc
%% Fold over the keys under a given Tag accumulating the hashes %% Fold over the keys under a given Tag accumulating the hashes
hashlist_query(SnapFun, Tag, JournalCheck) -> hashlist_query(SnapFun, Tag, JournalCheck) ->
@ -219,10 +250,10 @@ hashlist_query(SnapFun, Tag, JournalCheck) ->
end, end,
{async, Runner}. {async, Runner}.
-spec tictactree(fun(), -spec tictactree(snap_fun(),
{leveled_codec:tag(), any(), tuple()}, {leveled_codec:tag(), leveled_codec:key(), tuple()},
boolean(), atom(), fun()) boolean(), atom(), fold_filter_fun())
-> {async, fun()}. -> {async, runner_fun()}.
%% @doc %% @doc
%% Return a merkle tree from the fold, directly accessing hashes cached in the %% Return a merkle tree from the fold, directly accessing hashes cached in the
%% metadata %% metadata
@ -281,10 +312,11 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
end, end,
{async, Runner}. {async, Runner}.
-spec foldheads_allkeys(fun(), leveled_codec:tag(), -spec foldheads_allkeys(snap_fun(), leveled_codec:tag(),
fun(), boolean(), false|list(integer()), fold_objects_fun()|{fold_objects_fun(), foldacc()},
boolean(), false|list(integer()),
false|leveled_codec:lastmod_range(), false|leveled_codec:lastmod_range(),
false|pos_integer()) -> {async, fun()}. false|pos_integer()) -> {async, runner_fun()}.
%% @doc %% @doc
%% Fold over all heads in the store for a given tag - applying the passed %% Fold over all heads in the store for a given tag - applying the passed
%% function to each proxy object %% function to each proxy object
@ -301,8 +333,11 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck,
LastModRange, LastModRange,
MaxObjectCount). MaxObjectCount).
-spec foldobjects_allkeys(fun(), leveled_codec:tag(), fun(), -spec foldobjects_allkeys(snap_fun(),
key_order|sqn_order) -> {async, fun()}. leveled_codec:tag(),
fold_objects_fun()|{fold_objects_fun(), foldacc()},
key_order|sqn_order)
-> {async, runner_fun()}.
%% @doc %% @doc
%% Fold over all objects for a given tag %% Fold over all objects for a given tag
foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) -> foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) ->
@ -409,10 +444,11 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
{async, Folder}. {async, Folder}.
-spec foldobjects_bybucket(fun(), -spec foldobjects_bybucket(snap_fun(),
leveled_codec:tag(), leveled_codec:tag(),
list(key_range()), list(key_range()),
fun()) -> {async, fun()}. fold_objects_fun()|{fold_objects_fun(), foldacc()})
-> {async, runner_fun()}.
%% @doc %% @doc
%% Fold over all objects within a given key range in a bucket %% Fold over all objects within a given key range in a bucket
foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
@ -423,15 +459,15 @@ foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
false, false,
false). false).
-spec foldheads_bybucket(fun(), -spec foldheads_bybucket(snap_fun(),
atom(), leveled_codec:tag(),
list({any(), any()}), list(key_range()),
fun(), fold_objects_fun()|{fold_objects_fun(), foldacc()},
boolean(), boolean(),
false|list(integer()), false|list(integer()),
false|leveled_codec:lastmod_range(), false|leveled_codec:lastmod_range(),
false|pos_integer()) false|pos_integer())
-> {async, fun()}. -> {async, runner_fun()}.
%% @doc %% @doc
%% Fold over all object metadata within a given key range in a bucket %% Fold over all object metadata within a given key range in a bucket
foldheads_bybucket(SnapFun, foldheads_bybucket(SnapFun,
@ -449,7 +485,10 @@ foldheads_bybucket(SnapFun,
LastModRange, LastModRange,
MaxObjectCount). MaxObjectCount).
-spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}. -spec foldobjects_byindex(snap_fun(),
tuple(),
fold_objects_fun()|{fold_objects_fun(), foldacc()})
-> {async, runner_fun()}.
%% @doc %% @doc
%% Folds over an index, fetching the objects associated with the keys returned %% Folds over an index, fetching the objects associated with the keys returned
%% and passing those objects into the fold function %% and passing those objects into the fold function
@ -475,7 +514,6 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
get_nextbucket(_NextB, _NextK, _Tag, _LS, BKList, {Limit, Limit}) -> get_nextbucket(_NextB, _NextK, _Tag, _LS, BKList, {Limit, Limit}) ->
lists:reverse(BKList); lists:reverse(BKList);
get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
Now = leveled_util:integer_now(),
StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
ExtractFun = ExtractFun =
@ -491,43 +529,33 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
{1, null} -> {1, null} ->
leveled_log:log("B0008",[]), leveled_log:log("B0008",[]),
BKList; BKList;
{0, {{B, K}, V}} -> {0, {{B, K}, _V}} ->
case leveled_codec:is_active({Tag, B, K, null}, V, Now) of
true ->
leveled_log:log("B0009",[B]), leveled_log:log("B0009",[B]),
get_nextbucket(leveled_codec:next_key(B), get_nextbucket(leveled_codec:next_key(B),
null, null,
Tag, Tag,
LedgerSnapshot, LedgerSnapshot,
[{B, K}|BKList], [{B, K}|BKList],
{C + 1, L}); {C + 1, L})
false ->
NK =
case Tag of
?HEAD_TAG ->
{PK, SK} = K,
{PK, leveled_codec:next_key(SK)};
_ ->
leveled_codec:next_key(K)
end,
get_nextbucket(B, NK, Tag, LedgerSnapshot, BKList, {C, L})
end
end. end.
-spec foldobjects(fun(), atom(), list(), fun(), -spec foldobjects(snap_fun(),
atom(),
list(),
fold_objects_fun()|{fold_objects_fun(), foldacc()},
false|{true, boolean()}, false|list(integer())) -> false|{true, boolean()}, false|list(integer())) ->
{async, fun()}. {async, runner_fun()}.
foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->
foldobjects(SnapFun, Tag, KeyRanges, foldobjects(SnapFun, Tag, KeyRanges,
FoldObjFun, DeferredFetch, SegmentList, false, false). FoldObjFun, DeferredFetch, SegmentList, false, false).
-spec foldobjects(fun(), atom(), list(), fun(), -spec foldobjects(snap_fun(), atom(), list(),
fold_objects_fun()|{fold_objects_fun(), foldacc()},
false|{true, boolean()}, false|{true, boolean()},
false|list(integer()), false|list(integer()),
false|leveled_codec:lastmod_range(), false|leveled_codec:lastmod_range(),
false|pos_integer()) -> false|pos_integer()) -> {async, runner_fun()}.
{async, fun()}.
%% @doc %% @doc
%% The object folder should be passed DeferredFetch. %% The object folder should be passed DeferredFetch.
%% DeferredFetch can either be false (which will return to the fold function %% DeferredFetch can either be false (which will return to the fold function
@ -597,15 +625,9 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
accumulate_size() -> accumulate_size() ->
Now = leveled_util:integer_now(), AccFun =
AccFun = fun(Key, Value, {Size, Count}) -> fun(Key, Value, {Size, Count}) ->
case leveled_codec:is_active(Key, Value, Now) of {Size + leveled_codec:get_size(Key, Value), Count + 1}
true ->
{Size + leveled_codec:get_size(Key, Value),
Count + 1};
false ->
{Size, Count}
end
end, end,
AccFun. AccFun.
@ -633,15 +655,12 @@ accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) ->
AddKeyFun). AddKeyFun).
get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
Now = leveled_util:integer_now(),
AccFun = AccFun =
fun(LK, V, Acc) -> fun(LK, V, Acc) ->
case leveled_codec:is_active(LK, V, Now) of
true ->
{B, K, H} = leveled_codec:get_keyandobjhash(LK, V), {B, K, H} = leveled_codec:get_keyandobjhash(LK, V),
Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB, Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB,
case {JournalCheck, Check} of case JournalCheck and Check of
{true, true} -> true ->
case check_presence(LK, V, InkerClone) of case check_presence(LK, V, InkerClone) of
true -> true ->
AddKeyFun(B, K, H, Acc); AddKeyFun(B, K, H, Acc);
@ -650,16 +669,16 @@ get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
end; end;
_ -> _ ->
AddKeyFun(B, K, H, Acc) AddKeyFun(B, K, H, Acc)
end;
false ->
Acc
end end
end, end,
AccFun. AccFun.
-spec accumulate_objects(fold_objects_fun(),
pid()|null,
leveled_codec:tag(),
false|{true, boolean()})
-> acc_fun().
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
Now = leveled_util:integer_now(),
AccFun = AccFun =
fun(LK, V, Acc) -> fun(LK, V, Acc) ->
% The function takes the Ledger Key and the value from the % The function takes the Ledger Key and the value from the
@ -671,8 +690,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
% a fold_objects), then a metadata object needs to be built to be % a fold_objects), then a metadata object needs to be built to be
% returned - but a quick check that Key is present in the Journal % returned - but a quick check that Key is present in the Journal
% is made first % is made first
case leveled_codec:is_active(LK, V, Now) of
true ->
{SQN, _St, _MH, MD} = {SQN, _St, _MH, MD} =
leveled_codec:striphead_to_v1details(V), leveled_codec:striphead_to_v1details(V),
{B, K} = {B, K} =
@ -686,8 +703,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
case DeferredFetch of case DeferredFetch of
{true, JournalCheck} -> {true, JournalCheck} ->
ProxyObj = ProxyObj =
leveled_codec:return_proxy(Tag, MD, leveled_codec:return_proxy(Tag, MD, InkerClone, JK),
InkerClone, JK),
case JournalCheck of case JournalCheck of
true -> true ->
InJournal = InJournal =
@ -711,9 +727,6 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
Value -> Value ->
FoldObjectsFun(B, K, Value, Acc) FoldObjectsFun(B, K, Value, Acc)
end end
end;
false ->
Acc
end end
end, end,
AccFun. AccFun.
@ -729,11 +742,8 @@ check_presence(Key, Value, InkerClone) ->
end. end.
accumulate_keys(FoldKeysFun, TermRegex) -> accumulate_keys(FoldKeysFun, TermRegex) ->
Now = leveled_util:integer_now(),
AccFun = AccFun =
fun(Key, Value, Acc) -> fun(Key, _Value, Acc) ->
case leveled_codec:is_active(Key, Value, Now) of
true ->
{B, K} = leveled_codec:from_ledgerkey(Key), {B, K} = leveled_codec:from_ledgerkey(Key),
case TermRegex of case TermRegex of
undefined -> undefined ->
@ -745,9 +755,6 @@ accumulate_keys(FoldKeysFun, TermRegex) ->
_ -> _ ->
FoldKeysFun(B, K, Acc) FoldKeysFun(B, K, Acc)
end end
end;
false ->
Acc
end end
end, end,
AccFun. AccFun.
@ -759,37 +766,22 @@ add_terms(ObjKey, IdxValue) ->
{IdxValue, ObjKey}. {IdxValue, ObjKey}.
accumulate_index(TermRe, AddFun, FoldKeysFun) -> accumulate_index(TermRe, AddFun, FoldKeysFun) ->
Now = leveled_util:integer_now(),
case TermRe of case TermRe of
undefined -> undefined ->
fun(Key, Value, Acc) -> fun(Key, _Value, Acc) ->
case leveled_codec:is_active(Key, Value, Now) of {Bucket, ObjKey, IdxValue} = leveled_codec:from_ledgerkey(Key),
true -> FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc)
{Bucket, end;
ObjKey,
IdxValue} = leveled_codec:from_ledgerkey(Key),
FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc);
false ->
Acc
end end;
TermRe -> TermRe ->
fun(Key, Value, Acc) -> fun(Key, _Value, Acc) ->
case leveled_codec:is_active(Key, Value, Now) of {Bucket, ObjKey, IdxValue} = leveled_codec:from_ledgerkey(Key),
true ->
{Bucket,
ObjKey,
IdxValue} = leveled_codec:from_ledgerkey(Key),
case re:run(IdxValue, TermRe) of case re:run(IdxValue, TermRe) of
nomatch -> nomatch ->
Acc; Acc;
_ -> _ ->
FoldKeysFun(Bucket, FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc)
AddFun(ObjKey, IdxValue), end
Acc) end
end;
false ->
Acc
end end
end. end.
-spec wrap_runner(fun(), fun()) -> any(). -spec wrap_runner(fun(), fun()) -> any().