From ef2a8c62afa5fb1cde8c240174610566365d03ab Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 23 Nov 2018 16:00:11 +0000 Subject: [PATCH] Add capability to exit a head or object fold with a throw This allows for all fold functions to throw an exception to exit out of a fold with all dependencies still closed down as expected. This was previously available for key folds, which was necessary for the folds to work in Riak (as max_results in index queries depends one xiting the fold with an exception). This change now adds a ct test, and adds support for head folds, object folds (key order) and object folds (sqn order) --- src/leveled_inker.erl | 35 ++++-- src/leveled_penciller.erl | 2 +- src/leveled_runner.erl | 51 +++++---- test/end_to_end/iterator_SUITE.erl | 168 ++++++++++++++++++++++++++++- 4 files changed, 224 insertions(+), 32 deletions(-) diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index b1dec77..4b693c9 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -274,7 +274,7 @@ ink_close(Pid) -> ink_doom(Pid) -> gen_server:call(Pid, doom, 60000). --spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> ok. +-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> fun(). %% @doc %% Fold over the journal from a starting sequence number (MinSQN), passing %% in three functions and a snapshot of the penciller. The Fold functions @@ -307,8 +307,14 @@ ink_doom(Pid) -> %% The BatchFun is a two arity function that should take as inputs: %% An overall accumulator %% The batch accumulator built over the sub-fold +%% +%% The output of ink_fold is a folder, that may actually run the fold. The +%% type of the output of the function when called will depend on the type of +%% the accumulator ink_fold(Pid, MinSQN, FoldFuns, Acc) -> - gen_server:call(Pid, {fold, MinSQN, FoldFuns, Acc}, infinity). + gen_server:call(Pid, + {fold, MinSQN, FoldFuns, Acc, by_runner}, + infinity). -spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. %% @@ -333,7 +339,8 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> {fold, MinSQN, {FilterFun, InitAccFun, BatchFun}, - ok}, + ok, + as_ink}, infinity). -spec ink_compactjournal(pid(), pid(), integer()) -> ok. @@ -492,14 +499,22 @@ handle_call({key_check, Key, SQN}, _From, State) -> handle_call({fold, StartSQN, {FilterFun, InitAccFun, FoldFun}, - Acc}, _From, State) -> + Acc, + By}, _From, State) -> Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), - Reply = - fold_from_sequence(StartSQN, - {FilterFun, InitAccFun, FoldFun}, - Acc, - Manifest), - {reply, Reply, State}; + Folder = + fun() -> + fold_from_sequence(StartSQN, + {FilterFun, InitAccFun, FoldFun}, + Acc, + Manifest) + end, + case By of + as_ink -> + {reply, Folder(), State}; + by_runner -> + {reply, Folder, State} + end; handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, State#state.manifest_sqn}|State#state.registered_snapshots], diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 10300aa..ca94734 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -451,7 +451,7 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, {fetch_keys, StartKey, EndKey, AccFun, InitAcc0, SegmentList, LastModRange, MaxKeys, - as_pcl}, + by_runner}, infinity). -spec pcl_fetchnextkey(pid(), diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 896921e..9351086 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -374,14 +374,19 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> lists:foldr(ObjFun, ObjAcc, BatchAcc) end, - Acc = + InkFolder = leveled_inker:ink_fold(JournalSnapshot, - 0, - {FilterFun, InitAccFun, BatchFoldFun}, - InitAcc), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - ok = leveled_inker:ink_close(JournalSnapshot), - Acc + 0, + {FilterFun, + InitAccFun, + BatchFoldFun}, + InitAcc), + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot), + ok = leveled_inker:ink_close(JournalSnapshot) + end, + wrap_runner(InkFolder, AfterFun) end, {async, Folder}. @@ -535,14 +540,12 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, Folder = fun() -> {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), - AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag, DeferredFetch), - - ListFoldFun = + FoldFunGen = fun({StartKey, EndKey}, FoldAcc) -> leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot, StartKey, @@ -553,15 +556,24 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, LastModRange, LimitByCount) end, - Acc = lists:foldl(ListFoldFun, InitAcc0, KeyRanges), - ok = leveled_penciller:pcl_close(LedgerSnapshot), - case DeferredFetch of - {true, false} -> - ok; - _ -> - ok = leveled_inker:ink_close(JournalSnapshot) - end, - Acc + AfterFun = + fun() -> + ok = leveled_penciller:pcl_close(LedgerSnapshot), + case DeferredFetch of + {true, false} -> + ok; + _ -> + ok = leveled_inker:ink_close(JournalSnapshot) + end + end, + ListFoldFun = + fun(KeyRange, Acc) -> + Folder = FoldFunGen(KeyRange, Acc), + Folder() + end, + FolderToWrap = + fun() -> lists:foldl(ListFoldFun, InitAcc0, KeyRanges) end, + wrap_runner(FolderToWrap, AfterFun) end, {async, Folder}. @@ -790,7 +802,6 @@ wrap_runner(FoldAction, AfterAction) -> end. - %%%============================================================================ %%% Test %%%============================================================================ diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 5d9aa28..04c5c46 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -6,7 +6,8 @@ -define(KEY_ONLY, {false, undefined}). -export([all/0]). --export([single_object_with2i/1, +-export([breaking_folds/1, + single_object_with2i/1, small_load_with2i/1, query_count/1, multibucket_fold/1, @@ -14,6 +15,7 @@ rotating_objects/1]). all() -> [ + breaking_folds, single_object_with2i, small_load_with2i, query_count, @@ -23,6 +25,170 @@ all() -> [ ]. +breaking_folds(_Config) -> + % Run various iterators and show that they can be broken by throwing an + % exception from within the fold + KeyCount = 10000, + + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 10000000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjectGen = testutil:get_compressiblevalue_andinteger(), + IndexGen = testutil:get_randomindexes_generator(8), + ObjL1 = testutil:generate_objects(KeyCount, + binary_uuid, + [], + ObjectGen, + IndexGen), + testutil:riakload(Bookie1, ObjL1), + + % Find all keys index, and then same again but stop at a midpoint using a + % throw + {async, IdxFolder} = + leveled_bookie:book_indexfold(Bookie1, + list_to_binary("Bucket"), + {fun testutil:foldkeysfun/3, []}, + {"idx1_bin", "#", "|"}, + {true, undefined}), + KeyList1 = lists:reverse(IdxFolder()), + io:format("Index fold with result size ~w~n", [length(KeyList1)]), + true = KeyCount == length(KeyList1), + + + {MidTerm, MidKey} = lists:nth(KeyCount div 2, KeyList1), + + FoldKeyThrowFun = + fun(_B, {Term, Key}, Acc) -> + case {Term, Key} > {MidTerm, MidKey} of + true -> + throw({stop_fold, Acc}); + false -> + [{Term, Key}|Acc] + end + end, + {async, IdxFolderToMidK} = + leveled_bookie:book_indexfold(Bookie1, + list_to_binary("Bucket"), + {FoldKeyThrowFun, []}, + {"idx1_bin", "#", "|"}, + {true, undefined}), + CatchingFold = + fun(AsyncFolder) -> + try + AsyncFolder() + catch + throw:{stop_fold, Acc} -> + Acc + end + end, + + KeyList2 = lists:reverse(CatchingFold(IdxFolderToMidK)), + io:format("Index fold with result size ~w~n", [length(KeyList2)]), + true = KeyCount div 2 == length(KeyList2), + + + HeadFoldFun = + fun(_B, K, PO, Acc) -> + {proxy_object, _MDBin, Size, _FF} = binary_to_term(PO), + [{K, Size}|Acc] + end, + {async, HeadFolder} = + leveled_bookie:book_headfold(Bookie1, + ?RIAK_TAG, + {HeadFoldFun, []}, + true, true, false), + KeySizeList1 = lists:reverse(HeadFolder()), + io:format("Head fold with result size ~w~n", [length(KeySizeList1)]), + true = KeyCount == length(KeySizeList1), + + {MidHeadKey, _MidSize} = lists:nth(KeyCount div 2, KeySizeList1), + FoldThrowFun = + fun(FoldFun) -> + fun(B, K, PO, Acc) -> + case K > MidHeadKey of + true -> + throw({stop_fold, Acc}); + false -> + FoldFun(B, K, PO, Acc) + end + end + end, + {async, HeadFolderToMidK} = + leveled_bookie:book_headfold(Bookie1, + ?RIAK_TAG, + {FoldThrowFun(HeadFoldFun), []}, + true, true, false), + KeySizeList2 = lists:reverse(CatchingFold(HeadFolderToMidK)), + io:format("Head fold with result size ~w~n", [length(KeySizeList2)]), + true = KeyCount div 2 == length(KeySizeList2), + + ObjFoldFun = + fun(_B, K, V, Acc) -> + [{K,byte_size(V)}|Acc] + end, + {async, ObjectFolderKO} = + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {ObjFoldFun, []}, + false, + key_order), + ObjSizeList1 = lists:reverse(ObjectFolderKO()), + io:format("Obj fold with result size ~w~n", [length(ObjSizeList1)]), + true = KeyCount == length(ObjSizeList1), + + {async, ObjFolderToMidK} = + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {FoldThrowFun(ObjFoldFun), []}, + false, + key_order), + ObjSizeList2 = lists:reverse(CatchingFold(ObjFolderToMidK)), + io:format("Object fold with result size ~w~n", [length(ObjSizeList2)]), + true = KeyCount div 2 == length(ObjSizeList2), + + % Object folds which are SQN order use a different path through the code, + % so testing that here. Note that it would not make sense to have a fold + % that was terminated by reaching a point in the key range .. as results + % will not be passed to the fold function in key order + {async, ObjectFolderSO} = + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {ObjFoldFun, []}, + false, + sqn_order), + ObjSizeList1_SO = lists:reverse(ObjectFolderSO()), + io:format("Obj fold with result size ~w~n", [length(ObjSizeList1_SO)]), + true = KeyCount == length(ObjSizeList1_SO), + + % Exit fold when we've reached a thousand accumulated obects + FoldThrowThousandFun = + fun(FoldFun) -> + fun(B, K, PO, Acc) -> + case length(Acc) == 1000 of + true -> + throw({stop_fold, Acc}); + false -> + FoldFun(B, K, PO, Acc) + end + end + end, + {async, ObjFolderTo1K} = + leveled_bookie:book_objectfold(Bookie1, + ?RIAK_TAG, + {FoldThrowThousandFun(ObjFoldFun), []}, + false, + sqn_order), + ObjSizeList2_SO = lists:reverse(CatchingFold(ObjFolderTo1K)), + io:format("Object fold with result size ~w~n", [length(ObjSizeList2_SO)]), + true = 1000 == length(ObjSizeList2_SO), + + ok = leveled_bookie:book_close(Bookie1), + testutil:reset_filestructure(). + + + single_object_with2i(_Config) -> % Load a single object with an integer and a binary % index and query for it