From 108527a8d9230589c69d18ec9d3ac377f5b2837c Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 4 Dec 2020 12:49:17 +0000 Subject: [PATCH] is_active check within fold Not within the fold fun of the leveled_runner. This should avoid constantly having to re-merge and filter the penciller memory when running list_buckets and hitting inactive keys --- src/leveled_penciller.erl | 94 +++++++++++------ src/leveled_runner.erl | 216 ++++++++++++++------------------------ 2 files changed, 142 insertions(+), 168 deletions(-) diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 5cfb655..a719cf3 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -258,6 +258,7 @@ is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), + snapshot_time :: pos_integer() | undefined, source_penciller :: pid() | undefined, bookie_monref :: reference() | undefined, levelzero_astree :: list() | undefined, @@ -306,6 +307,15 @@ -type iterator() :: list(iterator_entry()). -type bad_ledgerkey() :: list(). -type sqn_check() :: current|replaced|missing. +-type pclacc_fun() :: + fun((leveled_codec:ledger_key(), + leveled_codec:ledger_value(), + any()) -> any()). +-type sst_fetchfun() :: + fun((pid(), + leveled_codec:ledger_key(), + leveled_codec:segment_hash(), + non_neg_integer()) -> leveled_codec:ledger_kv()|not_present). -export_type([levelzero_cacheentry/0, sqn_check/0]). @@ -351,7 +361,10 @@ pcl_pushmem(Pid, LedgerCache) -> %% Bookie to dump memory onto penciller gen_server:call(Pid, {push_mem, LedgerCache}, infinity). --spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok. +-spec pcl_fetchlevelzero(pid(), + non_neg_integer(), + fun((levelzero_cacheentry()) -> ok)) + -> ok. %% @doc %% Allows a single slot of the penciller's levelzero cache to be fetched. The %% levelzero cache can be up to 40K keys - sending this to the process that is @@ -402,7 +415,7 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) -> -spec pcl_fetchkeys(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), - fun(), any(), as_pcl|by_runner) -> any(). + pclacc_fun(), any(), as_pcl|by_runner) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This will cover %% all keys in the range - so must only be run against snapshots of the @@ -428,7 +441,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> -spec pcl_fetchkeysbysegment(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), - fun(), any(), + pclacc_fun(), any(), leveled_codec:segment_list(), false | leveled_codec:lastmod_range(), boolean()) -> any(). @@ -465,7 +478,7 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, -spec pcl_fetchnextkey(pid(), leveled_codec:ledger_key(), leveled_codec:ledger_key(), - fun(), any()) -> any(). + pclacc_fun(), any()) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This has the %% same constraints as pcl_fetchkeys/5, but will only return the first key @@ -799,7 +812,7 @@ handle_call({fetch_keys, fun() -> keyfolder({FilteredL0, SSTiter}, {StartKey, EndKey}, - {AccFun, InitAcc}, + {AccFun, InitAcc, State#state.snapshot_time}, {SegmentList, LastModRange0, MaxKeys}) end, case By of @@ -889,6 +902,7 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning}, {reply, {ok, CloneState#state{snapshot_fully_loaded=true, + snapshot_time = leveled_util:integer_now(), manifest=ManifestClone}}, State#state{manifest = Manifest0}}; handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true -> @@ -1447,7 +1461,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> -spec fetch(tuple(), {integer(), integer()}, leveled_pmanifest:manifest(), integer(), - fun()) -> {tuple()|not_present, integer()|basement}. + sst_fetchfun()) -> {tuple()|not_present, integer()|basement}. %% @doc %% Fetch from the persisted portion of the LSM tree, checking each level in %% turn until a match is found. @@ -1524,7 +1538,8 @@ compare_to_sqn(Obj, SQN) -> %%%============================================================================ --spec keyfolder(list(), list(), tuple(), tuple(), {fun(), any()}) -> any(). +-spec keyfolder(list(), list(), tuple(), tuple(), + {pclacc_fun(), any(), pos_integer()}) -> any(). %% @doc %% The keyfolder will compare an iterator across the immutable in-memory cache %% of the Penciller (the IMMiter), with an iterator across the persisted part @@ -1542,16 +1557,18 @@ compare_to_sqn(Obj, SQN) -> %% To advance the SSTiter the find_nextkey/4 function is used, as the SSTiter %% is an iterator across multiple levels - and so needs to do its own %% comparisons to pop the next result. -keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) -> +keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc, Now}) -> keyfolder({IMMiter, SSTiter}, {StartKey, EndKey}, - {AccFun, Acc}, + {AccFun, Acc, Now}, {false, {0, infinity}, -1}). -keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, - {_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 -> +keyfolder(_Iterators, + _KeyRange, + {_AccFun, Acc, _Now}, + {_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 -> {0, Acc}; -keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, +keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc, Now}, {SegmentList, LastModRange, MaxKeys}) -> {StartKey, EndKey} = KeyRange, case find_nextkey(SSTiter, StartKey, EndKey, @@ -1569,16 +1586,17 @@ keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, end; {NxSSTiter, {SSTKey, SSTVal}} -> {Acc1, MK1} = - maybe_accumulate(SSTKey, SSTVal, Acc, AccFun, + maybe_accumulate(SSTKey, SSTVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), keyfolder({[], NxSSTiter}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}) end; keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange, - {AccFun, Acc}, + {AccFun, Acc, Now}, {SegmentList, LastModRange, MaxKeys}) -> {StartKey, EndKey} = KeyRange, case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of @@ -1588,7 +1606,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, % (see above) keyfolder({[], SSTiterator}, KeyRange, - {AccFun, Acc}, + {AccFun, Acc, Now}, {SegmentList, LastModRange, MaxKeys}); {false, false} -> case find_nextkey(SSTiterator, StartKey, EndKey, @@ -1597,12 +1615,13 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, % No more keys in range in the persisted store, so use the % in-memory KV as the next {Acc1, MK1} = - maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, + maybe_accumulate(IMMKey, IMMVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), keyfolder({NxIMMiterator, []}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}); {NxSSTiterator, {SSTKey, SSTVal}} -> % There is a next key, so need to know which is the @@ -1614,7 +1633,8 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, SSTVal}) of left_hand_first -> {Acc1, MK1} = - maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, + maybe_accumulate(IMMKey, IMMVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), % Stow the previous best result away at Level -1 % so that there is no need to iterate to it again @@ -1625,20 +1645,22 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, NxSSTiterator, NewEntry)}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}); right_hand_first -> {Acc1, MK1} = - maybe_accumulate(SSTKey, SSTVal, Acc, AccFun, + maybe_accumulate(SSTKey, SSTVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], NxSSTiterator}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}); left_hand_dominant -> {Acc1, MK1} = - maybe_accumulate(IMMKey, IMMVal, Acc, AccFun, + maybe_accumulate(IMMKey, IMMVal, + {Acc, AccFun, Now}, MaxKeys, LastModRange), % We can add to the accumulator here. As the SST % key was the most dominant across all SST levels, @@ -1647,7 +1669,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, keyfolder({NxIMMiterator, NxSSTiterator}, KeyRange, - {AccFun, Acc1}, + {AccFun, Acc1, Now}, {SegmentList, LastModRange, MK1}) end end @@ -1655,16 +1677,21 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, -spec maybe_accumulate(leveled_codec:ledger_key(), leveled_codec:ledger_value(), - any(), fun(), integer(), - {non_neg_integer(), non_neg_integer()|infinity}) -> - any(). + {any(), pclacc_fun(), pos_integer()}, + integer(), + {non_neg_integer(), non_neg_integer()|infinity}) + -> any(). %% @doc %% Make an accumulation decision based one the date range -maybe_accumulate(LK, LV, Acc, AccFun, MaxKeys, {LowLastMod, HighLastMod}) -> +maybe_accumulate(LK, LV, + {Acc, AccFun, QueryStartTime}, + MaxKeys, + {LowLastMod, HighLastMod}) -> {_SQN, _SH, LMD} = leveled_codec:strip_to_indexdetails({LK, LV}), RunAcc = - (LMD == undefined) or ((LMD >= LowLastMod) and (LMD =< HighLastMod)), - case RunAcc of + (LMD == undefined) or + ((LMD >= LowLastMod) and (LMD =< HighLastMod)), + case RunAcc and leveled_codec:is_active(LK, LV, QueryStartTime) of true -> {AccFun(LK, LV, Acc), MaxKeys - 1}; false -> @@ -2273,6 +2300,7 @@ sqnoverlap_otherway_findnextkey_test() -> ?assertMatch(no_more_keys, ER). foldwithimm_simple_test() -> + Now = leveled_util:integer_now(), QueryArray = [ {2, [{{o, "Bucket1", "Key1", null}, {5, {active, infinity}, 0, null}}, @@ -2295,7 +2323,7 @@ foldwithimm_simple_test() -> Acc = keyfolder(IMMiter, QueryArray, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, - {AccFun, []}), + {AccFun, [], Now}), ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, {{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key5", null}, 2}, @@ -2307,7 +2335,7 @@ foldwithimm_simple_test() -> QueryArray, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, - {AccFun, []}), + {AccFun, [], Now}), ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, {{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key5", null}, 2}], AccA), @@ -2322,7 +2350,7 @@ foldwithimm_simple_test() -> AccB = keyfolder(IMMiterB, QueryArray, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, - {AccFun, []}), + {AccFun, [], Now}), ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, {{o, "Bucket1", "Key3", null}, 3}, {{o, "Bucket1", "Key4", null}, 10}, diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index a51dacc..382a7b2 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -475,7 +475,6 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) -> get_nextbucket(_NextB, _NextK, _Tag, _LS, BKList, {Limit, Limit}) -> lists:reverse(BKList); get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> - Now = leveled_util:integer_now(), StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), ExtractFun = @@ -491,27 +490,14 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> {1, null} -> leveled_log:log("B0008",[]), BKList; - {0, {{B, K}, V}} -> - case leveled_codec:is_active({Tag, B, K, null}, V, Now) of - true -> - leveled_log:log("B0009",[B]), - get_nextbucket(leveled_codec:next_key(B), - null, - Tag, - LedgerSnapshot, - [{B, K}|BKList], - {C + 1, L}); - false -> - NK = - case Tag of - ?HEAD_TAG -> - {PK, SK} = K, - {PK, leveled_codec:next_key(SK)}; - _ -> - leveled_codec:next_key(K) - end, - get_nextbucket(B, NK, Tag, LedgerSnapshot, BKList, {C, L}) - end + {0, {{B, K}, _V}} -> + leveled_log:log("B0009",[B]), + get_nextbucket(leveled_codec:next_key(B), + null, + Tag, + LedgerSnapshot, + [{B, K}|BKList], + {C + 1, L}) end. @@ -597,16 +583,10 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, accumulate_size() -> - Now = leveled_util:integer_now(), - AccFun = fun(Key, Value, {Size, Count}) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Size + leveled_codec:get_size(Key, Value), - Count + 1}; - false -> - {Size, Count} - end - end, + AccFun = + fun(Key, Value, {Size, Count}) -> + {Size + leveled_codec:get_size(Key, Value), Count + 1} + end, AccFun. accumulate_hashes(JournalCheck, InkerClone) -> @@ -633,33 +613,26 @@ accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) -> AddKeyFun). get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> - Now = leveled_util:integer_now(), AccFun = fun(LK, V, Acc) -> - case leveled_codec:is_active(LK, V, Now) of - true -> - {B, K, H} = leveled_codec:get_keyandobjhash(LK, V), - Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB, - case {JournalCheck, Check} of - {true, true} -> - case check_presence(LK, V, InkerClone) of - true -> - AddKeyFun(B, K, H, Acc); - false -> - Acc - end; - _ -> - AddKeyFun(B, K, H, Acc) + {B, K, H} = leveled_codec:get_keyandobjhash(LK, V), + Check = leveled_rand:uniform() < ?CHECKJOURNAL_PROB, + case {JournalCheck, Check} of + {true, true} -> + case check_presence(LK, V, InkerClone) of + true -> + AddKeyFun(B, K, H, Acc); + false -> + Acc end; - false -> - Acc + _ -> + AddKeyFun(B, K, H, Acc) end end, AccFun. accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> - Now = leveled_util:integer_now(), AccFun = fun(LK, V, Acc) -> % The function takes the Ledger Key and the value from the @@ -671,49 +644,43 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> % a fold_objects), then a metadata object needs to be built to be % returned - but a quick check that Key is present in the Journal % is made first - case leveled_codec:is_active(LK, V, Now) of - true -> - {SQN, _St, _MH, MD} = - leveled_codec:striphead_to_v1details(V), - {B, K} = - case leveled_codec:from_ledgerkey(LK) of - {B0, K0} -> - {B0, K0}; - {B0, K0, _T0} -> - {B0, K0} - end, - JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, - case DeferredFetch of - {true, JournalCheck} -> - ProxyObj = - leveled_codec:return_proxy(Tag, MD, - InkerClone, JK), - case JournalCheck of - true -> - InJournal = - leveled_inker:ink_keycheck(InkerClone, - LK, - SQN), - case InJournal of - probably -> - FoldObjectsFun(B, K, ProxyObj, Acc); - missing -> - Acc - end; - false -> - FoldObjectsFun(B, K, ProxyObj, Acc) + {SQN, _St, _MH, MD} = + leveled_codec:striphead_to_v1details(V), + {B, K} = + case leveled_codec:from_ledgerkey(LK) of + {B0, K0} -> + {B0, K0}; + {B0, K0, _T0} -> + {B0, K0} + end, + JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, + case DeferredFetch of + {true, JournalCheck} -> + ProxyObj = + leveled_codec:return_proxy(Tag, MD, InkerClone, JK), + case JournalCheck of + true -> + InJournal = + leveled_inker:ink_keycheck(InkerClone, + LK, + SQN), + case InJournal of + probably -> + FoldObjectsFun(B, K, ProxyObj, Acc); + missing -> + Acc end; false -> - R = leveled_bookie:fetch_value(InkerClone, JK), - case R of - not_present -> - Acc; - Value -> - FoldObjectsFun(B, K, Value, Acc) - end + FoldObjectsFun(B, K, ProxyObj, Acc) end; false -> - Acc + R = leveled_bookie:fetch_value(InkerClone, JK), + case R of + not_present -> + Acc; + Value -> + FoldObjectsFun(B, K, Value, Acc) + end end end, AccFun. @@ -729,25 +696,19 @@ check_presence(Key, Value, InkerClone) -> end. accumulate_keys(FoldKeysFun, TermRegex) -> - Now = leveled_util:integer_now(), AccFun = - fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {B, K} = leveled_codec:from_ledgerkey(Key), - case TermRegex of - undefined -> - FoldKeysFun(B, K, Acc); - Re -> - case re:run(K, Re) of - nomatch -> - Acc; - _ -> - FoldKeysFun(B, K, Acc) - end - end; - false -> - Acc + fun(Key, _Value, Acc) -> + {B, K} = leveled_codec:from_ledgerkey(Key), + case TermRegex of + undefined -> + FoldKeysFun(B, K, Acc); + Re -> + case re:run(K, Re) of + nomatch -> + Acc; + _ -> + FoldKeysFun(B, K, Acc) + end end end, AccFun. @@ -759,37 +720,22 @@ add_terms(ObjKey, IdxValue) -> {IdxValue, ObjKey}. accumulate_index(TermRe, AddFun, FoldKeysFun) -> - Now = leveled_util:integer_now(), case TermRe of undefined -> - fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Bucket, - ObjKey, - IdxValue} = leveled_codec:from_ledgerkey(Key), - FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc); - false -> - Acc - end end; + fun(Key, _Value, Acc) -> + {Bucket, ObjKey, IdxValue} = leveled_codec:from_ledgerkey(Key), + FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc) + end; TermRe -> - fun(Key, Value, Acc) -> - case leveled_codec:is_active(Key, Value, Now) of - true -> - {Bucket, - ObjKey, - IdxValue} = leveled_codec:from_ledgerkey(Key), - case re:run(IdxValue, TermRe) of - nomatch -> - Acc; - _ -> - FoldKeysFun(Bucket, - AddFun(ObjKey, IdxValue), - Acc) - end; - false -> - Acc - end end + fun(Key, _Value, Acc) -> + {Bucket, ObjKey, IdxValue} = leveled_codec:from_ledgerkey(Key), + case re:run(IdxValue, TermRe) of + nomatch -> + Acc; + _ -> + FoldKeysFun(Bucket, AddFun(ObjKey, IdxValue), Acc) + end + end end. -spec wrap_runner(fun(), fun()) -> any().