diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index d48d8d8..0875a56 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -520,57 +520,35 @@ book_headonly(Pid, Bucket, Key, SubKey) -> -spec book_returnfolder(pid(), tuple()) -> {async, fun()}. -%% @doc Snapshots/Clones +%% @doc Folds over store - deprecated +%% The tuple() is a query, and book_returnfolder will return an {async, Folder} +%% whereby calling Folder() will run a particular fold over a snapshot of the +%% store, and close the snapshot when complete %% -%% If there is a snapshot request (e.g. to iterate over the keys) the Bookie -%% may request a clone of the Penciller, or clones of both the Penciller and -%% the Inker should values also need to be accessed. The snapshot clone is -%% made available through a "runner" - a new trasnportable PID through which -%% the previous state of the store can be queried. So, for example, a -%% riak_kv_vnode_worker in the pool could host the runner. -%% -%% The clone is seeded with the manifest SQN. The clone should be registered -%% with the real Inker/Penciller, so that the real Inker/Penciller may prevent -%% the deletion of files still in use by a snapshot clone. -%% -%% Iterators should de-register themselves from the Penciller on completion. -%% Iterators should be automatically release after a timeout period. A file -%% can only be deleted from the Ledger if it is no longer in the manifest, and -%% there are no registered iterators from before the point the file was -%% removed from the manifest. -%% -%% Clones are simply new gen_servers with copies of the relevant -%% StateData. -%% -%% There are a series of specific folders implemented that provide pre-canned -%% snapshot functionality, more folders can be seen in the get_runner/2 -%% function: -%% -%% {bucket_stats, Bucket} -> return a key count and total object size within -%% a bucket -%% {riakbucket_stats, Bucket} -> as above, but for buckets with the Riak Tag -%% {bucket_list, Tag, {FoldKeysFun, Acc}} -> if we assume buckets and -%% keys are binaries, provides a fast bucket list function -%% {index_query, -%% Constraint, -%% {FoldKeysFun, Acc}, -%% {IdxField, StartValue, EndValue}, -%% {ReturnTerms, TermRegex}} -> secondray index query -%% {keylist, Tag, {FoldKeysFun, Acc}} -> list all keys with tag -%% {keylist, Tag, Bucket, {FoldKeysFun, Acc}} -> list all keys within given -%% bucket -%% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects -%% in a given bucket -%% {foldobjects_byindex, -%% Tag, -%% Bucket, -%% {Field, FromTerm, ToTerm}, -%% FoldObjectsFun} -> fold over all objects with an entry in a given -%% range on a given index +%% For any new application requiring a fold - use the API below instead, and +%% one of: +%% - book_indexfold +%% - book_bucketlist +%% - book_keylist +%% - book_headfold +%% - book_objectfold book_returnfolder(Pid, RunnerType) -> gen_server:call(Pid, {return_runner, RunnerType}, infinity). +%% Different runner types for async queries: +%% - book_indexfold +%% - book_bucketlist +%% - book_keylist +%% - book_headfold +%% - book_objectfold +%% +%% See individual instructions for each one. All folds can be completed early +%% by using a fold_function that throws an exception when some threshold is +%% reached - and a worker that catches that exception. +%% +%% See test/end_to_end/iterator_SUITE:breaking_folds/1 + %% @doc Builds and returns an `{async, Runner}' pair for secondary %% index queries. Calling `Runner' will fold over keys (ledger) tagged %% with the index `?IDX_TAG' and Constrain the fold to a specific diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index b1dec77..4b693c9 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -274,7 +274,7 @@ ink_close(Pid) -> ink_doom(Pid) -> gen_server:call(Pid, doom, 60000). --spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> ok. +-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> fun(). %% @doc %% Fold over the journal from a starting sequence number (MinSQN), passing %% in three functions and a snapshot of the penciller. The Fold functions @@ -307,8 +307,14 @@ ink_doom(Pid) -> %% The BatchFun is a two arity function that should take as inputs: %% An overall accumulator %% The batch accumulator built over the sub-fold +%% +%% The output of ink_fold is a folder, that may actually run the fold. The +%% type of the output of the function when called will depend on the type of +%% the accumulator ink_fold(Pid, MinSQN, FoldFuns, Acc) -> - gen_server:call(Pid, {fold, MinSQN, FoldFuns, Acc}, infinity). + gen_server:call(Pid, + {fold, MinSQN, FoldFuns, Acc, by_runner}, + infinity). -spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. %% @@ -333,7 +339,8 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> {fold, MinSQN, {FilterFun, InitAccFun, BatchFun}, - ok}, + ok, + as_ink}, infinity). -spec ink_compactjournal(pid(), pid(), integer()) -> ok. @@ -492,14 +499,22 @@ handle_call({key_check, Key, SQN}, _From, State) -> handle_call({fold, StartSQN, {FilterFun, InitAccFun, FoldFun}, - Acc}, _From, State) -> + Acc, + By}, _From, State) -> Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), - Reply = - fold_from_sequence(StartSQN, - {FilterFun, InitAccFun, FoldFun}, - Acc, - Manifest), - {reply, Reply, State}; + Folder = + fun() -> + fold_from_sequence(StartSQN, + {FilterFun, InitAccFun, FoldFun}, + Acc, + Manifest) + end, + case By of + as_ink -> + {reply, Folder(), State}; + by_runner -> + {reply, Folder, State} + end; handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, State#state.manifest_sqn}|State#state.registered_snapshots], diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 10300aa..ca94734 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -451,7 +451,7 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, {fetch_keys, StartKey, EndKey, AccFun, InitAcc0, SegmentList, LastModRange, MaxKeys, - as_pcl}, + by_runner}, infinity). -spec pcl_fetchnextkey(pid(), diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 896921e..45e3518 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -95,10 +95,22 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) -> BucketAcc = get_nextbucket(null, null, Tag, LedgerSnapshot, [], {0, MaxBuckets}), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - lists:foldl(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end, - InitAcc, - BucketAcc) + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot) + end, + FoldRunner = + fun() -> + lists:foldr(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end, + InitAcc, + BucketAcc) + % Buckets in reverse alphabetical order so foldr + end, + % For this fold, the fold over the store is actually completed + % before results are passed to the FoldBucketsFun to be + % accumulated. Using a throw to exit the fold early will not + % in this case save significant time. + wrap_runner(FoldRunner, AfterFun) end, {async, Runner}. @@ -374,14 +386,19 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> lists:foldr(ObjFun, ObjAcc, BatchAcc) end, - Acc = + InkFolder = leveled_inker:ink_fold(JournalSnapshot, - 0, - {FilterFun, InitAccFun, BatchFoldFun}, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - ok = leveled_inker:ink_close(JournalSnapshot), - Acc + 0, + {FilterFun, + InitAccFun, + BatchFoldFun}, + InitAcc), + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot), + ok = leveled_inker:ink_close(JournalSnapshot) + end, + wrap_runner(InkFolder, AfterFun) end, {async, Folder}. @@ -535,14 +552,12 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, Folder = fun() -> {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), - AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag, DeferredFetch), - - ListFoldFun = + FoldFunGen = fun({StartKey, EndKey}, FoldAcc) -> leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot, StartKey, @@ -553,15 +568,24 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, LastModRange, LimitByCount) end, - Acc = lists:foldl(ListFoldFun, InitAcc0, KeyRanges), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - case DeferredFetch of - {true, false} -> - ok; - _ -> - ok = leveled_inker:ink_close(JournalSnapshot) - end, - Acc + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot), + case DeferredFetch of + {true, false} -> + ok; + _ -> + ok = leveled_inker:ink_close(JournalSnapshot) + end + end, + ListFoldFun = + fun(KeyRange, Acc) -> + Folder = FoldFunGen(KeyRange, Acc), + Folder() + end, + FolderToWrap = + fun() -> lists:foldl(ListFoldFun, InitAcc0, KeyRanges) end, + wrap_runner(FolderToWrap, AfterFun) end, {async, Folder}. @@ -790,7 +814,6 @@ wrap_runner(FoldAction, AfterAction) -> end. - %%%============================================================================ %%% Test %%%============================================================================ diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 5d9aa28..be4b537 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -6,7 +6,8 @@ -define(KEY_ONLY, {false, undefined}). -export([all/0]). --export([single_object_with2i/1, +-export([breaking_folds/1, + single_object_with2i/1, small_load_with2i/1, query_count/1, multibucket_fold/1, @@ -14,6 +15,7 @@ rotating_objects/1]). all() -> [ + breaking_folds, single_object_with2i, small_load_with2i, query_count, @@ -23,6 +25,220 @@ all() -> [ ]. +breaking_folds(_Config) -> + % Run various iterators and show that they can be broken by throwing an + % exception from within the fold + KeyCount = 10000, + + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 10000000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjectGen = testutil:get_compressiblevalue_andinteger(), + IndexGen = testutil:get_randomindexes_generator(8), + ObjL1 = testutil:generate_objects(KeyCount, + binary_uuid, + [], + ObjectGen, + IndexGen), + testutil:riakload(Bookie1, ObjL1), + + % Find all keys index, and then same again but stop at a midpoint using a + % throw + {async, IdxFolder} = + leveled_bookie:book_indexfold(Bookie1, + list_to_binary("Bucket"), + {fun testutil:foldkeysfun/3, []}, + {"idx1_bin", "#", "|"}, + {true, undefined}), + KeyList1 = lists:reverse(IdxFolder()), + io:format("Index fold with result size ~w~n", [length(KeyList1)]), + true = KeyCount == length(KeyList1), + + + {MidTerm, MidKey} = lists:nth(KeyCount div 2, KeyList1), + + FoldKeyThrowFun = + fun(_B, {Term, Key}, Acc) -> + case {Term, Key} > {MidTerm, MidKey} of + true -> + throw({stop_fold, Acc}); + false -> + [{Term, Key}|Acc] + end + end, + {async, IdxFolderToMidK} = + leveled_bookie:book_indexfold(Bookie1, + list_to_binary("Bucket"), + {FoldKeyThrowFun, []}, + {"idx1_bin", "#", "|"}, + {true, undefined}), + CatchingFold = + fun(AsyncFolder) -> + try + AsyncFolder() + catch + throw:{stop_fold, Acc} -> + Acc + end + end, + + KeyList2 = lists:reverse(CatchingFold(IdxFolderToMidK)), + io:format("Index fold with result size ~w~n", [length(KeyList2)]), + true = KeyCount div 2 == length(KeyList2), + + + HeadFoldFun = + fun(_B, K, PO, Acc) -> + {proxy_object, _MDBin, Size, _FF} = binary_to_term(PO), + [{K, Size}|Acc] + end, + {async, HeadFolder} = + leveled_bookie:book_headfold(Bookie1, + ?RIAK_TAG, + {HeadFoldFun, []}, + true, true, false), + KeySizeList1 = lists:reverse(HeadFolder()), + io:format("Head fold with result size ~w~n", [length(KeySizeList1)]), + true = KeyCount == length(KeySizeList1), + + {MidHeadKey, _MidSize} = lists:nth(KeyCount div 2, KeySizeList1), + FoldThrowFun = + fun(FoldFun) -> + fun(B, K, PO, Acc) -> + case K > MidHeadKey of + true -> + throw({stop_fold, Acc}); + false -> + FoldFun(B, K, PO, Acc) + end + end + end, + {async, HeadFolderToMidK} = + leveled_bookie:book_headfold(Bookie1, + ?RIAK_TAG, + {FoldThrowFun(HeadFoldFun), []}, + true, true, false), + KeySizeList2 = lists:reverse(CatchingFold(HeadFolderToMidK)), + io:format("Head fold with result size ~w~n", [length(KeySizeList2)]), + true = KeyCount div 2 == length(KeySizeList2), + + ObjFoldFun = + fun(_B, K, V, Acc) -> + [{K,byte_size(V)}|Acc] + end, + {async, ObjectFolderKO} = + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {ObjFoldFun, []}, + false, + key_order), + ObjSizeList1 = lists:reverse(ObjectFolderKO()), + io:format("Obj fold with result size ~w~n", [length(ObjSizeList1)]), + true = KeyCount == length(ObjSizeList1), + + {async, ObjFolderToMidK} = + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {FoldThrowFun(ObjFoldFun), []}, + false, + key_order), + ObjSizeList2 = lists:reverse(CatchingFold(ObjFolderToMidK)), + io:format("Object fold with result size ~w~n", [length(ObjSizeList2)]), + true = KeyCount div 2 == length(ObjSizeList2), + + % Object folds which are SQN order use a different path through the code, + % so testing that here. Note that it would not make sense to have a fold + % that was terminated by reaching a point in the key range .. as results + % will not be passed to the fold function in key order + {async, ObjectFolderSO} = + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {ObjFoldFun, []}, + false, + sqn_order), + ObjSizeList1_SO = lists:reverse(ObjectFolderSO()), + io:format("Obj fold with result size ~w~n", [length(ObjSizeList1_SO)]), + true = KeyCount == length(ObjSizeList1_SO), + + % Exit fold when we've reached a thousand accumulated obects + FoldThrowThousandFun = + fun(FoldFun) -> + fun(B, K, PO, Acc) -> + case length(Acc) == 1000 of + true -> + throw({stop_fold, Acc}); + false -> + FoldFun(B, K, PO, Acc) + end + end + end, + {async, ObjFolderTo1K} = + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {FoldThrowThousandFun(ObjFoldFun), []}, + false, + sqn_order), + ObjSizeList2_SO = lists:reverse(CatchingFold(ObjFolderTo1K)), + io:format("Object fold with result size ~w~n", [length(ObjSizeList2_SO)]), + true = 1000 == length(ObjSizeList2_SO), + + ObjL2 = testutil:generate_objects(10, + binary_uuid, + [], + ObjectGen, + IndexGen, + "B2"), + ObjL3 = testutil:generate_objects(10, + binary_uuid, + [], + ObjectGen, + IndexGen, + "B3"), + ObjL4 = testutil:generate_objects(10, + binary_uuid, + [], + ObjectGen, + IndexGen, + "B4"), + testutil:riakload(Bookie1, ObjL2), + testutil:riakload(Bookie1, ObjL3), + testutil:riakload(Bookie1, ObjL4), + + FBAccT = {fun(B, Acc) -> [B|Acc] end, []}, + {async, BucketFolder} = + leveled_bookie:book_bucketlist(Bookie1, ?RIAK_TAG, FBAccT, all), + BucketList1 = lists:reverse(BucketFolder()), + io:format("bucket list with result size ~w~n", [length(BucketList1)]), + true = 4 == length(BucketList1), + + StopAt3Fun = + fun(B, Acc) -> + Acc0 = [B|Acc], + case B of + <<"B3">> -> + throw({stop_fold, Acc0}); + _ -> + Acc0 + end + end, + + {async, StopAt3BucketFolder} = + leveled_bookie:book_bucketlist(Bookie1, + ?RIAK_TAG, + {StopAt3Fun, []}, + all), + BucketListSA3 = lists:reverse(CatchingFold(StopAt3BucketFolder)), + io:format("bucket list with result ~w~n", [BucketListSA3]), + true = [<<"B2">>, <<"B3">>] == BucketListSA3, + + + ok = leveled_bookie:book_close(Bookie1), + testutil:reset_filestructure(). + + + single_object_with2i(_Config) -> % Load a single object with an integer and a binary % index and query for it @@ -566,7 +782,7 @@ multibucket_fold(_Config) -> ?RIAK_TAG, {FoldBucketsFun, []}, all), - BucketList = Folder(), + BucketList = lists:reverse(Folder()), ExpectedBucketList = [{<<"Type1">>, <<"Bucket1">>}, {<<"Type2">>, <<"Bucket4">>}, <<"Bucket2">>, <<"Bucket3">>],