diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 5cfb655..a719cf3 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -258,6 +258,7 @@ is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), + snapshot_time :: pos_integer() | undefined, source_penciller :: pid() | undefined, bookie_monref :: reference() | undefined, levelzero_astree :: list() | undefined, @@ -306,6 +307,15 @@ -type iterator() :: list(iterator_entry()). -type bad_ledgerkey() :: list(). -type sqn_check() :: current|replaced|missing. +-type pclacc_fun() :: + fun((leveled_codec:ledger_key(), + leveled_codec:ledger_value(), + any()) -> any()). +-type sst_fetchfun() :: + fun((pid(), + leveled_codec:ledger_key(), + leveled_codec:segment_hash(), + non_neg_integer()) -> leveled_codec:ledger_kv()|not_present). -export_type([levelzero_cacheentry/0, sqn_check/0]). @@ -351,7 +361,10 @@ pcl_pushmem(Pid, LedgerCache) -> %% Bookie to dump memory onto penciller gen_server:call(Pid, {push_mem, LedgerCache}, infinity). --spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok. +-spec pcl_fetchlevelzero(pid(), + non_neg_integer(), + fun((levelzero_cacheentry()) -> ok)) + -> ok. %% @doc %% Allows a single slot of the penciller's levelzero cache to be fetched. The %% levelzero cache can be up to 40K keys - sending this to the process that is @@ -402,7 +415,7 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) -> -spec pcl_fetchkeys(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), - fun(), any(), as_pcl|by_runner) -> any(). + pclacc_fun(), any(), as_pcl|by_runner) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This will cover %% all keys in the range - so must only be run against snapshots of the @@ -428,7 +441,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> -spec pcl_fetchkeysbysegment(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), - fun(), any(), + pclacc_fun(), any(), leveled_codec:segment_list(), false | leveled_codec:lastmod_range(), boolean()) -> any(). @@ -465,7 +478,7 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, -spec pcl_fetchnextkey(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), - fun(), any()) -> any(). + pclacc_fun(), any()) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This has the %% same constraints as pcl_fetchkeys/5, but will only return the first key @@ -799,7 +812,7 @@ handle_call({fetch_keys, fun() -> keyfolder({FilteredL0, SSTiter}, {StartKey, EndKey}, - {AccFun, InitAcc}, + {AccFun, InitAcc, State#state.snapshot_time}, {SegmentList, LastModRange0, MaxKeys}) end, case By of @@ -889,6 +902,7 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning}, {reply, {ok, CloneState#state{snapshot_fully_loaded=true, + snapshot_time = leveled_util:integer_now(), manifest=ManifestClone}}, State#state{manifest = Manifest0}}; handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true -> @@ -1447,7 +1461,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> -spec fetch(tuple(), {integer(), integer()}, leveled_pmanifest:manifest(), integer(), - fun()) -> {tuple()|not_present, integer()|basement}. + sst_fetchfun()) -> {tuple()|not_present, integer()|basement}. %% @doc %% Fetch from the persisted portion of the LSM tree, checking each level in %% turn until a match is found. @@ -1524,7 +1538,8 @@ compare_to_sqn(Obj, SQN) -> %%%============================================================================ --spec keyfolder(list(), list(), tuple(), tuple(), {fun(), any()}) -> any(). +-spec keyfolder(list(), list(), tuple(), tuple(), + {pclacc_fun(), any(), pos_integer()}) -> any(). %% @doc %% The keyfolder will compare an iterator across the immutable in-memory cache %% of the Penciller (the IMMiter), with an iterator across the persisted part @@ -1542,16 +1557,18 @@ compare_to_sqn(Obj, SQN) -> %% To advance the SSTiter the find_nextkey/4 function is used, as the SSTiter %% is an iterator across multiple levels - and so needs to do its own %% comparisons to pop the next result. -keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) -> +keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc, Now}) -> keyfolder({IMMiter, SSTiter}, {StartKey, EndKey}, - {AccFun, Acc}, + {AccFun, Acc, Now}, {false, {0, infinity}, -1}). -keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, - {_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 -> +keyfolder(_Iterators, + _KeyRange, + {_AccFun, Acc, _Now}, + {_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 -> {0, Acc}; -keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, +keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc, Now}, {SegmentList, LastModRange, MaxKeys}) -> {StartKey, EndKey} = KeyRange, case find_nextkey(SSTiter, StartKey, EndKey, @@ -1569,16 +1586,17 @@ keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, end; {NxSSTiter, {SSTKey, SSTVal}} -> {Acc1, MK1} = - maybe_accumulate(SSTKey, SSTVal, Acc, AccFun, + maybe_accumulate(SSTKey, SSTVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), keyfolder({[], NxSSTiter}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}) end; keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange, - {AccFun, Acc}, + {AccFun, Acc, Now}, {SegmentList, LastModRange, MaxKeys}) -> {StartKey, EndKey} = KeyRange, case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of @@ -1588,7 +1606,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, % (see above) keyfolder({[], SSTiterator}, KeyRange, - {AccFun, Acc}, + {AccFun, Acc, Now}, {SegmentList, LastModRange, MaxKeys}); {false, false} -> case find_nextkey(SSTiterator, StartKey, EndKey, @@ -1597,12 +1615,13 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, % No more keys in range in the persisted store, so use the % in-memory KV as the next {Acc1, MK1} = - maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, + maybe_accumulate(IMMKey, IMMVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), keyfolder({NxIMMiterator, []}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}); {NxSSTiterator, {SSTKey, SSTVal}} -> % There is a next key, so need to know which is the @@ -1614,7 +1633,8 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, SSTVal}) of left_hand_first -> {Acc1, MK1} = - maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, + maybe_accumulate(IMMKey, IMMVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), % Stow the previous best result away at Level -1 % so that there is no need to iterate to it again @@ -1625,20 +1645,22 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, NxSSTiterator, NewEntry)}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}); right_hand_first -> {Acc1, MK1} = - maybe_accumulate(SSTKey, SSTVal, Acc, AccFun, + maybe_accumulate(SSTKey, SSTVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], NxSSTiterator}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}); left_hand_dominant -> {Acc1, MK1} = - maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, + maybe_accumulate(IMMKey, IMMVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), % We can add to the accumulator here. As the SST % key was the most dominant across all SST levels, @@ -1647,7 +1669,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, keyfolder({NxIMMiterator, NxSSTiterator}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}) end end @@ -1655,16 +1677,21 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, -spec maybe_accumulate(leveled_codec:ledger_key(), leveled_codec:ledger_value(), - any(), fun(), integer(), - {non_neg_integer(), non_neg_integer()|infinity}) -> - any(). + {any(), pclacc_fun(), pos_integer()}, + integer(), + {non_neg_integer(), non_neg_integer()|infinity}) + -> any(). %% @doc %% Make an accumulation decision based one the date range -maybe_accumulate(LK, LV, Acc, AccFun, MaxKeys, {LowLastMod, HighLastMod}) -> +maybe_accumulate(LK, LV, + {Acc, AccFun, QueryStartTime}, + MaxKeys, + {LowLastMod, HighLastMod}) -> {_SQN, _SH, LMD} = leveled_codec:strip_to_indexdetails({LK, LV}), RunAcc = - (LMD == undefined) or ((LMD >= LowLastMod) and (LMD =< HighLastMod)), - case RunAcc of + (LMD == undefined) or + ((LMD >= LowLastMod) and (LMD =< HighLastMod)), + case RunAcc and leveled_codec:is_active(LK, LV, QueryStartTime) of true -> {AccFun(LK, LV, Acc), MaxKeys - 1}; false -> @@ -2273,6 +2300,7 @@ sqnoverlap_otherway_findnextkey_test() -> ?assertMatch(no_more_keys, ER). foldwithimm_simple_test() -> + Now = leveled_util:integer_now(), QueryArray = [ {2, [{{o, "Bucket1", "Key1", null}, {5, {active, infinity}, 0, null}}, @@ -2295,7 +2323,7 @@ foldwithimm_simple_test() -> Acc = keyfolder(IMMiter, QueryArray, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, - {AccFun, []}), + {AccFun, [], Now}), ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, {{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key5", null}, 2}, @@ -2307,7 +2335,7 @@ foldwithimm_simple_test() -> QueryArray, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, - {AccFun, []}), + {AccFun, [], Now}), ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, {{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key5", null}, 2}], AccA), @@ -2322,7 +2350,7 @@ foldwithimm_simple_test() -> AccB = keyfolder(IMMiterB, QueryArray, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, - {AccFun, []}), + {AccFun, [], Now}), ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, {{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key4", null}, 10}, diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index a51dacc..047a119 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -45,8 +45,26 @@ -type key_range() :: {leveled_codec:ledger_key()|null, leveled_codec:ledger_key()|null}. --type fun_and_acc() - :: {fun(), any()}. +-type foldacc() :: any(). + % Can't currently be specific about what an acc might be + +-type fold_objects_fun() + :: fun((leveled_codec:key(), leveled_codec:key(), any(), foldacc()) + -> foldacc()). +-type fold_keys_fun() + :: fun((leveled_codec:key(), leveled_codec:key(), foldacc()) + -> foldacc()). +-type fold_buckets_fun() + :: fun((leveled_codec:key(), foldacc()) -> foldacc()). +-type fold_filter_fun() + :: fun((leveled_codec:key(), leveled_codec:key()) -> accumulate|pass). + +-type snap_fun() + :: fun(() -> {ok, pid(), pid()|null}). +-type runner_fun() + :: fun(() -> foldacc()). +-type acc_fun() + :: fun((leveled_codec:key(), any(), foldacc()) -> foldacc()). %%%============================================================================ @@ -54,7 +72,8 @@ %%%============================================================================ --spec bucket_sizestats(fun(), any(), leveled_codec:tag()) -> {async, fun()}. +-spec bucket_sizestats(snap_fun(),leveled_codec:key(), leveled_codec:tag()) + -> {async, runner_fun()}. %% @doc %% Fold over a bucket accumulating the count of objects and their total sizes bucket_sizestats(SnapFun, Bucket, Tag) -> @@ -75,16 +94,19 @@ bucket_sizestats(SnapFun, Bucket, Tag) -> end, {async, Runner}. --spec bucket_list(fun(), leveled_codec:tag(), fun(), any()) - -> {async, fun()}. +-spec bucket_list(snap_fun(), + leveled_codec:tag(), + fold_buckets_fun(), foldacc()) -> {async, runner_fun()}. %% @doc %% List buckets for tag, assuming bucket names are all either binary, ascii %% strings or integers bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc) -> bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, -1). --spec bucket_list(fun(), leveled_codec:tag(), fun(), any(), integer()) - -> {async, fun()}. +-spec bucket_list(snap_fun(), + leveled_codec:tag(), + fold_buckets_fun(), foldacc(), + integer()) -> {async, runner_fun()}. %% @doc %% set Max Buckets to -1 to list all buckets, otherwise will only return %% MaxBuckets (use 1 to confirm that there exists any bucket for a given Tag) @@ -114,11 +136,12 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) -> end, {async, Runner}. --spec index_query(fun(), +-spec index_query(snap_fun(), {leveled_codec:ledger_key(), leveled_codec:ledger_key(), {boolean(), undefined|re:mp()|iodata()}}, - fun_and_acc()) -> {async, fun()}. + {fold_keys_fun(), foldacc()}) + -> {async, runner_fun()}. %% @doc %% Secondary index query %% This has the special capability that it will expect a message to be thrown @@ -157,9 +180,13 @@ index_query(SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT) -> end, {async, Runner}. --spec bucketkey_query(fun(), leveled_codec:tag(), any(), - key_range(), fun_and_acc(), - leveled_codec:regular_expression()) -> {async, fun()}. +-spec bucketkey_query(snap_fun(), + leveled_codec:tag(), + leveled_codec:key()|null, + key_range(), + {fold_keys_fun(), foldacc()}, + leveled_codec:regular_expression()) + -> {async, runner_fun()}. %% @doc %% Fold over all keys in `KeyRange' under tag (restricted to a given bucket) bucketkey_query(SnapFun, Tag, Bucket, @@ -186,14 +213,18 @@ bucketkey_query(SnapFun, Tag, Bucket, end, {async, Runner}. --spec bucketkey_query(fun(), leveled_codec:tag(), any(), fun_and_acc()) - -> {async, fun()}. +-spec bucketkey_query(snap_fun(), + leveled_codec:tag(), + leveled_codec:key()|null, + {fold_keys_fun(), foldacc()}) -> {async, runner_fun()}. %% @doc %% Fold over all keys under tag (potentially restricted to a given bucket) bucketkey_query(SnapFun, Tag, Bucket, FunAcc) -> bucketkey_query(SnapFun, Tag, Bucket, {null, null}, FunAcc, undefined). --spec hashlist_query(fun(), leveled_codec:tag(), boolean()) -> {async, fun()}. +-spec hashlist_query(snap_fun(), + leveled_codec:tag(), + boolean()) -> {async, runner_fun()}. %% @doc %% Fold over the keys under a given Tag accumulating the hashes hashlist_query(SnapFun, Tag, JournalCheck) -> @@ -219,10 +250,10 @@ hashlist_query(SnapFun, Tag, JournalCheck) -> end, {async, Runner}. --spec tictactree(fun(), - {leveled_codec:tag(), any(), tuple()}, - boolean(), atom(), fun()) - -> {async, fun()}. +-spec tictactree(snap_fun(), + {leveled_codec:tag(), leveled_codec:key(), tuple()}, + boolean(), atom(), fold_filter_fun()) + -> {async, runner_fun()}. %% @doc %% Return a merkle tree from the fold, directly accessing hashes cached in the %% metadata @@ -281,10 +312,11 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) -> end, {async, Runner}. --spec foldheads_allkeys(fun(), leveled_codec:tag(), - fun(), boolean(), false|list(integer()), +-spec foldheads_allkeys(snap_fun(), leveled_codec:tag(), + fold_objects_fun()|{fold_objects_fun(), foldacc()}, + boolean(), false|list(integer()), false|leveled_codec:lastmod_range(), - false|pos_integer()) -> {async, fun()}. + false|pos_integer()) -> {async, runner_fun()}. %% @doc %% Fold over all heads in the store for a given tag - applying the passed %% function to each proxy object @@ -301,8 +333,11 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, LastModRange, MaxObjectCount). --spec foldobjects_allkeys(fun(), leveled_codec:tag(), fun(), - key_order|sqn_order) -> {async, fun()}. +-spec foldobjects_allkeys(snap_fun(), + leveled_codec:tag(), + fold_objects_fun()|{fold_objects_fun(), foldacc()}, + key_order|sqn_order) + -> {async, runner_fun()}. %% @doc %% Fold over all objects for a given tag foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) -> @@ -409,10 +444,11 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> {async, Folder}. --spec foldobjects_bybucket(fun(), +-spec foldobjects_bybucket(snap_fun(), leveled_codec:tag(), list(key_range()), - fun()) -> {async, fun()}. + fold_objects_fun()|{fold_objects_fun(), foldacc()}) + -> {async, runner_fun()}. %% @doc %% Fold over all objects within a given key range in a bucket foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> @@ -423,15 +459,15 @@ foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> false, false). --spec foldheads_bybucket(fun(), - atom(), - list({any(), any()}), - fun(), +-spec foldheads_bybucket(snap_fun(), + leveled_codec:tag(), + list(key_range()), + fold_objects_fun()|{fold_objects_fun(), foldacc()}, boolean(), false|list(integer()), false|leveled_codec:lastmod_range(), false|pos_integer()) - -> {async, fun()}. + -> {async, runner_fun()}. %% @doc %% Fold over all object metadata within a given key range in a bucket foldheads_bybucket(SnapFun, @@ -449,7 +485,10 @@ foldheads_bybucket(SnapFun, LastModRange, MaxObjectCount). --spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}. +-spec foldobjects_byindex(snap_fun(), + tuple(), + fold_objects_fun()|{fold_objects_fun(), foldacc()}) + -> {async, runner_fun()}. %% @doc %% Folds over an index, fetching the objects associated with the keys returned %% and passing those objects into the fold function @@ -475,7 +514,6 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) -> get_nextbucket(_NextB, _NextK, _Tag, _LS, BKList, {Limit, Limit}) -> lists:reverse(BKList); get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> - Now = leveled_util:integer_now(), StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), ExtractFun = @@ -491,43 +529,33 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> {1, null} -> leveled_log:log("B0008",[]), BKList; - {0, {{B, K}, V}} -> - case leveled_codec:is_active({Tag, B, K, null}, V, Now) of - true -> - leveled_log:log("B0009",[B]), - get_nextbucket(leveled_codec:next_key(B), - null, - Tag, - LedgerSnapshot, - [{B, K}|BKList], - {C + 1, L}); - false -> - NK = - case Tag of - ?HEAD_TAG -> - {PK, SK} = K, - {PK, leveled_codec:next_key(SK)}; - _ -> - leveled_codec:next_key(K) - end, - get_nextbucket(B, NK, Tag, LedgerSnapshot, BKList, {C, L}) - end + {0, {{B, K}, _V}} -> + leveled_log:log("B0009",[B]), + get_nextbucket(leveled_codec:next_key(B), + null, + Tag, + LedgerSnapshot, + [{B, K}|BKList], + {C + 1, L}) end. --spec foldobjects(fun(), atom(), list(), fun(), +-spec foldobjects(snap_fun(), + atom(), + list(), + fold_objects_fun()|{fold_objects_fun(), foldacc()}, false|{true, boolean()}, false|list(integer())) -> - {async, fun()}. + {async, runner_fun()}. foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList, false, false). --spec foldobjects(fun(), atom(), list(), fun(), +-spec foldobjects(snap_fun(), atom(), list(), + fold_objects_fun()|{fold_objects_fun(), foldacc()}, false|{true, boolean()}, false|list(integer()), false|leveled_codec:lastmod_range(), - false|pos_integer()) -> - {async, fun()}. + false|pos_integer()) -> {async, runner_fun()}. %% @doc %% The object folder should be passed DeferredFetch. %% DeferredFetch can either be false (which will return to the fold function @@ -597,16 +625,10 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, accumulate_size() -> - Now = leveled_util:integer_now(), - AccFun = fun(Key, Value, {Size, Count}) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Size + leveled_codec:get_size(Key, Value), - Count + 1}; - false -> - {Size, Count} - end - end, + AccFun = + fun(Key, Value, {Size, Count}) -> + {Size + leveled_codec:get_size(Key, Value), Count + 1} + end, AccFun. accumulate_hashes(JournalCheck, InkerClone) -> @@ -633,33 +655,30 @@ accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) -> AddKeyFun). get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> - Now = leveled_util:integer_now(), AccFun = fun(LK, V, Acc) -> - case leveled_codec:is_active(LK, V, Now) of + {B, K, H} = leveled_codec:get_keyandobjhash(LK, V), + Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB, + case JournalCheck and Check of true -> - {B, K, H} = leveled_codec:get_keyandobjhash(LK, V), - Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB, - case {JournalCheck, Check} of - {true, true} -> - case check_presence(LK, V, InkerClone) of - true -> - AddKeyFun(B, K, H, Acc); - false -> - Acc - end; - _ -> - AddKeyFun(B, K, H, Acc) + case check_presence(LK, V, InkerClone) of + true -> + AddKeyFun(B, K, H, Acc); + false -> + Acc end; - false -> - Acc + _ -> + AddKeyFun(B, K, H, Acc) end end, AccFun. - +-spec accumulate_objects(fold_objects_fun(), + pid()|null, + leveled_codec:tag(), + false|{true, boolean()}) + -> acc_fun(). accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> - Now = leveled_util:integer_now(), AccFun = fun(LK, V, Acc) -> % The function takes the Ledger Key and the value from the @@ -671,49 +690,43 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> % a fold_objects), then a metadata object needs to be built to be % returned - but a quick check that Key is present in the Journal % is made first - case leveled_codec:is_active(LK, V, Now) of - true -> - {SQN, _St, _MH, MD} = - leveled_codec:striphead_to_v1details(V), - {B, K} = - case leveled_codec:from_ledgerkey(LK) of - {B0, K0} -> - {B0, K0}; - {B0, K0, _T0} -> - {B0, K0} - end, - JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, - case DeferredFetch of - {true, JournalCheck} -> - ProxyObj = - leveled_codec:return_proxy(Tag, MD, - InkerClone, JK), - case JournalCheck of - true -> - InJournal = - leveled_inker:ink_keycheck(InkerClone, - LK, - SQN), - case InJournal of - probably -> - FoldObjectsFun(B, K, ProxyObj, Acc); - missing -> - Acc - end; - false -> - FoldObjectsFun(B, K, ProxyObj, Acc) + {SQN, _St, _MH, MD} = + leveled_codec:striphead_to_v1details(V), + {B, K} = + case leveled_codec:from_ledgerkey(LK) of + {B0, K0} -> + {B0, K0}; + {B0, K0, _T0} -> + {B0, K0} + end, + JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, + case DeferredFetch of + {true, JournalCheck} -> + ProxyObj = + leveled_codec:return_proxy(Tag, MD, InkerClone, JK), + case JournalCheck of + true -> + InJournal = + leveled_inker:ink_keycheck(InkerClone, + LK, + SQN), + case InJournal of + probably -> + FoldObjectsFun(B, K, ProxyObj, Acc); + missing -> + Acc end; false -> - R = leveled_bookie:fetch_value(InkerClone, JK), - case R of - not_present -> - Acc; - Value -> - FoldObjectsFun(B, K, Value, Acc) - end + FoldObjectsFun(B, K, ProxyObj, Acc) end; false -> - Acc + R = leveled_bookie:fetch_value(InkerClone, JK), + case R of + not_present -> + Acc; + Value -> + FoldObjectsFun(B, K, Value, Acc) + end end end, AccFun. @@ -729,25 +742,19 @@ check_presence(Key, Value, InkerClone) -> end. accumulate_keys(FoldKeysFun, TermRegex) -> - Now = leveled_util:integer_now(), AccFun = - fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {B, K} = leveled_codec:from_ledgerkey(Key), - case TermRegex of - undefined -> - FoldKeysFun(B, K, Acc); - Re -> - case re:run(K, Re) of - nomatch -> - Acc; - _ -> - FoldKeysFun(B, K, Acc) - end - end; - false -> - Acc + fun(Key, _Value, Acc) -> + {B, K} = leveled_codec:from_ledgerkey(Key), + case TermRegex of + undefined -> + FoldKeysFun(B, K, Acc); + Re -> + case re:run(K, Re) of + nomatch -> + Acc; + _ -> + FoldKeysFun(B, K, Acc) + end end end, AccFun. @@ -759,37 +766,22 @@ add_terms(ObjKey, IdxValue) -> {IdxValue, ObjKey}. accumulate_index(TermRe, AddFun, FoldKeysFun) -> - Now = leveled_util:integer_now(), case TermRe of undefined -> - fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Bucket, - ObjKey, - IdxValue} = leveled_codec:from_ledgerkey(Key), - FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc); - false -> - Acc - end end; + fun(Key, _Value, Acc) -> + {Bucket, ObjKey, IdxValue} = leveled_codec:from_ledgerkey(Key), + FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc) + end; TermRe -> - fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Bucket, - ObjKey, - IdxValue} = leveled_codec:from_ledgerkey(Key), - case re:run(IdxValue, TermRe) of - nomatch -> - Acc; - _ -> - FoldKeysFun(Bucket, - AddFun(ObjKey, IdxValue), - Acc) - end; - false -> - Acc - end end + fun(Key, _Value, Acc) -> + {Bucket, ObjKey, IdxValue} = leveled_codec:from_ledgerkey(Key), + case re:run(IdxValue, TermRe) of + nomatch -> + Acc; + _ -> + FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc) + end + end end. -spec wrap_runner(fun(), fun()) -> any().