Change fold_heads to do basic Journal presence check
This at least checks the file is present, and the Key exists in the index of that file. If the value is corrupt it will be removed by compation, and then this will fail (unless the file is never compacted). TODO: resolve issus of files which are corrupt - but never compacted - a job for backup?
This commit is contained in:
parent
4d12dfe0ab
commit
fbb4879d81
2 changed files with 81 additions and 10 deletions
|
@ -1099,8 +1099,17 @@ accumulate_hashes(JournalCheck, InkerClone) ->
|
||||||
|
|
||||||
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
||||||
Now = leveled_codec:integer_now(),
|
Now = leveled_codec:integer_now(),
|
||||||
AccFun =
|
AccFun =
|
||||||
fun(LK, V, Acc) ->
|
fun(LK, V, Acc) ->
|
||||||
|
% The function takes the Ledger Key and the value from the
|
||||||
|
% ledger (with the value being the object metadata)
|
||||||
|
%
|
||||||
|
% Need to check if this is an active object (so TTL has not
|
||||||
|
% expired).
|
||||||
|
% If this is a deferred_fetch (i.e. the fold is a fold_heads not
|
||||||
|
% a fold_objects), then a metadata object needs to be built to be
|
||||||
|
% returned - but a quick check that Key is present in the Journal
|
||||||
|
% is made first
|
||||||
case leveled_codec:is_active(LK, V, Now) of
|
case leveled_codec:is_active(LK, V, Now) of
|
||||||
true ->
|
true ->
|
||||||
{SQN, _St, _MH, MD} =
|
{SQN, _St, _MH, MD} =
|
||||||
|
@ -1115,14 +1124,29 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
|
||||||
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
|
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
|
||||||
case DeferredFetch of
|
case DeferredFetch of
|
||||||
true ->
|
true ->
|
||||||
Size = leveled_codec:get_size(LK, V),
|
InJournal =
|
||||||
MDBin =
|
leveled_inker:ink_keycheck(InkerClone,
|
||||||
leveled_codec:build_metadata_object(LK, MD),
|
LK,
|
||||||
Value = {proxy_object,
|
SQN),
|
||||||
MDBin,
|
case InJournal of
|
||||||
Size,
|
probably ->
|
||||||
{fun fetch_value/2, InkerClone, JK}},
|
Size = leveled_codec:get_size(LK, V),
|
||||||
FoldObjectsFun(B, K, term_to_binary(Value), Acc);
|
MDBin =
|
||||||
|
leveled_codec:build_metadata_object(LK,
|
||||||
|
MD),
|
||||||
|
Value = {proxy_object,
|
||||||
|
MDBin,
|
||||||
|
Size,
|
||||||
|
{fun fetch_value/2,
|
||||||
|
InkerClone,
|
||||||
|
JK}},
|
||||||
|
FoldObjectsFun(B,
|
||||||
|
K,
|
||||||
|
term_to_binary(Value),
|
||||||
|
Acc);
|
||||||
|
missing ->
|
||||||
|
Acc
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
R = fetch_value(InkerClone, JK),
|
R = fetch_value(InkerClone, JK),
|
||||||
case R of
|
case R of
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
-export([all/0]).
|
-export([all/0]).
|
||||||
-export([retain_strategy/1,
|
-export([retain_strategy/1,
|
||||||
recovr_strategy/1,
|
recovr_strategy/1,
|
||||||
|
aae_missingjournal/1,
|
||||||
aae_bustedjournal/1,
|
aae_bustedjournal/1,
|
||||||
journal_compaction_bustedjournal/1
|
journal_compaction_bustedjournal/1
|
||||||
]).
|
]).
|
||||||
|
@ -11,6 +12,7 @@
|
||||||
all() -> [
|
all() -> [
|
||||||
retain_strategy,
|
retain_strategy,
|
||||||
recovr_strategy,
|
recovr_strategy,
|
||||||
|
aae_missingjournal,
|
||||||
aae_bustedjournal,
|
aae_bustedjournal,
|
||||||
journal_compaction_bustedjournal
|
journal_compaction_bustedjournal
|
||||||
].
|
].
|
||||||
|
@ -94,6 +96,51 @@ recovr_strategy(_Config) ->
|
||||||
testutil:reset_filestructure().
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
|
aae_missingjournal(_Config) ->
|
||||||
|
RootPath = testutil:reset_filestructure(),
|
||||||
|
StartOpts = [{root_path, RootPath},
|
||||||
|
{max_journalsize, 20000000},
|
||||||
|
{sync_strategy, testutil:sync_strategy()}],
|
||||||
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts),
|
||||||
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
|
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
||||||
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
|
GenList = [2],
|
||||||
|
_CLs = testutil:load_objects(20000, GenList, Bookie1, TestObject,
|
||||||
|
fun testutil:generate_objects/2),
|
||||||
|
|
||||||
|
FoldHeadsFun =
|
||||||
|
fun(B, K, _V, Acc) -> [{B, K}|Acc] end,
|
||||||
|
|
||||||
|
{async, AllHeadF1} =
|
||||||
|
leveled_bookie:book_returnfolder(Bookie1,
|
||||||
|
{foldheads_allkeys,
|
||||||
|
?RIAK_TAG,
|
||||||
|
FoldHeadsFun}),
|
||||||
|
HeadL1 = length(AllHeadF1()),
|
||||||
|
io:format("Fold head returned ~w objects~n", [HeadL1]),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
CDBFiles = testutil:find_journals(RootPath),
|
||||||
|
[HeadF|_Rest] = CDBFiles,
|
||||||
|
io:format("Selected Journal for removal of ~s~n", [HeadF]),
|
||||||
|
ok = file:delete(RootPath ++ "/journal/journal_files/" ++ HeadF),
|
||||||
|
|
||||||
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts),
|
||||||
|
% Check that fold heads picks up on the missing file
|
||||||
|
{async, AllHeadF2} =
|
||||||
|
leveled_bookie:book_returnfolder(Bookie2,
|
||||||
|
{foldheads_allkeys,
|
||||||
|
?RIAK_TAG,
|
||||||
|
FoldHeadsFun}),
|
||||||
|
HeadL2 = length(AllHeadF2()),
|
||||||
|
io:format("Fold head returned ~w objects~n", [HeadL2]),
|
||||||
|
true = HeadL2 < HeadL1,
|
||||||
|
true = HeadL2 > 0,
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
aae_bustedjournal(_Config) ->
|
aae_bustedjournal(_Config) ->
|
||||||
RootPath = testutil:reset_filestructure(),
|
RootPath = testutil:reset_filestructure(),
|
||||||
StartOpts = [{root_path, RootPath},
|
StartOpts = [{root_path, RootPath},
|
||||||
|
@ -223,7 +270,7 @@ aae_bustedjournal(_Config) ->
|
||||||
KeyHashList6 = HashTreeF6(),
|
KeyHashList6 = HashTreeF6(),
|
||||||
true = length(KeyHashList6) > 19000,
|
true = length(KeyHashList6) > 19000,
|
||||||
true = length(KeyHashList6) < HeadCount,
|
true = length(KeyHashList6) < HeadCount,
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie4),
|
ok = leveled_bookie:book_close(Bookie4),
|
||||||
|
|
||||||
testutil:restore_topending(RootPath, HeadF),
|
testutil:restore_topending(RootPath, HeadF),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue