Resolve failing recovery test

Now passing consistently with a number of different corruptions catered
for (including corruption of the Tag in the Inker Key)
This commit is contained in:
martinsumner 2016-12-16 23:18:55 +00:00
parent 1684f0a913
commit 9e28287231
5 changed files with 66 additions and 29 deletions

View file

@ -476,6 +476,7 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
_From,
StateName,
State) ->
{ok, EndPos0} = file:position(State#state.handle, eof),
{ok, StartPos0} = case StartPos of
undefined ->
file:position(State#state.handle,
@ -483,28 +484,38 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
StartPos ->
{ok, StartPos}
end,
case check_last_key(State#state.last_key) of
ok ->
file:position(State#state.handle, StartPos0),
MaybeEnd = (check_last_key(State#state.last_key) == empty) or
(StartPos0 >= (EndPos0 - ?DWORD_SIZE)),
case MaybeEnd of
true ->
{reply, {eof, Acc}, StateName, State};
false ->
{LastPosition, Acc2} = scan_over_file(State#state.handle,
StartPos0,
FilterFun,
Acc,
State#state.last_key),
{reply, {LastPosition, Acc2}, StateName, State};
empty ->
{reply, {eof, Acc}, StateName, State}
{reply, {LastPosition, Acc2}, StateName, State}
end;
handle_sync_event(cdb_lastkey, _From, StateName, State) ->
{reply, State#state.last_key, StateName, State};
handle_sync_event(cdb_firstkey, _From, StateName, State) ->
{ok, EOFPos} = file:position(State#state.handle, eof),
FirstKey = case EOFPos of
?BASE_POSITION ->
empty;
_ ->
element(1, extract_key(State#state.handle,
?BASE_POSITION))
end,
FilterFun = fun(Key, _V, _P, _O, _Fun) -> {stop, Key} end,
FirstKey =
case EOFPos of
?BASE_POSITION ->
empty;
_ ->
file:position(State#state.handle, ?BASE_POSITION),
{_Pos, FirstScanKey} = scan_over_file(State#state.handle,
?BASE_POSITION,
FilterFun,
empty,
State#state.last_key),
FirstScanKey
end,
{reply, FirstKey, StateName, State};
handle_sync_event(cdb_filename, _From, StateName, State) ->
{reply, State#state.filename, StateName, State};
@ -905,11 +916,13 @@ extract_key_value_check(Handle, Position) ->
%% at that point return the position and the key dictionary scanned so far
startup_scan_over_file(Handle, Position) ->
HashTree = new_hashtree(),
scan_over_file(Handle,
Position,
fun startup_filter/5,
{HashTree, empty},
empty).
{eof, Output} = scan_over_file(Handle,
Position,
fun startup_filter/5,
{HashTree, empty},
empty),
{ok, FinalPos} = file:position(Handle, cur),
{FinalPos, Output}.
%% Specific filter to be used at startup to build a hashtree for an incomplete
%% cdb file, and returns at the end the hashtree and the final Key seen in the
@ -935,7 +948,7 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
case saferead_keyvalue(Handle) of
false ->
leveled_log:log("CDB09", [Position]),
{Position, Output};
{eof, Output};
{Key, ValueAsBin, KeyLength, ValueLength} ->
NewPosition = case Key of
LastKey ->
@ -1710,6 +1723,7 @@ emptyvalue_fromdict_test() ->
ok = file:delete("../test/from_dict_test_ev.cdb").
find_lastkey_test() ->
file:delete("../test/lastkey.pnd"),
{ok, P1} = cdb_open_writer("../test/lastkey.pnd",
#cdb_options{binary_mode=false}),
ok = cdb_put(P1, "Key1", "Value1"),

View file

@ -218,13 +218,18 @@ compact_inkerkvc({{SQN, ?INKT_KEYD, LK}, V, CrcCheck}, Strategy) ->
end;
compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) ->
{Tag, _, _, _} = LK,
{Tag, TagStrat} = lists:keyfind(Tag, 1, Strategy),
case TagStrat of
retain ->
{_V, KeyDeltas} = split_inkvalue(V),
{TagStrat, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat ->
{TagStrat, null}
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
case TagStrat of
retain ->
{_V, KeyDeltas} = split_inkvalue(V),
{TagStrat, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}};
TagStrat ->
{TagStrat, null}
end;
false ->
leveled_log:log("IC012", [Tag, Strategy]),
skip
end;
compact_inkerkvc(_KVC, _Strategy) ->
skip.

View file

@ -519,8 +519,13 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
generate_manifest_entry(ActiveJournal) ->
{ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal),
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFN),
{StartSQN, _Type, _PK} = leveled_cdb:cdb_firstkey(PidR),
[{StartSQN, NewFN, PidR}].
case leveled_cdb:cdb_firstkey(PidR) of
{StartSQN, _Type, _PK} ->
[{StartSQN, NewFN, PidR}];
empty ->
leveled_log:log("IC013", [NewFN]),
[]
end.
clear_waste(State) ->

View file

@ -199,6 +199,11 @@
{info, "Clearing journal with filename ~s"}},
{"IC011",
{info, "Not clearing filename ~s as modified delta is only ~w seconds"}},
{"IC012",
{warn, "Tag ~w not found in Strategy ~w - maybe corrupted"}},
{"IC013",
{warn, "File with name ~s to be ignored in manifest as scanning for "
++ "first key returned empty - maybe corrupted"}},
{"PM002",
{info, "Completed dump of L0 cache to list of size ~w"}},
@ -264,7 +269,9 @@
++ "to_list ~w sort ~w build ~w"}},
{"CDB15",
{info, "Cycle count of ~w in hashtable search higher than expected"
++ " in search for hash ~w with result ~w"}}
++ " in search for hash ~w with result ~w"}},
{"CDB16",
{info, "CDB scan from start ~w in file with end ~w and last_key ~w"}}
])).

View file

@ -247,10 +247,16 @@ aae_bustedjournal(_Config) ->
journal_compaction_bustedjournal(_Config) ->
% Different circumstances will be created in different runs
busted_journal_test(10000000),
busted_journal_test(7777777).
busted_journal_test(MaxJournalSize) ->
% Simply confirms that none of this causes a crash
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 10000000},
{max_journalsize, MaxJournalSize},
{max_run_length, 10},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),