is_active check within fold
Not within the fold fun of the leveled_runner. This should avoid constantly having to re-merge and filter the penciller memory when running list_buckets and hitting inactive keys
This commit is contained in:
parent
f3f574de02
commit
108527a8d9
2 changed files with 142 additions and 168 deletions
|
@ -258,6 +258,7 @@
|
|||
|
||||
is_snapshot = false :: boolean(),
|
||||
snapshot_fully_loaded = false :: boolean(),
|
||||
snapshot_time :: pos_integer() | undefined,
|
||||
source_penciller :: pid() | undefined,
|
||||
bookie_monref :: reference() | undefined,
|
||||
levelzero_astree :: list() | undefined,
|
||||
|
@ -306,6 +307,15 @@
|
|||
-type iterator() :: list(iterator_entry()).
|
||||
-type bad_ledgerkey() :: list().
|
||||
-type sqn_check() :: current|replaced|missing.
|
||||
-type pclacc_fun() ::
|
||||
fun((leveled_codec:ledger_key(),
|
||||
leveled_codec:ledger_value(),
|
||||
any()) -> any()).
|
||||
-type sst_fetchfun() ::
|
||||
fun((pid(),
|
||||
leveled_codec:ledger_key(),
|
||||
leveled_codec:segment_hash(),
|
||||
non_neg_integer()) -> leveled_codec:ledger_kv()|not_present).
|
||||
|
||||
-export_type([levelzero_cacheentry/0, sqn_check/0]).
|
||||
|
||||
|
@ -351,7 +361,10 @@ pcl_pushmem(Pid, LedgerCache) ->
|
|||
%% Bookie to dump memory onto penciller
|
||||
gen_server:call(Pid, {push_mem, LedgerCache}, infinity).
|
||||
|
||||
-spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok.
|
||||
-spec pcl_fetchlevelzero(pid(),
|
||||
non_neg_integer(),
|
||||
fun((levelzero_cacheentry()) -> ok))
|
||||
-> ok.
|
||||
%% @doc
|
||||
%% Allows a single slot of the penciller's levelzero cache to be fetched. The
|
||||
%% levelzero cache can be up to 40K keys - sending this to the process that is
|
||||
|
@ -402,7 +415,7 @@ pcl_fetch(Pid, Key, Hash, UseL0Index) ->
|
|||
-spec pcl_fetchkeys(pid(),
|
||||
leveled_codec:ledger_key(),
|
||||
leveled_codec:ledger_key(),
|
||||
fun(), any(), as_pcl|by_runner) -> any().
|
||||
pclacc_fun(), any(), as_pcl|by_runner) -> any().
|
||||
%% @doc
|
||||
%% Run a range query between StartKey and EndKey (inclusive). This will cover
|
||||
%% all keys in the range - so must only be run against snapshots of the
|
||||
|
@ -428,7 +441,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
|
|||
-spec pcl_fetchkeysbysegment(pid(),
|
||||
leveled_codec:ledger_key(),
|
||||
leveled_codec:ledger_key(),
|
||||
fun(), any(),
|
||||
pclacc_fun(), any(),
|
||||
leveled_codec:segment_list(),
|
||||
false | leveled_codec:lastmod_range(),
|
||||
boolean()) -> any().
|
||||
|
@ -465,7 +478,7 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc,
|
|||
-spec pcl_fetchnextkey(pid(),
|
||||
leveled_codec:ledger_key(),
|
||||
leveled_codec:ledger_key(),
|
||||
fun(), any()) -> any().
|
||||
pclacc_fun(), any()) -> any().
|
||||
%% @doc
|
||||
%% Run a range query between StartKey and EndKey (inclusive). This has the
|
||||
%% same constraints as pcl_fetchkeys/5, but will only return the first key
|
||||
|
@ -799,7 +812,7 @@ handle_call({fetch_keys,
|
|||
fun() ->
|
||||
keyfolder({FilteredL0, SSTiter},
|
||||
{StartKey, EndKey},
|
||||
{AccFun, InitAcc},
|
||||
{AccFun, InitAcc, State#state.snapshot_time},
|
||||
{SegmentList, LastModRange0, MaxKeys})
|
||||
end,
|
||||
case By of
|
||||
|
@ -889,6 +902,7 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
|
|||
{reply,
|
||||
{ok,
|
||||
CloneState#state{snapshot_fully_loaded=true,
|
||||
snapshot_time = leveled_util:integer_now(),
|
||||
manifest=ManifestClone}},
|
||||
State#state{manifest = Manifest0}};
|
||||
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
|
||||
|
@ -1447,7 +1461,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
|||
|
||||
-spec fetch(tuple(), {integer(), integer()},
|
||||
leveled_pmanifest:manifest(), integer(),
|
||||
fun()) -> {tuple()|not_present, integer()|basement}.
|
||||
sst_fetchfun()) -> {tuple()|not_present, integer()|basement}.
|
||||
%% @doc
|
||||
%% Fetch from the persisted portion of the LSM tree, checking each level in
|
||||
%% turn until a match is found.
|
||||
|
@ -1524,7 +1538,8 @@ compare_to_sqn(Obj, SQN) ->
|
|||
%%%============================================================================
|
||||
|
||||
|
||||
-spec keyfolder(list(), list(), tuple(), tuple(), {fun(), any()}) -> any().
|
||||
-spec keyfolder(list(), list(), tuple(), tuple(),
|
||||
{pclacc_fun(), any(), pos_integer()}) -> any().
|
||||
%% @doc
|
||||
%% The keyfolder will compare an iterator across the immutable in-memory cache
|
||||
%% of the Penciller (the IMMiter), with an iterator across the persisted part
|
||||
|
@ -1542,16 +1557,18 @@ compare_to_sqn(Obj, SQN) ->
|
|||
%% To advance the SSTiter the find_nextkey/4 function is used, as the SSTiter
|
||||
%% is an iterator across multiple levels - and so needs to do its own
|
||||
%% comparisons to pop the next result.
|
||||
keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
|
||||
keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc, Now}) ->
|
||||
keyfolder({IMMiter, SSTiter},
|
||||
{StartKey, EndKey},
|
||||
{AccFun, Acc},
|
||||
{AccFun, Acc, Now},
|
||||
{false, {0, infinity}, -1}).
|
||||
|
||||
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc},
|
||||
{_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 ->
|
||||
keyfolder(_Iterators,
|
||||
_KeyRange,
|
||||
{_AccFun, Acc, _Now},
|
||||
{_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 ->
|
||||
{0, Acc};
|
||||
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc},
|
||||
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc, Now},
|
||||
{SegmentList, LastModRange, MaxKeys}) ->
|
||||
{StartKey, EndKey} = KeyRange,
|
||||
case find_nextkey(SSTiter, StartKey, EndKey,
|
||||
|
@ -1569,16 +1586,17 @@ keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc},
|
|||
end;
|
||||
{NxSSTiter, {SSTKey, SSTVal}} ->
|
||||
{Acc1, MK1} =
|
||||
maybe_accumulate(SSTKey, SSTVal, Acc, AccFun,
|
||||
maybe_accumulate(SSTKey, SSTVal,
|
||||
{Acc, AccFun, Now},
|
||||
MaxKeys, LastModRange),
|
||||
keyfolder({[], NxSSTiter},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
{AccFun, Acc1, Now},
|
||||
{SegmentList, LastModRange, MK1})
|
||||
end;
|
||||
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc},
|
||||
{AccFun, Acc, Now},
|
||||
{SegmentList, LastModRange, MaxKeys}) ->
|
||||
{StartKey, EndKey} = KeyRange,
|
||||
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
|
||||
|
@ -1588,7 +1606,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
|
|||
% (see above)
|
||||
keyfolder({[], SSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc},
|
||||
{AccFun, Acc, Now},
|
||||
{SegmentList, LastModRange, MaxKeys});
|
||||
{false, false} ->
|
||||
case find_nextkey(SSTiterator, StartKey, EndKey,
|
||||
|
@ -1597,12 +1615,13 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
|
|||
% No more keys in range in the persisted store, so use the
|
||||
% in-memory KV as the next
|
||||
{Acc1, MK1} =
|
||||
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun,
|
||||
maybe_accumulate(IMMKey, IMMVal,
|
||||
{Acc, AccFun, Now},
|
||||
MaxKeys, LastModRange),
|
||||
keyfolder({NxIMMiterator,
|
||||
[]},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
{AccFun, Acc1, Now},
|
||||
{SegmentList, LastModRange, MK1});
|
||||
{NxSSTiterator, {SSTKey, SSTVal}} ->
|
||||
% There is a next key, so need to know which is the
|
||||
|
@ -1614,7 +1633,8 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
|
|||
SSTVal}) of
|
||||
left_hand_first ->
|
||||
{Acc1, MK1} =
|
||||
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun,
|
||||
maybe_accumulate(IMMKey, IMMVal,
|
||||
{Acc, AccFun, Now},
|
||||
MaxKeys, LastModRange),
|
||||
% Stow the previous best result away at Level -1
|
||||
% so that there is no need to iterate to it again
|
||||
|
@ -1625,20 +1645,22 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
|
|||
NxSSTiterator,
|
||||
NewEntry)},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
{AccFun, Acc1, Now},
|
||||
{SegmentList, LastModRange, MK1});
|
||||
right_hand_first ->
|
||||
{Acc1, MK1} =
|
||||
maybe_accumulate(SSTKey, SSTVal, Acc, AccFun,
|
||||
maybe_accumulate(SSTKey, SSTVal,
|
||||
{Acc, AccFun, Now},
|
||||
MaxKeys, LastModRange),
|
||||
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
|
||||
NxSSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
{AccFun, Acc1, Now},
|
||||
{SegmentList, LastModRange, MK1});
|
||||
left_hand_dominant ->
|
||||
{Acc1, MK1} =
|
||||
maybe_accumulate(IMMKey, IMMVal, Acc, AccFun,
|
||||
maybe_accumulate(IMMKey, IMMVal,
|
||||
{Acc, AccFun, Now},
|
||||
MaxKeys, LastModRange),
|
||||
% We can add to the accumulator here. As the SST
|
||||
% key was the most dominant across all SST levels,
|
||||
|
@ -1647,7 +1669,7 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
|
|||
keyfolder({NxIMMiterator,
|
||||
NxSSTiterator},
|
||||
KeyRange,
|
||||
{AccFun, Acc1},
|
||||
{AccFun, Acc1, Now},
|
||||
{SegmentList, LastModRange, MK1})
|
||||
end
|
||||
end
|
||||
|
@ -1655,16 +1677,21 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
|
|||
|
||||
-spec maybe_accumulate(leveled_codec:ledger_key(),
|
||||
leveled_codec:ledger_value(),
|
||||
any(), fun(), integer(),
|
||||
{non_neg_integer(), non_neg_integer()|infinity}) ->
|
||||
any().
|
||||
{any(), pclacc_fun(), pos_integer()},
|
||||
integer(),
|
||||
{non_neg_integer(), non_neg_integer()|infinity})
|
||||
-> any().
|
||||
%% @doc
|
||||
%% Make an accumulation decision based one the date range
|
||||
maybe_accumulate(LK, LV, Acc, AccFun, MaxKeys, {LowLastMod, HighLastMod}) ->
|
||||
maybe_accumulate(LK, LV,
|
||||
{Acc, AccFun, QueryStartTime},
|
||||
MaxKeys,
|
||||
{LowLastMod, HighLastMod}) ->
|
||||
{_SQN, _SH, LMD} = leveled_codec:strip_to_indexdetails({LK, LV}),
|
||||
RunAcc =
|
||||
(LMD == undefined) or ((LMD >= LowLastMod) and (LMD =< HighLastMod)),
|
||||
case RunAcc of
|
||||
(LMD == undefined) or
|
||||
((LMD >= LowLastMod) and (LMD =< HighLastMod)),
|
||||
case RunAcc and leveled_codec:is_active(LK, LV, QueryStartTime) of
|
||||
true ->
|
||||
{AccFun(LK, LV, Acc), MaxKeys - 1};
|
||||
false ->
|
||||
|
@ -2273,6 +2300,7 @@ sqnoverlap_otherway_findnextkey_test() ->
|
|||
?assertMatch(no_more_keys, ER).
|
||||
|
||||
foldwithimm_simple_test() ->
|
||||
Now = leveled_util:integer_now(),
|
||||
QueryArray = [
|
||||
{2, [{{o, "Bucket1", "Key1", null},
|
||||
{5, {active, infinity}, 0, null}},
|
||||
|
@ -2295,7 +2323,7 @@ foldwithimm_simple_test() ->
|
|||
Acc = keyfolder(IMMiter,
|
||||
QueryArray,
|
||||
{o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},
|
||||
{AccFun, []}),
|
||||
{AccFun, [], Now}),
|
||||
?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
|
||||
{{o, "Bucket1", "Key3", null}, 3},
|
||||
{{o, "Bucket1", "Key5", null}, 2},
|
||||
|
@ -2307,7 +2335,7 @@ foldwithimm_simple_test() ->
|
|||
QueryArray,
|
||||
{o, "Bucket1", "Key1", null},
|
||||
{o, "Bucket1", "Key6", null},
|
||||
{AccFun, []}),
|
||||
{AccFun, [], Now}),
|
||||
?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
|
||||
{{o, "Bucket1", "Key3", null}, 3},
|
||||
{{o, "Bucket1", "Key5", null}, 2}], AccA),
|
||||
|
@ -2322,7 +2350,7 @@ foldwithimm_simple_test() ->
|
|||
AccB = keyfolder(IMMiterB,
|
||||
QueryArray,
|
||||
{o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},
|
||||
{AccFun, []}),
|
||||
{AccFun, [], Now}),
|
||||
?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
|
||||
{{o, "Bucket1", "Key3", null}, 3},
|
||||
{{o, "Bucket1", "Key4", null}, 10},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue