Expand fold objects support
Fold over bucket and fold over index added
This commit is contained in:
parent
9ea74836ee
commit
68b17c71b3
4 changed files with 100 additions and 22 deletions
|
@ -372,7 +372,23 @@ handle_call({return_folder, FolderType}, _From, State) ->
|
|||
{foldobjects_allkeys, Tag, FoldObjectsFun} ->
|
||||
{reply,
|
||||
foldobjects_allkeys(State, Tag, FoldObjectsFun),
|
||||
State};
|
||||
{foldobjects_bybucket, Tag, Bucket, FoldObjectsFun} ->
|
||||
{reply,
|
||||
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun),
|
||||
State};
|
||||
{foldobjects_byindex,
|
||||
Tag,
|
||||
Bucket,
|
||||
{Field, FromTerm, ToTerm},
|
||||
FoldObjectsFun} ->
|
||||
{reply,
|
||||
foldobjects_byindex(State,
|
||||
Tag, Bucket,
|
||||
Field, FromTerm, ToTerm,
|
||||
FoldObjectsFun),
|
||||
State}
|
||||
|
||||
end;
|
||||
handle_call({compact_journal, Timeout}, _From, State) ->
|
||||
ok = leveled_inker:ink_compactjournal(State#state.inker,
|
||||
|
@ -492,6 +508,23 @@ hashtree_query(State, Tag, JournalCheck) ->
|
|||
|
||||
|
||||
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
||||
|
||||
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) ->
|
||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
|
||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
||||
|
||||
foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun) ->
|
||||
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field,
|
||||
FromTerm),
|
||||
EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field,
|
||||
ToTerm),
|
||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun).
|
||||
|
||||
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
|
||||
{ok,
|
||||
{LedgerSnapshot, LedgerCache},
|
||||
JournalSnapshot} = snapshot_store(State, store),
|
||||
|
@ -499,9 +532,7 @@ foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
|||
leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
|
||||
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
||||
LedgerCache),
|
||||
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||
AccFun = accumulate_objects(FoldObjectsFun, JournalSnapshot),
|
||||
AccFun = accumulate_objects(FoldObjectsFun, JournalSnapshot, Tag),
|
||||
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||
StartKey,
|
||||
EndKey,
|
||||
|
@ -513,6 +544,7 @@ foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
|||
end,
|
||||
{async, Folder}.
|
||||
|
||||
|
||||
allkey_query(State, Tag) ->
|
||||
{ok,
|
||||
{LedgerSnapshot, LedgerCache},
|
||||
|
@ -643,14 +675,18 @@ accumulate_hashes(JournalCheck, InkerClone) ->
|
|||
end,
|
||||
AccFun.
|
||||
|
||||
accumulate_objects(FoldObjectsFun, InkerClone) ->
|
||||
accumulate_objects(FoldObjectsFun, InkerClone, Tag) ->
|
||||
Now = leveled_codec:integer_now(),
|
||||
AccFun = fun(LK, V, Acc) ->
|
||||
case leveled_codec:is_active(LK, V, Now) of
|
||||
true ->
|
||||
SQN = leveled_codec:strip_to_seqonly({LK, V}),
|
||||
{B, K} = leveled_codec:from_ledgerkey(LK),
|
||||
R = leveled_inker:ink_fetch(InkerClone, LK, SQN),
|
||||
{B, K} = case leveled_codec:from_ledgerkey(LK) of
|
||||
{B0, K0} -> {B0, K0};
|
||||
{B0, K0, _T0} -> {B0, K0}
|
||||
end,
|
||||
QK = leveled_codec:to_ledgerkey(B, K, Tag),
|
||||
R = leveled_inker:ink_fetch(InkerClone, QK, SQN),
|
||||
case R of
|
||||
{ok, Value} ->
|
||||
FoldObjectsFun(B, K, Value, Acc);
|
||||
|
@ -663,6 +699,9 @@ accumulate_objects(FoldObjectsFun, InkerClone) ->
|
|||
end,
|
||||
AccFun.
|
||||
|
||||
|
||||
|
||||
|
||||
check_presence(Key, Value, InkerClone) ->
|
||||
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
||||
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
||||
|
|
|
@ -6,17 +6,18 @@
|
|||
-define(KEY_ONLY, {false, undefined}).
|
||||
|
||||
-export([all/0]).
|
||||
-export([simple_load_with2i/1,
|
||||
-export([small_load_with2i/1,
|
||||
query_count/1,
|
||||
rotating_objects/1]).
|
||||
|
||||
all() -> [
|
||||
simple_load_with2i,
|
||||
small_load_with2i,
|
||||
query_count,
|
||||
rotating_objects].
|
||||
rotating_objects
|
||||
].
|
||||
|
||||
|
||||
simple_load_with2i(_Config) ->
|
||||
small_load_with2i(_Config) ->
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
StartOpts1 = [{root_path, RootPath},
|
||||
{max_journalsize, 50000000}],
|
||||
|
@ -37,6 +38,42 @@ simple_load_with2i(_Config) ->
|
|||
ChkList1 = lists:sublist(lists:sort(ObjL1), 100),
|
||||
testutil:check_forlist(Bookie1, ChkList1),
|
||||
testutil:check_forobject(Bookie1, TestObject),
|
||||
|
||||
%% Delete the objects from the ChkList removing the indexes
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
DSpc = lists:map(fun({add, F, T}) -> {remove, F, T}
|
||||
end,
|
||||
Spc),
|
||||
{B, K} = leveled_codec:riakto_keydetails(Obj),
|
||||
leveled_bookie:book_riakdelete(Bookie1, B, K, DSpc)
|
||||
end,
|
||||
ChkList1),
|
||||
%% Get the Buckets Keys and Hashes for the whole bucket
|
||||
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc]
|
||||
end,
|
||||
{async, HTreeF1} = leveled_bookie:book_returnfolder(Bookie1,
|
||||
{foldobjects_allkeys,
|
||||
?RIAK_TAG,
|
||||
FoldObjectsFun}),
|
||||
KeyHashList1 = HTreeF1(),
|
||||
{async, HTreeF2} = leveled_bookie:book_returnfolder(Bookie1,
|
||||
{foldobjects_bybucket,
|
||||
?RIAK_TAG,
|
||||
"Bucket",
|
||||
FoldObjectsFun}),
|
||||
KeyHashList2 = HTreeF2(),
|
||||
{async, HTreeF3} = leveled_bookie:book_returnfolder(Bookie1,
|
||||
{foldobjects_byindex,
|
||||
?RIAK_TAG,
|
||||
"Bucket",
|
||||
{"idx1_bin",
|
||||
"#", "~"},
|
||||
FoldObjectsFun}),
|
||||
KeyHashList3 = HTreeF3(),
|
||||
true = 9901 == length(KeyHashList1), % also includes the test object
|
||||
true = 9900 == length(KeyHashList2),
|
||||
true = 9900 == length(KeyHashList3),
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
testutil:reset_filestructure().
|
||||
|
||||
|
|
|
@ -103,7 +103,8 @@ aae_bustedjournal(_Config) ->
|
|||
% Will need to remove the file or corrupt the hashtree to get presence to
|
||||
% fail
|
||||
|
||||
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, riak_hash(V)}|Acc] end,
|
||||
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, testutil:riak_hash(V)}|Acc]
|
||||
end,
|
||||
SW = os:timestamp(),
|
||||
{async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2,
|
||||
{foldobjects_allkeys,
|
||||
|
@ -190,15 +191,6 @@ aae_bustedjournal(_Config) ->
|
|||
testutil:reset_filestructure().
|
||||
|
||||
|
||||
riak_hash(Obj=#r_object{}) ->
|
||||
Vclock = vclock(Obj),
|
||||
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
|
||||
erlang:phash2(term_to_binary(UpdObj)).
|
||||
|
||||
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
|
||||
vclock(#r_object{vclock=VClock}) -> VClock.
|
||||
|
||||
|
||||
journal_compaction_bustedjournal(_Config) ->
|
||||
% Simply confirms that none of this causes a crash
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
|
|
|
@ -32,7 +32,8 @@
|
|||
corrupt_journal/5,
|
||||
restore_file/2,
|
||||
restore_topending/2,
|
||||
find_journals/1]).
|
||||
find_journals/1,
|
||||
riak_hash/1]).
|
||||
|
||||
-define(RETURN_TERMS, {true, undefined}).
|
||||
|
||||
|
@ -424,3 +425,12 @@ find_journals(RootPath) ->
|
|||
[],
|
||||
FNsA_J),
|
||||
CDBFiles.
|
||||
|
||||
|
||||
riak_hash(Obj=#r_object{}) ->
|
||||
Vclock = vclock(Obj),
|
||||
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
|
||||
erlang:phash2(term_to_binary(UpdObj)).
|
||||
|
||||
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
|
||||
vclock(#r_object{vclock=VClock}) -> VClock.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue