Add Fold Object by KeyList support
This commit is contained in:
parent
8601e219d5
commit
898f86a08d
2 changed files with 109 additions and 3 deletions
|
@ -373,6 +373,10 @@ handle_call({return_folder, FolderType}, _From, State) ->
|
||||||
{hashtree_query, Tag, JournalCheck} ->
|
{hashtree_query, Tag, JournalCheck} ->
|
||||||
{reply,
|
{reply,
|
||||||
hashtree_query(State, Tag, JournalCheck),
|
hashtree_query(State, Tag, JournalCheck),
|
||||||
|
State};
|
||||||
|
{foldobjects_allkeys, Tag, FoldObjectsFun} ->
|
||||||
|
{reply,
|
||||||
|
foldobjects_allkeys(State, Tag, FoldObjectsFun),
|
||||||
State}
|
State}
|
||||||
end;
|
end;
|
||||||
handle_call({compact_journal, Timeout}, _From, State) ->
|
handle_call({compact_journal, Timeout}, _From, State) ->
|
||||||
|
@ -495,6 +499,30 @@ hashtree_query(State, Tag, JournalCheck) ->
|
||||||
end,
|
end,
|
||||||
{async, Folder}.
|
{async, Folder}.
|
||||||
|
|
||||||
|
|
||||||
|
foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
|
||||||
|
{ok,
|
||||||
|
{LedgerSnapshot, LedgerCache},
|
||||||
|
JournalSnapshot} = snapshot_store(State, store),
|
||||||
|
Folder = fun() ->
|
||||||
|
io:format("Length of increment in snapshot is ~w~n",
|
||||||
|
[gb_trees:size(LedgerCache)]),
|
||||||
|
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
|
||||||
|
LedgerCache),
|
||||||
|
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
|
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
|
||||||
|
AccFun = accumulate_objects(FoldObjectsFun, JournalSnapshot),
|
||||||
|
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
|
||||||
|
StartKey,
|
||||||
|
EndKey,
|
||||||
|
AccFun,
|
||||||
|
[]),
|
||||||
|
ok = leveled_penciller:pcl_close(LedgerSnapshot),
|
||||||
|
ok = leveled_inker:ink_close(JournalSnapshot),
|
||||||
|
Acc
|
||||||
|
end,
|
||||||
|
{async, Folder}.
|
||||||
|
|
||||||
allkey_query(State, Tag) ->
|
allkey_query(State, Tag) ->
|
||||||
{ok,
|
{ok,
|
||||||
{LedgerSnapshot, LedgerCache},
|
{LedgerSnapshot, LedgerCache},
|
||||||
|
@ -640,6 +668,26 @@ accumulate_hashes(JournalCheck, InkerClone) ->
|
||||||
end,
|
end,
|
||||||
AccFun.
|
AccFun.
|
||||||
|
|
||||||
|
accumulate_objects(FoldObjectsFun, InkerClone) ->
|
||||||
|
Now = leveled_codec:integer_now(),
|
||||||
|
AccFun = fun(LK, V, Acc) ->
|
||||||
|
case leveled_codec:is_active(LK, V, Now) of
|
||||||
|
true ->
|
||||||
|
SQN = leveled_codec:strip_to_seqonly({LK, V}),
|
||||||
|
{B, K} = leveled_codec:from_ledgerkey(LK),
|
||||||
|
R = leveled_inker:ink_fetch(InkerClone, LK, SQN),
|
||||||
|
case R of
|
||||||
|
{ok, Value} ->
|
||||||
|
FoldObjectsFun(B, K, Value, Acc);
|
||||||
|
not_present ->
|
||||||
|
Acc
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
AccFun.
|
||||||
|
|
||||||
check_presence(Key, Value, InkerClone) ->
|
check_presence(Key, Value, InkerClone) ->
|
||||||
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
|
||||||
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
case leveled_inker:ink_keycheck(InkerClone, LedgerKey, SQN) of
|
||||||
|
@ -1071,4 +1119,36 @@ hashtree_query_withjournalcheck_test() ->
|
||||||
ok = book_close(Bookie1),
|
ok = book_close(Bookie1),
|
||||||
reset_filestructure().
|
reset_filestructure().
|
||||||
|
|
||||||
|
foldobjects_vs_hashtree_test() ->
|
||||||
|
RootPath = reset_filestructure(),
|
||||||
|
{ok, Bookie1} = book_start([{root_path, RootPath},
|
||||||
|
{max_journalsize, 1000000},
|
||||||
|
{cache_size, 500}]),
|
||||||
|
ObjL1 = generate_multiple_objects(800, 1),
|
||||||
|
% Put in all the objects with a TTL in the future
|
||||||
|
Future = leveled_codec:integer_now() + 300,
|
||||||
|
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
|
||||||
|
"Bucket", K, V, S,
|
||||||
|
?STD_TAG,
|
||||||
|
Future) end,
|
||||||
|
ObjL1),
|
||||||
|
{async, HTFolder1} = book_returnfolder(Bookie1,
|
||||||
|
{hashtree_query,
|
||||||
|
?STD_TAG,
|
||||||
|
false}),
|
||||||
|
KeyHashList1 = lists:usort(HTFolder1()),
|
||||||
|
io:format("First item ~w~n", [lists:nth(1, KeyHashList1)]),
|
||||||
|
FoldObjectsFun = fun(B, K, V, Acc) ->
|
||||||
|
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
|
||||||
|
{async, HTFolder2} = book_returnfolder(Bookie1,
|
||||||
|
{foldobjects_allkeys,
|
||||||
|
?STD_TAG,
|
||||||
|
FoldObjectsFun}),
|
||||||
|
KeyHashList2 = HTFolder2(),
|
||||||
|
?assertMatch(KeyHashList1, lists:usort(KeyHashList2)),
|
||||||
|
|
||||||
|
ok = book_close(Bookie1),
|
||||||
|
reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
|
@ -8,9 +8,9 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() -> [
|
all() -> [
|
||||||
retain_strategy,
|
% retain_strategy,
|
||||||
aae_bustedjournal,
|
aae_bustedjournal %,
|
||||||
journal_compaction_bustedjournal
|
% journal_compaction_bustedjournal
|
||||||
].
|
].
|
||||||
|
|
||||||
retain_strategy(_Config) ->
|
retain_strategy(_Config) ->
|
||||||
|
@ -103,9 +103,35 @@ aae_bustedjournal(_Config) ->
|
||||||
% Will need to remove the file or corrupt the hashtree to get presence to
|
% Will need to remove the file or corrupt the hashtree to get presence to
|
||||||
% fail
|
% fail
|
||||||
|
|
||||||
|
FoldObjectsFun = fun(B, K, V, Acc) -> [{B, K, riak_hash(V)}|Acc] end,
|
||||||
|
SW = os:timestamp(),
|
||||||
|
{async, HashTreeF3} = leveled_bookie:book_returnfolder(Bookie2,
|
||||||
|
{foldobjects_allkeys,
|
||||||
|
?RIAK_TAG,
|
||||||
|
FoldObjectsFun}),
|
||||||
|
KeyHashList3 = HashTreeF3(),
|
||||||
|
|
||||||
|
true = length(KeyHashList3) > 19000,
|
||||||
|
true = length(KeyHashList3) < HeadCount,
|
||||||
|
Delta = length(lists:subtract(KeyHashList1, KeyHashList3)),
|
||||||
|
true = Delta < 1001,
|
||||||
|
io:format("Fetch of hashtree using fold objects took ~w microseconds" ++
|
||||||
|
" and found a Delta of ~w and an objects count of ~w~n",
|
||||||
|
[timer:now_diff(os:timestamp(), SW),
|
||||||
|
Delta,
|
||||||
|
length(KeyHashList3)]),
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie2),
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
testutil:reset_filestructure().
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
riak_hash(Obj=#r_object{}) ->
|
||||||
|
Vclock = vclock(Obj),
|
||||||
|
UpdObj = set_vclock(Obj, lists:sort(Vclock)),
|
||||||
|
erlang:phash2(term_to_binary(UpdObj)).
|
||||||
|
|
||||||
|
set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}.
|
||||||
|
vclock(#r_object{vclock=VClock}) -> VClock.
|
||||||
|
|
||||||
|
|
||||||
journal_compaction_bustedjournal(_Config) ->
|
journal_compaction_bustedjournal(_Config) ->
|
||||||
% Simply confirms that none of this causes a crash
|
% Simply confirms that none of this causes a crash
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue