Support multi-query fold

Allow a single snapshot to run query over multiple ranges.   Used initially to fold over multiple buckets.
This commit is contained in:
Martin Sumner 2018-03-01 23:19:52 +00:00
parent 10f4c4c5e5
commit 861aa5a7db
4 changed files with 144 additions and 36 deletions

View file

@ -935,9 +935,28 @@ get_runner(State,
SnapType = snaptype_by_presence(JournalCheck), SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold), SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun, leveled_runner:foldheads_bybucket(SnapFun,
{Tag, StartKey, EndKey}, Tag,
[{StartKey, EndKey}],
FoldFun, FoldFun,
JournalCheck, SegmentList); JournalCheck, SegmentList);
get_runner(State,
{foldheads_bybucketlist,
Tag,
BucketList,
FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
KeyRangeFun =
fun(Bucket) ->
{StartKey, EndKey, _} = return_ledger_keyrange(Tag, Bucket, all),
{StartKey, EndKey}
end,
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun,
Tag,
lists:map(KeyRangeFun, BucketList),
FoldFun,
JournalCheck, SegmentList);
get_runner(State, get_runner(State,
{foldobjects_bybucket, {foldobjects_bybucket,
Tag, Bucket, KeyRange, Tag, Bucket, KeyRange,
@ -946,7 +965,8 @@ get_runner(State,
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange), {StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapFun = return_snapfun(State, store, SnapQ, true, SnapPreFold), SnapFun = return_snapfun(State, store, SnapQ, true, SnapPreFold),
leveled_runner:foldobjects_bybucket(SnapFun, leveled_runner:foldobjects_bybucket(SnapFun,
{Tag, StartKey, EndKey}, Tag,
[{StartKey, EndKey}],
FoldFun); FoldFun);
get_runner(State, get_runner(State,
{foldobjects_byindex, {foldobjects_byindex,
@ -1007,7 +1027,7 @@ get_deprecatedrunner(State,
PartitionFilter). PartitionFilter).
-spec return_ledger_keyrange(atom(), any(), tuple()) -> -spec return_ledger_keyrange(atom(), any(), tuple()|all) ->
{tuple(), tuple(), tuple()|no_lookup}. {tuple(), tuple(), tuple()|no_lookup}.
%% @doc %% @doc
%% Convert a range of binary keys into a ledger key range, returning %% Convert a range of binary keys into a ledger key range, returning
@ -1038,7 +1058,6 @@ return_ledger_keyrange(Tag, Bucket, KeyRange) ->
end, end,
{StartKey, EndKey, SnapQuery}. {StartKey, EndKey, SnapQuery}.
maybe_longrunning(SW, Aspect) -> maybe_longrunning(SW, Aspect) ->
case timer:now_diff(os:timestamp(), SW) of case timer:now_diff(os:timestamp(), SW) of

View file

@ -677,7 +677,7 @@ handle_call({fetch_keys,
{AccFun, InitAcc}, {AccFun, InitAcc},
{SegmentList, MaxKeys}), {SegmentList, MaxKeys}),
{reply, Acc, State#state{levelzero_astree = L0AsList}}; {reply, Acc, State};
handle_call(get_startup_sqn, _From, State) -> handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State}; {reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) -> handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) ->

View file

@ -30,8 +30,8 @@
tictactree/5, tictactree/5,
foldheads_allkeys/5, foldheads_allkeys/5,
foldobjects_allkeys/4, foldobjects_allkeys/4,
foldheads_bybucket/5, foldheads_bybucket/6,
foldobjects_bybucket/3, foldobjects_bybucket/4,
foldobjects_byindex/3 foldobjects_byindex/3
]). ]).
@ -222,9 +222,11 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag), StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun, foldobjects(SnapFun,
Tag, StartKey, EndKey, Tag,
[{StartKey, EndKey}],
FoldFun, FoldFun,
{true, JournalCheck}, SegmentList). {true, JournalCheck},
SegmentList).
-spec foldobjects_allkeys(fun(), atom(), fun(), key_order|sqn_order) -spec foldobjects_allkeys(fun(), atom(), fun(), key_order|sqn_order)
-> {async, fun()}. -> {async, fun()}.
@ -234,9 +236,11 @@ foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag), StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun, foldobjects(SnapFun,
Tag, StartKey, EndKey, Tag,
[{StartKey, EndKey}],
FoldFun, FoldFun,
false, false); false,
false);
foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
% Fold over the journal in order of receipt % Fold over the journal in order of receipt
{FoldFun, InitAcc} = {FoldFun, InitAcc} =
@ -321,31 +325,37 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
{async, Folder}. {async, Folder}.
-spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) -> -spec foldobjects_bybucket(fun(), atom(), list({any(), any()}), fun()) ->
{async, fun()}. {async, fun()}.
%% @doc %% @doc
%% Fold over all objects within a given key range in a bucket %% Fold over all objects within a given key range in a bucket
foldobjects_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun) -> foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) ->
foldobjects(SnapFun, foldobjects(SnapFun,
Tag, StartKey, EndKey, Tag,
KeyRanges,
FoldFun, FoldFun,
false, false). false,
false).
-spec foldheads_bybucket(fun(), -spec foldheads_bybucket(fun(),
{atom(), any(), any()}, atom(),
list({any(), any()}),
fun(), fun(),
boolean(), false|list(integer())) boolean(), false|list(integer()))
-> {async, fun()}. -> {async, fun()}.
%% @doc %% @doc
%% Fold over all object metadata within a given key range in a bucket %% Fold over all object metadata within a given key range in a bucket
foldheads_bybucket(SnapFun, foldheads_bybucket(SnapFun,
{Tag, StartKey, EndKey}, Tag,
KeyRanges,
FoldFun, FoldFun,
JournalCheck, SegmentList) -> JournalCheck, SegmentList) ->
foldobjects(SnapFun, foldobjects(SnapFun,
Tag, StartKey, EndKey, Tag,
KeyRanges,
FoldFun, FoldFun,
{true, JournalCheck}, SegmentList). {true, JournalCheck},
SegmentList).
-spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}. -spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}.
%% @doc %% @doc
@ -357,9 +367,11 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
EndKey = EndKey =
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm), leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
foldobjects(SnapFun, foldobjects(SnapFun,
Tag, StartKey, EndKey, Tag,
[{StartKey, EndKey}],
FoldFun, FoldFun,
false, false). false,
false).
@ -407,7 +419,7 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
end. end.
-spec foldobjects(fun(), atom(), tuple(), tuple(), fun(), -spec foldobjects(fun(), atom(), list(), fun(),
false|{true, boolean()}, false|list(integer())) -> false|{true, boolean()}, false|list(integer())) ->
{async, fun()}. {async, fun()}.
%% @doc %% @doc
@ -417,19 +429,16 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
%% will be created that if understood by the fold function will allow the fold %% will be created that if understood by the fold function will allow the fold
%% function to work on the head of the object, and defer fetching the body in %% function to work on the head of the object, and defer fetching the body in
%% case such a fetch is unecessary. %% case such a fetch is unecessary.
foldobjects(SnapFun, foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) ->
Tag, StartKey, EndKey,
FoldObjectsFun,
DeferredFetch, SegmentList) ->
{FoldFun, InitAcc} = {FoldFun, InitAcc} =
case is_tuple(FoldObjectsFun) of case is_tuple(FoldObjFun) of
true -> true ->
% FoldObjectsFun is already a tuple with a Fold function and an % FoldObjectsFun is already a tuple with a Fold function and an
% initial accumulator % initial accumulator
FoldObjectsFun; FoldObjFun;
false -> false ->
% no initial accumulatr passed, and so should be just a list % no initial accumulatr passed, and so should be just a list
{FoldObjectsFun, []} {FoldObjFun, []}
end, end,
Folder = Folder =
@ -440,12 +449,18 @@ foldobjects(SnapFun,
JournalSnapshot, JournalSnapshot,
Tag, Tag,
DeferredFetch), DeferredFetch),
Acc = leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
StartKey, ListFoldFun =
EndKey, fun({StartKey, EndKey}, FoldAcc) ->
AccFun, io:format("SK ~w EK ~w ~n", [StartKey, EndKey]),
InitAcc, leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
SegmentList), StartKey,
EndKey,
AccFun,
FoldAcc,
SegmentList)
end,
Acc = lists:foldl(ListFoldFun, InitAcc, KeyRanges),
ok = leveled_penciller:pcl_close(LedgerSnapshot), ok = leveled_penciller:pcl_close(LedgerSnapshot),
case DeferredFetch of case DeferredFetch of
{true, false} -> {true, false} ->

