MaxCount monitoring and responding

Stop issue of {no_more_keys, Acc} being passed on fold over list of ranges to next range (and blowing up)
This commit is contained in:
Martin Sumner 2018-11-01 23:40:28 +00:00
parent dc84eabe0c
commit 2eec8a5378
4 changed files with 69 additions and 36 deletions

View file

@ -909,15 +909,8 @@ book_headfold(Pid, Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentLis
%% All version 1 objects will be included in the result set regardless of Last
%% Modified Date.
%% The Max Object Count will stop the fold once the count has been reached on
%% this store only. The Max Object Count if provided will mean that on
%% completion of the fold the accumulator will be wrapped in a tuple to
%% indicate the reason for completion:
%% - {no_more_keys, Acc} if the end of the range was reached wihtout hitting
%% the Max Object Count limit
%% - {max_count, Acc} if the Max Object Count limit was reached before
%% reaching the end of the range
%% If MaxObjectCount is false then the Acc will be returned not wrapped in a
%% tuple
%% this store only. The Max Object Count if provided will mean that the runner
%% will return {RemainingCount, Acc} not just Acc
-spec book_headfold(pid(), Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
{async, Runner} when
@ -939,7 +932,8 @@ book_headfold(Pid, Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentLis
SegmentList :: false | list(integer()),
LastModRange :: false | leveled_codec:lastmod_range(),
MaxObjectCount :: false | pos_integer(),
Runner :: fun(() -> Acc).
Runner :: fun(() -> ResultingAcc),
ResultingAcc :: Acc | {non_neg_integer(), Acc}.
book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
RunnerType =

View file

@ -423,7 +423,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
fun(), any(),
leveled_codec:segment_list(),
false | leveled_codec:lastmod_range(),
false | pos_integer()) -> any().
boolean()) -> any().
%% @doc
%% Run a range query between StartKey and EndKey (inclusive). This will cover
%% all keys in the range - so must only be run against snapshots of the
@ -437,21 +437,20 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) ->
%% indexing by segment. This cannot be used on ?IDX_TAG and other tags that
%% use the no_lookup hash
pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc,
SegmentList, LastModRange, MaxObjectCount) ->
MaxKeys =
case MaxObjectCount of
SegmentList, LastModRange, LimitByCount) ->
{MaxKeys, InitAcc0} =
case LimitByCount of
true ->
% The passed in accumulator should have the Max Key Count
% as the first element of a tuple with the actual accumulator
InitAcc;
false ->
-1;
MOC when is_integer(MOC) ->
MOC
{-1, InitAcc}
end,
gen_server:call(Pid,
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
SegmentList,
LastModRange,
MaxKeys,
StartKey, EndKey, AccFun, InitAcc0,
SegmentList, LastModRange, MaxKeys,
as_pcl},
infinity).
@ -1405,7 +1404,7 @@ keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc},
{_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 ->
{max_count, Acc};
{0, Acc};
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc},
{SegmentList, LastModRange, MaxKeys}) ->
{StartKey, EndKey} = KeyRange,
@ -1414,10 +1413,12 @@ keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc},
no_more_keys ->
case MaxKeys > 0 of
true ->
% Need to single this query ended not because the
% MaxKeys was reached
{no_more_keys, Acc};
% This query had a max count, so we must respond with the
% remainder on the count
{MaxKeys, Acc};
false ->
% This query started with a MaxKeys set to -1. Query is
% not interested in having MaxKeys in Response
Acc
end;
{NxSSTiter, {SSTKey, SSTVal}} ->

View file

@ -465,10 +465,10 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) ->
ExtractFun,
null),
case R of
{no_more_keys, null} ->
{1, null} ->
leveled_log:log("B0008",[]),
BKList;
{_, {{B, K}, V}} ->
{0, {{B, K}, V}} ->
case leveled_codec:is_active({Tag, B, K, null}, V, Now) of
true ->
leveled_log:log("B0009",[B]),
@ -524,12 +524,20 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
% no initial accumulator passed, and so should be just a list
{FoldObjFun, []}
end,
{LimitByCount, InitAcc0} =
case MaxObjectCount of
false ->
{false, InitAcc};
MOC when is_integer(MOC) ->
{true, {MOC, InitAcc}}
end,
Folder =
fun() ->
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
AccFun = accumulate_objects(FoldFun,
AccFun =
accumulate_objects(FoldFun,
JournalSnapshot,
Tag,
DeferredFetch),
@ -543,9 +551,9 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
FoldAcc,
SegmentList,
LastModRange,
MaxObjectCount)
LimitByCount)
end,
Acc = lists:foldl(ListFoldFun, InitAcc, KeyRanges),
Acc = lists:foldl(ListFoldFun, InitAcc0, KeyRanges),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
case DeferredFetch of
{true, false} ->

View file

@ -79,7 +79,7 @@ fetchclocks_modifiedbetween(_Config) ->
_ObjL4EndTS = testutil:convert_to_seconds(os:timestamp()),
timer:sleep(1000),
_ObjL5StartTS = testutil:convert_to_seconds(os:timestamp()),
ObjL5StartTS = testutil:convert_to_seconds(os:timestamp()),
ObjList5 =
testutil:generate_objects(8000,
{fixed_binary, 1}, [],
@ -294,6 +294,36 @@ fetchclocks_modifiedbetween(_Config) ->
io:format("R6A_PlusFilter ~w~n", [R6A_PlusFilter]),
true = 19000 == element(2, R6A_PlusFilter),
% Hit limit of max count before trying next bucket, with and without a
% timestamp filter
{async, R7A_MultiBucketRunner} =
leveled_bookie:book_headfold(Bookie1A,
?RIAK_TAG,
{bucket_list, [<<"B1">>, <<"B2">>]},
{SimpleCountFun, 0},
false,
true,
false,
{ObjL5StartTS, ObjL6EndTS},
5000),
R7A_MultiBucket = R7A_MultiBucketRunner(),
io:format("R7A_MultiBucket ~w ~n", [R7A_MultiBucket]),
true = R7A_MultiBucket == {0, 5000},
{async, R8A_MultiBucketRunner} =
leveled_bookie:book_headfold(Bookie1A,
?RIAK_TAG,
{bucket_list, [<<"B1">>, <<"B2">>]},
{SimpleCountFun, 0},
false,
true,
false,
false,
5000),
R8A_MultiBucket = R8A_MultiBucketRunner(),
io:format("R8A_MultiBucket ~w ~n", [R8A_MultiBucket]),
true = R8A_MultiBucket == {0, 5000},
ok = leveled_bookie:book_destroy(Bookie1A),
ok = leveled_bookie:book_destroy(Bookie1B).