Merge pull request #212 from martinsumner/mas-i211-breakingfolds

Mas i211 breakingfolds
This commit is contained in:
Martin Sumner 2018-11-24 10:33:50 +00:00 committed by GitHub
commit aacc2adc54
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 315 additions and 83 deletions

View file

@ -520,57 +520,35 @@ book_headonly(Pid, Bucket, Key, SubKey) ->
-spec book_returnfolder(pid(), tuple()) -> {async, fun()}. -spec book_returnfolder(pid(), tuple()) -> {async, fun()}.
%% @doc Snapshots/Clones %% @doc Folds over store - deprecated
%% The tuple() is a query, and book_returnfolder will return an {async, Folder}
%% whereby calling Folder() will run a particular fold over a snapshot of the
%% store, and close the snapshot when complete
%% %%
%% If there is a snapshot request (e.g. to iterate over the keys) the Bookie %% For any new application requiring a fold - use the API below instead, and
%% may request a clone of the Penciller, or clones of both the Penciller and %% one of:
%% the Inker should values also need to be accessed. The snapshot clone is %% - book_indexfold
%% made available through a "runner" - a new trasnportable PID through which %% - book_bucketlist
%% the previous state of the store can be queried. So, for example, a %% - book_keylist
%% riak_kv_vnode_worker in the pool could host the runner. %% - book_headfold
%% %% - book_objectfold
%% The clone is seeded with the manifest SQN. The clone should be registered
%% with the real Inker/Penciller, so that the real Inker/Penciller may prevent
%% the deletion of files still in use by a snapshot clone.
%%
%% Iterators should de-register themselves from the Penciller on completion.
%% Iterators should be automatically release after a timeout period. A file
%% can only be deleted from the Ledger if it is no longer in the manifest, and
%% there are no registered iterators from before the point the file was
%% removed from the manifest.
%%
%% Clones are simply new gen_servers with copies of the relevant
%% StateData.
%%
%% There are a series of specific folders implemented that provide pre-canned
%% snapshot functionality, more folders can be seen in the get_runner/2
%% function:
%%
%% {bucket_stats, Bucket} -> return a key count and total object size within
%% a bucket
%% {riakbucket_stats, Bucket} -> as above, but for buckets with the Riak Tag
%% {bucket_list, Tag, {FoldKeysFun, Acc}} -> if we assume buckets and
%% keys are binaries, provides a fast bucket list function
%% {index_query,
%% Constraint,
%% {FoldKeysFun, Acc},
%% {IdxField, StartValue, EndValue},
%% {ReturnTerms, TermRegex}} -> secondray index query
%% {keylist, Tag, {FoldKeysFun, Acc}} -> list all keys with tag
%% {keylist, Tag, Bucket, {FoldKeysFun, Acc}} -> list all keys within given
%% bucket
%% {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> fold over all objects
%% in a given bucket
%% {foldobjects_byindex,
%% Tag,
%% Bucket,
%% {Field, FromTerm, ToTerm},
%% FoldObjectsFun} -> fold over all objects with an entry in a given
%% range on a given index
book_returnfolder(Pid, RunnerType) -> book_returnfolder(Pid, RunnerType) ->
gen_server:call(Pid, {return_runner, RunnerType}, infinity). gen_server:call(Pid, {return_runner, RunnerType}, infinity).
%% Different runner types for async queries:
%% - book_indexfold
%% - book_bucketlist
%% - book_keylist
%% - book_headfold
%% - book_objectfold
%%
%% See individual instructions for each one. All folds can be completed early
%% by using a fold_function that throws an exception when some threshold is
%% reached - and a worker that catches that exception.
%%
%% See test/end_to_end/iterator_SUITE:breaking_folds/1
%% @doc Builds and returns an `{async, Runner}' pair for secondary %% @doc Builds and returns an `{async, Runner}' pair for secondary
%% index queries. Calling `Runner' will fold over keys (ledger) tagged %% index queries. Calling `Runner' will fold over keys (ledger) tagged
%% with the index `?IDX_TAG' and Constrain the fold to a specific %% with the index `?IDX_TAG' and Constrain the fold to a specific

View file

@ -274,7 +274,7 @@ ink_close(Pid) ->
ink_doom(Pid) -> ink_doom(Pid) ->
gen_server:call(Pid, doom, 60000). gen_server:call(Pid, doom, 60000).
-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> ok. -spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> fun().
%% @doc %% @doc
%% Fold over the journal from a starting sequence number (MinSQN), passing %% Fold over the journal from a starting sequence number (MinSQN), passing
%% in three functions and a snapshot of the penciller. The Fold functions %% in three functions and a snapshot of the penciller. The Fold functions
@ -307,8 +307,14 @@ ink_doom(Pid) ->
%% The BatchFun is a two arity function that should take as inputs: %% The BatchFun is a two arity function that should take as inputs:
%% An overall accumulator %% An overall accumulator
%% The batch accumulator built over the sub-fold %% The batch accumulator built over the sub-fold
%%
%% The output of ink_fold is a folder, that may actually run the fold. The
%% type of the output of the function when called will depend on the type of
%% the accumulator
ink_fold(Pid, MinSQN, FoldFuns, Acc) -> ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
gen_server:call(Pid, {fold, MinSQN, FoldFuns, Acc}, infinity). gen_server:call(Pid,
{fold, MinSQN, FoldFuns, Acc, by_runner},
infinity).
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok. -spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
%% %%
@ -333,7 +339,8 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
{fold, {fold,
MinSQN, MinSQN,
{FilterFun, InitAccFun, BatchFun}, {FilterFun, InitAccFun, BatchFun},
ok}, ok,
as_ink},
infinity). infinity).
-spec ink_compactjournal(pid(), pid(), integer()) -> ok. -spec ink_compactjournal(pid(), pid(), integer()) -> ok.
@ -492,14 +499,22 @@ handle_call({key_check, Key, SQN}, _From, State) ->
handle_call({fold, handle_call({fold,
StartSQN, StartSQN,
{FilterFun, InitAccFun, FoldFun}, {FilterFun, InitAccFun, FoldFun},
Acc}, _From, State) -> Acc,
By}, _From, State) ->
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
Reply = Folder =
fold_from_sequence(StartSQN, fun() ->
{FilterFun, InitAccFun, FoldFun}, fold_from_sequence(StartSQN,
Acc, {FilterFun, InitAccFun, FoldFun},
Manifest), Acc,
{reply, Reply, State}; Manifest)
end,
case By of
as_ink ->
{reply, Folder(), State};
by_runner ->
{reply, Folder, State}
end;
handle_call({register_snapshot, Requestor}, _From , State) -> handle_call({register_snapshot, Requestor}, _From , State) ->
Rs = [{Requestor, Rs = [{Requestor,
State#state.manifest_sqn}|State#state.registered_snapshots], State#state.manifest_sqn}|State#state.registered_snapshots],

View file

@ -451,7 +451,7 @@ pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc,
{fetch_keys, {fetch_keys,
StartKey, EndKey, AccFun, InitAcc0, StartKey, EndKey, AccFun, InitAcc0,
SegmentList, LastModRange, MaxKeys, SegmentList, LastModRange, MaxKeys,
as_pcl}, by_runner},
infinity). infinity).
-spec pcl_fetchnextkey(pid(), -spec pcl_fetchnextkey(pid(),

View file

@ -95,10 +95,22 @@ bucket_list(SnapFun, Tag, FoldBucketsFun, InitAcc, MaxBuckets) ->
BucketAcc = BucketAcc =
get_nextbucket(null, null, get_nextbucket(null, null,
Tag, LedgerSnapshot, [], {0, MaxBuckets}), Tag, LedgerSnapshot, [], {0, MaxBuckets}),
ok = leveled_penciller:pcl_close(LedgerSnapshot), AfterFun =
lists:foldl(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end, fun() ->
InitAcc, ok = leveled_penciller:pcl_close(LedgerSnapshot)
BucketAcc) end,
FoldRunner =
fun() ->
lists:foldr(fun({B, _K}, Acc) -> FoldBucketsFun(B, Acc) end,
InitAcc,
BucketAcc)
% Buckets in reverse alphabetical order so foldr
end,
% For this fold, the fold over the store is actually completed
% before results are passed to the FoldBucketsFun to be
% accumulated. Using a throw to exit the fold early will not
% in this case save significant time.
wrap_runner(FoldRunner, AfterFun)
end, end,
{async, Runner}. {async, Runner}.
@ -374,14 +386,19 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
lists:foldr(ObjFun, ObjAcc, BatchAcc) lists:foldr(ObjFun, ObjAcc, BatchAcc)
end, end,
Acc = InkFolder =
leveled_inker:ink_fold(JournalSnapshot, leveled_inker:ink_fold(JournalSnapshot,
0, 0,
{FilterFun, InitAccFun, BatchFoldFun}, {FilterFun,
InitAcc), InitAccFun,
ok = leveled_penciller:pcl_close(LedgerSnapshot), BatchFoldFun},
ok = leveled_inker:ink_close(JournalSnapshot), InitAcc),
Acc AfterFun =
fun() ->
ok = leveled_penciller:pcl_close(LedgerSnapshot),
ok = leveled_inker:ink_close(JournalSnapshot)
end,
wrap_runner(InkFolder, AfterFun)
end, end,
{async, Folder}. {async, Folder}.
@ -535,14 +552,12 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
Folder = Folder =
fun() -> fun() ->
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(), {ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
AccFun = AccFun =
accumulate_objects(FoldFun, accumulate_objects(FoldFun,
JournalSnapshot, JournalSnapshot,
Tag, Tag,
DeferredFetch), DeferredFetch),
FoldFunGen =
ListFoldFun =
fun({StartKey, EndKey}, FoldAcc) -> fun({StartKey, EndKey}, FoldAcc) ->
leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot, leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
StartKey, StartKey,
@ -553,15 +568,24 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch,
LastModRange, LastModRange,
LimitByCount) LimitByCount)
end, end,
Acc = lists:foldl(ListFoldFun, InitAcc0, KeyRanges), AfterFun =
ok = leveled_penciller:pcl_close(LedgerSnapshot), fun() ->
case DeferredFetch of ok = leveled_penciller:pcl_close(LedgerSnapshot),
{true, false} -> case DeferredFetch of
ok; {true, false} ->
_ -> ok;
ok = leveled_inker:ink_close(JournalSnapshot) _ ->
end, ok = leveled_inker:ink_close(JournalSnapshot)
Acc end
end,
ListFoldFun =
fun(KeyRange, Acc) ->
Folder = FoldFunGen(KeyRange, Acc),
Folder()
end,
FolderToWrap =
fun() -> lists:foldl(ListFoldFun, InitAcc0, KeyRanges) end,
wrap_runner(FolderToWrap, AfterFun)
end, end,
{async, Folder}. {async, Folder}.
@ -790,7 +814,6 @@ wrap_runner(FoldAction, AfterAction) ->
end. end.
%%%============================================================================ %%%============================================================================
%%% Test %%% Test
%%%============================================================================ %%%============================================================================

View file

@ -6,7 +6,8 @@
-define(KEY_ONLY, {false, undefined}). -define(KEY_ONLY, {false, undefined}).
-export([all/0]). -export([all/0]).
-export([single_object_with2i/1, -export([breaking_folds/1,
single_object_with2i/1,
small_load_with2i/1, small_load_with2i/1,
query_count/1, query_count/1,
multibucket_fold/1, multibucket_fold/1,
@ -14,6 +15,7 @@
rotating_objects/1]). rotating_objects/1]).
all() -> [ all() -> [
breaking_folds,
single_object_with2i, single_object_with2i,
small_load_with2i, small_load_with2i,
query_count, query_count,
@ -23,6 +25,220 @@ all() -> [
]. ].
breaking_folds(_Config) ->
% Run various iterators and show that they can be broken by throwing an
% exception from within the fold
KeyCount = 10000,
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 10000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
ObjectGen = testutil:get_compressiblevalue_andinteger(),
IndexGen = testutil:get_randomindexes_generator(8),
ObjL1 = testutil:generate_objects(KeyCount,
binary_uuid,
[],
ObjectGen,
IndexGen),
testutil:riakload(Bookie1, ObjL1),
% Find all keys index, and then same again but stop at a midpoint using a
% throw
{async, IdxFolder} =
leveled_bookie:book_indexfold(Bookie1,
list_to_binary("Bucket"),
{fun testutil:foldkeysfun/3, []},
{"idx1_bin", "#", "|"},
{true, undefined}),
KeyList1 = lists:reverse(IdxFolder()),
io:format("Index fold with result size ~w~n", [length(KeyList1)]),
true = KeyCount == length(KeyList1),
{MidTerm, MidKey} = lists:nth(KeyCount div 2, KeyList1),
FoldKeyThrowFun =
fun(_B, {Term, Key}, Acc) ->
case {Term, Key} > {MidTerm, MidKey} of
true ->
throw({stop_fold, Acc});
false ->
[{Term, Key}|Acc]
end
end,
{async, IdxFolderToMidK} =
leveled_bookie:book_indexfold(Bookie1,
list_to_binary("Bucket"),
{FoldKeyThrowFun, []},
{"idx1_bin", "#", "|"},
{true, undefined}),
CatchingFold =
fun(AsyncFolder) ->
try
AsyncFolder()
catch
throw:{stop_fold, Acc} ->
Acc
end
end,
KeyList2 = lists:reverse(CatchingFold(IdxFolderToMidK)),
io:format("Index fold with result size ~w~n", [length(KeyList2)]),
true = KeyCount div 2 == length(KeyList2),
HeadFoldFun =
fun(_B, K, PO, Acc) ->
{proxy_object, _MDBin, Size, _FF} = binary_to_term(PO),
[{K, Size}|Acc]
end,
{async, HeadFolder} =
leveled_bookie:book_headfold(Bookie1,
?RIAK_TAG,
{HeadFoldFun, []},
true, true, false),
KeySizeList1 = lists:reverse(HeadFolder()),
io:format("Head fold with result size ~w~n", [length(KeySizeList1)]),
true = KeyCount == length(KeySizeList1),
{MidHeadKey, _MidSize} = lists:nth(KeyCount div 2, KeySizeList1),
FoldThrowFun =
fun(FoldFun) ->
fun(B, K, PO, Acc) ->
case K > MidHeadKey of
true ->
throw({stop_fold, Acc});
false ->
FoldFun(B, K, PO, Acc)
end
end
end,
{async, HeadFolderToMidK} =
leveled_bookie:book_headfold(Bookie1,
?RIAK_TAG,
{FoldThrowFun(HeadFoldFun), []},
true, true, false),
KeySizeList2 = lists:reverse(CatchingFold(HeadFolderToMidK)),
io:format("Head fold with result size ~w~n", [length(KeySizeList2)]),
true = KeyCount div 2 == length(KeySizeList2),
ObjFoldFun =
fun(_B, K, V, Acc) ->
[{K,byte_size(V)}|Acc]
end,
{async, ObjectFolderKO} =
leveled_bookie:book_objectfold(Bookie1,
?RIAK_TAG,
{ObjFoldFun, []},
false,
key_order),
ObjSizeList1 = lists:reverse(ObjectFolderKO()),
io:format("Obj fold with result size ~w~n", [length(ObjSizeList1)]),
true = KeyCount == length(ObjSizeList1),
{async, ObjFolderToMidK} =
leveled_bookie:book_objectfold(Bookie1,
?RIAK_TAG,
{FoldThrowFun(ObjFoldFun), []},
false,
key_order),
ObjSizeList2 = lists:reverse(CatchingFold(ObjFolderToMidK)),
io:format("Object fold with result size ~w~n", [length(ObjSizeList2)]),
true = KeyCount div 2 == length(ObjSizeList2),
% Object folds which are SQN order use a different path through the code,
% so testing that here. Note that it would not make sense to have a fold
% that was terminated by reaching a point in the key range .. as results
% will not be passed to the fold function in key order
{async, ObjectFolderSO} =
leveled_bookie:book_objectfold(Bookie1,
?RIAK_TAG,
{ObjFoldFun, []},
false,
sqn_order),
ObjSizeList1_SO = lists:reverse(ObjectFolderSO()),
io:format("Obj fold with result size ~w~n", [length(ObjSizeList1_SO)]),
true = KeyCount == length(ObjSizeList1_SO),
% Exit fold when we've reached a thousand accumulated obects
FoldThrowThousandFun =
fun(FoldFun) ->
fun(B, K, PO, Acc) ->
case length(Acc) == 1000 of
true ->
throw({stop_fold, Acc});
false ->
FoldFun(B, K, PO, Acc)
end
end
end,
{async, ObjFolderTo1K} =
leveled_bookie:book_objectfold(Bookie1,
?RIAK_TAG,
{FoldThrowThousandFun(ObjFoldFun), []},
false,
sqn_order),
ObjSizeList2_SO = lists:reverse(CatchingFold(ObjFolderTo1K)),
io:format("Object fold with result size ~w~n", [length(ObjSizeList2_SO)]),
true = 1000 == length(ObjSizeList2_SO),
ObjL2 = testutil:generate_objects(10,
binary_uuid,
[],
ObjectGen,
IndexGen,
"B2"),
ObjL3 = testutil:generate_objects(10,
binary_uuid,
[],
ObjectGen,
IndexGen,
"B3"),
ObjL4 = testutil:generate_objects(10,
binary_uuid,
[],
ObjectGen,
IndexGen,
"B4"),
testutil:riakload(Bookie1, ObjL2),
testutil:riakload(Bookie1, ObjL3),
testutil:riakload(Bookie1, ObjL4),
FBAccT = {fun(B, Acc) -> [B|Acc] end, []},
{async, BucketFolder} =
leveled_bookie:book_bucketlist(Bookie1, ?RIAK_TAG, FBAccT, all),
BucketList1 = lists:reverse(BucketFolder()),
io:format("bucket list with result size ~w~n", [length(BucketList1)]),
true = 4 == length(BucketList1),
StopAt3Fun =
fun(B, Acc) ->
Acc0 = [B|Acc],
case B of
<<"B3">> ->
throw({stop_fold, Acc0});
_ ->
Acc0
end
end,
{async, StopAt3BucketFolder} =
leveled_bookie:book_bucketlist(Bookie1,
?RIAK_TAG,
{StopAt3Fun, []},
all),
BucketListSA3 = lists:reverse(CatchingFold(StopAt3BucketFolder)),
io:format("bucket list with result ~w~n", [BucketListSA3]),
true = [<<"B2">>, <<"B3">>] == BucketListSA3,
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().
single_object_with2i(_Config) -> single_object_with2i(_Config) ->
% Load a single object with an integer and a binary % Load a single object with an integer and a binary
% index and query for it % index and query for it
@ -566,7 +782,7 @@ multibucket_fold(_Config) ->
?RIAK_TAG, ?RIAK_TAG,
{FoldBucketsFun, []}, {FoldBucketsFun, []},
all), all),
BucketList = Folder(), BucketList = lists:reverse(Folder()),
ExpectedBucketList = ExpectedBucketList =
[{<<"Type1">>, <<"Bucket1">>}, {<<"Type2">>, <<"Bucket4">>}, [{<<"Type1">>, <<"Bucket1">>}, {<<"Type2">>, <<"Bucket4">>},
<<"Bucket2">>, <<"Bucket3">>], <<"Bucket2">>, <<"Bucket3">>],