diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 74fad3e..d12687c 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -729,6 +729,7 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> no_lookup), Folder = fun() -> BucketAcc = get_nextbucket(null, + null, Tag, LedgerSnapshot, []), @@ -739,26 +740,40 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> end, {async, Folder}. -get_nextbucket(NextBucket, Tag, LedgerSnapshot, BKList) -> - StartKey = leveled_codec:to_ledgerkey(NextBucket, null, Tag), +get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) -> + Now = leveled_codec:integer_now(), + StartKey = leveled_codec:to_ledgerkey(NextBucket, NextKey, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - ExtractFun = fun(LK, _V, _Acc) -> leveled_codec:from_ledgerkey(LK) end, - BK = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot, - StartKey, - EndKey, - ExtractFun, - null), - case BK of + ExtractFun = + fun(LK, V, _Acc) -> + {leveled_codec:from_ledgerkey(LK), V} + end, + R = leveled_penciller:pcl_fetchnextkey(LedgerSnapshot, + StartKey, + EndKey, + ExtractFun, + null), + case R of null -> leveled_log:log("B0008",[]), BKList; - {B, K} when is_binary(B) -> - leveled_log:log("B0009",[B]), - get_nextbucket(<>, - Tag, - LedgerSnapshot, - [{B, K}|BKList]); - NB -> + {{B, K}, V} when is_binary(B), is_binary(K) -> + case leveled_codec:is_active({B, K}, V, Now) of + true -> + leveled_log:log("B0009",[B]), + get_nextbucket(<>, + null, + Tag, + LedgerSnapshot, + [{B, K}|BKList]); + false -> + get_nextbucket(B, + <>, + Tag, + LedgerSnapshot, + BKList) + end; + {NB, _V} -> leveled_log:log("B0010",[NB]), [] end. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 0183240..07c6aeb 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -80,7 +80,7 @@ handle_info/2, terminate/2, clerk_new/1, - clerk_compact/6, + clerk_compact/7, clerk_hashtablecalc/3, clerk_stop/1, code_change/3]). @@ -124,14 +124,15 @@ clerk_new(InkerClerkOpts) -> gen_server:start(?MODULE, [InkerClerkOpts], []). -clerk_compact(Pid, Checker, InitiateFun, FilterFun, Inker, Timeout) -> +clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) -> gen_server:cast(Pid, {compact, Checker, InitiateFun, + CloseFun, FilterFun, Inker, - Timeout}). + TimeO}). clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> {ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []), @@ -171,7 +172,7 @@ init([IClerkOpts]) -> handle_call(_Msg, _From, State) -> {reply, not_supported, State}. -handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, +handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, State) -> % Empty the waste folder clear_waste(State), @@ -207,11 +208,13 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, update_inker(Inker, ManifestSlice, FilesToDelete), + ok = CloseFun(FilterServer), {noreply, State} end; Score -> leveled_log:log("IC003", [Score]), ok = leveled_inker:ink_compactioncomplete(Inker), + ok = CloseFun(FilterServer), {noreply, State} end; handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index f261b19..69d9a3f 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -275,22 +275,25 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> %% required to reload the Ledger on startup). ink_compactjournal(Pid, Bookie, Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, + CheckerCloseFun = fun leveled_penciller:pcl_close/1, CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3, gen_server:call(Pid, {compact, Bookie, CheckerInitiateFun, + CheckerCloseFun, CheckerFilterFun, Timeout}, infinity). %% Allows the Checker to be overriden in test, use something other than a %% penciller -ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) -> +ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) -> gen_server:call(Pid, {compact, Checker, InitiateFun, + CloseFun, FilterFun, Timeout}, infinity). @@ -427,12 +430,14 @@ handle_call(print_manifest, _From, State) -> handle_call({compact, Checker, InitiateFun, + CloseFun, FilterFun, Timeout}, _From, State) -> leveled_iclerk:clerk_compact(State#state.clerk, Checker, InitiateFun, + CloseFun, FilterFun, self(), Timeout), @@ -839,7 +844,6 @@ initiate_penciller_snapshot(Bookie) -> MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. - %%%============================================================================ %%% Test %%%============================================================================ @@ -1001,6 +1005,7 @@ compact_journal_test() -> ok = ink_compactjournal(Ink1, Checker, fun(X) -> {X, 55} end, + fun(_F) -> ok end, fun(L, K, SQN) -> lists:member({SQN, K}, L) end, 5000), timer:sleep(1000), @@ -1010,6 +1015,7 @@ compact_journal_test() -> ok = ink_compactjournal(Ink1, Checker2, fun(X) -> {X, 55} end, + fun(_F) -> ok end, fun(L, K, SQN) -> lists:member({SQN, K}, L) end, 5000), timer:sleep(1000), @@ -1041,6 +1047,7 @@ empty_manifest_test() -> ok = ink_compactjournal(Ink1, [], fun(X) -> {X, 55} end, + fun(_F) -> ok end, CheckFun, 5000), timer:sleep(1000), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 37424c2..932e062 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -8,7 +8,8 @@ fetchput_snapshot/1, load_and_count/1, load_and_count_withdelete/1, - space_clear_ondelete/1 + space_clear_ondelete/1, + is_empty_test/1 ]). all() -> [ @@ -18,7 +19,8 @@ all() -> [ fetchput_snapshot, load_and_count, load_and_count_withdelete, - space_clear_ondelete + space_clear_ondelete, + is_empty_test ]. @@ -591,4 +593,58 @@ space_clear_ondelete(_Config) -> true = length(FNsD_L) < length(FNsA_L), true = length(FNsD_L) < length(FNsB_L), true = length(FNsD_L) < length(FNsC_L), - true = length(FNsD_L) == 0. \ No newline at end of file + true = length(FNsD_L) == 0. + + + +is_empty_test(_Config) -> + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + + {B1, K1, V1, Spec, MD} = {term_to_binary("Bucket1"), + term_to_binary("Key1"), + "Value1", + [], + [{"MDK1", "MDV1"}]}, + {TestObject1, TestSpec1} = + testutil:generate_testobject(B1, K1, V1, Spec, MD), + {B1, K2, V2, Spec, MD} = {term_to_binary("Bucket1"), + term_to_binary("Key2"), + "Value2", + [], + [{"MDK1", "MDV1"}]}, + {TestObject2, TestSpec2} = + testutil:generate_testobject(B1, K2, V2, Spec, MD), + {B2, K3, V3, Spec, MD} = {term_to_binary("Bucket2"), + term_to_binary("Key3"), + "Value3", + [], + [{"MDK1", "MDV1"}]}, + {TestObject3, TestSpec3} = + testutil:generate_testobject(B2, K3, V3, Spec, MD), + ok = testutil:book_riakput(Bookie1, TestObject1, TestSpec1), + ok = testutil:book_riakput(Bookie1, TestObject2, TestSpec2), + ok = testutil:book_riakput(Bookie1, TestObject3, TestSpec3), + + FoldBucketsFun = fun(B, Acc) -> sets:add_element(B, Acc) end, + BucketListQuery = {binary_bucketlist, + ?RIAK_TAG, + {FoldBucketsFun, sets:new()}}, + {async, BL} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery), + true = sets:size(BL()) == 2, + + ok = leveled_bookie:book_put(Bookie1, B2, K3, delete, [], ?RIAK_TAG), + {async, BLpd1} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery), + true = sets:size(BLpd1()) == 1, + + ok = leveled_bookie:book_put(Bookie1, B1, K2, delete, [], ?RIAK_TAG), + {async, BLpd2} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery), + true = sets:size(BLpd2()) == 1, + + ok = leveled_bookie:book_put(Bookie1, B1, K1, delete, [], ?RIAK_TAG), + {async, BLpd3} = leveled_bookie:book_returnfolder(Bookie1, BucketListQuery), + true = sets:size(BLpd3()) == 0, + + ok = leveled_bookie:book_close(Bookie1). diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 8c0246e..dd03edb 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -97,7 +97,7 @@ small_load_with2i(_Config) -> IdxQ1 = {index_query, "Bucket", {fun testutil:foldkeysfun/3, []}, - {"idx1_bin", "#", "~"}, + {"idx1_bin", "#", "|"}, {true, undefined}}, {async, IdxFolder} = leveled_bookie:book_returnfolder(Bookie1, IdxQ1), KeyList1 = lists:usort(IdxFolder()), @@ -106,7 +106,7 @@ small_load_with2i(_Config) -> IdxQ2 = {index_query, {"Bucket", LastKey}, {fun testutil:foldkeysfun/3, []}, - {"idx1_bin", LastTerm, "~"}, + {"idx1_bin", LastTerm, "|"}, {false, undefined}}, {async, IdxFolderLK} = leveled_bookie:book_returnfolder(Bookie1, IdxQ2), KeyList2 = lists:usort(IdxFolderLK()), @@ -141,7 +141,7 @@ small_load_with2i(_Config) -> ?RIAK_TAG, "Bucket", {"idx1_bin", - "#", "~"}, + "#", "|"}, FoldObjectsFun}), KeyHashList3 = HTreeF3(), true = 9901 == length(KeyHashList1), % also includes the test object @@ -186,7 +186,7 @@ query_count(_Config) -> testutil:sync_strategy()), BucketBin = list_to_binary("Bucket"), {TestObject, TestSpec} = testutil:generate_testobject(BucketBin, - "Key1", + term_to_binary("Key1"), "Value1", [], [{"MDK1", "MDV1"}]), @@ -269,7 +269,7 @@ query_count(_Config) -> Query1 = {index_query, BucketBin, {fun testutil:foldkeysfun/3, []}, - {"idx2_bin", "2000", "2000~"}, + {"idx2_bin", "2000", "2000|"}, {false, RegMia}}, {async, Mia2KFolder1} = leveled_bookie:book_returnfolder(Book2, Query1), @@ -396,8 +396,7 @@ query_count(_Config) -> io:format("Bucket set returned in ~w microseconds", [timer:now_diff(os:timestamp(), SW_QA)]), - true = sets:size(BucketSet1) == 1, - true = sets:is_element(list_to_binary("Bucket"), BucketSet1), + true = sets:size(BucketSet1) == 1, ObjList10A = testutil:generate_objects(5000, binary_uuid,