Further testing of compaction
Check we avoid crashing in challenging compaction scenarios
This commit is contained in:
parent
7d35ef7126
commit
84a92b5f95
5 changed files with 113 additions and 39 deletions
|
@ -1025,7 +1025,12 @@ saferead_keyvalue(Handle) ->
|
||||||
eof ->
|
eof ->
|
||||||
false;
|
false;
|
||||||
{ok, Value} ->
|
{ok, Value} ->
|
||||||
{Key, Value, KeyL, ValueL}
|
case crccheck_value(Value) of
|
||||||
|
true ->
|
||||||
|
{Key, Value, KeyL, ValueL};
|
||||||
|
false ->
|
||||||
|
false
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -56,7 +56,6 @@
|
||||||
check_forinkertype/2,
|
check_forinkertype/2,
|
||||||
create_value_for_journal/1,
|
create_value_for_journal/1,
|
||||||
build_metadata_object/2,
|
build_metadata_object/2,
|
||||||
generate_ledgerkv/4,
|
|
||||||
generate_ledgerkv/5,
|
generate_ledgerkv/5,
|
||||||
get_size/2,
|
get_size/2,
|
||||||
get_keyandhash/2,
|
get_keyandhash/2,
|
||||||
|
@ -170,6 +169,8 @@ from_inkerkv(Object) ->
|
||||||
from_journalkey({SQN, _Type, LedgerKey}) ->
|
from_journalkey({SQN, _Type, LedgerKey}) ->
|
||||||
{SQN, LedgerKey}.
|
{SQN, LedgerKey}.
|
||||||
|
|
||||||
|
compact_inkerkvc({_InkerKey, crc_wonky, false}, _Strategy) ->
|
||||||
|
skip;
|
||||||
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
|
compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) ->
|
||||||
skip;
|
skip;
|
||||||
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
|
compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
|
||||||
|
@ -271,9 +272,6 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
|
||||||
end,
|
end,
|
||||||
IndexSpecs).
|
IndexSpecs).
|
||||||
|
|
||||||
generate_ledgerkv(PrimaryKey, SQN, Obj, Size) ->
|
|
||||||
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, infinity).
|
|
||||||
|
|
||||||
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
|
||||||
{Tag, Bucket, Key, _} = PrimaryKey,
|
{Tag, Bucket, Key, _} = PrimaryKey,
|
||||||
Status = case Obj of
|
Status = case Obj of
|
||||||
|
|
|
@ -149,12 +149,7 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
|
||||||
State) ->
|
State) ->
|
||||||
% Need to fetch manifest at start rather than have it be passed in
|
% Need to fetch manifest at start rather than have it be passed in
|
||||||
% Don't want to process a queued call waiting on an old manifest
|
% Don't want to process a queued call waiting on an old manifest
|
||||||
Manifest = case leveled_inker:ink_getmanifest(Inker) of
|
[_Active|Manifest] = leveled_inker:ink_getmanifest(Inker),
|
||||||
[] ->
|
|
||||||
[];
|
|
||||||
[_Active|Tail] ->
|
|
||||||
Tail
|
|
||||||
end,
|
|
||||||
MaxRunLength = State#state.max_run_length,
|
MaxRunLength = State#state.max_run_length,
|
||||||
{FilterServer, MaxSQN} = InitiateFun(Checker),
|
{FilterServer, MaxSQN} = InitiateFun(Checker),
|
||||||
CDBopts = State#state.cdb_options,
|
CDBopts = State#state.cdb_options,
|
||||||
|
@ -462,11 +457,7 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
||||||
{false, true, false, retain} ->
|
{false, true, false, retain} ->
|
||||||
{Acc ++ [KVC1], PromptDelete};
|
{Acc ++ [KVC1], PromptDelete};
|
||||||
{false, true, false, _} ->
|
{false, true, false, _} ->
|
||||||
{Acc, PromptDelete};
|
{Acc, PromptDelete}
|
||||||
{_, false, _, _} ->
|
|
||||||
io:format("Corrupted value found for "
|
|
||||||
++ "Journal Key ~w~n", [K]),
|
|
||||||
{Acc, false}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -3,12 +3,14 @@
|
||||||
-include("include/leveled.hrl").
|
-include("include/leveled.hrl").
|
||||||
-export([all/0]).
|
-export([all/0]).
|
||||||
-export([retain_strategy/1,
|
-export([retain_strategy/1,
|
||||||
aae_bustedjournal/1
|
aae_bustedjournal/1,
|
||||||
|
journal_compaction_bustedjournal/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() -> [
|
all() -> [
|
||||||
retain_strategy,
|
% retain_strategy,
|
||||||
aae_bustedjournal
|
aae_bustedjournal,
|
||||||
|
journal_compaction_bustedjournal
|
||||||
].
|
].
|
||||||
|
|
||||||
retain_strategy(_Config) ->
|
retain_strategy(_Config) ->
|
||||||
|
@ -48,27 +50,10 @@ aae_bustedjournal(_Config) ->
|
||||||
_CLs = testutil:load_objects(20000, GenList, Bookie1, TestObject,
|
_CLs = testutil:load_objects(20000, GenList, Bookie1, TestObject,
|
||||||
fun testutil:generate_objects/2),
|
fun testutil:generate_objects/2),
|
||||||
ok = leveled_bookie:book_close(Bookie1),
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
{ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
|
CDBFiles = testutil:find_journals(RootPath),
|
||||||
{ok, Regex} = re:compile(".*\.cdb"),
|
|
||||||
CDBFiles = lists:foldl(fun(FN, Acc) -> case re:run(FN, Regex) of
|
|
||||||
nomatch ->
|
|
||||||
Acc;
|
|
||||||
_ ->
|
|
||||||
[FN|Acc]
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
[],
|
|
||||||
FNsA_J),
|
|
||||||
[HeadF|_Rest] = CDBFiles,
|
[HeadF|_Rest] = CDBFiles,
|
||||||
io:format("Selected Journal for corruption of ~s~n", [HeadF]),
|
io:format("Selected Journal for corruption of ~s~n", [HeadF]),
|
||||||
{ok, Handle} = file:open(RootPath ++ "/journal/journal_files/" ++ HeadF,
|
testutil:corrupt_journal(RootPath, HeadF, 1000),
|
||||||
[binary, raw, read, write]),
|
|
||||||
lists:foreach(fun(X) ->
|
|
||||||
Position = X * 1000 + 2048,
|
|
||||||
ok = file:pwrite(Handle, Position, <<0:8/integer>>)
|
|
||||||
end,
|
|
||||||
lists:seq(1, 1000)),
|
|
||||||
ok = file:close(Handle),
|
|
||||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts),
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts),
|
||||||
|
|
||||||
{async, KeyF} = leveled_bookie:book_returnfolder(Bookie2,
|
{async, KeyF} = leveled_bookie:book_returnfolder(Bookie2,
|
||||||
|
@ -119,6 +104,76 @@ aae_bustedjournal(_Config) ->
|
||||||
testutil:reset_filestructure().
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
|
journal_compaction_bustedjournal(_Config) ->
|
||||||
|
% Simply confirms that none of this causes a crash
|
||||||
|
RootPath = testutil:reset_filestructure(),
|
||||||
|
StartOpts1 = #bookie_options{root_path=RootPath,
|
||||||
|
max_journalsize=10000000,
|
||||||
|
max_run_length=10},
|
||||||
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
|
ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec),
|
||||||
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
|
ObjList1 = testutil:generate_objects(50000, 2),
|
||||||
|
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||||
|
leveled_bookie:book_riakput(Bookie1, Obj, Spc) end,
|
||||||
|
ObjList1),
|
||||||
|
%% Now replace all the objects
|
||||||
|
ObjList2 = testutil:generate_objects(50000, 2),
|
||||||
|
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||||
|
leveled_bookie:book_riakput(Bookie1, Obj, Spc) end,
|
||||||
|
ObjList2),
|
||||||
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
|
||||||
|
CDBFiles = testutil:find_journals(RootPath),
|
||||||
|
lists:foreach(fun(FN) -> testutil:corrupt_journal(RootPath, FN, 100) end,
|
||||||
|
CDBFiles),
|
||||||
|
|
||||||
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_compactjournal(Bookie2, 30000),
|
||||||
|
F = fun leveled_bookie:book_islastcompactionpending/1,
|
||||||
|
lists:foldl(fun(X, Pending) ->
|
||||||
|
case Pending of
|
||||||
|
false ->
|
||||||
|
false;
|
||||||
|
true ->
|
||||||
|
io:format("Loop ~w waiting for journal "
|
||||||
|
++ "compaction to complete~n", [X]),
|
||||||
|
timer:sleep(20000),
|
||||||
|
F(Bookie2)
|
||||||
|
end end,
|
||||||
|
true,
|
||||||
|
lists:seq(1, 15)),
|
||||||
|
|
||||||
|
ObjList3 = testutil:generate_objects(15000, 50002),
|
||||||
|
ObjList4 = testutil:generate_objects(15000, 50002),
|
||||||
|
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||||
|
leveled_bookie:book_riakput(Bookie2, Obj, Spc) end,
|
||||||
|
ObjList3),
|
||||||
|
%% Now replace all the objects
|
||||||
|
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||||
|
leveled_bookie:book_riakput(Bookie2, Obj, Spc) end,
|
||||||
|
ObjList4),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_compactjournal(Bookie2, 30000),
|
||||||
|
lists:foldl(fun(X, Pending) ->
|
||||||
|
case Pending of
|
||||||
|
false ->
|
||||||
|
false;
|
||||||
|
true ->
|
||||||
|
io:format("Loop ~w waiting for journal "
|
||||||
|
++ "compaction to complete~n", [X]),
|
||||||
|
timer:sleep(20000),
|
||||||
|
F(Bookie2)
|
||||||
|
end end,
|
||||||
|
true,
|
||||||
|
lists:seq(1, 15)),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
|
testutil:reset_filestructure(10000).
|
||||||
|
|
||||||
|
|
||||||
rotating_object_check(BookOpts, B, NumberOfObjects) ->
|
rotating_object_check(BookOpts, B, NumberOfObjects) ->
|
||||||
{ok, Book1} = leveled_bookie:book_start(BookOpts),
|
{ok, Book1} = leveled_bookie:book_start(BookOpts),
|
||||||
{KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),
|
{KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),
|
||||||
|
|
|
@ -28,7 +28,9 @@
|
||||||
put_altered_indexed_objects/3,
|
put_altered_indexed_objects/3,
|
||||||
put_altered_indexed_objects/4,
|
put_altered_indexed_objects/4,
|
||||||
check_indexed_objects/4,
|
check_indexed_objects/4,
|
||||||
rotating_object_check/3]).
|
rotating_object_check/3,
|
||||||
|
corrupt_journal/3,
|
||||||
|
find_journals/1]).
|
||||||
|
|
||||||
-define(RETURN_TERMS, {true, undefined}).
|
-define(RETURN_TERMS, {true, undefined}).
|
||||||
|
|
||||||
|
@ -380,3 +382,26 @@ rotating_object_check(RootPath, B, NumberOfObjects) ->
|
||||||
ok = leveled_bookie:book_close(Book2),
|
ok = leveled_bookie:book_close(Book2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
corrupt_journal(RootPath, FileName, Corruptions) ->
|
||||||
|
{ok, Handle} = file:open(RootPath ++ "/journal/journal_files/" ++ FileName,
|
||||||
|
[binary, raw, read, write]),
|
||||||
|
lists:foreach(fun(X) ->
|
||||||
|
Position = X * 1000 + 2048,
|
||||||
|
ok = file:pwrite(Handle, Position, <<0:8/integer>>)
|
||||||
|
end,
|
||||||
|
lists:seq(1, Corruptions)),
|
||||||
|
ok = file:close(Handle).
|
||||||
|
|
||||||
|
find_journals(RootPath) ->
|
||||||
|
{ok, FNsA_J} = file:list_dir(RootPath ++ "/journal/journal_files"),
|
||||||
|
{ok, Regex} = re:compile(".*\.cdb"),
|
||||||
|
CDBFiles = lists:foldl(fun(FN, Acc) -> case re:run(FN, Regex) of
|
||||||
|
nomatch ->
|
||||||
|
Acc;
|
||||||
|
_ ->
|
||||||
|
[FN|Acc]
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
[],
|
||||||
|
FNsA_J),
|
||||||
|
CDBFiles.
|
Loading…
Add table
Add a link
Reference in a new issue