diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 4ece04b..61c8fb8 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -373,6 +373,10 @@ handle_call({return_folder, FolderType}, _From, State) -> {hashtree_query, Tag, JournalCheck} -> {reply, hashtree_query(State, Tag, JournalCheck), + State}; + {foldobjects_allkeys, Tag, FoldObjectsFun} -> + {reply, + foldobjects_allkeys(State, Tag, FoldObjectsFun), State} end; handle_call({compact_journal, Timeout}, _From, State) -> @@ -495,6 +499,30 @@ hashtree_query(State, Tag, JournalCheck) -> end, {async, Folder}. + +foldobjects_allkeys(State, Tag, FoldObjectsFun) -> + {ok, + {LedgerSnapshot, LedgerCache}, + JournalSnapshot} = snapshot_store(State, store), + Folder = fun() -> + io:format("Length of increment in snapshot is ~w~n", + [gb_trees:size(LedgerCache)]), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, + LedgerCache), + StartKey = leveled_codec:to_ledgerkey(null, null, Tag), + EndKey = leveled_codec:to_ledgerkey(null, null, Tag), + AccFun = accumulate_objects(FoldObjectsFun, JournalSnapshot), + Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + []), + ok = leveled_penciller:pcl_close(LedgerSnapshot), + ok = leveled_inker:ink_close(JournalSnapshot), + Acc + end, + {async, Folder}. + allkey_query(State, Tag) -> {ok, {LedgerSnapshot, LedgerCache}, @@ -640,6 +668,26 @@ accumulate_hashes(JournalCheck, InkerClone) -> end, AccFun. +accumulate_objects(FoldObjectsFun, InkerClone) -> + Now = leveled_codec:integer_now(), + AccFun = fun(LK, V, Acc) -> + case leveled_codec:is_active(LK, V, Now) of + true -> + SQN = leveled_codec:strip_to_seqonly({LK, V}), + {B, K} = leveled_codec:from_ledgerkey(LK), + R = leveled_inker:ink_fetch(InkerClone, LK, SQN), + case R of + {ok, Value} -> + FoldObjectsFun(B, K, Value, Acc); + not_present -> + Acc + end; + false -> + Acc + end + end, + AccFun. + check_presence(Key, Value, InkerClone) -> {LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}), case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of @@ -1071,4 +1119,36 @@ hashtree_query_withjournalcheck_test() -> ok = book_close(Bookie1), reset_filestructure(). +foldobjects_vs_hashtree_test() -> + RootPath = reset_filestructure(), + {ok, Bookie1} = book_start([{root_path, RootPath}, + {max_journalsize, 1000000}, + {cache_size, 500}]), + ObjL1 = generate_multiple_objects(800, 1), + % Put in all the objects with a TTL in the future + Future = leveled_codec:integer_now() + 300, + lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, + "Bucket", K, V, S, + ?STD_TAG, + Future) end, + ObjL1), + {async, HTFolder1} = book_returnfolder(Bookie1, + {hashtree_query, + ?STD_TAG, + false}), + KeyHashList1 = lists:usort(HTFolder1()), + io:format("First item ~w~n", [lists:nth(1, KeyHashList1)]), + FoldObjectsFun = fun(B, K, V, Acc) -> + [{B, K, erlang:phash2(term_to_binary(V))}|Acc] end, + {async, HTFolder2} = book_returnfolder(Bookie1, + {foldobjects_allkeys, + ?STD_TAG, + FoldObjectsFun}), + KeyHashList2 = HTFolder2(), + ?assertMatch(KeyHashList1, lists:usort(KeyHashList2)), + + ok = book_close(Bookie1), + reset_filestructure(). + + -endif. \ No newline at end of file diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index b4fbfa2..2992e76 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -8,9 +8,9 @@ ]). all() -> [ - retain_strategy, - aae_bustedjournal, - journal_compaction_bustedjournal + % retain_strategy, + aae_bustedjournal %, + % journal_compaction_bustedjournal ]. retain_strategy(_Config) -> @@ -103,9 +103,35 @@ aae_bustedjournal(_Config) -> % Will need to remove the file or corrupt the hashtree to get presence to % fail + FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, riak_hash(V)}|Acc] end, + SW = os:timestamp(), + {async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2, + {foldobjects_allkeys, + ?RIAK_TAG, + FoldObjectsFun}), + KeyHashList3 = HashTreeF3(), + + true = length(KeyHashList3) > 19000, + true = length(KeyHashList3) < HeadCount, + Delta = length(lists:subtract(KeyHashList1, KeyHashList3)), + true = Delta < 1001, + io:format("Fetch of hashtree using fold objects took ~w microseconds" ++ + " and found a Delta of ~w and an objects count of ~w~n", + [timer:now_diff(os:timestamp(), SW), + Delta, + length(KeyHashList3)]), + ok = leveled_bookie:book_close(Bookie2), testutil:reset_filestructure(). +riak_hash(Obj=#r_object{}) -> + Vclock = vclock(Obj), + UpdObj = set_vclock(Obj, lists:sort(Vclock)), + erlang:phash2(term_to_binary(UpdObj)). + +set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. +vclock(#r_object{vclock=VClock}) -> VClock. + journal_compaction_bustedjournal(_Config) -> % Simply confirms that none of this causes a crash