View file

@ -9,12 +9,14 @@
-export([single_object_with2i/1, -export([single_object_with2i/1,
small_load_with2i/1, small_load_with2i/1,
query_count/1, query_count/1,
multibucket_fold/1,
rotating_objects/1]). rotating_objects/1]).
all() -> [ all() -> [
single_object_with2i, single_object_with2i,
small_load_with2i, small_load_with2i,
query_count, query_count,
multibucket_fold,
rotating_objects rotating_objects
]. ].
@ -454,7 +456,7 @@ count_termsonindex(Bucket, IdxField, Book, QType) ->
lists:foldl(fun(X, Acc) -> lists:foldl(fun(X, Acc) ->
SW = os:timestamp(), SW = os:timestamp(),
ST = integer_to_list(X), ST = integer_to_list(X),
ET = ST ++ "~", ET = ST ++ "|",
Q = {index_query, Q = {index_query,
Bucket, Bucket,
{fun testutil:foldkeysfun/3, []}, {fun testutil:foldkeysfun/3, []},
@ -473,6 +475,78 @@ count_termsonindex(Bucket, IdxField, Book, QType) ->
0, 0,
lists:seq(190, 221)). lists:seq(190, 221)).
multibucket_fold(_Config) ->
RootPath = testutil:reset_filestructure(),
{ok, Bookie1} = leveled_bookie:book_start(RootPath,
2000,
50000000,
testutil:sync_strategy()),
ObjectGen = testutil:get_compressiblevalue_andinteger(),
IndexGen = fun() -> [] end,
ObjL1 = testutil:generate_objects(13000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket1">>),
testutil:riakload(Bookie1, ObjL1),
ObjL2 = testutil:generate_objects(17000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket2">>),
testutil:riakload(Bookie1, ObjL2),
ObjL3 = testutil:generate_objects(7000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket3">>),
testutil:riakload(Bookie1, ObjL3),
ObjL4 = testutil:generate_objects(23000,
uuid,
[],
ObjectGen,
IndexGen,
<<"Bucket4">>),
testutil:riakload(Bookie1, ObjL4),
Q1 = {foldheads_bybucketlist,
?RIAK_TAG,
[<<"Bucket1">>, <<"Bucket4">>],
fun(B, K, _PO, Acc) ->
[{B, K}|Acc]
end,
false,
true,
false},
{async, R1} = leveled_bookie:book_returnfolder(Bookie1, Q1),
O1 = length(R1()),
io:format("Result R1 of length ~w~n", [O1]),
Q2 = {foldheads_bybucketlist,
?RIAK_TAG,
[<<"Bucket2">>, <<"Bucket3">>],
{fun(_B, _K, _PO, Acc) ->
Acc +1
end,
0},
false,
true,
false},
{async, R2} = leveled_bookie:book_returnfolder(Bookie1, Q2),
O2 = R2(),
io:format("Result R2 of ~w~n", [O2]),
true = 36000 == O1,
true = 24000 == O2,
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().
rotating_objects(_Config) -> rotating_objects(_Config) ->
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),