diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 19e31c4..e3c77ab 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -82,7 +82,8 @@ book_objectfold/5, book_objectfold/6, book_headfold/6, - book_headfold/7 + book_headfold/7, + book_headfold/9 ]). -export([empty_ledgercache/0, @@ -111,6 +112,7 @@ -define(DUMMY, dummy). % Dummy key used for mput operations -define(MAX_KEYCHECK_FREQUENCY, 100). -define(MIN_KEYCHECK_FREQUENCY, 1). +-define(OPEN_LASTMOD_RANGE, {0, infinity}). -define(OPTION_DEFAULTS, [{root_path, undefined}, {snapshot_bookie, undefined}, @@ -893,10 +895,58 @@ book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> SegmentList :: false | list(integer()), Runner :: fun(() -> Acc). book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> - RunnerType = {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, JournalCheck, SnapPreFold, SegmentList}, + RunnerType = + {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, + JournalCheck, SnapPreFold, SegmentList, false, false}, book_returnfolder(Pid, RunnerType); book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> - RunnerType = {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, JournalCheck, SnapPreFold, SegmentList}, + RunnerType = + {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, + JournalCheck, SnapPreFold, SegmentList, false, false}, + book_returnfolder(Pid, RunnerType). + +%% @doc as book_headfold/7, but with the addition of a Last Modified Date +%% Range and Max Head Count. For version 2 objects this will filter out +%% all objects with a highest Last Modified Date that is outside of the range. +%% All version 1 objects will be included in the result set regardless of Last +%% Modified Date. +%% The Max Head Count will stop the fold once the count has been reached on +%% this store only +-spec book_headfold(pid(), Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount) -> + {async, Runner} when + Tag :: leveled_codec:tag(), + Limiter :: BucketList | BucketKeyRange, + BucketList :: {bucket_list, list(Bucket)}, + BucketKeyRange :: {range, Bucket, KeyRange}, + KeyRange :: {StartKey, EndKey} | all, + StartKey :: Key, + EndKey :: Key, + FoldAccT :: {FoldFun, Acc}, + FoldFun :: fun((Bucket, Key, Value, Acc) -> Acc), + Acc :: term(), + Bucket :: term(), + Key :: term(), + Value :: term(), + JournalCheck :: boolean(), + SnapPreFold :: boolean(), + SegmentList :: false | list(integer()), + LastModRange :: false | leveled_codec:lastmod_range(), + MaxObjectCount :: false | pos_integer(), + Runner :: fun(() -> Acc). +book_headfold(Pid, Tag, {bucket_list, BucketList}, FoldAccT, JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount) -> + RunnerType = + {foldheads_bybucket, Tag, BucketList, bucket_list, FoldAccT, + JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount}, + book_returnfolder(Pid, RunnerType); +book_headfold(Pid, Tag, {range, Bucket, KeyRange}, FoldAccT, JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount) -> + RunnerType = + {foldheads_bybucket, Tag, Bucket, KeyRange, FoldAccT, + JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount}, book_returnfolder(Pid, RunnerType). -spec book_snapshot(pid(), @@ -1576,7 +1626,8 @@ get_runner(State, Tag, BucketList, bucket_list, FoldFun, - JournalCheck, SnapPreFold, SegmentList}) -> + JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount}) -> KeyRangeFun = fun(Bucket) -> {StartKey, EndKey, _} = return_ledger_keyrange(Tag, Bucket, all), @@ -1588,13 +1639,16 @@ get_runner(State, Tag, lists:map(KeyRangeFun, BucketList), FoldFun, - JournalCheck, SegmentList); + JournalCheck, + SegmentList, + LastModRange, MaxObjectCount); get_runner(State, {foldheads_bybucket, Tag, Bucket, KeyRange, FoldFun, - JournalCheck, SnapPreFold, SegmentList}) -> + JournalCheck, SnapPreFold, + SegmentList, LastModRange, MaxObjectCount}) -> {StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange), SnapType = snaptype_by_presence(JournalCheck), SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold), @@ -1602,7 +1656,9 @@ get_runner(State, Tag, [{StartKey, EndKey}], FoldFun, - JournalCheck, SegmentList); + JournalCheck, + SegmentList, + LastModRange, MaxObjectCount); get_runner(State, {foldobjects_bybucket, Tag, Bucket, KeyRange, @@ -2521,7 +2577,8 @@ folder_cache_test(CacheSize) -> "BucketA", all, FoldHeadsFun, - true, true, false}), + true, true, + false, false, false}), KeyHashList2A = HTFolder2A(), {async, HTFolder2B} = book_returnfolder(Bookie1, @@ -2530,7 +2587,8 @@ folder_cache_test(CacheSize) -> "BucketB", all, FoldHeadsFun, - true, false, false}), + true, false, + false, false, false}), KeyHashList2B = HTFolder2B(), ?assertMatch(true, @@ -2545,7 +2603,8 @@ folder_cache_test(CacheSize) -> "BucketB", {"Key", <<"$all">>}, FoldHeadsFun, - true, false, false}), + true, false, + false, false, false}), KeyHashList2C = HTFolder2C(), {async, HTFolder2D} = book_returnfolder(Bookie1, @@ -2554,7 +2613,8 @@ folder_cache_test(CacheSize) -> "BucketB", {"Key", "Keyzzzzz"}, FoldHeadsFun, - true, true, false}), + true, true, + false, false, false}), KeyHashList2D = HTFolder2D(), ?assertMatch(true, lists:usort(KeyHashList2B) == lists:usort(KeyHashList2C)), @@ -2574,7 +2634,8 @@ folder_cache_test(CacheSize) -> "BucketB", {"Key", SplitIntEnd}, FoldHeadsFun, - true, false, false}), + true, false, + false, false, false}), KeyHashList2E = HTFolder2E(), {async, HTFolder2F} = book_returnfolder(Bookie1, @@ -2583,7 +2644,8 @@ folder_cache_test(CacheSize) -> "BucketB", {SplitIntStart, "Key|"}, FoldHeadsFun, - true, false, false}), + true, false, + false, false, false}), KeyHashList2F = HTFolder2F(), ?assertMatch(true, length(KeyHashList2E) > 0), diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 5aca861..dbbb16e 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -94,6 +94,7 @@ % if the object has siblings in the store will be the maximum of those % dates integer()|undefined. +-type lastmod_range() :: {integer(), pos_integer()|infinity}. -type ledger_status() :: tomb|{active, non_neg_integer()|infinity}. @@ -140,7 +141,8 @@ index_specs/0, segment_list/0, maybe_lookup/0, - last_moddate/0]). + last_moddate/0, + lastmod_range/0]). %%%============================================================================ diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 91a0b59..64b89d8 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -177,7 +177,7 @@ pcl_fetch/4, pcl_fetchkeys/5, pcl_fetchkeys/6, - pcl_fetchkeysbysegment/6, + pcl_fetchkeysbysegment/8, pcl_fetchnextkey/5, pcl_checksequencenumber/3, pcl_workforclerk/1, @@ -237,6 +237,7 @@ -define(SNAPSHOT_TIMEOUT_SHORT, 600). -define(TIMING_SAMPLECOUNTDOWN, 10000). -define(TIMING_SAMPLESIZE, 100). +-define(OPEN_LASTMOD_RANGE, {0, infinity}). -record(state, {manifest, % a manifest record from the leveled_manifest module persisted_sqn = 0 :: integer(), % The highest SQN persisted @@ -411,7 +412,7 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - false, -1, + false, false, -1, By}, infinity). @@ -420,7 +421,9 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> leveled_codec:ledger_key(), leveled_codec:ledger_key(), fun(), any(), - leveled_codec:segment_list()) -> any(). + leveled_codec:segment_list(), + false | leveled_codec:lastmod_range(), + false | pos_integer()) -> any(). %% @doc %% Run a range query between StartKey and EndKey (inclusive). This will cover %% all keys in the range - so must only be run against snapshots of the @@ -433,12 +436,22 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc, By) -> %% Note that segment must be false unless the object Tag supports additional %% indexing by segment. This cannot be used on ?IDX_TAG and other tags that %% use the no_lookup hash -pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) -> +pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, + SegmentList, LastModRange, MaxObjectCount) -> + MaxKeys = + case MaxObjectCount of + false -> + -1; + MOC when is_integer(MOC) -> + MOC + end, gen_server:call(Pid, {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - SegmentList, -1, + SegmentList, + LastModRange, + MaxKeys, as_pcl}, infinity). @@ -455,7 +468,7 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> {fetch_keys, StartKey, EndKey, AccFun, InitAcc, - false, 1, + false, false, 1, as_pcl}, infinity). @@ -690,10 +703,17 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, - SegmentList, MaxKeys, By}, + SegmentList, LastModRange, MaxKeys, By}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> + LastModRange0 = + case LastModRange of + false -> + ?OPEN_LASTMOD_RANGE; + R -> + R + end, SW = os:timestamp(), L0AsList = case State#state.levelzero_astree of @@ -726,8 +746,7 @@ handle_call({fetch_keys, keyfolder({L0AsList, SSTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, - {SegmentList, {0, infinity}, MaxKeys}) - % TODO: Allow query to set last mod range + {SegmentList, LastModRange0, MaxKeys}) end, case By of as_pcl -> diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index c8ad66a..e3b5445 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -32,7 +32,7 @@ tictactree/5, foldheads_allkeys/5, foldobjects_allkeys/4, - foldheads_bybucket/6, + foldheads_bybucket/8, foldobjects_bybucket/4, foldobjects_byindex/3 ]). @@ -49,6 +49,7 @@ :: {fun(), any()}. -type term_regex() :: re:mp()|undefined. + %%%============================================================================ %%% External functions %%%============================================================================ @@ -399,7 +400,10 @@ foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> atom(), list({any(), any()}), fun(), - boolean(), false|list(integer())) + boolean(), + false|list(integer()), + false|leveled_codec:lastmod_range(), + false|pos_integer()) -> {async, fun()}. %% @doc %% Fold over all object metadata within a given key range in a bucket @@ -407,13 +411,16 @@ foldheads_bybucket(SnapFun, Tag, KeyRanges, FoldFun, - JournalCheck, SegmentList) -> + JournalCheck, + SegmentList, LastModRange, MaxObjectCount) -> foldobjects(SnapFun, Tag, KeyRanges, FoldFun, {true, JournalCheck}, - SegmentList). + SegmentList, + LastModRange, + MaxObjectCount). -spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}. %% @doc @@ -484,6 +491,16 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> -spec foldobjects(fun(), atom(), list(), fun(), false|{true, boolean()}, false|list(integer())) -> {async, fun()}. +foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> + foldobjects(SnapFun, Tag, KeyRanges, + FoldObjFun, DeferredFetch, SegmentList, false, false). + +-spec foldobjects(fun(), atom(), list(), fun(), + false|{true, boolean()}, + false|list(integer()), + false|leveled_codec:lastmod_range(), + false|pos_integer()) -> + {async, fun()}. %% @doc %% The object folder should be passed DeferredFetch. %% DeferredFetch can either be false (which will return to the fold function @@ -491,7 +508,8 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList, {C, L}) -> %% will be created that if understood by the fold function will allow the fold %% function to work on the head of the object, and defer fetching the body in %% case such a fetch is unecessary. -foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> +foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, + SegmentList, LastModRange, MaxObjectCount) -> {FoldFun, InitAcc} = case is_tuple(FoldObjFun) of true -> @@ -519,7 +537,9 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> EndKey, AccFun, FoldAcc, - SegmentList) + SegmentList, + LastModRange, + MaxObjectCount) end, Acc = lists:foldl(ListFoldFun, InitAcc, KeyRanges), ok = leveled_penciller:pcl_close(LedgerSnapshot),