From 0d8ab0899e808aacfcd89e93fecdd853c66cba60 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Tue, 23 May 2017 11:59:44 +0100 Subject: [PATCH 1/3] Add test for is_empty Bucket listing didn't care if keys were active - now does. --- src/leveled_bookie.erl | 55 ++++++++++++++++++----------- test/end_to_end/basic_SUITE.erl | 62 +++++++++++++++++++++++++++++++-- 2 files changed, 94 insertions(+), 23 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 970b54c..d12687c 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -179,7 +179,7 @@ book_start(Opts) -> %% @doc Put an object with an expiry time %% %% Put an item in the store but with a Time To Live - the time when the object -%% should expire, in gregorian_sconds (add the required number of seconds to +%% should expire, in gregorian_seconds (add the required number of seconds to %% leveled_codec:integer_time/1). %% %% There exists the possibility of per object expiry times, not just whole @@ -729,6 +729,7 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> no_lookup), Folder = fun() -> BucketAcc = get_nextbucket(null, + null, Tag, LedgerSnapshot, []), @@ -739,26 +740,40 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> end, {async, Folder}. -get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) -> - StartKey = leveled_codec:to_ledgerkey(NextBucket, null, Tag), +get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) -> + Now = leveled_codec:integer_now(), + StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - ExtractFun = fun(LK, _V, _Acc) -> leveled_codec:from_ledgerkey(LK) end, - BK = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot, - StartKey, - EndKey, - ExtractFun, - null), - case BK of + ExtractFun = + fun(LK, V, _Acc) -> + {leveled_codec:from_ledgerkey(LK), V} + end, + R = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot, + StartKey, + EndKey, + ExtractFun, + null), + case R of null -> leveled_log:log("B0008",[]), BKList; - {B, K} when is_binary(B) -> - leveled_log:log("B0009",[B]), - get_nextbucket(<>, - Tag, - LedgerSnapshot, - [{B, K}|BKList]); - NB -> + {{B, K}, V} when is_binary(B), is_binary(K) -> + case leveled_codec:is_active({B, K}, V, Now) of + true -> + leveled_log:log("B0009",[B]), + get_nextbucket(<>, + null, + Tag, + LedgerSnapshot, + [{B, K}|BKList]); + false -> + get_nextbucket(B, + <>, + Tag, + LedgerSnapshot, + BKList) + end; + {NB, _V} -> leveled_log:log("B0010",[NB]), [] end. @@ -1308,11 +1323,11 @@ maybe_withjitter(CacheSize, MaxCacheSize) -> -load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> +load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun) -> {MinSQN, MaxSQN, OutputTree} = Acc0, - {SQN, Type, PK} = KeyInLedger, + {SQN, Type, PK} = KeyInJournal, % VBin may already be a term - {VBin, VSize} = ExtractFun(ValueInLedger), + {VBin, VSize} = ExtractFun(ValueInJournal), {Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin), case SQN of SQN when SQN < MinSQN -> diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 37424c2..932e062 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -8,7 +8,8 @@ fetchput_snapshot/1, load_and_count/1, load_and_count_withdelete/1, - space_clear_ondelete/1 + space_clear_ondelete/1, + is_empty_test/1 ]). all() -> [ @@ -18,7 +19,8 @@ all() -> [ fetchput_snapshot, load_and_count, load_and_count_withdelete, - space_clear_ondelete + space_clear_ondelete, + is_empty_test ]. @@ -591,4 +593,58 @@ space_clear_ondelete(_Config) -> true = length(FNsD_L) < length(FNsA_L), true = length(FNsD_L) < length(FNsB_L), true = length(FNsD_L) < length(FNsC_L), - true = length(FNsD_L) == 0. \ No newline at end of file + true = length(FNsD_L) == 0. + + + +is_empty_test(_Config) -> + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + + {B1, K1, V1, Spec, MD} = {term_to_binary("Bucket1"), + term_to_binary("Key1"), + "Value1", + [], + [{"MDK1", "MDV1"}]}, + {TestObject1, TestSpec1} = + testutil:generate_testobject(B1, K1, V1, Spec, MD), + {B1, K2, V2, Spec, MD} = {term_to_binary("Bucket1"), + term_to_binary("Key2"), + "Value2", + [], + [{"MDK1", "MDV1"}]}, + {TestObject2, TestSpec2} = + testutil:generate_testobject(B1, K2, V2, Spec, MD), + {B2, K3, V3, Spec, MD} = {term_to_binary("Bucket2"), + term_to_binary("Key3"), + "Value3", + [], + [{"MDK1", "MDV1"}]}, + {TestObject3, TestSpec3} = + testutil:generate_testobject(B2, K3, V3, Spec, MD), + ok = testutil:book_riakput(Bookie1, TestObject1, TestSpec1), + ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2), + ok = testutil:book_riakput(Bookie1, TestObject3, TestSpec3), + + FoldBucketsFun = fun(B, Acc) -> sets:add_element(B, Acc) end, + BucketListQuery = {binary_bucketlist, + ?RIAK_TAG, + {FoldBucketsFun, sets:new()}}, + {async, BL} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery), + true = sets:size(BL()) == 2, + + ok = leveled_bookie:book_put(Bookie1, B2, K3, delete, [], ?RIAK_TAG), + {async, BLpd1} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery), + true = sets:size(BLpd1()) == 1, + + ok = leveled_bookie:book_put(Bookie1, B1, K2, delete, [], ?RIAK_TAG), + {async, BLpd2} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery), + true = sets:size(BLpd2()) == 1, + + ok = leveled_bookie:book_put(Bookie1, B1, K1, delete, [], ?RIAK_TAG), + {async, BLpd3} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery), + true = sets:size(BLpd3()) == 0, + + ok = leveled_bookie:book_close(Bookie1). From 96a548e17a760bdb78e5bac909b94a6c720540b7 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 23 May 2017 15:54:11 +0100 Subject: [PATCH 2/3] Change tests - binary keys the new code requires bucket listing to be on binary keys not just binary buckets. As this is only intended for use within Riak (where all keys are buckets are binaries), this constraint seems OK. A test needed changing to ensure it had a binary key in the bucket. --- test/end_to_end/iterator_SUITE.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 8c0246e..dd03edb 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -97,7 +97,7 @@ small_load_with2i(_Config) -> IdxQ1 = {index_query, "Bucket", {fun testutil:foldkeysfun/3, []}, - {"idx1_bin", "#", "~"}, + {"idx1_bin", "#", "|"}, {true, undefined}}, {async, IdxFolder} = leveled_bookie:book_returnfolder(Bookie1, IdxQ1), KeyList1 = lists:usort(IdxFolder()), @@ -106,7 +106,7 @@ small_load_with2i(_Config) -> IdxQ2 = {index_query, {"Bucket", LastKey}, {fun testutil:foldkeysfun/3, []}, - {"idx1_bin", LastTerm, "~"}, + {"idx1_bin", LastTerm, "|"}, {false, undefined}}, {async, IdxFolderLK} = leveled_bookie:book_returnfolder(Bookie1, IdxQ2), KeyList2 = lists:usort(IdxFolderLK()), @@ -141,7 +141,7 @@ small_load_with2i(_Config) -> ?RIAK_TAG, "Bucket", {"idx1_bin", - "#", "~"}, + "#", "|"}, FoldObjectsFun}), KeyHashList3 = HTreeF3(), true = 9901 == length(KeyHashList1), % also includes the test object @@ -186,7 +186,7 @@ query_count(_Config) -> testutil:sync_strategy()), BucketBin = list_to_binary("Bucket"), {TestObject, TestSpec} = testutil:generate_testobject(BucketBin, - "Key1", + term_to_binary("Key1"), "Value1", [], [{"MDK1", "MDV1"}]), @@ -269,7 +269,7 @@ query_count(_Config) -> Query1 = {index_query, BucketBin, {fun testutil:foldkeysfun/3, []}, - {"idx2_bin", "2000", "2000~"}, + {"idx2_bin", "2000", "2000|"}, {false, RegMia}}, {async, Mia2KFolder1} = leveled_bookie:book_returnfolder(Book2, Query1), @@ -396,8 +396,7 @@ query_count(_Config) -> io:format("Bucket set returned in ~w microseconds", [timer:now_diff(os:timestamp(), SW_QA)]), - true = sets:size(BucketSet1) == 1, - true = sets:is_element(list_to_binary("Bucket"), BucketSet1), + true = sets:size(BucketSet1) == 1, ObjList10A = testutil:generate_objects(5000, binary_uuid, From c664176247c6eb87d964fe700677ed70ecbeace9 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 26 May 2017 10:51:30 +0100 Subject: [PATCH 3/3] Release penciller snapshot after journal compaction As otherwise memory consumption beocmes an issue, as they will take an hour to timeout naturally. --- src/leveled_iclerk.erl | 11 +++++++---- src/leveled_inker.erl | 11 +++++++++-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 0183240..07c6aeb 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -80,7 +80,7 @@ handle_info/2, terminate/2, clerk_new/1, - clerk_compact/6, + clerk_compact/7, clerk_hashtablecalc/3, clerk_stop/1, code_change/3]). @@ -124,14 +124,15 @@ clerk_new(InkerClerkOpts) -> gen_server:start(?MODULE, [InkerClerkOpts], []). -clerk_compact(Pid, Checker, InitiateFun, FilterFun, Inker, Timeout) -> +clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) -> gen_server:cast(Pid, {compact, Checker, InitiateFun, + CloseFun, FilterFun, Inker, - Timeout}). + TimeO}). clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> {ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []), @@ -171,7 +172,7 @@ init([IClerkOpts]) -> handle_call(_Msg, _From, State) -> {reply, not_supported, State}. -handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, +handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, State) -> % Empty the waste folder clear_waste(State), @@ -207,11 +208,13 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, update_inker(Inker, ManifestSlice, FilesToDelete), + ok = CloseFun(FilterServer), {noreply, State} end; Score -> leveled_log:log("IC003", [Score]), ok = leveled_inker:ink_compactioncomplete(Inker), + ok = CloseFun(FilterServer), {noreply, State} end; handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2513a4a..15aa494 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -178,22 +178,25 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> ink_compactjournal(Pid, Bookie, Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, + CheckerCloseFun = fun leveled_penciller:pcl_close/1, CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3, gen_server:call(Pid, {compact, Bookie, CheckerInitiateFun, + CheckerCloseFun, CheckerFilterFun, Timeout}, infinity). %% Allows the Checker to be overriden in test, use something other than a %% penciller -ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) -> +ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) -> gen_server:call(Pid, {compact, Checker, InitiateFun, + CloseFun, FilterFun, Timeout}, infinity). @@ -308,12 +311,14 @@ handle_call(print_manifest, _From, State) -> handle_call({compact, Checker, InitiateFun, + CloseFun, FilterFun, Timeout}, _From, State) -> leveled_iclerk:clerk_compact(State#state.clerk, Checker, InitiateFun, + CloseFun, FilterFun, self(), Timeout), @@ -720,7 +725,6 @@ initiate_penciller_snapshot(Bookie) -> MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. - %%%============================================================================ %%% Test %%%============================================================================ @@ -878,6 +882,7 @@ compact_journal_test() -> ok = ink_compactjournal(Ink1, Checker, fun(X) -> {X, 55} end, + fun(_F) -> ok end, fun(L, K, SQN) -> lists:member({SQN, K}, L) end, 5000), timer:sleep(1000), @@ -887,6 +892,7 @@ compact_journal_test() -> ok = ink_compactjournal(Ink1, Checker2, fun(X) -> {X, 55} end, + fun(_F) -> ok end, fun(L, K, SQN) -> lists:member({SQN, K}, L) end, 5000), timer:sleep(1000), @@ -918,6 +924,7 @@ empty_manifest_test() -> ok = ink_compactjournal(Ink1, [], fun(X) -> {X, 55} end, + fun(_F) -> ok end, CheckFun, 5000), timer:sleep(1000),