From 84a92b5f9557ab04c39457421e7e76e983d905b3 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Tue, 1 Nov 2016 00:46:14 +0000 Subject: [PATCH] Further testing of compaction Check we avoid crashing in challenging compaction scenarios --- src/leveled_cdb.erl | 7 ++- src/leveled_codec.erl | 6 +- src/leveled_iclerk.erl | 13 +--- test/end_to_end/recovery_SUITE.erl | 99 +++++++++++++++++++++++------- test/end_to_end/testutil.erl | 27 +++++++- 5 files changed, 113 insertions(+), 39 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 454c4ec..fc8af19 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -1025,7 +1025,12 @@ saferead_keyvalue(Handle) -> eof -> false; {ok, Value} -> - {Key, Value, KeyL, ValueL} + case crccheck_value(Value) of + true -> + {Key, Value, KeyL, ValueL}; + false -> + false + end end end end. diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 384d12b..fed4cff 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -56,7 +56,6 @@ check_forinkertype/2, create_value_for_journal/1, build_metadata_object/2, - generate_ledgerkv/4, generate_ledgerkv/5, get_size/2, get_keyandhash/2, @@ -170,6 +169,8 @@ from_inkerkv(Object) -> from_journalkey({SQN, _Type, LedgerKey}) -> {SQN, LedgerKey}. +compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) -> + skip; compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) -> skip; compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) -> @@ -271,9 +272,6 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> end, IndexSpecs). -generate_ledgerkv(PrimaryKey, SQN, Obj, Size) -> - generate_ledgerkv(PrimaryKey, SQN, Obj, Size, infinity). - generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> {Tag, Bucket, Key, _} = PrimaryKey, Status = case Obj of diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index a381ba4..8fc284c 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -149,12 +149,7 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, State) -> % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest - Manifest = case leveled_inker:ink_getmanifest(Inker) of - [] -> - []; - [_Active|Tail] -> - Tail - end, + [_Active|Manifest] = leveled_inker:ink_getmanifest(Inker), MaxRunLength = State#state.max_run_length, {FilterServer, MaxSQN} = InitiateFun(Checker), CDBopts = State#state.cdb_options, @@ -462,11 +457,7 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> {false, true, false, retain} -> {Acc ++ [KVC1], PromptDelete}; {false, true, false, _} -> - {Acc, PromptDelete}; - {_, false, _, _} -> - io:format("Corrupted value found for " - ++ "Journal Key ~w~n", [K]), - {Acc, false} + {Acc, PromptDelete} end end end, diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 71b1d3a..71b6dd9 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -3,12 +3,14 @@ -include("include/leveled.hrl"). -export([all/0]). -export([retain_strategy/1, - aae_bustedjournal/1 + aae_bustedjournal/1, + journal_compaction_bustedjournal/1 ]). all() -> [ - retain_strategy, - aae_bustedjournal + % retain_strategy, + aae_bustedjournal, + journal_compaction_bustedjournal ]. retain_strategy(_Config) -> @@ -48,27 +50,10 @@ aae_bustedjournal(_Config) -> _CLs = testutil:load_objects(20000, GenList, Bookie1, TestObject, fun testutil:generate_objects/2), ok = leveled_bookie:book_close(Bookie1), - {ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"), - {ok, Regex} = re:compile(".*\.cdb"), - CDBFiles = lists:foldl(fun(FN, Acc) -> case re:run(FN, Regex) of - nomatch -> - Acc; - _ -> - [FN|Acc] - end - end, - [], - FNsA_J), + CDBFiles = testutil:find_journals(RootPath), [HeadF|_Rest] = CDBFiles, io:format("Selected Journal for corruption of ~s~n", [HeadF]), - {ok, Handle} = file:open(RootPath ++ "/journal/journal_files/" ++ HeadF, - [binary, raw, read, write]), - lists:foreach(fun(X) -> - Position = X * 1000 + 2048, - ok = file:pwrite(Handle, Position, <<0:8/integer>>) - end, - lists:seq(1, 1000)), - ok = file:close(Handle), + testutil:corrupt_journal(RootPath, HeadF, 1000), {ok, Bookie2} = leveled_bookie:book_start(StartOpts), {async, KeyF} = leveled_bookie:book_returnfolder(Bookie2, @@ -119,6 +104,76 @@ aae_bustedjournal(_Config) -> testutil:reset_filestructure(). +journal_compaction_bustedjournal(_Config) -> + % Simply confirms that none of this causes a crash + RootPath = testutil:reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath, + max_journalsize=10000000, + max_run_length=10}, + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = testutil:generate_testobject(), + ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + testutil:check_forobject(Bookie1, TestObject), + ObjList1 = testutil:generate_objects(50000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, + ObjList1), + %% Now replace all the objects + ObjList2 = testutil:generate_objects(50000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, + ObjList2), + ok = leveled_bookie:book_close(Bookie1), + + CDBFiles = testutil:find_journals(RootPath), + lists:foreach(fun(FN) -> testutil:corrupt_journal(RootPath, FN, 100) end, + CDBFiles), + + {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + + ok = leveled_bookie:book_compactjournal(Bookie2, 30000), + F = fun leveled_bookie:book_islastcompactionpending/1, + lists:foldl(fun(X, Pending) -> + case Pending of + false -> + false; + true -> + io:format("Loop ~w waiting for journal " + ++ "compaction to complete~n", [X]), + timer:sleep(20000), + F(Bookie2) + end end, + true, + lists:seq(1, 15)), + + ObjList3 = testutil:generate_objects(15000, 50002), + ObjList4 = testutil:generate_objects(15000, 50002), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, + ObjList3), + %% Now replace all the objects + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, + ObjList4), + + ok = leveled_bookie:book_compactjournal(Bookie2, 30000), + lists:foldl(fun(X, Pending) -> + case Pending of + false -> + false; + true -> + io:format("Loop ~w waiting for journal " + ++ "compaction to complete~n", [X]), + timer:sleep(20000), + F(Bookie2) + end end, + true, + lists:seq(1, 15)), + + ok = leveled_bookie:book_close(Bookie2), + testutil:reset_filestructure(10000). + + rotating_object_check(BookOpts, B, NumberOfObjects) -> {ok, Book1} = leveled_bookie:book_start(BookOpts), {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index f88e900..3a7585c 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -28,7 +28,9 @@ put_altered_indexed_objects/3, put_altered_indexed_objects/4, check_indexed_objects/4, - rotating_object_check/3]). + rotating_object_check/3, + corrupt_journal/3, + find_journals/1]). -define(RETURN_TERMS, {true, undefined}). @@ -380,3 +382,26 @@ rotating_object_check(RootPath, B, NumberOfObjects) -> ok = leveled_bookie:book_close(Book2), ok. +corrupt_journal(RootPath, FileName, Corruptions) -> + {ok, Handle} = file:open(RootPath ++ "/journal/journal_files/" ++ FileName, + [binary, raw, read, write]), + lists:foreach(fun(X) -> + Position = X * 1000 + 2048, + ok = file:pwrite(Handle, Position, <<0:8/integer>>) + end, + lists:seq(1, Corruptions)), + ok = file:close(Handle). + +find_journals(RootPath) -> + {ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"), + {ok, Regex} = re:compile(".*\.cdb"), + CDBFiles = lists:foldl(fun(FN, Acc) -> case re:run(FN, Regex) of + nomatch -> + Acc; + _ -> + [FN|Acc] + end + end, + [], + FNsA_J), + CDBFiles. \ No newline at end of file