Extend API

To support max_keys and the last modified date range.

This applies the last modified date check on all ledger folds.  This is hard to avoid, but ultimately a very low cost.

The limit on the number of heads to fold, is the limit based on passing to the accumulator - not on the limit being added to the accumulator.  So if the FoldFun perfoms a filter (e.g. for the preflist), then those filtered results will still count towards the maximum.

There needs to be someway at the end of signalling from the fold if the outcome was  or was not 'constrained' by max_keys - as the fold cannot simply tel by lenght checking the outcome.

Note this is used rather than length checking the buffer and throwing a 'stop_fold' message when the limit is reached.  The choice is made for simplicity, and ease of testing.  The throw mechanism is necessary if there is a need to stop parallel folds across the the cluster - but in this case the node_worker_pool will be used.
This commit is contained in:
Martin Sumner 2018-10-31 00:09:24 +00:00
parent 1f976948a1
commit 11627bbdd9
4 changed files with 132 additions and 29 deletions

View file

@ -82,7 +82,8 @@
book_objectfold/5,
book_objectfold/6,
book_headfold/6,
book_headfold/7
book_headfold/7,
book_headfold/9
]).
-export([empty_ledgercache/0,
@ -111,6 +112,7 @@
-define(DUMMY, dummy). % Dummy key used for mput operations
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(OPTION_DEFAULTS,
[{root_path, undefined},
{snapshot_bookie, undefined},
@ -893,10 +895,58 @@ book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
SegmentList :: false | list(integer()),
Runner :: fun(() -> Acc).
book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
RunnerType = {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, JournalCheck, SnapPreFold, SegmentList},
RunnerType =
{foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT,
JournalCheck, SnapPreFold, SegmentList, false, false},
book_returnfolder(Pid, RunnerType);
book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) ->
RunnerType = {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, JournalCheck, SnapPreFold, SegmentList},
RunnerType =
{foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT,
JournalCheck, SnapPreFold, SegmentList, false, false},
book_returnfolder(Pid, RunnerType).
%% @doc as book_headfold/7, but with the addition of a Last Modified Date
%% Range and Max Head Count. For version 2 objects this will filter out
%% all objects with a highest Last Modified Date that is outside of the range.
%% All version 1 objects will be included in the result set regardless of Last
%% Modified Date.
%% The Max Head Count will stop the fold once the count has been reached on
%% this store only
-spec book_headfold(pid(), Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
{async, Runner} when
Tag :: leveled_codec:tag(),
Limiter :: BucketList | BucketKeyRange,
BucketList :: {bucket_list, list(Bucket)},
BucketKeyRange :: {range, Bucket, KeyRange},
KeyRange :: {StartKey, EndKey} | all,
StartKey :: Key,
EndKey :: Key,
FoldAccT :: {FoldFun, Acc},
FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc),
Acc :: term(),
Bucket :: term(),
Key :: term(),
Value :: term(),
JournalCheck :: boolean(),
SnapPreFold :: boolean(),
SegmentList :: false | list(integer()),
LastModRange :: false | leveled_codec:lastmod_range(),
MaxObjectCount :: false | pos_integer(),
Runner :: fun(() -> Acc).
book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
RunnerType =
{foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount},
book_returnfolder(Pid, RunnerType);
book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount) ->
RunnerType =
{foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount},
book_returnfolder(Pid, RunnerType).
-spec book_snapshot(pid(),
@ -1576,7 +1626,8 @@ get_runner(State,
Tag,
BucketList, bucket_list,
FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount}) ->
KeyRangeFun =
fun(Bucket) ->
{StartKey, EndKey, _} = return_ledger_keyrange(Tag, Bucket, all),
@ -1588,13 +1639,16 @@ get_runner(State,
Tag,
lists:map(KeyRangeFun, BucketList),
FoldFun,
JournalCheck, SegmentList);
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
get_runner(State,
{foldheads_bybucket,
Tag,
Bucket, KeyRange,
FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount}) ->
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold),
@ -1602,7 +1656,9 @@ get_runner(State,
Tag,
[{StartKey, EndKey}],
FoldFun,
JournalCheck, SegmentList);
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
get_runner(State,
{foldobjects_bybucket,
Tag, Bucket, KeyRange,
@ -2521,7 +2577,8 @@ folder_cache_test(CacheSize) ->
"BucketA",
all,
FoldHeadsFun,
true, true, false}),
true, true,
false, false, false}),
KeyHashList2A = HTFolder2A(),
{async, HTFolder2B} =
book_returnfolder(Bookie1,
@ -2530,7 +2587,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
all,
FoldHeadsFun,
true, false, false}),
true, false,
false, false, false}),
KeyHashList2B = HTFolder2B(),
?assertMatch(true,
@ -2545,7 +2603,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
{"Key", <<"$all">>},
FoldHeadsFun,
true, false, false}),
true, false,
false, false, false}),
KeyHashList2C = HTFolder2C(),
{async, HTFolder2D} =
book_returnfolder(Bookie1,
@ -2554,7 +2613,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
{"Key", "Keyzzzzz"},
FoldHeadsFun,
true, true, false}),
true, true,
false, false, false}),
KeyHashList2D = HTFolder2D(),
?assertMatch(true,
lists:usort(KeyHashList2B) == lists:usort(KeyHashList2C)),
@ -2574,7 +2634,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
{"Key", SplitIntEnd},
FoldHeadsFun,
true, false, false}),
true, false,
false, false, false}),
KeyHashList2E = HTFolder2E(),
{async, HTFolder2F} =
book_returnfolder(Bookie1,
@ -2583,7 +2644,8 @@ folder_cache_test(CacheSize) ->
"BucketB",
{SplitIntStart, "Key|"},
FoldHeadsFun,
true, false, false}),
true, false,
false, false, false}),
KeyHashList2F = HTFolder2F(),
?assertMatch(true, length(KeyHashList2E) > 0),