diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index dd1556c..dec8ceb 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -372,7 +372,23 @@ handle_call({return_folder, FolderType}, _From, State) -> {foldobjects_allkeys, Tag, FoldObjectsFun} -> {reply, foldobjects_allkeys(State, Tag, FoldObjectsFun), + State}; + {foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} -> + {reply, + foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun), + State}; + {foldobjects_byindex, + Tag, + Bucket, + {Field, FromTerm, ToTerm}, + FoldObjectsFun} -> + {reply, + foldobjects_byindex(State, + Tag, Bucket, + Field, FromTerm, ToTerm, + FoldObjectsFun), State} + end; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, @@ -492,6 +508,23 @@ hashtree_query(State, Tag, JournalCheck) -> foldobjects_allkeys(State, Tag, FoldObjectsFun) -> + StartKey = leveled_codec:to_ledgerkey(null, null, Tag), + EndKey = leveled_codec:to_ledgerkey(null, null, Tag), + foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). + +foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) -> + StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), + EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), + foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). + +foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun) -> + StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, + FromTerm), + EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, + ToTerm), + foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). + +foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> {ok, {LedgerSnapshot, LedgerCache}, JournalSnapshot} = snapshot_store(State, store), @@ -499,9 +532,7 @@ foldobjects_allkeys(State, Tag, FoldObjectsFun) -> leveled_log:log("B0004", [gb_trees:size(LedgerCache)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, LedgerCache), - StartKey = leveled_codec:to_ledgerkey(null, null, Tag), - EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - AccFun = accumulate_objects(FoldObjectsFun, JournalSnapshot), + AccFun = accumulate_objects(FoldObjectsFun, JournalSnapshot, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, EndKey, @@ -513,6 +544,7 @@ foldobjects_allkeys(State, Tag, FoldObjectsFun) -> end, {async, Folder}. + allkey_query(State, Tag) -> {ok, {LedgerSnapshot, LedgerCache}, @@ -643,14 +675,18 @@ accumulate_hashes(JournalCheck, InkerClone) -> end, AccFun. -accumulate_objects(FoldObjectsFun, InkerClone) -> +accumulate_objects(FoldObjectsFun, InkerClone, Tag) -> Now = leveled_codec:integer_now(), AccFun = fun(LK, V, Acc) -> case leveled_codec:is_active(LK, V, Now) of true -> SQN = leveled_codec:strip_to_seqonly({LK, V}), - {B, K} = leveled_codec:from_ledgerkey(LK), - R = leveled_inker:ink_fetch(InkerClone, LK, SQN), + {B, K} = case leveled_codec:from_ledgerkey(LK) of + {B0, K0} -> {B0, K0}; + {B0, K0, _T0} -> {B0, K0} + end, + QK = leveled_codec:to_ledgerkey(B, K, Tag), + R = leveled_inker:ink_fetch(InkerClone, QK, SQN), case R of {ok, Value} -> FoldObjectsFun(B, K, Value, Acc); @@ -663,6 +699,9 @@ accumulate_objects(FoldObjectsFun, InkerClone) -> end, AccFun. + + + check_presence(Key, Value, InkerClone) -> {LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}), case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 7e1fcff..73e9ab4 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -6,17 +6,18 @@ -define(KEY_ONLY, {false, undefined}). -export([all/0]). --export([simple_load_with2i/1, +-export([small_load_with2i/1, query_count/1, rotating_objects/1]). all() -> [ - simple_load_with2i, + small_load_with2i, query_count, - rotating_objects]. + rotating_objects + ]. -simple_load_with2i(_Config) -> +small_load_with2i(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_journalsize, 50000000}], @@ -37,6 +38,42 @@ simple_load_with2i(_Config) -> ChkList1 = lists:sublist(lists:sort(ObjL1), 100), testutil:check_forlist(Bookie1, ChkList1), testutil:check_forobject(Bookie1, TestObject), + + %% Delete the objects from the ChkList removing the indexes + lists:foreach(fun({_RN, Obj, Spc}) -> + DSpc = lists:map(fun({add, F, T}) -> {remove, F, T} + end, + Spc), + {B, K} = leveled_codec:riakto_keydetails(Obj), + leveled_bookie:book_riakdelete(Bookie1, B, K, DSpc) + end, + ChkList1), + %% Get the Buckets Keys and Hashes for the whole bucket + FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc] + end, + {async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1, + {foldobjects_allkeys, + ?RIAK_TAG, + FoldObjectsFun}), + KeyHashList1 = HTreeF1(), + {async, HTreeF2} = leveled_bookie:book_returnfolder(Bookie1, + {foldobjects_bybucket, + ?RIAK_TAG, + "Bucket", + FoldObjectsFun}), + KeyHashList2 = HTreeF2(), + {async, HTreeF3} = leveled_bookie:book_returnfolder(Bookie1, + {foldobjects_byindex, + ?RIAK_TAG, + "Bucket", + {"idx1_bin", + "#", "~"}, + FoldObjectsFun}), + KeyHashList3 = HTreeF3(), + true = 9901 == length(KeyHashList1), % also includes the test object + true = 9900 == length(KeyHashList2), + true = 9900 == length(KeyHashList3), + ok = leveled_bookie:book_close(Bookie1), testutil:reset_filestructure(). diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index a6269d1..de58e4c 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -103,7 +103,8 @@ aae_bustedjournal(_Config) -> % Will need to remove the file or corrupt the hashtree to get presence to % fail - FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, riak_hash(V)}|Acc] end, + FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc] + end, SW = os:timestamp(), {async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2, {foldobjects_allkeys, @@ -190,15 +191,6 @@ aae_bustedjournal(_Config) -> testutil:reset_filestructure(). -riak_hash(Obj=#r_object{}) -> - Vclock = vclock(Obj), - UpdObj = set_vclock(Obj, lists:sort(Vclock)), - erlang:phash2(term_to_binary(UpdObj)). - -set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. -vclock(#r_object{vclock=VClock}) -> VClock. - - journal_compaction_bustedjournal(_Config) -> % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index e11d1a8..9d9f278 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -32,7 +32,8 @@ corrupt_journal/5, restore_file/2, restore_topending/2, - find_journals/1]). + find_journals/1, + riak_hash/1]). -define(RETURN_TERMS, {true, undefined}). @@ -423,4 +424,13 @@ find_journals(RootPath) -> end, [], FNsA_J), - CDBFiles. \ No newline at end of file + CDBFiles. + + +riak_hash(Obj=#r_object{}) -> + Vclock = vclock(Obj), + UpdObj = set_vclock(Obj, lists:sort(Vclock)), + erlang:phash2(term_to_binary(UpdObj)). + +set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. +vclock(#r_object{vclock=VClock}) -> VClock.