From 386d40928b2e1feb572b842276d010a87f426555 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Sun, 20 Nov 2016 21:21:31 +0000 Subject: [PATCH] Fast List Buckets Copied the technique from HanoiDB to speed up list buckets. --- src/leveled_bookie.erl | 51 ++++++++++++ src/leveled_log.erl | 6 ++ src/leveled_penciller.erl | 96 ++++++++++++++++------- test/end_to_end/iterator_SUITE.erl | 120 +++++++++++++++++++++-------- test/end_to_end/testutil.erl | 11 +++ 5 files changed, 224 insertions(+), 60 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 9c215e0..40050b3 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -335,6 +335,10 @@ handle_call({return_folder, FolderType}, _From, State) -> {reply, bucket_stats(State, Bucket, ?RIAK_TAG), State}; + {binary_bucketlist, Tag, {FoldKeysFun, Acc}} -> + {reply, + binary_bucketlist(State, Tag, {FoldKeysFun, Acc}), + State}; {index_query, Constraint, {FoldKeysFun, Acc}, @@ -430,6 +434,53 @@ bucket_stats(State, Bucket, Tag) -> end, {async, Folder}. + +binary_bucketlist(State, Tag, {FoldKeysFun, InitAcc}) -> + % List buckets for tag, assuming bucket names are all binary type + {ok, + {LedgerSnapshot, LedgerCache}, + _JournalSnapshot} = snapshot_store(State, ledger), + Folder = fun() -> + leveled_log:log("B0004", [gb_trees:size(LedgerCache)]), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, + LedgerCache), + BucketAcc = get_nextbucket(null, + Tag, + LedgerSnapshot, + []), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + lists:foldl(fun({B, K}, Acc) -> FoldKeysFun(B, K, Acc) end, + InitAcc, + BucketAcc) + end, + {async, Folder}. + +get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) -> + StartKey = leveled_codec:to_ledgerkey(NextBucket, null, Tag), + EndKey = leveled_codec:to_ledgerkey(null, null, Tag), + ExtractFun = fun(LK, _V, _Acc) -> leveled_codec:from_ledgerkey(LK) end, + BK = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot, + StartKey, + EndKey, + ExtractFun, + null), + case BK of + null -> + leveled_log:log("B0008",[]), + BKList; + {B, K} when is_binary(B) -> + leveled_log:log("B0009",[B]), + get_nextbucket(<>, + Tag, + LedgerSnapshot, + [{B, K}|BKList]); + NB -> + leveled_log:log("B0010",[NB]), + [] + + end. + + index_query(State, Constraint, {FoldKeysFun, InitAcc}, diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 4fc4b2c..d33928d 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -32,6 +32,12 @@ {info, "Reached end of load batch with SQN ~w"}}, {"B0007", {info, "Skipping as exceeded MaxSQN ~w with SQN ~w"}}, + {"B0008", + {info, "Bucket list finds no more results"}}, + {"B0009", + {info, "Bucket list finds Bucket ~w"}}, + {"B0010", + {info, "Bucket list finds non-binary Bucket ~w"}}, {"P0001", {info, "Ledger snapshot ~w registered"}}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index fa43727..87dcbdc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -169,6 +169,7 @@ pcl_fetchlevelzero/2, pcl_fetch/2, pcl_fetchkeys/5, + pcl_fetchnextkey/5, pcl_checksequencenumber/3, pcl_workforclerk/1, pcl_promptmanifestchange/2, @@ -218,6 +219,7 @@ is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid(), + levelzero_astree :: gb_trees:tree(), ongoing_work = [] :: list(), work_backlog = false :: boolean()}). @@ -248,7 +250,12 @@ pcl_fetch(Pid, Key) -> pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> gen_server:call(Pid, - {fetch_keys, StartKey, EndKey, AccFun, InitAcc}, + {fetch_keys, StartKey, EndKey, AccFun, InitAcc, -1}, + infinity). + +pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> + gen_server:call(Pid, + {fetch_keys, StartKey, EndKey, AccFun, InitAcc, 1}, infinity). pcl_checksequencenumber(Pid, Key, SQN) -> @@ -352,20 +359,29 @@ handle_call({check_sqn, Key, SQN}, _From, State) -> State#state.levelzero_cache), SQN), State}; -handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc}, +handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> - L0AsTree = leveled_pmem:merge_trees(StartKey, - EndKey, - State#state.levelzero_cache, - gb_trees:empty()), + L0AsTree = + case State#state.levelzero_astree of + undefined -> + leveled_pmem:merge_trees(StartKey, + EndKey, + State#state.levelzero_cache, + gb_trees:empty()); + Tree -> + Tree + end, L0iter = gb_trees:iterator(L0AsTree), SFTiter = initiate_rangequery_frommanifest(StartKey, EndKey, State#state.manifest), - Acc = keyfolder(L0iter, SFTiter, StartKey, EndKey, {AccFun, InitAcc}), - {reply, Acc, State}; + Acc = keyfolder({L0iter, SFTiter}, + {StartKey, EndKey}, + {AccFun, InitAcc}, + MaxKeys), + {reply, Acc, State#state{levelzero_astree = L0AsTree}}; handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), {reply, Work, UpdState}; @@ -956,37 +972,56 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) -> end. -keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc}) -> - case find_nextkey(SFTiterator, StartKey, EndKey) of +keyfolder(IMMiter, SFTiter, StartKey, EndKey, {AccFun, Acc}) -> + keyfolder({IMMiter, SFTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1). + +keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 -> + Acc; +keyfolder({null, SFTiter}, KeyRange, {AccFun, Acc}, MaxKeys) -> + {StartKey, EndKey} = KeyRange, + case find_nextkey(SFTiter, StartKey, EndKey) of no_more_keys -> Acc; - {NxtSFTiterator, {SFTKey, SFTVal}} -> + {NxSFTiter, {SFTKey, SFTVal}} -> Acc1 = AccFun(SFTKey, SFTVal, Acc), - keyfolder(null, NxtSFTiterator, StartKey, EndKey, {AccFun, Acc1}) + keyfolder({null, NxSFTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1) end; -keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) -> +keyfolder({IMMiterator, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys) -> + {StartKey, EndKey} = KeyRange, case gb_trees:next(IMMiterator) of none -> % There are no more keys in the in-memory iterator, so now % iterate only over the remaining keys in the SFT iterator - keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc}); - {IMMKey, IMMVal, NxtIMMiterator} -> + keyfolder({null, SFTiterator}, KeyRange, {AccFun, Acc}, MaxKeys); + {IMMKey, _IMMVal, NxIMMiterator} when IMMKey < StartKey -> + % Normally everything is pre-filterd, but the IMM iterator can + % be re-used and do may be behind the StartKey if the StartKey has + % advanced from the previous use + keyfolder({NxIMMiterator, SFTiterator}, + KeyRange, + {AccFun, Acc}, + MaxKeys); + {IMMKey, IMMVal, NxIMMiterator} -> case leveled_codec:endkey_passed(EndKey, IMMKey) of true -> % There are no more keys in-range in the in-memory % iterator, so take action as if this iterator is empty % (see above) - keyfolder(null, SFTiterator, - StartKey, EndKey, {AccFun, Acc}); + keyfolder({null, SFTiterator}, + KeyRange, + {AccFun, Acc}, + MaxKeys); false -> case find_nextkey(SFTiterator, StartKey, EndKey) of no_more_keys -> % No more keys in range in the persisted store, so use the % in-memory KV as the next Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder(NxtIMMiterator, SFTiterator, - StartKey, EndKey, {AccFun, Acc1}); - {NxtSFTiterator, {SFTKey, SFTVal}} -> + keyfolder({NxIMMiterator, SFTiterator}, + KeyRange, + {AccFun, Acc1}, + MaxKeys - 1); + {NxSFTiterator, {SFTKey, SFTVal}} -> % There is a next key, so need to know which is the % next key between the two (and handle two keys % with different sequence numbers). @@ -996,19 +1031,22 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) -> SFTVal}) of left_hand_first -> Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder(NxtIMMiterator, SFTiterator, - StartKey, EndKey, - {AccFun, Acc1}); + keyfolder({NxIMMiterator, SFTiterator}, + KeyRange, + {AccFun, Acc1}, + MaxKeys - 1); right_hand_first -> Acc1 = AccFun(SFTKey, SFTVal, Acc), - keyfolder(IMMiterator, NxtSFTiterator, - StartKey, EndKey, - {AccFun, Acc1}); + keyfolder({IMMiterator, NxSFTiterator}, + KeyRange, + {AccFun, Acc1}, + MaxKeys - 1); left_hand_dominant -> Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder(NxtIMMiterator, NxtSFTiterator, - StartKey, EndKey, - {AccFun, Acc1}) + keyfolder({NxIMMiterator, NxSFTiterator}, + KeyRange, + {AccFun, Acc1}, + MaxKeys - 1) end end end diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 77c2a60..1cab55a 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -95,17 +95,13 @@ small_load_with2i(_Config) -> true = 9900 == length(KeyHashList2), true = 9900 == length(KeyHashList3), - SumIntegerFun = fun(_B, _K, V, Acc) -> - [C] = V#r_object.contents, - {I, _Bin} = C#r_content.value, - Acc + I - end, - {async, Sum1} = leveled_bookie:book_returnfolder(Bookie1, - {foldobjects_bybucket, - ?RIAK_TAG, - "Bucket", - {SumIntegerFun, - 0}}), + SumIntFun = fun(_B, _K, V, Acc) -> + [C] = V#r_object.contents, + {I, _Bin} = C#r_content.value, + Acc + I + end, + BucketObjQ = {foldobjects_bybucket, ?RIAK_TAG, "Bucket", {SumIntFun, 0}}, + {async, Sum1} = leveled_bookie:book_returnfolder(Bookie1, BucketObjQ), Total1 = Sum1(), true = Total1 > 100000, @@ -113,15 +109,19 @@ small_load_with2i(_Config) -> {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), - {async, Sum2} = leveled_bookie:book_returnfolder(Bookie2, - {foldobjects_bybucket, - ?RIAK_TAG, - "Bucket", - {SumIntegerFun, - 0}}), + {async, Sum2} = leveled_bookie:book_returnfolder(Bookie2, BucketObjQ), Total2 = Sum2(), true = Total2 == Total1, + FoldBucketsFun = fun(B, _K, Acc) -> sets:add_element(B, Acc) end, + % Should not find any buckets - as there is a non-binary bucket, and no + % binary ones + BucketListQuery = {binary_bucketlist, + ?RIAK_TAG, + {FoldBucketsFun, sets:new()}}, + {async, BL} = leveled_bookie:book_returnfolder(Bookie2, BucketListQuery), + true = sets:size(BL()) == 0, + ok = leveled_bookie:book_close(Bookie2), testutil:reset_filestructure(). @@ -129,7 +129,8 @@ small_load_with2i(_Config) -> query_count(_Config) -> RootPath = testutil:reset_filestructure(), {ok, Book1} = leveled_bookie:book_start(RootPath, 2000, 50000000), - {TestObject, TestSpec} = testutil:generate_testobject("Bucket", + BucketBin = list_to_binary("Bucket"), + {TestObject, TestSpec} = testutil:generate_testobject(BucketBin, "Key1", "Value1", [], @@ -143,7 +144,7 @@ query_count(_Config) -> Indexes = testutil:get_randomindexes_generator(8), SW = os:timestamp(), ObjL1 = testutil:generate_objects(10000, - uuid, + binary_uuid, [], V, Indexes), @@ -157,7 +158,7 @@ query_count(_Config) -> testutil:check_forobject(Book1, TestObject), Total = lists:foldl(fun(X, Acc) -> IdxF = "idx" ++ integer_to_list(X) ++ "_bin", - T = count_termsonindex("Bucket", + T = count_termsonindex(BucketBin, IdxF, Book1, ?KEY_ONLY), @@ -171,13 +172,13 @@ query_count(_Config) -> 640000 -> ok end, - Index1Count = count_termsonindex("Bucket", + Index1Count = count_termsonindex(BucketBin, "idx1_bin", Book1, ?KEY_ONLY), ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(RootPath, 1000, 50000000), - Index1Count = count_termsonindex("Bucket", + Index1Count = count_termsonindex(BucketBin, "idx1_bin", Book2, ?KEY_ONLY), @@ -186,7 +187,7 @@ query_count(_Config) -> {ok, Regex} = re:compile("[0-9]+" ++ Name), SW = os:timestamp(), - T = count_termsonindex("Bucket", + T = count_termsonindex(BucketBin, "idx1_bin", Book2, {false, @@ -208,7 +209,7 @@ query_count(_Config) -> end, {ok, RegMia} = re:compile("[0-9]+Mia"), Query1 = {index_query, - "Bucket", + BucketBin, {fun testutil:foldkeysfun/3, []}, {"idx2_bin", "2000", "2000~"}, {false, RegMia}}, @@ -216,7 +217,7 @@ query_count(_Config) -> Mia2KFolder1} = leveled_bookie:book_returnfolder(Book2, Query1), Mia2000Count1 = length(Mia2KFolder1()), Query2 = {index_query, - "Bucket", + BucketBin, {fun testutil:foldkeysfun/3, []}, {"idx2_bin", "2000", "2001"}, {true, undefined}}, @@ -239,7 +240,7 @@ query_count(_Config) -> end, {ok, RxMia2K} = re:compile("^2000[0-9]+Mia"), Query3 = {index_query, - "Bucket", + BucketBin, {fun testutil:foldkeysfun/3, []}, {"idx2_bin", "1980", "2100"}, {false, RxMia2K}}, @@ -249,11 +250,15 @@ query_count(_Config) -> V9 = testutil:get_compressiblevalue(), Indexes9 = testutil:get_randomindexes_generator(8), - [{_RN, Obj9, Spc9}] = testutil:generate_objects(1, uuid, [], V9, Indexes9), + [{_RN, Obj9, Spc9}] = testutil:generate_objects(1, + binary_uuid, + [], + V9, + Indexes9), ok = testutil:book_riakput(Book2, Obj9, Spc9), R9 = lists:map(fun({add, IdxF, IdxT}) -> Q = {index_query, - "Bucket", + BucketBin, {fun testutil:foldkeysfun/3, []}, {IdxF, IdxT, IdxT}, ?KEY_ONLY}, @@ -270,7 +275,7 @@ query_count(_Config) -> ok = testutil:book_riakput(Book2, Obj9, Spc9Del), lists:foreach(fun({IdxF, IdxT, X}) -> Q = {index_query, - "Bucket", + BucketBin, {fun testutil:foldkeysfun/3, []}, {IdxF, IdxT, IdxT}, ?KEY_ONLY}, @@ -286,7 +291,7 @@ query_count(_Config) -> {ok, Book3} = leveled_bookie:book_start(RootPath, 2000, 50000000), lists:foreach(fun({IdxF, IdxT, X}) -> Q = {index_query, - "Bucket", + BucketBin, {fun testutil:foldkeysfun/3, []}, {IdxF, IdxT, IdxT}, ?KEY_ONLY}, @@ -303,7 +308,7 @@ query_count(_Config) -> {ok, Book4} = leveled_bookie:book_start(RootPath, 2000, 50000000), lists:foreach(fun({IdxF, IdxT, X}) -> Q = {index_query, - "Bucket", + BucketBin, {fun testutil:foldkeysfun/3, []}, {IdxF, IdxT, IdxT}, ?KEY_ONLY}, @@ -316,7 +321,60 @@ query_count(_Config) -> end, R9), testutil:check_forobject(Book4, TestObject), + + FoldBucketsFun = fun(B, _K, Acc) -> sets:add_element(B, Acc) end, + BucketListQuery = {binary_bucketlist, + ?RIAK_TAG, + {FoldBucketsFun, sets:new()}}, + {async, BLF1} = leveled_bookie:book_returnfolder(Book4, BucketListQuery), + SW_QA = os:timestamp(), + BucketSet1 = BLF1(), + io:format("Bucket set returned in ~w microseconds", + [timer:now_diff(os:timestamp(), SW_QA)]), + + true = sets:size(BucketSet1) == 1, + true = sets:is_element(list_to_binary("Bucket"), BucketSet1), + + ObjList10A = testutil:generate_objects(5000, + binary_uuid, + [], + V9, + Indexes9, + "BucketA"), + ObjList10B = testutil:generate_objects(5000, + binary_uuid, + [], + V9, + Indexes9, + "BucketB"), + ObjList10C = testutil:generate_objects(5000, + binary_uuid, + [], + V9, + Indexes9, + "BucketC"), + testutil:riakload(Book4, ObjList10A), + testutil:riakload(Book4, ObjList10B), + testutil:riakload(Book4, ObjList10C), + {async, BLF2} = leveled_bookie:book_returnfolder(Book4, BucketListQuery), + SW_QB = os:timestamp(), + BucketSet2 = BLF2(), + io:format("Bucket set returned in ~w microseconds", + [timer:now_diff(os:timestamp(), SW_QB)]), + true = sets:size(BucketSet2) == 4, + ok = leveled_bookie:book_close(Book4), + + {ok, Book5} = leveled_bookie:book_start(RootPath, 2000, 50000000), + {async, BLF3} = leveled_bookie:book_returnfolder(Book5, BucketListQuery), + SW_QC = os:timestamp(), + BucketSet3 = BLF3(), + io:format("Bucket set returned in ~w microseconds", + [timer:now_diff(os:timestamp(), SW_QC)]), + true = sets:size(BucketSet3) == 4, + + ok = leveled_bookie:book_close(Book5), + testutil:reset_filestructure(). diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index b7fcc8b..768758f 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -231,6 +231,17 @@ generate_objects(Count, KeyNumber, ObjL, Value, IndexGen) -> generate_objects(0, _KeyNumber, ObjL, _Value, _IndexGen, _Bucket) -> ObjL; +generate_objects(Count, binary_uuid, ObjL, Value, IndexGen, Bucket) -> + {Obj1, Spec1} = set_object(list_to_binary(Bucket), + list_to_binary(leveled_codec:generate_uuid()), + Value, + IndexGen), + generate_objects(Count - 1, + binary_uuid, + ObjL ++ [{random:uniform(), Obj1, Spec1}], + Value, + IndexGen, + Bucket); generate_objects(Count, uuid, ObjL, Value, IndexGen, Bucket) -> {Obj1, Spec1} = set_object(Bucket, leveled_codec:generate_uuid(),