diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index dcfb6cd..09debba 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -909,15 +909,8 @@ book_headfold(Pid, Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentLis %% All version 1 objects will be included in the result set regardless of Last %% Modified Date. %% The Max Object Count will stop the fold once the count has been reached on -%% this store only. The Max Object Count if provided will mean that on -%% completion of the fold the accumulator will be wrapped in a tuple to -%% indicate the reason for completion: -%% - {no_more_keys, Acc} if the end of the range was reached wihtout hitting -%% the Max Object Count limit -%% - {max_count, Acc} if the Max Object Count limit was reached before -%% reaching the end of the range -%% If MaxObjectCount is false then the Acc will be returned not wrapped in a -%% tuple +%% this store only. The Max Object Count if provided will mean that the runner +%% will return {RemainingCount, Acc} not just Acc -spec book_headfold(pid(), Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentList, LastModRange, MaxObjectCount) -> {async, Runner} when @@ -939,7 +932,8 @@ book_headfold(Pid, Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentLis SegmentList :: false | list(integer()), LastModRange :: false | leveled_codec:lastmod_range(), MaxObjectCount :: false | pos_integer(), - Runner :: fun(() -> Acc). + Runner :: fun(() -> ResultingAcc), + ResultingAcc :: Acc | {non_neg_integer(), Acc}. book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold, SegmentList, LastModRange, MaxObjectCount) -> RunnerType = diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 21c822c..63c9fdc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -423,7 +423,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> fun(), any(), leveled_codec:segment_list(), false | leveled_codec:lastmod_range(), - false | pos_integer()) -> any(). + boolean()) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This will cover %% all keys in the range - so must only be run against snapshots of the @@ -437,21 +437,20 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> %% indexing by segment. This cannot be used on ?IDX_TAG and other tags that %% use the no_lookup hash pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, - SegmentList, LastModRange, MaxObjectCount) -> - MaxKeys = - case MaxObjectCount of + SegmentList, LastModRange, LimitByCount) -> + {MaxKeys, InitAcc0} = + case LimitByCount of + true -> + % The passed in accumulator should have the Max Key Count + % as the first element of a tuple with the actual accumulator + InitAcc; false -> - -1; - MOC when is_integer(MOC) -> - MOC + {-1, InitAcc} end, gen_server:call(Pid, {fetch_keys, - StartKey, EndKey, - AccFun, InitAcc, - SegmentList, - LastModRange, - MaxKeys, + StartKey, EndKey, AccFun, InitAcc0, + SegmentList, LastModRange, MaxKeys, as_pcl}, infinity). @@ -1405,7 +1404,7 @@ keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) -> keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, {_SegmentList, _LastModRange, MaxKeys}) when MaxKeys == 0 -> - {max_count, Acc}; + {0, Acc}; keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, {SegmentList, LastModRange, MaxKeys}) -> {StartKey, EndKey} = KeyRange, @@ -1414,10 +1413,12 @@ keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, no_more_keys -> case MaxKeys > 0 of true -> - % Need to single this query ended not because the - % MaxKeys was reached - {no_more_keys, Acc}; + % This query had a max count, so we must respond with the + % remainder on the count + {MaxKeys, Acc}; false -> + % This query started with a MaxKeys set to -1. Query is + % not interested in having MaxKeys in Response Acc end; {NxSSTiter, {SSTKey, SSTVal}} -> diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 9f4c25e..f034890 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -465,10 +465,10 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> ExtractFun, null), case R of - {no_more_keys, null} -> + {1, null} -> leveled_log:log("B0008",[]), BKList; - {_, {{B, K}, V}} -> + {0, {{B, K}, V}} -> case leveled_codec:is_active({Tag, B, K, null}, V, Now) of true -> leveled_log:log("B0009",[B]), @@ -524,16 +524,24 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, % no initial accumulator passed, and so should be just a list {FoldObjFun, []} end, + {LimitByCount, InitAcc0} = + case MaxObjectCount of + false -> + {false, InitAcc}; + MOC when is_integer(MOC) -> + {true, {MOC, InitAcc}} + end, Folder = fun() -> {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), - AccFun = accumulate_objects(FoldFun, - JournalSnapshot, - Tag, - DeferredFetch), - + AccFun = + accumulate_objects(FoldFun, + JournalSnapshot, + Tag, + DeferredFetch), + ListFoldFun = fun({StartKey, EndKey}, FoldAcc) -> leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot, @@ -543,9 +551,9 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, FoldAcc, SegmentList, LastModRange, - MaxObjectCount) + LimitByCount) end, - Acc = lists:foldl(ListFoldFun, InitAcc, KeyRanges), + Acc = lists:foldl(ListFoldFun, InitAcc0, KeyRanges), ok = leveled_penciller:pcl_close(LedgerSnapshot), case DeferredFetch of {true, false} -> diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 6eb1962..cf3aa01 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -79,7 +79,7 @@ fetchclocks_modifiedbetween(_Config) -> _ObjL4EndTS = testutil:convert_to_seconds(os:timestamp()), timer:sleep(1000), - _ObjL5StartTS = testutil:convert_to_seconds(os:timestamp()), + ObjL5StartTS = testutil:convert_to_seconds(os:timestamp()), ObjList5 = testutil:generate_objects(8000, {fixed_binary, 1}, [], @@ -294,6 +294,36 @@ fetchclocks_modifiedbetween(_Config) -> io:format("R6A_PlusFilter ~w~n", [R6A_PlusFilter]), true = 19000 == element(2, R6A_PlusFilter), + % Hit limit of max count before trying next bucket, with and without a + % timestamp filter + {async, R7A_MultiBucketRunner} = + leveled_bookie:book_headfold(Bookie1A, + ?RIAK_TAG, + {bucket_list, [<<"B1">>, <<"B2">>]}, + {SimpleCountFun, 0}, + false, + true, + false, + {ObjL5StartTS, ObjL6EndTS}, + 5000), + R7A_MultiBucket = R7A_MultiBucketRunner(), + io:format("R7A_MultiBucket ~w ~n", [R7A_MultiBucket]), + true = R7A_MultiBucket == {0, 5000}, + + {async, R8A_MultiBucketRunner} = + leveled_bookie:book_headfold(Bookie1A, + ?RIAK_TAG, + {bucket_list, [<<"B1">>, <<"B2">>]}, + {SimpleCountFun, 0}, + false, + true, + false, + false, + 5000), + R8A_MultiBucket = R8A_MultiBucketRunner(), + io:format("R8A_MultiBucket ~w ~n", [R8A_MultiBucket]), + true = R8A_MultiBucket == {0, 5000}, + ok = leveled_bookie:book_destroy(Bookie1A), ok = leveled_bookie:book_destroy(Bookie1B).