From 11627bbdd9e97652c0d01d678a59374a219e8ae9 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 31 Oct 2018 00:09:24 +0000 Subject: [PATCH] Extend API To support max_keys and the last modified date range. This applies the last modified date check on all ledger folds. This is hard to avoid, but ultimately a very low cost. The limit on the number of heads to fold, is the limit based on passing to the accumulator - not on the limit being added to the accumulator. So if the FoldFun perfoms a filter (e.g. for the preflist), then those filtered results will still count towards the maximum. There needs to be someway at the end of signalling from the fold if the outcome was or was not 'constrained' by max_keys - as the fold cannot simply tel by lenght checking the outcome. Note this is used rather than length checking the buffer and throwing a 'stop_fold' message when the limit is reached. The choice is made for simplicity, and ease of testing. The throw mechanism is necessary if there is a need to stop parallel folds across the the cluster - but in this case the node_worker_pool will be used. --- src/leveled_bookie.erl | 88 +++++++++++++++++++++++++++++++++------ src/leveled_codec.erl | 4 +- src/leveled_penciller.erl | 37 ++++++++++++---- src/leveled_runner.erl | 32 +++++++++++--- 4 files changed, 132 insertions(+), 29 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 19e31c4..e3c77ab 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -82,7 +82,8 @@ book_objectfold/5, book_objectfold/6, book_headfold/6, - book_headfold/7 + book_headfold/7, + book_headfold/9 ]). -export([empty_ledgercache/0, @@ -111,6 +112,7 @@ -define(DUMMY, dummy). % Dummy key used for mput operations -define(MAX_KEYCHECK_FREQUENCY, 100). -define(MIN_KEYCHECK_FREQUENCY, 1). +-define(OPEN_LASTMOD_RANGE, {0, infinity}). -define(OPTION_DEFAULTS, [{root_path, undefined}, {snapshot_bookie, undefined}, @@ -893,10 +895,58 @@ book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> SegmentList :: false | list(integer()), Runner :: fun(() -> Acc). book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> - RunnerType = {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, JournalCheck, SnapPreFold, SegmentList}, + RunnerType = + {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, + JournalCheck, SnapPreFold, SegmentList, false, false}, book_returnfolder(Pid, RunnerType); book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> - RunnerType = {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, JournalCheck, SnapPreFold, SegmentList}, + RunnerType = + {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, + JournalCheck, SnapPreFold, SegmentList, false, false}, + book_returnfolder(Pid, RunnerType). + +%% @doc as book_headfold/7, but with the addition of a Last Modified Date +%% Range and Max Head Count. For version 2 objects this will filter out +%% all objects with a highest Last Modified Date that is outside of the range. +%% All version 1 objects will be included in the result set regardless of Last +%% Modified Date. +%% The Max Head Count will stop the fold once the count has been reached on +%% this store only +-spec book_headfold(pid(), Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount) -> + {async, Runner} when + Tag :: leveled_codec:tag(), + Limiter :: BucketList | BucketKeyRange, + BucketList :: {bucket_list, list(Bucket)}, + BucketKeyRange :: {range, Bucket, KeyRange}, + KeyRange :: {StartKey, EndKey} | all, + StartKey :: Key, + EndKey :: Key, + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Value :: term(), + JournalCheck :: boolean(), + SnapPreFold :: boolean(), + SegmentList :: false | list(integer()), + LastModRange :: false | leveled_codec:lastmod_range(), + MaxObjectCount :: false | pos_integer(), + Runner :: fun(() -> Acc). +book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount) -> + RunnerType = + {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, + JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount}, + book_returnfolder(Pid, RunnerType); +book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount) -> + RunnerType = + {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, + JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount}, book_returnfolder(Pid, RunnerType). -spec book_snapshot(pid(), @@ -1576,7 +1626,8 @@ get_runner(State, Tag, BucketList, bucket_list, FoldFun, - JournalCheck, SnapPreFold, SegmentList}) -> + JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount}) -> KeyRangeFun = fun(Bucket) -> {StartKey, EndKey, _} = return_ledger_keyrange(Tag, Bucket, all), @@ -1588,13 +1639,16 @@ get_runner(State, Tag, lists:map(KeyRangeFun, BucketList), FoldFun, - JournalCheck, SegmentList); + JournalCheck, + SegmentList, + LastModRange, MaxObjectCount); get_runner(State, {foldheads_bybucket, Tag, Bucket, KeyRange, FoldFun, - JournalCheck, SnapPreFold, SegmentList}) -> + JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount}) -> {StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange), SnapType = snaptype_by_presence(JournalCheck), SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold), @@ -1602,7 +1656,9 @@ get_runner(State, Tag, [{StartKey, EndKey}], FoldFun, - JournalCheck, SegmentList); + JournalCheck, + SegmentList, + LastModRange, MaxObjectCount); get_runner(State, {foldobjects_bybucket, Tag, Bucket, KeyRange, @@ -2521,7 +2577,8 @@ folder_cache_test(CacheSize) -> "BucketA", all, FoldHeadsFun, - true, true, false}), + true, true, + false, false, false}), KeyHashList2A = HTFolder2A(), {async, HTFolder2B} = book_returnfolder(Bookie1, @@ -2530,7 +2587,8 @@ folder_cache_test(CacheSize) -> "BucketB", all, FoldHeadsFun, - true, false, false}), + true, false, + false, false, false}), KeyHashList2B = HTFolder2B(), ?assertMatch(true, @@ -2545,7 +2603,8 @@ folder_cache_test(CacheSize) -> "BucketB", {"Key", <<"$all">>}, FoldHeadsFun, - true, false, false}), + true, false, + false, false, false}), KeyHashList2C = HTFolder2C(), {async, HTFolder2D} = book_returnfolder(Bookie1, @@ -2554,7 +2613,8 @@ folder_cache_test(CacheSize) -> "BucketB", {"Key", "Keyzzzzz"}, FoldHeadsFun, - true, true, false}), + true, true, + false, false, false}), KeyHashList2D = HTFolder2D(), ?assertMatch(true, lists:usort(KeyHashList2B) == lists:usort(KeyHashList2C)), @@ -2574,7 +2634,8 @@ folder_cache_test(CacheSize) -> "BucketB", {"Key", SplitIntEnd}, FoldHeadsFun, - true, false, false}), + true, false, + false, false, false}), KeyHashList2E = HTFolder2E(), {async, HTFolder2F} = book_returnfolder(Bookie1, @@ -2583,7 +2644,8 @@ folder_cache_test(CacheSize) -> "BucketB", {SplitIntStart, "Key|"}, FoldHeadsFun, - true, false, false}), + true, false, + false, false, false}), KeyHashList2F = HTFolder2F(), ?assertMatch(true, length(KeyHashList2E) > 0), diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 5aca861..dbbb16e 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -94,6 +94,7 @@ % if the object has siblings in the store will be the maximum of those % dates integer()|undefined. +-type lastmod_range() :: {integer(), pos_integer()|infinity}. -type ledger_status() :: tomb|{active, non_neg_integer()|infinity}. @@ -140,7 +141,8 @@ index_specs/0, segment_list/0, maybe_lookup/0, - last_moddate/0]). + last_moddate/0, + lastmod_range/0]). %%%============================================================================ diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 91a0b59..64b89d8 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -177,7 +177,7 @@ pcl_fetch/4, pcl_fetchkeys/5, pcl_fetchkeys/6, - pcl_fetchkeysbysegment/6, + pcl_fetchkeysbysegment/8, pcl_fetchnextkey/5, pcl_checksequencenumber/3, pcl_workforclerk/1, @@ -237,6 +237,7 @@ -define(SNAPSHOT_TIMEOUT_SHORT, 600). -define(TIMING_SAMPLECOUNTDOWN, 10000). -define(TIMING_SAMPLESIZE, 100). +-define(OPEN_LASTMOD_RANGE, {0, infinity}). -record(state, {manifest, % a manifest record from the leveled_manifest module persisted_sqn = 0 :: integer(), % The highest SQN persisted @@ -411,7 +412,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - false, -1, + false, false, -1, By}, infinity). @@ -420,7 +421,9 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> leveled_codec:ledger_key(), leveled_codec:ledger_key(), fun(), any(), - leveled_codec:segment_list()) -> any(). + leveled_codec:segment_list(), + false | leveled_codec:lastmod_range(), + false | pos_integer()) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This will cover %% all keys in the range - so must only be run against snapshots of the @@ -433,12 +436,22 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> %% Note that segment must be false unless the object Tag supports additional %% indexing by segment. This cannot be used on ?IDX_TAG and other tags that %% use the no_lookup hash -pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) -> +pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, + SegmentList, LastModRange, MaxObjectCount) -> + MaxKeys = + case MaxObjectCount of + false -> + -1; + MOC when is_integer(MOC) -> + MOC + end, gen_server:call(Pid, {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - SegmentList, -1, + SegmentList, + LastModRange, + MaxKeys, as_pcl}, infinity). @@ -455,7 +468,7 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - false, 1, + false, false, 1, as_pcl}, infinity). @@ -690,10 +703,17 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, - SegmentList, MaxKeys, By}, + SegmentList, LastModRange, MaxKeys, By}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> + LastModRange0 = + case LastModRange of + false -> + ?OPEN_LASTMOD_RANGE; + R -> + R + end, SW = os:timestamp(), L0AsList = case State#state.levelzero_astree of @@ -726,8 +746,7 @@ handle_call({fetch_keys, keyfolder({L0AsList, SSTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, - {SegmentList, {0, infinity}, MaxKeys}) - % TODO: Allow query to set last mod range + {SegmentList, LastModRange0, MaxKeys}) end, case By of as_pcl -> diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index c8ad66a..e3b5445 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -32,7 +32,7 @@ tictactree/5, foldheads_allkeys/5, foldobjects_allkeys/4, - foldheads_bybucket/6, + foldheads_bybucket/8, foldobjects_bybucket/4, foldobjects_byindex/3 ]). @@ -49,6 +49,7 @@ :: {fun(), any()}. -type term_regex() :: re:mp()|undefined. + %%%============================================================================ %%% External functions %%%============================================================================ @@ -399,7 +400,10 @@ foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> atom(), list({any(), any()}), fun(), - boolean(), false|list(integer())) + boolean(), + false|list(integer()), + false|leveled_codec:lastmod_range(), + false|pos_integer()) -> {async, fun()}. %% @doc %% Fold over all object metadata within a given key range in a bucket @@ -407,13 +411,16 @@ foldheads_bybucket(SnapFun, Tag, KeyRanges, FoldFun, - JournalCheck, SegmentList) -> + JournalCheck, + SegmentList, LastModRange, MaxObjectCount) -> foldobjects(SnapFun, Tag, KeyRanges, FoldFun, {true, JournalCheck}, - SegmentList). + SegmentList, + LastModRange, + MaxObjectCount). -spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}. %% @doc @@ -484,6 +491,16 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> -spec foldobjects(fun(), atom(), list(), fun(), false|{true, boolean()}, false|list(integer())) -> {async, fun()}. +foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> + foldobjects(SnapFun, Tag, KeyRanges, + FoldObjFun, DeferredFetch, SegmentList, false, false). + +-spec foldobjects(fun(), atom(), list(), fun(), + false|{true, boolean()}, + false|list(integer()), + false|leveled_codec:lastmod_range(), + false|pos_integer()) -> + {async, fun()}. %% @doc %% The object folder should be passed DeferredFetch. %% DeferredFetch can either be false (which will return to the fold function @@ -491,7 +508,8 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> %% will be created that if understood by the fold function will allow the fold %% function to work on the head of the object, and defer fetching the body in %% case such a fetch is unecessary. -foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> +foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, + SegmentList, LastModRange, MaxObjectCount) -> {FoldFun, InitAcc} = case is_tuple(FoldObjFun) of true -> @@ -519,7 +537,9 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> EndKey, AccFun, FoldAcc, - SegmentList) + SegmentList, + LastModRange, + MaxObjectCount) end, Acc = lists:foldl(ListFoldFun, InitAcc, KeyRanges), ok = leveled_penciller:pcl_close(LedgerSnapshot),