diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index c251489..afb6792 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -73,7 +73,7 @@ generate_uuid() -> inker_reload_strategy(AltList) -> ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}], lists:foldl(fun({X, Y}, SList) -> - lists:keyreplace(X, 1, Y, SList) + lists:keyreplace(X, 1, SList, {X, Y}) end, ReloadStrategy0, AltList). @@ -163,12 +163,12 @@ from_journalkey({SQN, _Type, LedgerKey}) -> compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) -> skip; -compact_inkerkvc({{_SQN, ?INKT_KEYD, LK}, _V, _CrcCheck}, Strategy) -> +compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) -> {Tag, _, _, _} = LK, {Tag, TagStrat} = lists:keyfind(Tag, 1, Strategy), case TagStrat of retain -> - skip; + {retain, {{SQN, ?INKT_KEYD, LK}, V, CrcCheck}}; TagStrat -> {TagStrat, null} end; diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 807eaf3..3a9ecc4 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -162,12 +162,13 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, ok = filelib:ensure_dir(FP), Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), - BestRun = assess_candidates(Candidates, MaxRunLength), - case score_run(BestRun, MaxRunLength) of + BestRun0 = assess_candidates(Candidates, MaxRunLength), + case score_run(BestRun0, MaxRunLength) of Score when Score > 0 -> - print_compaction_run(BestRun, MaxRunLength), + BestRun1 = sort_run(BestRun0), + print_compaction_run(BestRun1, MaxRunLength), {ManifestSlice, - PromptDelete} = compact_files(BestRun, + PromptDelete} = compact_files(BestRun1, CDBopts, FilterFun, FilterServer, @@ -178,7 +179,7 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, C#candidate.filename, C#candidate.journal} end, - BestRun), + BestRun1), io:format("Clerk updating Inker as compaction complete of " ++ "~w files~n", [length(FilesToDelete)]), {ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker, @@ -365,6 +366,12 @@ print_compaction_run(BestRun, MaxRunLength) -> end, BestRun). +sort_run(RunOfFiles) -> + CompareFun = fun(Cand1, Cand2) -> + Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end, + lists:sort(CompareFun, RunOfFiles). + + compact_files([], _CDBopts, _FilterFun, _FilterServer, _MaxSQN, _RStrategy) -> {[], 0}; compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> @@ -418,6 +425,8 @@ get_all_positions([], PositionBatches) -> get_all_positions([HeadRef|RestOfBest], PositionBatches) -> SrcJournal = HeadRef#candidate.journal, Positions = leveled_cdb:cdb_getpositions(SrcJournal, all), + io:format("Compaction source ~s has yielded ~w positions~n", + [HeadRef#candidate.filename, length(Positions)]), Batches = split_positions_into_batches(lists:sort(Positions), SrcJournal, []), @@ -768,4 +777,12 @@ compact_empty_file_test() -> Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4), ?assertMatch(100.0, Score1). +compare_candidate_test() -> + Candidate1 = #candidate{low_sqn=1}, + Candidate2 = #candidate{low_sqn=2}, + Candidate3 = #candidate{low_sqn=3}, + Candidate4 = #candidate{low_sqn=4}, + ?assertMatch([Candidate1, Candidate2, Candidate3, Candidate4], + sort_run([Candidate3, Candidate2, Candidate4, Candidate1])). + -endif. \ No newline at end of file diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 327d421..9096bda 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -8,7 +8,7 @@ fetchput_snapshot/1, load_and_count/1, load_and_count_withdelete/1, - space_clear_ondelete_test/1 + space_clear_ondelete/1 ]). all() -> [ @@ -18,7 +18,7 @@ all() -> [ fetchput_snapshot, load_and_count , load_and_count_withdelete, - space_clear_ondelete_test + space_clear_ondelete ]. @@ -398,8 +398,7 @@ load_and_count_withdelete(_Config) -> testutil:reset_filestructure(). -space_clear_ondelete_test(_Config) -> - % Test is a work in progress +space_clear_ondelete(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=20000000}, {ok, Book1} = leveled_bookie:book_start(StartOpts1), diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index f119496..5d34786 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -4,14 +4,14 @@ -include("include/leveled.hrl"). -define(KEY_ONLY, {false, undefined}). --define(RETURN_TERMS, {true, undefined}). -export([all/0]). -export([simple_load_with2i/1, simple_querycount/1, rotating_objects/1]). -all() -> [simple_load_with2i, +all() -> [ + simple_load_with2i, simple_querycount, rotating_objects]. @@ -278,99 +278,16 @@ count_termsonindex(Bucket, IdxField, Book, QType) -> rotating_objects(_Config) -> RootPath = testutil:reset_filestructure(), - ok = rotating_object_check(RootPath, "Bucket1", 10), - ok = rotating_object_check(RootPath, "Bucket2", 200), - ok = rotating_object_check(RootPath, "Bucket3", 800), - ok = rotating_object_check(RootPath, "Bucket4", 1600), - ok = rotating_object_check(RootPath, "Bucket5", 3200), - ok = rotating_object_check(RootPath, "Bucket6", 9600), + ok = testutil:rotating_object_check(RootPath, "Bucket1", 10), + ok = testutil:rotating_object_check(RootPath, "Bucket2", 200), + ok = testutil:rotating_object_check(RootPath, "Bucket3", 800), + ok = testutil:rotating_object_check(RootPath, "Bucket4", 1600), + ok = testutil:rotating_object_check(RootPath, "Bucket5", 3200), + ok = testutil:rotating_object_check(RootPath, "Bucket6", 9600), testutil:reset_filestructure(). -rotating_object_check(RootPath, Bucket, NumberOfObjects) -> - {ok, Book1} = leveled_bookie:book_start(RootPath, 2000, 5000000), - {KSpcL1, V1} = put_indexed_objects(Book1, Bucket, NumberOfObjects), - ok = check_indexed_objects(Book1, Bucket, KSpcL1, V1), - {KSpcL2, V2} = put_altered_indexed_objects(Book1, Bucket, KSpcL1), - ok = check_indexed_objects(Book1, Bucket, KSpcL2, V2), - {KSpcL3, V3} = put_altered_indexed_objects(Book1, Bucket, KSpcL2), - ok = leveled_bookie:book_close(Book1), - {ok, Book2} = leveled_bookie:book_start(RootPath, 1000, 5000000), - ok = check_indexed_objects(Book2, Bucket, KSpcL3, V3), - {KSpcL4, V4} = put_altered_indexed_objects(Book2, Bucket, KSpcL3), - ok = check_indexed_objects(Book2, Bucket, KSpcL4, V4), - ok = leveled_bookie:book_close(Book2), - ok. - - - -check_indexed_objects(Book, B, KSpecL, V) -> - % Check all objects match, return what should eb the results of an all - % index query - IdxR = lists:map(fun({K, Spc}) -> - {ok, O} = leveled_bookie:book_riakget(Book, B, K), - V = testutil:get_value(O), - {add, - "idx1_bin", - IdxVal} = lists:keyfind(add, 1, Spc), - {IdxVal, K} end, - KSpecL), - % Check the all index query matxhes expectations - R = leveled_bookie:book_returnfolder(Book, - {index_query, - B, - {"idx1_bin", - "0", - "~"}, - ?RETURN_TERMS}), - {async, Fldr} = R, - QR = lists:sort(Fldr()), - ER = lists:sort(IdxR), - ok = if - ER == QR -> - ok - end, - ok. - - -put_indexed_objects(Book, Bucket, Count) -> - V = testutil:get_compressiblevalue(), - IndexGen = testutil:get_randomindexes_generator(1), - SW = os:timestamp(), - ObjL1 = testutil:generate_objects(Count, - uuid, - [], - V, - IndexGen, - Bucket), - KSpecL = lists:map(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Book, - Obj, - Spc), - {testutil:get_key(Obj), Spc} - end, - ObjL1), - io:format("Put of ~w objects with ~w index entries " - ++ - "each completed in ~w microseconds~n", - [Count, 1, timer:now_diff(os:timestamp(), SW)]), - {KSpecL, V}. - -put_altered_indexed_objects(Book, Bucket, KSpecL) -> - IndexGen = testutil:get_randomindexes_generator(1), - V = testutil:get_compressiblevalue(), - RplKSpecL = lists:map(fun({K, Spc}) -> - AddSpc = lists:keyfind(add, 1, Spc), - {O, AltSpc} = testutil:set_object(Bucket, - K, - V, - IndexGen, - [AddSpc]), - ok = leveled_bookie:book_riakput(Book, - O, - AltSpc), - {K, AltSpc} end, - KSpecL), - {RplKSpecL, V}. + + diff --git a/test/end_to_end/restart_SUITE.erl b/test/end_to_end/restart_SUITE.erl new file mode 100644 index 0000000..0017035 --- /dev/null +++ b/test/end_to_end/restart_SUITE.erl @@ -0,0 +1,107 @@ +-module(restart_SUITE). +-include_lib("common_test/include/ct.hrl"). +-include("include/leveled.hrl"). +-export([all/0]). +-export([retain_strategy/1 + ]). + +all() -> [ + retain_strategy + ]. + +retain_strategy(_Config) -> + RootPath = testutil:reset_filestructure(), + BookOpts = #bookie_options{root_path=RootPath, + cache_size=1000, + max_journalsize=5000000, + reload_strategy=[{?RIAK_TAG, retain}]}, + {ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 800), + ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}]), + {ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 1600), + ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, + {"Bucket4", Spcl4, LastV4}]), + {ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 3200), + ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, + {"Bucket5", Spcl5, LastV5}]), + {ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 6400), + ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, + {"Bucket4", Spcl4, LastV4}, + {"Bucket5", Spcl5, LastV5}, + {"Bucket6", Spcl6, LastV6}]), + testutil:reset_filestructure(). + + + +rotating_object_check(BookOpts, B, NumberOfObjects) -> + {ok, Book1} = leveled_bookie:book_start(BookOpts), + {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), + ok = testutil:check_indexed_objects(Book1, + B, + KSpcL1, + V1), + {KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, + B, + KSpcL1, + false), + ok = testutil:check_indexed_objects(Book1, + B, + KSpcL1 ++ KSpcL2, + V2), + {KSpcL3, V3} = testutil:put_altered_indexed_objects(Book1, + B, + KSpcL2, + false), + ok = leveled_bookie:book_close(Book1), + {ok, Book2} = leveled_bookie:book_start(BookOpts), + ok = testutil:check_indexed_objects(Book2, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3, + V3), + {KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, + B, + KSpcL3, + false), + io:format("Bucket complete - checking index before compaction~n"), + ok = testutil:check_indexed_objects(Book2, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, + V4), + + ok = leveled_bookie:book_compactjournal(Book2, 30000), + F = fun leveled_bookie:book_islastcompactionpending/1, + lists:foldl(fun(X, Pending) -> + case Pending of + false -> + false; + true -> + io:format("Loop ~w waiting for journal " + ++ "compaction to complete~n", [X]), + timer:sleep(20000), + F(Book2) + end end, + true, + lists:seq(1, 15)), + io:format("Waiting for journal deletes~n"), + timer:sleep(20000), + + io:format("Checking index following compaction~n"), + ok = testutil:check_indexed_objects(Book2, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, + V4), + + ok = leveled_bookie:book_close(Book2), + {ok, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4}. + + +restart_from_blankledger(BookOpts, B_SpcL) -> + leveled_penciller:clean_testdir(BookOpts#bookie_options.root_path ++ + "/ledger"), + {ok, Book1} = leveled_bookie:book_start(BookOpts), + io:format("Checking index following restart~n"), + lists:foreach(fun({B, SpcL, V}) -> + ok = testutil:check_indexed_objects(Book1, B, SpcL, V) + end, + B_SpcL), + ok = leveled_bookie:book_close(Book1), + ok. \ No newline at end of file diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index ec002e7..f88e900 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -23,8 +23,14 @@ get_compressiblevalue/0, get_randomindexes_generator/1, name_list/0, - load_objects/5]). + load_objects/5, + put_indexed_objects/3, + put_altered_indexed_objects/3, + put_altered_indexed_objects/4, + check_indexed_objects/4, + rotating_object_check/3]). +-define(RETURN_TERMS, {true, undefined}). reset_filestructure() -> @@ -267,4 +273,110 @@ get_randomdate() -> Date = calendar:gregorian_seconds_to_datetime(RandPoint), {{Year, Month, Day}, {Hour, Minute, Second}} = Date, lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w~2..0w", - [Year, Month, Day, Hour, Minute, Second])). \ No newline at end of file + [Year, Month, Day, Hour, Minute, Second])). + + +check_indexed_objects(Book, B, KSpecL, V) -> + % Check all objects match, return what should be the results of an all + % index query + IdxR = lists:map(fun({K, Spc}) -> + {ok, O} = leveled_bookie:book_riakget(Book, B, K), + V = testutil:get_value(O), + {add, + "idx1_bin", + IdxVal} = lists:keyfind(add, 1, Spc), + {IdxVal, K} end, + KSpecL), + % Check the all index query matches expectations + R = leveled_bookie:book_returnfolder(Book, + {index_query, + B, + {"idx1_bin", + "0", + "~"}, + ?RETURN_TERMS}), + SW = os:timestamp(), + {async, Fldr} = R, + QR0 = Fldr(), + io:format("Query match found of length ~w in ~w microseconds " ++ + "expected ~w ~n", + [length(QR0), + timer:now_diff(os:timestamp(), SW), + length(IdxR)]), + QR = lists:sort(QR0), + ER = lists:sort(IdxR), + + ok = if + ER == QR -> + ok + end, + ok. + + +put_indexed_objects(Book, Bucket, Count) -> + V = testutil:get_compressiblevalue(), + IndexGen = testutil:get_randomindexes_generator(1), + SW = os:timestamp(), + ObjL1 = testutil:generate_objects(Count, + uuid, + [], + V, + IndexGen, + Bucket), + KSpecL = lists:map(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Book, + Obj, + Spc), + {testutil:get_key(Obj), Spc} + end, + ObjL1), + io:format("Put of ~w objects with ~w index entries " + ++ + "each completed in ~w microseconds~n", + [Count, 1, timer:now_diff(os:timestamp(), SW)]), + {KSpecL, V}. + + +put_altered_indexed_objects(Book, Bucket, KSpecL) -> + put_altered_indexed_objects(Book, Bucket, KSpecL, true). + +put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> + IndexGen = testutil:get_randomindexes_generator(1), + V = testutil:get_compressiblevalue(), + RplKSpecL = lists:map(fun({K, Spc}) -> + AddSpc = if + RemoveOld2i == true -> + [lists:keyfind(add, 1, Spc)]; + RemoveOld2i == false -> + [] + end, + {O, AltSpc} = testutil:set_object(Bucket, + K, + V, + IndexGen, + AddSpc), + ok = leveled_bookie:book_riakput(Book, + O, + AltSpc), + {K, AltSpc} end, + KSpecL), + {RplKSpecL, V}. + +rotating_object_check(RootPath, B, NumberOfObjects) -> + BookOpts = #bookie_options{root_path=RootPath, + cache_size=1000, + max_journalsize=5000000}, + {ok, Book1} = leveled_bookie:book_start(BookOpts), + {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), + ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1), + {KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, B, KSpcL1), + ok = testutil:check_indexed_objects(Book1, B, KSpcL2, V2), + {KSpcL3, V3} = testutil:put_altered_indexed_objects(Book1, B, KSpcL2), + ok = leveled_bookie:book_close(Book1), + {ok, Book2} = leveled_bookie:book_start(BookOpts), + ok = testutil:check_indexed_objects(Book2, B, KSpcL3, V3), + {KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, B, KSpcL3), + ok = testutil:check_indexed_objects(Book2, B, KSpcL4, V4), + ok = leveled_bookie:book_close(Book2), + ok. +