Add capability to exit a head or object fold with a throw
This allows for all fold functions to throw an exception to exit out of a fold with all dependencies still closed down as expected. This was previously available for key folds, which was necessary for the folds to work in Riak (as max_results in index queries depends one xiting the fold with an exception). This change now adds a ct test, and adds support for head folds, object folds (key order) and object folds (sqn order)
This commit is contained in:
parent
da5abb4e4c
commit
ef2a8c62af
4 changed files with 224 additions and 32 deletions
|
@ -274,7 +274,7 @@ ink_close(Pid) ->
|
|||
ink_doom(Pid) ->
|
||||
gen_server:call(Pid, doom, 60000).
|
||||
|
||||
-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> ok.
|
||||
-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> fun().
|
||||
%% @doc
|
||||
%% Fold over the journal from a starting sequence number (MinSQN), passing
|
||||
%% in three functions and a snapshot of the penciller. The Fold functions
|
||||
|
@ -307,8 +307,14 @@ ink_doom(Pid) ->
|
|||
%% The BatchFun is a two arity function that should take as inputs:
|
||||
%% An overall accumulator
|
||||
%% The batch accumulator built over the sub-fold
|
||||
%%
|
||||
%% The output of ink_fold is a folder, that may actually run the fold. The
|
||||
%% type of the output of the function when called will depend on the type of
|
||||
%% the accumulator
|
||||
ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
|
||||
gen_server:call(Pid, {fold, MinSQN, FoldFuns, Acc}, infinity).
|
||||
gen_server:call(Pid,
|
||||
{fold, MinSQN, FoldFuns, Acc, by_runner},
|
||||
infinity).
|
||||
|
||||
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
|
||||
%%
|
||||
|
@ -333,7 +339,8 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
|||
{fold,
|
||||
MinSQN,
|
||||
{FilterFun, InitAccFun, BatchFun},
|
||||
ok},
|
||||
ok,
|
||||
as_ink},
|
||||
infinity).
|
||||
|
||||
-spec ink_compactjournal(pid(), pid(), integer()) -> ok.
|
||||
|
@ -492,14 +499,22 @@ handle_call({key_check, Key, SQN}, _From, State) ->
|
|||
handle_call({fold,
|
||||
StartSQN,
|
||||
{FilterFun, InitAccFun, FoldFun},
|
||||
Acc}, _From, State) ->
|
||||
Acc,
|
||||
By}, _From, State) ->
|
||||
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
|
||||
Reply =
|
||||
Folder =
|
||||
fun() ->
|
||||
fold_from_sequence(StartSQN,
|
||||
{FilterFun, InitAccFun, FoldFun},
|
||||
Acc,
|
||||
Manifest),
|
||||
{reply, Reply, State};
|
||||
Manifest)
|
||||
end,
|
||||
case By of
|
||||
as_ink ->
|
||||
{reply, Folder(), State};
|
||||
by_runner ->
|
||||
{reply, Folder, State}
|
||||
end;
|
||||
handle_call({register_snapshot, Requestor}, _From , State) ->
|
||||
Rs = [{Requestor,
|
||||
State#state.manifest_sqn}|State#state.registered_snapshots],
|
||||
|
|
|
@ -451,7 +451,7 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc,
|
|||
{fetch_keys,
|
||||
StartKey, EndKey, AccFun, InitAcc0,
|
||||
SegmentList, LastModRange, MaxKeys,
|
||||
as_pcl},
|
||||
by_runner},
|
||||
infinity).
|
||||
|
||||
-spec pcl_fetchnextkey(pid(),
|
||||
|
|
|
@ -374,14 +374,19 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
|||
lists:foldr(ObjFun, ObjAcc, BatchAcc)
|
||||
end,
|
||||
|
||||
Acc =
|
||||
InkFolder =
|
||||
leveled_inker:ink_fold(JournalSnapshot,
|
||||
0,
|
||||
{FilterFun, InitAccFun, BatchFoldFun},
|
||||
{FilterFun,
|
||||
InitAccFun,
|
||||
BatchFoldFun},
|
||||
InitAcc),
|
||||
AfterFun =
|
||||
fun() ->
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
ok = leveled_inker:ink_close(JournalSnapshot),
|
||||
Acc
|
||||
ok = leveled_inker:ink_close(JournalSnapshot)
|
||||
end,
|
||||
wrap_runner(InkFolder, AfterFun)
|
||||
end,
|
||||
{async, Folder}.
|
||||
|
||||
|
@ -535,14 +540,12 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
|
|||
Folder =
|
||||
fun() ->
|
||||
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
|
||||
|
||||
AccFun =
|
||||
accumulate_objects(FoldFun,
|
||||
JournalSnapshot,
|
||||
Tag,
|
||||
DeferredFetch),
|
||||
|
||||
ListFoldFun =
|
||||
FoldFunGen =
|
||||
fun({StartKey, EndKey}, FoldAcc) ->
|
||||
leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
|
||||
StartKey,
|
||||
|
@ -553,15 +556,24 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
|
|||
LastModRange,
|
||||
LimitByCount)
|
||||
end,
|
||||
Acc = lists:foldl(ListFoldFun, InitAcc0, KeyRanges),
|
||||
AfterFun =
|
||||
fun() ->
|
||||
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||
case DeferredFetch of
|
||||
{true, false} ->
|
||||
ok;
|
||||
_ ->
|
||||
ok = leveled_inker:ink_close(JournalSnapshot)
|
||||
end
|
||||
end,
|
||||
Acc
|
||||
ListFoldFun =
|
||||
fun(KeyRange, Acc) ->
|
||||
Folder = FoldFunGen(KeyRange, Acc),
|
||||
Folder()
|
||||
end,
|
||||
FolderToWrap =
|
||||
fun() -> lists:foldl(ListFoldFun, InitAcc0, KeyRanges) end,
|
||||
wrap_runner(FolderToWrap, AfterFun)
|
||||
end,
|
||||
{async, Folder}.
|
||||
|
||||
|
@ -790,7 +802,6 @@ wrap_runner(FoldAction, AfterAction) ->
|
|||
end.
|
||||
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
%%%============================================================================
|
||||
|
|
|
@ -6,7 +6,8 @@
|
|||
-define(KEY_ONLY, {false, undefined}).
|
||||
|
||||
-export([all/0]).
|
||||
-export([single_object_with2i/1,
|
||||
-export([breaking_folds/1,
|
||||
single_object_with2i/1,
|
||||
small_load_with2i/1,
|
||||
query_count/1,
|
||||
multibucket_fold/1,
|
||||
|
@ -14,6 +15,7 @@
|
|||
rotating_objects/1]).
|
||||
|
||||
all() -> [
|
||||
breaking_folds,
|
||||
single_object_with2i,
|
||||
small_load_with2i,
|
||||
query_count,
|
||||
|
@ -23,6 +25,170 @@ all() -> [
|
|||
].
|
||||
|
||||
|
||||
breaking_folds(_Config) ->
|
||||
% Run various iterators and show that they can be broken by throwing an
|
||||
% exception from within the fold
|
||||
KeyCount = 10000,
|
||||
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
StartOpts1 = [{root_path, RootPath},
|
||||
{max_journalsize, 10000000},
|
||||
{sync_strategy, testutil:sync_strategy()}],
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
ObjectGen = testutil:get_compressiblevalue_andinteger(),
|
||||
IndexGen = testutil:get_randomindexes_generator(8),
|
||||
ObjL1 = testutil:generate_objects(KeyCount,
|
||||
binary_uuid,
|
||||
[],
|
||||
ObjectGen,
|
||||
IndexGen),
|
||||
testutil:riakload(Bookie1, ObjL1),
|
||||
|
||||
% Find all keys index, and then same again but stop at a midpoint using a
|
||||
% throw
|
||||
{async, IdxFolder} =
|
||||
leveled_bookie:book_indexfold(Bookie1,
|
||||
list_to_binary("Bucket"),
|
||||
{fun testutil:foldkeysfun/3, []},
|
||||
{"idx1_bin", "#", "|"},
|
||||
{true, undefined}),
|
||||
KeyList1 = lists:reverse(IdxFolder()),
|
||||
io:format("Index fold with result size ~w~n", [length(KeyList1)]),
|
||||
true = KeyCount == length(KeyList1),
|
||||
|
||||
|
||||
{MidTerm, MidKey} = lists:nth(KeyCount div 2, KeyList1),
|
||||
|
||||
FoldKeyThrowFun =
|
||||
fun(_B, {Term, Key}, Acc) ->
|
||||
case {Term, Key} > {MidTerm, MidKey} of
|
||||
true ->
|
||||
throw({stop_fold, Acc});
|
||||
false ->
|
||||
[{Term, Key}|Acc]
|
||||
end
|
||||
end,
|
||||
{async, IdxFolderToMidK} =
|
||||
leveled_bookie:book_indexfold(Bookie1,
|
||||
list_to_binary("Bucket"),
|
||||
{FoldKeyThrowFun, []},
|
||||
{"idx1_bin", "#", "|"},
|
||||
{true, undefined}),
|
||||
CatchingFold =
|
||||
fun(AsyncFolder) ->
|
||||
try
|
||||
AsyncFolder()
|
||||
catch
|
||||
throw:{stop_fold, Acc} ->
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
|
||||
KeyList2 = lists:reverse(CatchingFold(IdxFolderToMidK)),
|
||||
io:format("Index fold with result size ~w~n", [length(KeyList2)]),
|
||||
true = KeyCount div 2 == length(KeyList2),
|
||||
|
||||
|
||||
HeadFoldFun =
|
||||
fun(_B, K, PO, Acc) ->
|
||||
{proxy_object, _MDBin, Size, _FF} = binary_to_term(PO),
|
||||
[{K, Size}|Acc]
|
||||
end,
|
||||
{async, HeadFolder} =
|
||||
leveled_bookie:book_headfold(Bookie1,
|
||||
?RIAK_TAG,
|
||||
{HeadFoldFun, []},
|
||||
true, true, false),
|
||||
KeySizeList1 = lists:reverse(HeadFolder()),
|
||||
io:format("Head fold with result size ~w~n", [length(KeySizeList1)]),
|
||||
true = KeyCount == length(KeySizeList1),
|
||||
|
||||
{MidHeadKey, _MidSize} = lists:nth(KeyCount div 2, KeySizeList1),
|
||||
FoldThrowFun =
|
||||
fun(FoldFun) ->
|
||||
fun(B, K, PO, Acc) ->
|
||||
case K > MidHeadKey of
|
||||
true ->
|
||||
throw({stop_fold, Acc});
|
||||
false ->
|
||||
FoldFun(B, K, PO, Acc)
|
||||
end
|
||||
end
|
||||
end,
|
||||
{async, HeadFolderToMidK} =
|
||||
leveled_bookie:book_headfold(Bookie1,
|
||||
?RIAK_TAG,
|
||||
{FoldThrowFun(HeadFoldFun), []},
|
||||
true, true, false),
|
||||
KeySizeList2 = lists:reverse(CatchingFold(HeadFolderToMidK)),
|
||||
io:format("Head fold with result size ~w~n", [length(KeySizeList2)]),
|
||||
true = KeyCount div 2 == length(KeySizeList2),
|
||||
|
||||
ObjFoldFun =
|
||||
fun(_B, K, V, Acc) ->
|
||||
[{K,byte_size(V)}|Acc]
|
||||
end,
|
||||
{async, ObjectFolderKO} =
|
||||
leveled_bookie:book_objectfold(Bookie1,
|
||||
?RIAK_TAG,
|
||||
{ObjFoldFun, []},
|
||||
false,
|
||||
key_order),
|
||||
ObjSizeList1 = lists:reverse(ObjectFolderKO()),
|
||||
io:format("Obj fold with result size ~w~n", [length(ObjSizeList1)]),
|
||||
true = KeyCount == length(ObjSizeList1),
|
||||
|
||||
{async, ObjFolderToMidK} =
|
||||
leveled_bookie:book_objectfold(Bookie1,
|
||||
?RIAK_TAG,
|
||||
{FoldThrowFun(ObjFoldFun), []},
|
||||
false,
|
||||
key_order),
|
||||
ObjSizeList2 = lists:reverse(CatchingFold(ObjFolderToMidK)),
|
||||
io:format("Object fold with result size ~w~n", [length(ObjSizeList2)]),
|
||||
true = KeyCount div 2 == length(ObjSizeList2),
|
||||
|
||||
% Object folds which are SQN order use a different path through the code,
|
||||
% so testing that here. Note that it would not make sense to have a fold
|
||||
% that was terminated by reaching a point in the key range .. as results
|
||||
% will not be passed to the fold function in key order
|
||||
{async, ObjectFolderSO} =
|
||||
leveled_bookie:book_objectfold(Bookie1,
|
||||
?RIAK_TAG,
|
||||
{ObjFoldFun, []},
|
||||
false,
|
||||
sqn_order),
|
||||
ObjSizeList1_SO = lists:reverse(ObjectFolderSO()),
|
||||
io:format("Obj fold with result size ~w~n", [length(ObjSizeList1_SO)]),
|
||||
true = KeyCount == length(ObjSizeList1_SO),
|
||||
|
||||
% Exit fold when we've reached a thousand accumulated obects
|
||||
FoldThrowThousandFun =
|
||||
fun(FoldFun) ->
|
||||
fun(B, K, PO, Acc) ->
|
||||
case length(Acc) == 1000 of
|
||||
true ->
|
||||
throw({stop_fold, Acc});
|
||||
false ->
|
||||
FoldFun(B, K, PO, Acc)
|
||||
end
|
||||
end
|
||||
end,
|
||||
{async, ObjFolderTo1K} =
|
||||
leveled_bookie:book_objectfold(Bookie1,
|
||||
?RIAK_TAG,
|
||||
{FoldThrowThousandFun(ObjFoldFun), []},
|
||||
false,
|
||||
sqn_order),
|
||||
ObjSizeList2_SO = lists:reverse(CatchingFold(ObjFolderTo1K)),
|
||||
io:format("Object fold with result size ~w~n", [length(ObjSizeList2_SO)]),
|
||||
true = 1000 == length(ObjSizeList2_SO),
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
testutil:reset_filestructure().
|
||||
|
||||
|
||||
|
||||
single_object_with2i(_Config) ->
|
||||
% Load a single object with an integer and a binary
|
||||
% index and query for it
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue