diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 116c5b0..3e9dbcc 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -935,9 +935,28 @@ get_runner(State, SnapType = snaptype_by_presence(JournalCheck), SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold), leveled_runner:foldheads_bybucket(SnapFun, - {Tag, StartKey, EndKey}, + Tag, + [{StartKey, EndKey}], FoldFun, JournalCheck, SegmentList); +get_runner(State, + {foldheads_bybucketlist, + Tag, + BucketList, + FoldFun, + JournalCheck, SnapPreFold, SegmentList}) -> + KeyRangeFun = + fun(Bucket) -> + {StartKey, EndKey, _} = return_ledger_keyrange(Tag, Bucket, all), + {StartKey, EndKey} + end, + SnapType = snaptype_by_presence(JournalCheck), + SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold), + leveled_runner:foldheads_bybucket(SnapFun, + Tag, + lists:map(KeyRangeFun, BucketList), + FoldFun, + JournalCheck, SegmentList); get_runner(State, {foldobjects_bybucket, Tag, Bucket, KeyRange, @@ -946,7 +965,8 @@ get_runner(State, {StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange), SnapFun = return_snapfun(State, store, SnapQ, true, SnapPreFold), leveled_runner:foldobjects_bybucket(SnapFun, - {Tag, StartKey, EndKey}, + Tag, + [{StartKey, EndKey}], FoldFun); get_runner(State, {foldobjects_byindex, @@ -1007,7 +1027,7 @@ get_deprecatedrunner(State, PartitionFilter). --spec return_ledger_keyrange(atom(), any(), tuple()) -> +-spec return_ledger_keyrange(atom(), any(), tuple()|all) -> {tuple(), tuple(), tuple()|no_lookup}. %% @doc %% Convert a range of binary keys into a ledger key range, returning @@ -1038,7 +1058,6 @@ return_ledger_keyrange(Tag, Bucket, KeyRange) -> end, {StartKey, EndKey, SnapQuery}. - maybe_longrunning(SW, Aspect) -> case timer:now_diff(os:timestamp(), SW) of diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index ea06283..f2d1454 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -677,7 +677,7 @@ handle_call({fetch_keys, {AccFun, InitAcc}, {SegmentList, MaxKeys}), - {reply, Acc, State#state{levelzero_astree = L0AsList}}; + {reply, Acc, State}; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) -> diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 1d8af15..c6c768f 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -30,8 +30,8 @@ tictactree/5, foldheads_allkeys/5, foldobjects_allkeys/4, - foldheads_bybucket/5, - foldobjects_bybucket/3, + foldheads_bybucket/6, + foldobjects_bybucket/4, foldobjects_byindex/3 ]). @@ -222,9 +222,11 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), foldobjects(SnapFun, - Tag, StartKey, EndKey, + Tag, + [{StartKey, EndKey}], FoldFun, - {true, JournalCheck}, SegmentList). + {true, JournalCheck}, + SegmentList). -spec foldobjects_allkeys(fun(), atom(), fun(), key_order|sqn_order) -> {async, fun()}. @@ -234,9 +236,11 @@ foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), foldobjects(SnapFun, - Tag, StartKey, EndKey, + Tag, + [{StartKey, EndKey}], FoldFun, - false, false); + false, + false); foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> % Fold over the journal in order of receipt {FoldFun, InitAcc} = @@ -321,31 +325,37 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> {async, Folder}. --spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) -> +-spec foldobjects_bybucket(fun(), atom(), list({any(), any()}), fun()) -> {async, fun()}. %% @doc %% Fold over all objects within a given key range in a bucket -foldobjects_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun) -> +foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> foldobjects(SnapFun, - Tag, StartKey, EndKey, + Tag, + KeyRanges, FoldFun, - false, false). + false, + false). -spec foldheads_bybucket(fun(), - {atom(), any(), any()}, + atom(), + list({any(), any()}), fun(), boolean(), false|list(integer())) -> {async, fun()}. %% @doc %% Fold over all object metadata within a given key range in a bucket foldheads_bybucket(SnapFun, - {Tag, StartKey, EndKey}, + Tag, + KeyRanges, FoldFun, JournalCheck, SegmentList) -> foldobjects(SnapFun, - Tag, StartKey, EndKey, + Tag, + KeyRanges, FoldFun, - {true, JournalCheck}, SegmentList). + {true, JournalCheck}, + SegmentList). -spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}. %% @doc @@ -357,9 +367,11 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) -> EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm), foldobjects(SnapFun, - Tag, StartKey, EndKey, + Tag, + [{StartKey, EndKey}], FoldFun, - false, false). + false, + false). @@ -407,7 +419,7 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) -> end. --spec foldobjects(fun(), atom(), tuple(), tuple(), fun(), +-spec foldobjects(fun(), atom(), list(), fun(), false|{true, boolean()}, false|list(integer())) -> {async, fun()}. %% @doc @@ -417,19 +429,16 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) -> %% will be created that if understood by the fold function will allow the fold %% function to work on the head of the object, and defer fetching the body in %% case such a fetch is unecessary. -foldobjects(SnapFun, - Tag, StartKey, EndKey, - FoldObjectsFun, - DeferredFetch, SegmentList) -> +foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> {FoldFun, InitAcc} = - case is_tuple(FoldObjectsFun) of + case is_tuple(FoldObjFun) of true -> % FoldObjectsFun is already a tuple with a Fold function and an % initial accumulator - FoldObjectsFun; + FoldObjFun; false -> % no initial accumulatr passed, and so should be just a list - {FoldObjectsFun, []} + {FoldObjFun, []} end, Folder = @@ -440,12 +449,18 @@ foldobjects(SnapFun, JournalSnapshot, Tag, DeferredFetch), - Acc = leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - InitAcc, - SegmentList), + + ListFoldFun = + fun({StartKey, EndKey}, FoldAcc) -> + io:format("SK ~w EK ~w ~n", [StartKey, EndKey]), + leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + FoldAcc, + SegmentList) + end, + Acc = lists:foldl(ListFoldFun, InitAcc, KeyRanges), ok = leveled_penciller:pcl_close(LedgerSnapshot), case DeferredFetch of {true, false} -> diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 562a402..2c47610 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -9,12 +9,14 @@ -export([single_object_with2i/1, small_load_with2i/1, query_count/1, + multibucket_fold/1, rotating_objects/1]). all() -> [ single_object_with2i, small_load_with2i, query_count, + multibucket_fold, rotating_objects ]. @@ -454,7 +456,7 @@ count_termsonindex(Bucket, IdxField, Book, QType) -> lists:foldl(fun(X, Acc) -> SW = os:timestamp(), ST = integer_to_list(X), - ET = ST ++ "~", + ET = ST ++ "|", Q = {index_query, Bucket, {fun testutil:foldkeysfun/3, []}, @@ -473,6 +475,78 @@ count_termsonindex(Bucket, IdxField, Book, QType) -> 0, lists:seq(190, 221)). +multibucket_fold(_Config) -> + RootPath = testutil:reset_filestructure(), + {ok, Bookie1} = leveled_bookie:book_start(RootPath, + 2000, + 50000000, + testutil:sync_strategy()), + ObjectGen = testutil:get_compressiblevalue_andinteger(), + IndexGen = fun() -> [] end, + ObjL1 = testutil:generate_objects(13000, + uuid, + [], + ObjectGen, + IndexGen, + <<"Bucket1">>), + testutil:riakload(Bookie1, ObjL1), + ObjL2 = testutil:generate_objects(17000, + uuid, + [], + ObjectGen, + IndexGen, + <<"Bucket2">>), + testutil:riakload(Bookie1, ObjL2), + ObjL3 = testutil:generate_objects(7000, + uuid, + [], + ObjectGen, + IndexGen, + <<"Bucket3">>), + testutil:riakload(Bookie1, ObjL3), + ObjL4 = testutil:generate_objects(23000, + uuid, + [], + ObjectGen, + IndexGen, + <<"Bucket4">>), + testutil:riakload(Bookie1, ObjL4), + Q1 = {foldheads_bybucketlist, + ?RIAK_TAG, + [<<"Bucket1">>, <<"Bucket4">>], + fun(B, K, _PO, Acc) -> + [{B, K}|Acc] + end, + false, + true, + false}, + {async, R1} = leveled_bookie:book_returnfolder(Bookie1, Q1), + O1 = length(R1()), + io:format("Result R1 of length ~w~n", [O1]), + + Q2 = {foldheads_bybucketlist, + ?RIAK_TAG, + [<<"Bucket2">>, <<"Bucket3">>], + {fun(_B, _K, _PO, Acc) -> + Acc +1 + end, + 0}, + false, + true, + false}, + {async, R2} = leveled_bookie:book_returnfolder(Bookie1, Q2), + O2 = R2(), + io:format("Result R2 of ~w~n", [O2]), + + true = 36000 == O1, + true = 24000 == O2, + + ok = leveled_bookie:book_close(Bookie1), + testutil:reset_filestructure(). + + + + rotating_objects(_Config) -> RootPath = testutil:reset_filestructure(),