Fast List Buckets

Copied the technique from HanoiDB to speed up list buckets.
This commit is contained in:
martinsumner 2016-11-20 21:21:31 +00:00
parent f40ecdd529
commit 386d40928b
5 changed files with 224 additions and 60 deletions

View file

@ -335,6 +335,10 @@ handle_call({return_folder, FolderType}, _From, State) ->
{reply,
bucket_stats(State, Bucket, ?RIAK_TAG),
State};
{binary_bucketlist, Tag, {FoldKeysFun, Acc}} ->
{reply,
binary_bucketlist(State, Tag, {FoldKeysFun, Acc}),
State};
{index_query,
Constraint,
{FoldKeysFun, Acc},
@ -430,6 +434,53 @@ bucket_stats(State, Bucket, Tag) ->
end,
{async, Folder}.
binary_bucketlist(State, Tag, {FoldKeysFun, InitAcc}) ->
% List buckets for tag, assuming bucket names are all binary type
{ok,
{LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() ->
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache),
BucketAcc = get_nextbucket(null,
Tag,
LedgerSnapshot,
[]),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
lists:foldl(fun({B, K}, Acc) -> FoldKeysFun(B, K, Acc) end,
InitAcc,
BucketAcc)
end,
{async, Folder}.
get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) ->
StartKey = leveled_codec:to_ledgerkey(NextBucket, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
ExtractFun = fun(LK, _V, _Acc) -> leveled_codec:from_ledgerkey(LK) end,
BK = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot,
StartKey,
EndKey,
ExtractFun,
null),
case BK of
null ->
leveled_log:log("B0008",[]),
BKList;
{B, K} when is_binary(B) ->
leveled_log:log("B0009",[B]),
get_nextbucket(<<B/binary, 0>>,
Tag,
LedgerSnapshot,
[{B, K}|BKList]);
NB ->
leveled_log:log("B0010",[NB]),
[]
end.
index_query(State,
Constraint,
{FoldKeysFun, InitAcc},

View file

@ -32,6 +32,12 @@
{info, "Reached end of load batch with SQN ~w"}},
{"B0007",
{info, "Skipping as exceeded MaxSQN ~w with SQN ~w"}},
{"B0008",
{info, "Bucket list finds no more results"}},
{"B0009",
{info, "Bucket list finds Bucket ~w"}},
{"B0010",
{info, "Bucket list finds non-binary Bucket ~w"}},
{"P0001",
{info, "Ledger snapshot ~w registered"}},

View file

@ -169,6 +169,7 @@
pcl_fetchlevelzero/2,
pcl_fetch/2,
pcl_fetchkeys/5,
pcl_fetchnextkey/5,
pcl_checksequencenumber/3,
pcl_workforclerk/1,
pcl_promptmanifestchange/2,
@ -218,6 +219,7 @@
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
source_penciller :: pid(),
levelzero_astree :: gb_trees:tree(),
ongoing_work = [] :: list(),
work_backlog = false :: boolean()}).
@ -248,7 +250,12 @@ pcl_fetch(Pid, Key) ->
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
gen_server:call(Pid,
{fetch_keys, StartKey, EndKey, AccFun, InitAcc},
{fetch_keys, StartKey, EndKey, AccFun, InitAcc, -1},
infinity).
pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
gen_server:call(Pid,
{fetch_keys, StartKey, EndKey, AccFun, InitAcc, 1},
infinity).
pcl_checksequencenumber(Pid, Key, SQN) ->
@ -352,20 +359,29 @@ handle_call({check_sqn, Key, SQN}, _From, State) ->
State#state.levelzero_cache),
SQN),
State};
handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc},
handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
_From,
State=#state{snapshot_fully_loaded=Ready})
when Ready == true ->
L0AsTree = leveled_pmem:merge_trees(StartKey,
L0AsTree =
case State#state.levelzero_astree of
undefined ->
leveled_pmem:merge_trees(StartKey,
EndKey,
State#state.levelzero_cache,
gb_trees:empty()),
gb_trees:empty());
Tree ->
Tree
end,
L0iter = gb_trees:iterator(L0AsTree),
SFTiter = initiate_rangequery_frommanifest(StartKey,
EndKey,
State#state.manifest),
Acc = keyfolder(L0iter, SFTiter, StartKey, EndKey, {AccFun, InitAcc}),
{reply, Acc, State};
Acc = keyfolder({L0iter, SFTiter},
{StartKey, EndKey},
{AccFun, InitAcc},
MaxKeys),
{reply, Acc, State#state{levelzero_astree = L0AsTree}};
handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, Work, UpdState};
@ -956,37 +972,56 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
end.
keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc}) ->
case find_nextkey(SFTiterator, StartKey, EndKey) of
keyfolder(IMMiter, SFTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder({IMMiter, SFTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1).
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 ->
Acc;
keyfolder({null, SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange,
case find_nextkey(SFTiter, StartKey, EndKey) of
no_more_keys ->
Acc;
{NxtSFTiterator, {SFTKey, SFTVal}} ->
{NxSFTiter, {SFTKey, SFTVal}} ->
Acc1 = AccFun(SFTKey, SFTVal, Acc),
keyfolder(null, NxtSFTiterator, StartKey, EndKey, {AccFun, Acc1})
keyfolder({null, NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1)
end;
keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder({IMMiterator, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange,
case gb_trees:next(IMMiterator) of
none ->
% There are no more keys in the in-memory iterator, so now
% iterate only over the remaining keys in the SFT iterator
keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc});
{IMMKey, IMMVal, NxtIMMiterator} ->
keyfolder({null, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys);
{IMMKey, _IMMVal, NxIMMiterator} when IMMKey < StartKey ->
% Normally everything is pre-filterd, but the IMM iterator can
% be re-used and do may be behind the StartKey if the StartKey has
% advanced from the previous use
keyfolder({NxIMMiterator, SFTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
{IMMKey, IMMVal, NxIMMiterator} ->
case leveled_codec:endkey_passed(EndKey, IMMKey) of
true ->
% There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty
% (see above)
keyfolder(null, SFTiterator,
StartKey, EndKey, {AccFun, Acc});
keyfolder({null, SFTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
false ->
case find_nextkey(SFTiterator, StartKey, EndKey) of
no_more_keys ->
% No more keys in range in the persisted store, so use the
% in-memory KV as the next
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder(NxtIMMiterator, SFTiterator,
StartKey, EndKey, {AccFun, Acc1});
{NxtSFTiterator, {SFTKey, SFTVal}} ->
keyfolder({NxIMMiterator, SFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
{NxSFTiterator, {SFTKey, SFTVal}} ->
% There is a next key, so need to know which is the
% next key between the two (and handle two keys
% with different sequence numbers).
@ -996,19 +1031,22 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) ->
SFTVal}) of
left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder(NxtIMMiterator, SFTiterator,
StartKey, EndKey,
{AccFun, Acc1});
keyfolder({NxIMMiterator, SFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
right_hand_first ->
Acc1 = AccFun(SFTKey, SFTVal, Acc),
keyfolder(IMMiterator, NxtSFTiterator,
StartKey, EndKey,
{AccFun, Acc1});
keyfolder({IMMiterator, NxSFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder(NxtIMMiterator, NxtSFTiterator,
StartKey, EndKey,
{AccFun, Acc1})
keyfolder({NxIMMiterator, NxSFTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1)
end
end
end

View file

@ -95,17 +95,13 @@ small_load_with2i(_Config) ->
true = 9900 == length(KeyHashList2),
true = 9900 == length(KeyHashList3),
SumIntegerFun = fun(_B, _K, V, Acc) ->
SumIntFun = fun(_B, _K, V, Acc) ->
[C] = V#r_object.contents,
{I, _Bin} = C#r_content.value,
Acc + I
end,
{async, Sum1} = leveled_bookie:book_returnfolder(Bookie1,
{foldobjects_bybucket,
?RIAK_TAG,
"Bucket",
{SumIntegerFun,
0}}),
BucketObjQ = {foldobjects_bybucket, ?RIAK_TAG, "Bucket", {SumIntFun, 0}},
{async, Sum1} = leveled_bookie:book_returnfolder(Bookie1, BucketObjQ),
Total1 = Sum1(),
true = Total1 > 100000,
@ -113,15 +109,19 @@ small_load_with2i(_Config) ->
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
{async, Sum2} = leveled_bookie:book_returnfolder(Bookie2,
{foldobjects_bybucket,
?RIAK_TAG,
"Bucket",
{SumIntegerFun,
0}}),
{async, Sum2} = leveled_bookie:book_returnfolder(Bookie2, BucketObjQ),
Total2 = Sum2(),
true = Total2 == Total1,
FoldBucketsFun = fun(B, _K, Acc) -> sets:add_element(B, Acc) end,
% Should not find any buckets - as there is a non-binary bucket, and no
% binary ones
BucketListQuery = {binary_bucketlist,
?RIAK_TAG,
{FoldBucketsFun, sets:new()}},
{async, BL} = leveled_bookie:book_returnfolder(Bookie2, BucketListQuery),
true = sets:size(BL()) == 0,
ok = leveled_bookie:book_close(Bookie2),
testutil:reset_filestructure().
@ -129,7 +129,8 @@ small_load_with2i(_Config) ->
query_count(_Config) ->
RootPath = testutil:reset_filestructure(),
{ok, Book1} = leveled_bookie:book_start(RootPath, 2000, 50000000),
{TestObject, TestSpec} = testutil:generate_testobject("Bucket",
BucketBin = list_to_binary("Bucket"),
{TestObject, TestSpec} = testutil:generate_testobject(BucketBin,
"Key1",
"Value1",
[],
@ -143,7 +144,7 @@ query_count(_Config) ->
Indexes = testutil:get_randomindexes_generator(8),
SW = os:timestamp(),
ObjL1 = testutil:generate_objects(10000,
uuid,
binary_uuid,
[],
V,
Indexes),
@ -157,7 +158,7 @@ query_count(_Config) ->
testutil:check_forobject(Book1, TestObject),
Total = lists:foldl(fun(X, Acc) ->
IdxF = "idx" ++ integer_to_list(X) ++ "_bin",
T = count_termsonindex("Bucket",
T = count_termsonindex(BucketBin,
IdxF,
Book1,
?KEY_ONLY),
@ -171,13 +172,13 @@ query_count(_Config) ->
640000 ->
ok
end,
Index1Count = count_termsonindex("Bucket",
Index1Count = count_termsonindex(BucketBin,
"idx1_bin",
Book1,
?KEY_ONLY),
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(RootPath, 1000, 50000000),
Index1Count = count_termsonindex("Bucket",
Index1Count = count_termsonindex(BucketBin,
"idx1_bin",
Book2,
?KEY_ONLY),
@ -186,7 +187,7 @@ query_count(_Config) ->
{ok, Regex} = re:compile("[0-9]+" ++
Name),
SW = os:timestamp(),
T = count_termsonindex("Bucket",
T = count_termsonindex(BucketBin,
"idx1_bin",
Book2,
{false,
@ -208,7 +209,7 @@ query_count(_Config) ->
end,
{ok, RegMia} = re:compile("[0-9]+Mia"),
Query1 = {index_query,
"Bucket",
BucketBin,
{fun testutil:foldkeysfun/3, []},
{"idx2_bin", "2000", "2000~"},
{false, RegMia}},
@ -216,7 +217,7 @@ query_count(_Config) ->
Mia2KFolder1} = leveled_bookie:book_returnfolder(Book2, Query1),
Mia2000Count1 = length(Mia2KFolder1()),
Query2 = {index_query,
"Bucket",
BucketBin,
{fun testutil:foldkeysfun/3, []},
{"idx2_bin", "2000", "2001"},
{true, undefined}},
@ -239,7 +240,7 @@ query_count(_Config) ->
end,
{ok, RxMia2K} = re:compile("^2000[0-9]+Mia"),
Query3 = {index_query,
"Bucket",
BucketBin,
{fun testutil:foldkeysfun/3, []},
{"idx2_bin", "1980", "2100"},
{false, RxMia2K}},
@ -249,11 +250,15 @@ query_count(_Config) ->
V9 = testutil:get_compressiblevalue(),
Indexes9 = testutil:get_randomindexes_generator(8),
[{_RN, Obj9, Spc9}] = testutil:generate_objects(1, uuid, [], V9, Indexes9),
[{_RN, Obj9, Spc9}] = testutil:generate_objects(1,
binary_uuid,
[],
V9,
Indexes9),
ok = testutil:book_riakput(Book2, Obj9, Spc9),
R9 = lists:map(fun({add, IdxF, IdxT}) ->
Q = {index_query,
"Bucket",
BucketBin,
{fun testutil:foldkeysfun/3, []},
{IdxF, IdxT, IdxT},
?KEY_ONLY},
@ -270,7 +275,7 @@ query_count(_Config) ->
ok = testutil:book_riakput(Book2, Obj9, Spc9Del),
lists:foreach(fun({IdxF, IdxT, X}) ->
Q = {index_query,
"Bucket",
BucketBin,
{fun testutil:foldkeysfun/3, []},
{IdxF, IdxT, IdxT},
?KEY_ONLY},
@ -286,7 +291,7 @@ query_count(_Config) ->
{ok, Book3} = leveled_bookie:book_start(RootPath, 2000, 50000000),
lists:foreach(fun({IdxF, IdxT, X}) ->
Q = {index_query,
"Bucket",
BucketBin,
{fun testutil:foldkeysfun/3, []},
{IdxF, IdxT, IdxT},
?KEY_ONLY},
@ -303,7 +308,7 @@ query_count(_Config) ->
{ok, Book4} = leveled_bookie:book_start(RootPath, 2000, 50000000),
lists:foreach(fun({IdxF, IdxT, X}) ->
Q = {index_query,
"Bucket",
BucketBin,
{fun testutil:foldkeysfun/3, []},
{IdxF, IdxT, IdxT},
?KEY_ONLY},
@ -316,7 +321,60 @@ query_count(_Config) ->
end,
R9),
testutil:check_forobject(Book4, TestObject),
FoldBucketsFun = fun(B, _K, Acc) -> sets:add_element(B, Acc) end,
BucketListQuery = {binary_bucketlist,
?RIAK_TAG,
{FoldBucketsFun, sets:new()}},
{async, BLF1} = leveled_bookie:book_returnfolder(Book4, BucketListQuery),
SW_QA = os:timestamp(),
BucketSet1 = BLF1(),
io:format("Bucket set returned in ~w microseconds",
[timer:now_diff(os:timestamp(), SW_QA)]),
true = sets:size(BucketSet1) == 1,
true = sets:is_element(list_to_binary("Bucket"), BucketSet1),
ObjList10A = testutil:generate_objects(5000,
binary_uuid,
[],
V9,
Indexes9,
"BucketA"),
ObjList10B = testutil:generate_objects(5000,
binary_uuid,
[],
V9,
Indexes9,
"BucketB"),
ObjList10C = testutil:generate_objects(5000,
binary_uuid,
[],
V9,
Indexes9,
"BucketC"),
testutil:riakload(Book4, ObjList10A),
testutil:riakload(Book4, ObjList10B),
testutil:riakload(Book4, ObjList10C),
{async, BLF2} = leveled_bookie:book_returnfolder(Book4, BucketListQuery),
SW_QB = os:timestamp(),
BucketSet2 = BLF2(),
io:format("Bucket set returned in ~w microseconds",
[timer:now_diff(os:timestamp(), SW_QB)]),
true = sets:size(BucketSet2) == 4,
ok = leveled_bookie:book_close(Book4),
{ok, Book5} = leveled_bookie:book_start(RootPath, 2000, 50000000),
{async, BLF3} = leveled_bookie:book_returnfolder(Book5, BucketListQuery),
SW_QC = os:timestamp(),
BucketSet3 = BLF3(),
io:format("Bucket set returned in ~w microseconds",
[timer:now_diff(os:timestamp(), SW_QC)]),
true = sets:size(BucketSet3) == 4,
ok = leveled_bookie:book_close(Book5),
testutil:reset_filestructure().

View file

@ -231,6 +231,17 @@ generate_objects(Count, KeyNumber, ObjL, Value, IndexGen) ->
generate_objects(0, _KeyNumber, ObjL, _Value, _IndexGen, _Bucket) ->
ObjL;
generate_objects(Count, binary_uuid, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} = set_object(list_to_binary(Bucket),
list_to_binary(leveled_codec:generate_uuid()),
Value,
IndexGen),
generate_objects(Count - 1,
binary_uuid,
ObjL ++ [{random:uniform(), Obj1, Spec1}],
Value,
IndexGen,
Bucket);
generate_objects(Count, uuid, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} = set_object(Bucket,
leveled_codec:generate_uuid(),