diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 262b963..970b54c 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1099,8 +1099,17 @@ accumulate_hashes(JournalCheck, InkerClone) -> accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> Now = leveled_codec:integer_now(), - AccFun = + AccFun = fun(LK, V, Acc) -> + % The function takes the Ledger Key and the value from the + % ledger (with the value being the object metadata) + % + % Need to check if this is an active object (so TTL has not + % expired). + % If this is a deferred_fetch (i.e. the fold is a fold_heads not + % a fold_objects), then a metadata object needs to be built to be + % returned - but a quick check that Key is present in the Journal + % is made first case leveled_codec:is_active(LK, V, Now) of true -> {SQN, _St, _MH, MD} = @@ -1115,14 +1124,29 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, case DeferredFetch of true -> - Size = leveled_codec:get_size(LK, V), - MDBin = - leveled_codec:build_metadata_object(LK, MD), - Value = {proxy_object, - MDBin, - Size, - {fun fetch_value/2, InkerClone, JK}}, - FoldObjectsFun(B, K, term_to_binary(Value), Acc); + InJournal = + leveled_inker:ink_keycheck(InkerClone, + LK, + SQN), + case InJournal of + probably -> + Size = leveled_codec:get_size(LK, V), + MDBin = + leveled_codec:build_metadata_object(LK, + MD), + Value = {proxy_object, + MDBin, + Size, + {fun fetch_value/2, + InkerClone, + JK}}, + FoldObjectsFun(B, + K, + term_to_binary(Value), + Acc); + missing -> + Acc + end; false -> R = fetch_value(InkerClone, JK), case R of diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index a1fbaca..ccca1ea 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -4,6 +4,7 @@ -export([all/0]). -export([retain_strategy/1, recovr_strategy/1, + aae_missingjournal/1, aae_bustedjournal/1, journal_compaction_bustedjournal/1 ]). @@ -11,6 +12,7 @@ all() -> [ retain_strategy, recovr_strategy, + aae_missingjournal, aae_bustedjournal, journal_compaction_bustedjournal ]. @@ -94,6 +96,51 @@ recovr_strategy(_Config) -> testutil:reset_filestructure(). +aae_missingjournal(_Config) -> + RootPath = testutil:reset_filestructure(), + StartOpts = [{root_path, RootPath}, + {max_journalsize, 20000000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts), + {TestObject, TestSpec} = testutil:generate_testobject(), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), + testutil:check_forobject(Bookie1, TestObject), + GenList = [2], + _CLs = testutil:load_objects(20000, GenList, Bookie1, TestObject, + fun testutil:generate_objects/2), + + FoldHeadsFun = + fun(B, K, _V, Acc) -> [{B, K}|Acc] end, + + {async, AllHeadF1} = + leveled_bookie:book_returnfolder(Bookie1, + {foldheads_allkeys, + ?RIAK_TAG, + FoldHeadsFun}), + HeadL1 = length(AllHeadF1()), + io:format("Fold head returned ~w objects~n", [HeadL1]), + + ok = leveled_bookie:book_close(Bookie1), + CDBFiles = testutil:find_journals(RootPath), + [HeadF|_Rest] = CDBFiles, + io:format("Selected Journal for removal of ~s~n", [HeadF]), + ok = file:delete(RootPath ++ "/journal/journal_files/" ++ HeadF), + + {ok, Bookie2} = leveled_bookie:book_start(StartOpts), + % Check that fold heads picks up on the missing file + {async, AllHeadF2} = + leveled_bookie:book_returnfolder(Bookie2, + {foldheads_allkeys, + ?RIAK_TAG, + FoldHeadsFun}), + HeadL2 = length(AllHeadF2()), + io:format("Fold head returned ~w objects~n", [HeadL2]), + true = HeadL2 < HeadL1, + true = HeadL2 > 0, + + ok = leveled_bookie:book_close(Bookie2), + testutil:reset_filestructure(). + aae_bustedjournal(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts = [{root_path, RootPath}, @@ -223,7 +270,7 @@ aae_bustedjournal(_Config) -> KeyHashList6 = HashTreeF6(), true = length(KeyHashList6) > 19000, true = length(KeyHashList6) < HeadCount, - + ok = leveled_bookie:book_close(Bookie4), testutil:restore_topending(RootPath, HeadF),