Merge pull request #101 from martinsumner/mas-aae-segementfoldplus

Mas aae segementfoldplus
This commit is contained in:
Martin Sumner 2017-11-08 21:07:15 +00:00 committed by GitHub
commit 60e1868150
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 1021 additions and 372 deletions

View file

@ -660,26 +660,10 @@ snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) ->
LedgerCacheReady#ledger_cache.index,
LedgerCacheReady#ledger_cache.min_sqn,
LedgerCacheReady#ledger_cache.max_sqn},
LongRunning0 =
case LongRunning of
undefined ->
case Query of
undefined ->
true;
no_lookup ->
true;
_ ->
% If a specific query has been defined, then not expected
% to be long running
false
end;
TrueOrFalse ->
TrueOrFalse
end,
PCLopts = #penciller_options{start_snapshot = true,
source_penciller = Penciller,
snapshot_query = Query,
snapshot_longrunning = LongRunning0,
snapshot_longrunning = LongRunning,
bookies_mem = BookiesMem},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
case SnapType of
@ -768,10 +752,14 @@ get_runner(State, {keylist, Tag, Bucket, FoldAccT}) ->
%% Set of runners for object or metadata folds
get_runner(State,
{foldheads_allkeys, Tag, FoldFun, JournalCheck, SnapPreFold}) ->
{foldheads_allkeys,
Tag, FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck);
leveled_runner:foldheads_allkeys(SnapFun,
Tag, FoldFun,
JournalCheck, SegmentList);
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) ->
SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold),
@ -780,14 +768,14 @@ get_runner(State,
{foldheads_bybucket,
Tag, Bucket, KeyRange,
FoldFun,
JournalCheck, SnapPreFold}) ->
JournalCheck, SnapPreFold, SegmentList}) ->
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun,
{Tag, StartKey, EndKey},
FoldFun,
JournalCheck);
JournalCheck, SegmentList);
get_runner(State,
{foldobjects_bybucket,
Tag, Bucket, KeyRange,
@ -1425,8 +1413,7 @@ foldobjects_vs_hashtree_testto() ->
{foldheads_allkeys,
?STD_TAG,
FoldHeadsFun,
true,
true}),
true, true, false}),
KeyHashList3 = HTFolder3(),
?assertMatch(KeyHashList1, lists:usort(KeyHashList3)),
@ -1445,8 +1432,7 @@ foldobjects_vs_hashtree_testto() ->
{foldheads_allkeys,
?STD_TAG,
FoldHeadsFun2,
false,
false}),
false, false, false}),
KeyHashList4 = HTFolder4(),
?assertMatch(KeyHashList1, lists:usort(KeyHashList4)),
@ -1516,8 +1502,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
"BucketA",
all,
FoldHeadsFun,
true,
true}),
true, true, false}),
KeyHashList2A = HTFolder2A(),
{async, HTFolder2B} =
book_returnfolder(Bookie1,
@ -1526,8 +1511,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
"BucketB",
all,
FoldHeadsFun,
true,
false}),
true, false, false}),
KeyHashList2B = HTFolder2B(),
?assertMatch(true,
@ -1542,8 +1526,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
"BucketB",
{"Key", <<"$all">>},
FoldHeadsFun,
true,
false}),
true, false, false}),
KeyHashList2C = HTFolder2C(),
{async, HTFolder2D} =
book_returnfolder(Bookie1,
@ -1552,8 +1535,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
"BucketB",
{"Key", "Keyzzzzz"},
FoldHeadsFun,
true,
true}),
true, true, false}),
KeyHashList2D = HTFolder2D(),
?assertMatch(true,
lists:usort(KeyHashList2B) == lists:usort(KeyHashList2C)),
@ -1567,8 +1549,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
"BucketB",
{"Key", "Key4zzzz"},
FoldHeadsFun,
true,
false}),
true, false, false}),
KeyHashList2E = HTFolder2E(),
{async, HTFolder2F} =
book_returnfolder(Bookie1,
@ -1577,8 +1558,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
"BucketB",
{"Key5", <<"all">>},
FoldHeadsFun,
true,
false}),
true, false, false}),
KeyHashList2F = HTFolder2F(),
?assertMatch(true, length(KeyHashList2E) > 0),

View file

@ -699,13 +699,11 @@ terminate(Reason, StateName, State) ->
ok;
{Handle, delete_pending, undefined} ->
ok = file:close(Handle),
ok = file:delete(State#state.filename),
leveled_log:log("CDB20", [State#state.filename]);
ok = file:delete(State#state.filename);
{Handle, delete_pending, WasteFP} ->
file:close(Handle),
Components = filename:split(State#state.filename),
NewName = WasteFP ++ lists:last(Components),
leveled_log:log("CDB19", [State#state.filename, NewName]),
file:rename(State#state.filename, NewName);
{Handle, _, _} ->
file:close(Handle)

View file

@ -67,7 +67,8 @@
riak_extract_metadata/2,
magic_hash/1,
segment_hash/1,
to_lookup/1]).
to_lookup/1,
riak_metadata_to_binary/2]).
-define(V1_VERS, 1).
-define(MAGIC, 53). % riak_kv -> riak_object
@ -92,9 +93,13 @@ segment_hash(Key) when is_binary(Key) ->
<<SegmentID:16/integer, ExtraHash:32/integer, _Rest/binary>> =
crypto:hash(md5, Key),
{SegmentID, ExtraHash};
segment_hash({?RIAK_TAG, Bucket, Key, null})
when is_binary(Bucket), is_binary(Key) ->
segment_hash(<<Bucket/binary, Key/binary>>);
segment_hash(Key) ->
segment_hash(term_to_binary(Key)).
-spec magic_hash(any()) -> integer().
%% @doc
%% Use DJ Bernstein magic hash function. Note, this is more expensive than
@ -232,8 +237,6 @@ from_inkerkv(Object, ToIgnoreKeyChanges) ->
case Object of
{{SQN, ?INKT_STND, PK}, Bin} when is_binary(Bin) ->
{{SQN, PK}, revert_value_from_journal(Bin, ToIgnoreKeyChanges)};
{{SQN, ?INKT_STND, PK}, Term} ->
{{SQN, PK}, Term};
_ ->
Object
end.
@ -382,13 +385,8 @@ get_tagstrategy(LK, Strategy) ->
skip
end.
split_inkvalue(VBin) ->
case is_binary(VBin) of
true ->
revert_value_from_journal(VBin);
false ->
VBin
end.
split_inkvalue(VBin) when is_binary(VBin) ->
revert_value_from_journal(VBin).
check_forinkertype(_LedgerKey, delete) ->
?INKT_TOMB;
@ -498,9 +496,9 @@ aae_indexspecs(AAE, Bucket, Key, SQN, H, LastMods) ->
Acc;
{LMD1, TTL} ->
TreeSize = AAE#recent_aae.tree_size,
SegID32 = leveled_tictac:keyto_segment32(Key),
SegID =
leveled_tictac:get_segment(erlang:phash2(Key),
TreeSize),
leveled_tictac:get_segment(SegID32, TreeSize),
IdxFldStr = ?NRT_IDX ++ LMD1 ++ "_bin",
IdxTrmStr =
string:right(integer_to_list(SegID), 8, $0) ++

View file

@ -393,16 +393,13 @@ handle_call({register_snapshot, Requestor}, _From , State) ->
State#state.active_journaldb},
State#state{registered_snapshots=Rs}};
handle_call({confirm_delete, ManSQN}, _From, State) ->
Reply = lists:foldl(fun({_R, SnapSQN}, Bool) ->
case SnapSQN >= ManSQN of
true ->
Bool;
false ->
false
end end,
true,
State#state.registered_snapshots),
{reply, Reply, State};
CheckSQNFun =
fun({_R, SnapSQN}, Bool) ->
(SnapSQN >= ManSQN) and Bool
end,
{reply,
lists:foldl(CheckSQNFun, true, State#state.registered_snapshots),
State};
handle_call(get_manifest, _From, State) ->
{reply, leveled_imanifest:to_list(State#state.manifest), State};
handle_call({update_manifest,
@ -1065,16 +1062,10 @@ compact_journal_testto(WRP, ExpectedFiles) ->
5000),
timer:sleep(1000),
CompactedManifest2 = ink_getmanifest(Ink1),
R = lists:foldl(fun({_SQN, FN, _P, _LK}, Acc) ->
case string:str(FN, ?COMPACT_FP) of
N when N > 0 ->
true;
0 ->
Acc
end end,
false,
CompactedManifest2),
?assertMatch(false, R),
lists:foreach(fun({_SQN, FN, _P, _LK}) ->
?assertMatch(0, string:str(FN, ?COMPACT_FP))
end,
CompactedManifest2),
?assertMatch(2, length(CompactedManifest2)),
ink_close(Ink1),
% Need to wait for delete_pending files to timeout

View file

@ -176,6 +176,7 @@
pcl_fetch/2,
pcl_fetch/3,
pcl_fetchkeys/5,
pcl_fetchkeysbysegment/6,
pcl_fetchnextkey/5,
pcl_checksequencenumber/3,
pcl_workforclerk/1,
@ -348,7 +349,32 @@ pcl_fetch(Pid, Key, Hash) ->
%% Erlang term order.
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
gen_server:call(Pid,
{fetch_keys, StartKey, EndKey, AccFun, InitAcc, -1},
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
false, -1},
infinity).
-spec pcl_fetchkeysbysegment(pid(), tuple(), tuple(), fun(), any(),
false|list(integer())) -> any().
%% @doc
%% Run a range query between StartKey and EndKey (inclusive). This will cover
%% all keys in the range - so must only be run against snapshots of the
%% penciller to avoid blocking behaviour.
%%
%% This version allows an additional input of a SegmentList. This is a list
%% of 16-bit integers representing the segment IDs band ((2 ^ 16) -1) that
%% are interesting to the fetch
%%
%% Note that segment must be false unless the object Tag supports additional
%% indexing by segment. This cannot be used on ?IDX_TAG and other tags that
%% use the no_lookup hash
pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) ->
gen_server:call(Pid,
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
SegmentList, -1},
infinity).
-spec pcl_fetchnextkey(pid(), tuple(), tuple(), fun(), any()) -> any().
@ -358,7 +384,10 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
%% found in erlang term order.
pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
gen_server:call(Pid,
{fetch_keys, StartKey, EndKey, AccFun, InitAcc, 1},
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
false, 1},
infinity).
-spec pcl_checksequencenumber(pid(), tuple(), integer()) -> boolean().
@ -542,7 +571,10 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
State#state.levelzero_index),
SQN),
State};
handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
handle_call({fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
SegmentList, MaxKeys},
_From,
State=#state{snapshot_fully_loaded=Ready})
when Ready == true ->
@ -577,7 +609,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
Acc = keyfolder({L0AsList, SSTiter},
{StartKey, EndKey},
{AccFun, InitAcc},
MaxKeys),
{SegmentList, MaxKeys}),
{reply, Acc, State#state{levelzero_astree = L0AsList}};
handle_call(get_startup_sqn, _From, State) ->
@ -746,20 +778,14 @@ handle_cast(work_for_clerk, State) ->
case WC of
0 ->
{noreply, State#state{work_backlog=false}};
N when N > ?WORKQUEUE_BACKLOG_TOLERANCE ->
leveled_log:log("P0024", [N, true]),
[TL|_Tail] = WL,
ok = leveled_pclerk:clerk_push(State#state.clerk,
{TL, State#state.manifest}),
{noreply,
State#state{work_backlog=true, work_ongoing=true}};
N ->
leveled_log:log("P0024", [N, false]),
Backlog = N > ?WORKQUEUE_BACKLOG_TOLERANCE,
leveled_log:log("P0024", [N, Backlog]),
[TL|_Tail] = WL,
ok = leveled_pclerk:clerk_push(State#state.clerk,
{TL, State#state.manifest}),
{noreply,
State#state{work_backlog=false, work_ongoing=true}}
State#state{work_backlog=Backlog, work_ongoing=true}}
end;
_ ->
{noreply, State}
@ -960,7 +986,8 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
ledger_sqn=UpdMaxSQN},
CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize,
CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE,
L0Free = not leveled_pmanifest:levelzero_present(State#state.manifest),
L0Free =
not leveled_pmanifest:levelzero_present(State#state.manifest),
RandomFactor =
case State#state.levelzero_cointoss of
true ->
@ -1106,43 +1133,175 @@ compare_to_sqn(Obj, SQN) ->
end.
%%%============================================================================
%%% Iterator functions
%%%
%%% TODO - move to dedicated module with extended unit testing
%%%============================================================================
-spec keyfolder(list(), list(), tuple(), tuple(), {fun(), any()}) -> any().
%% @doc
%% The keyfolder will compare an iterator across the immutable in-memory cache
%% of the Penciller (the IMMiter), with an iterator across the persisted part
%% (the SSTiter).
%%
%% A Segment List and a MaxKeys may be passed. Every time something is added
%% to the accumulator MaxKeys is reduced - so set MaxKeys to -1 if it is
%% intended to be infinite.
%%
%% The basic principle is to take the next key in the IMMiter and compare it
%% to the next key in the SSTiter, and decide which one should be added to the
%% accumulator. The iterators are advanced if they either win (i.e. are the
%% next key), or are dominated. This goes on until the iterators are empty.
%%
%% To advance the SSTiter the find_nextkey/4 function is used, as the SSTiter
%% is an iterator across multiple levels - and so needs to do its own
%% comparisons to pop the next result.
keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder({IMMiter, SSTiter},
{StartKey, EndKey},
{AccFun, Acc},
{false, -1}).
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, {_SegmentList, MaxKeys})
when MaxKeys == 0 ->
Acc;
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, {SegmentList, MaxKeys}) ->
{StartKey, EndKey} = KeyRange,
case find_nextkey(SSTiter, StartKey, EndKey, SegmentList) of
no_more_keys ->
Acc;
{NxSSTiter, {SSTKey, SSTVal}} ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[], NxSSTiter},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1})
end;
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys}) ->
{StartKey, EndKey} = KeyRange,
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
{true, _} ->
% Normally everything is pre-filterd, but the IMM iterator can
% be re-used and so may be behind the StartKey if the StartKey has
% advanced from the previous use
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys});
{false, true} ->
% There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty
% (see above)
keyfolder({[], SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys});
{false, false} ->
case find_nextkey(SSTiterator, StartKey, EndKey, SegmentList) of
no_more_keys ->
% No more keys in range in the persisted store, so use the
% in-memory KV as the next
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator,
[]},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
{NxSSTiterator, {SSTKey, SSTVal}} ->
% There is a next key, so need to know which is the
% next key between the two (and handle two keys
% with different sequence numbers).
case leveled_codec:key_dominates({IMMKey,
IMMVal},
{SSTKey,
SSTVal}) of
left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
% Stow the previous best result away at Level -1
% so that there is no need to iterate to it again
NewEntry = {-1, [{SSTKey, SSTVal}]},
keyfolder({NxIMMiterator,
lists:keystore(-1,
1,
NxSSTiterator,
NewEntry)},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
right_hand_first ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
NxSSTiterator},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
% We can add to the accumulator here. As the SST
% key was the most dominant across all SST levels,
% so there is no need to hold off until the IMMKey
% is left hand first.
keyfolder({NxIMMiterator,
NxSSTiterator},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1})
end
end
end.
%% Looks to find the best choice for the next key across the levels (other
%% than in-memory table)
%% In finding the best choice, the next key in a given level may be a next
%% block or next file pointer which will need to be expanded
find_nextkey(QueryArray, StartKey, EndKey) ->
find_nextkey(QueryArray,
0,
{null, null},
StartKey,
EndKey,
?ITERATOR_SCANWIDTH).
find_nextkey(QueryArray, StartKey, EndKey, false).
find_nextkey(_QueryArray, LCnt, {null, null}, _StartKey, _EndKey, _Width)
when LCnt > ?MAX_LEVELS ->
find_nextkey(QueryArray, StartKey, EndKey, SegmentList) ->
find_nextkey(QueryArray,
-1,
{null, null},
StartKey, EndKey,
SegmentList, ?ITERATOR_SCANWIDTH).
find_nextkey(_QueryArray, LCnt,
{null, null},
_StartKey, _EndKey,
_SegList, _Width) when LCnt > ?MAX_LEVELS ->
% The array has been scanned wihtout finding a best key - must be
% exhausted - respond to indicate no more keys to be found by the
% iterator
no_more_keys;
find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _StartKey, _EndKey, _Width)
when LCnt > ?MAX_LEVELS ->
find_nextkey(QueryArray, LCnt,
{BKL, BestKV},
_StartKey, _EndKey,
_SegList, _Width) when LCnt > ?MAX_LEVELS ->
% All levels have been scanned, so need to remove the best result from
% the array, and return that array along with the best key/sqn/status
% combination
{BKL, [BestKV|Tail]} = lists:keyfind(BKL, 1, QueryArray),
{lists:keyreplace(BKL, 1, QueryArray, {BKL, Tail}), BestKV};
find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
StartKey, EndKey, Width) ->
find_nextkey(QueryArray, LCnt,
{BestKeyLevel, BestKV},
StartKey, EndKey,
SegList, Width) ->
% Get the next key at this level
{NextKey, RestOfKeys} = case lists:keyfind(LCnt, 1, QueryArray) of
false ->
{null, null};
{LCnt, []} ->
{null, null};
{LCnt, [NK|ROfKs]} ->
{NK, ROfKs}
end,
{NextKey, RestOfKeys} =
case lists:keyfind(LCnt, 1, QueryArray) of
false ->
{null, null};
{LCnt, []} ->
{null, null};
{LCnt, [NK|ROfKs]} ->
{NK, ROfKs}
end,
% Compare the next key at this level with the best key
case {NextKey, BestKeyLevel, BestKV} of
{null, BKL, BKV} ->
@ -1150,42 +1309,48 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
find_nextkey(QueryArray,
LCnt + 1,
{BKL, BKV},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{next, Owner, _SK}, BKL, BKV} ->
% The first key at this level is pointer to a file - need to query
% the file to expand this level out before proceeding
Pointer = {next, Owner, StartKey, EndKey},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
Width),
Width,
SegList),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
LCnt,
{BKL, BKV},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{pointer, SSTPid, Slot, PSK, PEK}, BKL, BKV} ->
% The first key at this level is pointer within a file - need to
% query the file to expand this level out before proceeding
Pointer = {pointer, SSTPid, Slot, PSK, PEK},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
Width),
Width,
SegList),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
LCnt,
{BKL, BKV},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{Key, Val}, null, null} ->
% No best key set - so can assume that this key is the best key,
% and check the lower levels
find_nextkey(QueryArray,
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{Key, Val}, _BKL, {BestKey, _BestVal}} when Key < BestKey ->
% There is a real key and a best key to compare, and the real key
% at this level is before the best key, and so is now the new best
@ -1194,18 +1359,23 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
find_nextkey(QueryArray,
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey ->
SQN = leveled_codec:strip_to_seqonly({Key, Val}),
BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}),
if
SQN =< BestSQN ->
% This is a dominated key, so we need to skip over it
NewEntry = {LCnt, RestOfKeys},
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
NewQArray = lists:keyreplace(LCnt,
1,
QueryArray,
{LCnt, RestOfKeys}),
find_nextkey(NewQArray,
LCnt + 1,
{BKL, {BestKey, BestVal}},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
SQN > BestSQN ->
% There is a real key at the front of this level and it has
% a higher SQN than the best key, so we should use this as
@ -1220,92 +1390,19 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
{BKL, BestTail}),
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey, Width)
StartKey, EndKey,
SegList, Width)
end;
{_, BKL, BKV} ->
% This is not the best key
find_nextkey(QueryArray,
LCnt + 1,
{BKL, BKV},
StartKey, EndKey, Width)
StartKey, EndKey,
SegList, Width)
end.
keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder({IMMiter, SSTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1).
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 ->
Acc;
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange,
case find_nextkey(SSTiter, StartKey, EndKey) of
no_more_keys ->
Acc;
{NxSSTiter, {SSTKey, SSTVal}} ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[], NxSSTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1)
end;
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange,
{AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange,
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
{true, _} ->
% Normally everything is pre-filterd, but the IMM iterator can
% be re-used and so may be behind the StartKey if the StartKey has
% advanced from the previous use
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
{false, true} ->
% There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty
% (see above)
keyfolder({[], SSTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
{false, false} ->
case find_nextkey(SSTiterator, StartKey, EndKey) of
no_more_keys ->
% No more keys in range in the persisted store, so use the
% in-memory KV as the next
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
{NxSSTiterator, {SSTKey, SSTVal}} ->
% There is a next key, so need to know which is the
% next key between the two (and handle two keys
% with different sequence numbers).
case leveled_codec:key_dominates({IMMKey,
IMMVal},
{SSTKey,
SSTVal}) of
left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
right_hand_first ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
NxSSTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, NxSSTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1)
end
end
end.
%%%============================================================================
@ -1445,7 +1542,8 @@ simple_server_test() ->
?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003", null})),
timer:sleep(200),
% This sleep should make sure that the merge to L1 has occurred
% This will free up the L0 slot for the remainder to be written in shutdown
% This will free up the L0 slot for the remainder to be written in
% shutdown
ok = pcl_close(PCL),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath,
@ -1502,8 +1600,8 @@ simple_server_test() ->
null},
3004)),
% Add some more keys and confirm that check sequence number still
% sees the old version in the previous snapshot, but will see the new version
% in a new snapshot
% sees the old version in the previous snapshot, but will see the new
% version in a new snapshot
Key1A_Pre = {{o,"Bucket0001", "Key0001", null},
{4005, {active, infinity}, null}},
@ -1665,17 +1763,20 @@ foldwithimm_simple_test() ->
{8, {active, infinity}, 0, null}}],
AccA = keyfolder(IMMiterA,
QueryArray,
{o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},
{o, "Bucket1", "Key1", null},
{o, "Bucket1", "Key6", null},
{AccFun, []}),
?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
{{o, "Bucket1", "Key3", null}, 3},
{{o, "Bucket1", "Key5", null}, 2}], AccA),
KL1B = [{{o, "Bucket1", "Key4", null}, {10, {active, infinity}, 0, null}}|KL1A],
AddKV = {{o, "Bucket1", "Key4", null}, {10, {active, infinity}, 0, null}},
KL1B = [AddKV|KL1A],
IMM3 = leveled_tree:from_orderedlist(lists:ukeysort(1, KL1B), ?CACHE_TYPE),
IMMiterB = leveled_tree:match_range({o, "Bucket1", "Key1", null},
{o, null, null, null},
IMM3),
io:format("Compare IMM3 with QueryArrary~n"),
AccB = keyfolder(IMMiterB,
QueryArray,
{o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},

View file

@ -28,9 +28,9 @@
bucketkey_query/4,
hashlist_query/3,
tictactree/5,
foldheads_allkeys/4,
foldheads_allkeys/5,
foldobjects_allkeys/3,
foldheads_bybucket/4,
foldheads_bybucket/5,
foldobjects_bybucket/3,
foldobjects_byindex/3
]).
@ -167,6 +167,15 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
{ok, LedgerSnap, JournalSnap} = SnapFun(),
% The start key and end key will vary depending on whether the
% fold is to fold over an index or a key range
EnsureKeyBinaryFun =
fun(K, T) ->
case is_binary(K) of
true ->
{K, T};
false ->
{term_to_binary(K), T}
end
end,
{StartKey, EndKey, ExtractFun} =
case Tag of
?IDX_TAG ->
@ -174,12 +183,15 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
KeyDefFun = fun leveled_codec:to_ledgerkey/5,
{KeyDefFun(Bucket, null, ?IDX_TAG, IdxFld, StartIdx),
KeyDefFun(Bucket, null, ?IDX_TAG, IdxFld, EndIdx),
fun(K, T) -> {K, T} end};
EnsureKeyBinaryFun};
_ ->
{StartOKey, EndOKey} = Query,
{leveled_codec:to_ledgerkey(Bucket, StartOKey, Tag),
leveled_codec:to_ledgerkey(Bucket, EndOKey, Tag),
fun(K, H) -> {K, {is_hash, H}} end}
fun(K, H) ->
V = {is_hash, H},
EnsureKeyBinaryFun(K, V)
end}
end,
AccFun =
accumulate_tree(Filter, JournalCheck, JournalSnap, ExtractFun),
@ -201,14 +213,18 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
end,
{async, Runner}.
-spec foldheads_allkeys(fun(), atom(), fun(), boolean()) -> {async, fun()}.
-spec foldheads_allkeys(fun(), atom(), fun(), boolean(), false|list(integer()))
-> {async, fun()}.
%% @doc
%% Fold over all heads in the store for a given tag - applying the passed
%% function to each proxy object
foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck) ->
foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
{true, JournalCheck}, SegmentList).
-spec foldobjects_allkeys(fun(), atom(), fun()) -> {async, fun()}.
%% @doc
@ -216,21 +232,36 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck) ->
foldobjects_allkeys(SnapFun, Tag, FoldFun) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
false, false).
-spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) ->
{async, fun()}.
%% @doc
%% Fold over all objects within a given key range in a bucket
foldobjects_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun) ->
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
false, false).
-spec foldheads_bybucket(fun(), {atom(), any(), any()}, fun(), boolean()) ->
{async, fun()}.
-spec foldheads_bybucket(fun(),
{atom(), any(), any()},
fun(),
boolean(), false|list(integer()))
-> {async, fun()}.
%% @doc
%% Fold over all object metadata within a given key range in a bucket
foldheads_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun, JournalCheck) ->
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}).
foldheads_bybucket(SnapFun,
{Tag, StartKey, EndKey},
FoldFun,
JournalCheck, SegmentList) ->
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
{true, JournalCheck}, SegmentList).
-spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}.
%% @doc
@ -241,7 +272,10 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm),
EndKey =
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
false, false).
@ -290,8 +324,8 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
-spec foldobjects(fun(), atom(), tuple(), tuple(), fun(),
false|{true, boolean()}) ->
{async, fun()}.
false|{true, boolean()}, false|list(integer())) ->
{async, fun()}.
%% @doc
%% The object folder should be passed DeferredFetch.
%% DeferredFetch can either be false (which will return to the fold function
@ -299,7 +333,10 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
%% will be created that if understood by the fold function will allow the fold
%% function to work on the head of the object, and defer fetching the body in
%% case such a fetch is unecessary.
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) ->
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldObjectsFun,
DeferredFetch, SegmentList) ->
{FoldFun, InitAcc} =
case is_tuple(FoldObjectsFun) of
true ->
@ -319,11 +356,12 @@ foldobjects(SnapFun, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) ->
JournalSnapshot,
Tag,
DeferredFetch),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey,
EndKey,
AccFun,
InitAcc),
Acc = leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
StartKey,
EndKey,
AccFun,
InitAcc,
SegmentList),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
case DeferredFetch of
{true, false} ->
@ -363,7 +401,7 @@ accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) ->
fun(B, K, H, Tree) ->
case FilterFun(B, K) of
accumulate ->
leveled_tictac:add_kv(Tree, K, H, HashFun, false);
leveled_tictac:add_kv(Tree, K, H, HashFun);
pass ->
Tree
end

View file

@ -98,7 +98,9 @@
sst_get/2,
sst_get/3,
sst_getkvrange/4,
sst_getfilteredrange/5,
sst_getslots/2,
sst_getfilteredslots/3,
sst_getmaxsequencenumber/1,
sst_setfordelete/2,
sst_clear/1,
@ -106,7 +108,7 @@
sst_deleteconfirmed/1,
sst_close/1]).
-export([expand_list_by_pointer/3]).
-export([expand_list_by_pointer/4]).
-record(slot_index_value, {slot_id :: integer(),
@ -116,10 +118,12 @@
-record(summary, {first_key :: tuple(),
last_key :: tuple(),
index :: tuple() | undefined,
index :: tuple() | undefined,
size :: integer(),
max_sqn :: integer()}).
-type press_methods() :: lz4|native.
%% yield_blockquery is used to detemrine if the work necessary to process a
%% range query beyond the fetching the slot should be managed from within
%% this process, or should be handled by the calling process.
@ -136,8 +140,9 @@
filename,
yield_blockquery = false :: boolean(),
blockindex_cache,
compression_method :: lz4|native}).
compression_method :: press_methods()}).
-type sst_state() :: #state{}.
%%%============================================================================
%%% API
@ -161,7 +166,8 @@ sst_open(RootPath, Filename) ->
{ok, Pid, {SK, EK}}
end.
-spec sst_new(string(), string(), integer(), list(), integer(), lz4|native) ->
-spec sst_new(string(), string(), integer(),
list(), integer(), press_methods()) ->
{ok, pid(), {tuple(), tuple()}}.
%% @doc
%% Start a new SST file at the assigned level passing in a list of Key, Value
@ -184,7 +190,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) ->
end.
-spec sst_new(string(), string(), list(), list(),
boolean(), integer(), integer(), lz4|native) ->
boolean(), integer(), integer(), press_methods()) ->
empty|{ok, pid(), {{list(), list()}, tuple(), tuple()}}.
%% @doc
%% Start a new SST file at the assigned level passing in a two lists of
@ -223,7 +229,7 @@ sst_new(RootPath, Filename,
-spec sst_newlevelzero(string(), string(),
integer(), fun(), pid()|undefined, integer(),
lz4|native) ->
press_methods()) ->
{ok, pid(), noreply}.
%% @doc
%% Start a new file at level zero. At this level the file size is not fixed -
@ -259,25 +265,37 @@ sst_get(Pid, LedgerKey) ->
sst_get(Pid, LedgerKey, Hash) ->
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
-spec sst_getkvrange(pid(), tuple()|all, tuple()|all, integer()) -> list().
%% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer
%% will be placed on the tail of the resulting list if results expand beyond
%% the Scan Width
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false).
-spec sst_getfilteredrange(pid(), tuple()|all, tuple()|all, integer(),
list()|false) -> list().
%% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer
%% will be placed on the tail of the resulting list if results expand beyond
%% the Scan Width
%%
%% To make the range open-ended (either ta start, end or both) the all atom
%% can be use din place of the Key tuple.
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) ->
SegList0 = tune_seglist(SegList),
case gen_fsm:sync_send_event(Pid,
{get_kvrange, StartKey, EndKey, ScanWidth},
{get_kvrange,
StartKey, EndKey,
ScanWidth, SegList0},
infinity) of
{yield, SlotsToFetchBinList, SlotsToPoint, PressMethod} ->
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK, PressMethod)
end,
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint;
binaryslot_reader(SlotsToFetchBinList, PressMethod)
++ SlotsToPoint;
Reply ->
Reply
end.
@ -285,15 +303,24 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
-spec sst_getslots(pid(), list()) -> list().
%% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop
%% to term form outside of the FSM loop, this is to stop the copying of the
%% converted term to the calling process.
sst_getslots(Pid, SlotList) ->
{SlotBins, PressMethod}
= gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity),
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK, PressMethod)
end,
lists:foldl(FetchFun, [], SlotBins).
sst_getfilteredslots(Pid, SlotList, false).
-spec sst_getfilteredslots(pid(), list(), false|list()) -> list().
%% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop
%%
%% A list of 16-bit integer Segment IDs can be passed to filter the keys
%% returned (not precisely - with false results returned in addition). Use
%% false as a SegList to not filter
sst_getfilteredslots(Pid, SlotList, SegList) ->
SegL0 = tune_seglist(SegList),
{SlotBins, PressMethod} =
gen_fsm:sync_send_event(Pid, {get_slots, SlotList, SegL0}, infinity),
binaryslot_reader(SlotBins, PressMethod).
-spec sst_getmaxsequencenumber(pid()) -> integer().
%% @doc
@ -436,10 +463,11 @@ reader({get_kv, LedgerKey, Hash}, _From, State) ->
{Result, Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State),
UpdTimings = leveled_log:sst_timing(State#state.sst_timings, SW, Stage),
{reply, Result, reader, UpdState#state{sst_timings = UpdTimings}};
reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
reader({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) ->
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
EndKey,
ScanWidth,
SlotList,
State),
PressMethod = State#state.compression_method,
case State#state.yield_blockquery of
@ -452,17 +480,18 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
reader,
State};
false ->
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK, PressMethod)
end,
{reply,
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint,
binaryslot_reader(SlotsToFetchBinList, PressMethod)
++ SlotsToPoint,
reader,
State}
end;
reader({get_slots, SlotList}, _From, State) ->
SlotBins = read_slots(State#state.handle, SlotList),
reader({get_slots, SlotList, SegList}, _From, State) ->
SlotBins =
read_slots(State#state.handle,
SlotList,
{SegList, State#state.blockindex_cache},
State#state.compression_method),
{reply, {SlotBins, State#state.compression_method}, reader, State};
reader(get_maxsequencenumber, _From, State) ->
Summary = State#state.summary,
@ -494,10 +523,12 @@ reader(close, _From, State) ->
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
{Result, _Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State),
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SlotList},
_From, State) ->
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
EndKey,
ScanWidth,
SlotList,
State),
% Always yield as about to clear and de-reference
PressMethod = State#state.compression_method,
@ -506,8 +537,12 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
delete_pending,
State,
?DELETE_TIMEOUT};
delete_pending({get_slots, SlotList}, _From, State) ->
SlotBins = read_slots(State#state.handle, SlotList),
delete_pending({get_slots, SlotList, SegList}, _From, State) ->
SlotBins =
read_slots(State#state.handle,
SlotList,
{SegList, State#state.blockindex_cache},
State#state.compression_method),
{reply,
{SlotBins, State#state.compression_method},
delete_pending,
@ -590,20 +625,31 @@ fetch(LedgerKey, Hash, State) ->
[] ->
{not_present, slot_bloom, SlotID, State};
_ ->
StartPos = Slot#slot_index_value.start_position,
Result =
check_blocks(PosList,
State#state.handle,
Slot,
StartPos,
BlockLengths,
LedgerKey,
PressMethod),
LedgerKey,
PressMethod,
not_present),
{Result, slot_fetch, SlotID, State}
end
end
end.
fetch_range(StartKey, EndKey, ScanWidth, State) ->
-spec fetch_range(tuple(), tuple(), integer(), false|list(integer()), sst_state())
-> {list(), list()}.
%% @doc
%% Fetch the contents of the SST file for a given key range. This will
%% pre-fetch some results, and append pointers for additional results.
%%
%% A filter can be provided based on the Segment ID (usable for hashable
%% objects not no_lookup entries) to accelerate the query if the 5-arity
%% version is used
fetch_range(StartKey, EndKey, ScanWidth, SegList, State) ->
Summary = State#state.summary,
Handle = State#state.handle,
{Slots, RTrim} = lookup_slots(StartKey, EndKey, Summary#summary.index),
@ -654,7 +700,11 @@ fetch_range(StartKey, EndKey, ScanWidth, State) ->
lists:split(ScanWidth, ExpandedSlots)
end,
SlotsToFetchBinList = read_slots(Handle, SlotsToFetch),
SlotsToFetchBinList =
read_slots(Handle,
SlotsToFetch,
{SegList, State#state.blockindex_cache},
State#state.compression_method),
{SlotsToFetchBinList, SlotsToPoint}.
@ -799,7 +849,7 @@ generate_filenames(RootFilename) ->
end.
-spec serialise_block(any(), lz4|native) -> binary().
-spec serialise_block(any(), press_methods()) -> binary().
%% @doc
%% Convert term to binary
%% Function split out to make it easier to experiment with different
@ -812,7 +862,7 @@ serialise_block(Term, native) ->
term_to_binary(Term, ?BINARY_SETTINGS).
-spec deserialise_block(binary(), lz4|native) -> any().
-spec deserialise_block(binary(), press_methods()) -> any().
%% @doc
%% Convert binary to term
%% Function split out to make it easier to experiment with different
@ -1014,28 +1064,40 @@ generate_binary_slot(Lookup, KVL, PressMethod) ->
{<<Lengths/binary, PosBinIndex/binary>>, FullBin, HashL, LastKey}.
check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey, _PressMethod) ->
not_present;
check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey, PressMethod) ->
% Acc should start as not_present if LedgerKey is a key, and a list if
% LedgerKey is false
check_blocks([], _Handle, _StartPos, _BlockLengths,
_LedgerKeyToCheck, _PressMethod, Acc) ->
Acc;
check_blocks([Pos|Rest], Handle, StartPos,
BlockLengths, LedgerKeyToCheck, PressMethod, Acc) ->
{BlockNumber, BlockPos} = revert_position(Pos),
BlockBin = read_block(Handle, Slot, BlockLengths, BlockNumber),
BlockBin =
read_block(Handle,
StartPos,
BlockLengths,
BlockNumber),
BlockL = deserialise_block(BlockBin, PressMethod),
{K, V} = lists:nth(BlockPos, BlockL),
case K of
LedgerKey ->
LedgerKeyToCheck ->
{K, V};
_ ->
check_blocks(Rest,
Handle, Slot, BlockLengths,
LedgerKey,
PressMethod)
case LedgerKeyToCheck of
false ->
Acc ++ [{K, V}];
_ ->
check_blocks(Rest, Handle, StartPos, BlockLengths,
LedgerKeyToCheck, PressMethod, Acc)
end
end.
read_block(Handle, Slot, BlockLengths, BlockID) ->
read_block(Handle, StartPos, BlockLengths, BlockID) ->
{BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID),
{ok, BlockBin} = file:pread(Handle,
Slot#slot_index_value.start_position
StartPos
+ BlockPos
+ Offset
+ 28,
@ -1049,39 +1111,118 @@ read_slot(Handle, Slot) ->
Slot#slot_index_value.length),
SlotBin.
read_slots(Handle, SlotList) ->
PointerMapFun =
fun(Pointer) ->
{Slot, SK, EK} =
case Pointer of
{pointer, _Pid, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0};
{pointer, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0}
end,
{Slot#slot_index_value.start_position,
Slot#slot_index_value.length,
SK,
EK}
pointer_mapfun(Pointer) ->
{Slot, SK, EK} =
case Pointer of
{pointer, _Pid, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0};
{pointer, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0}
end,
LengthList = lists:map(PointerMapFun, SlotList),
{Slot#slot_index_value.start_position,
Slot#slot_index_value.length,
Slot#slot_index_value.slot_id,
SK,
EK}.
-spec binarysplit_mapfun(binary(), integer()) -> fun().
%% @doc
%% Return a function that can pull individual slot binaries from a binary
%% covering multiple slots
binarysplit_mapfun(MultiSlotBin, StartPos) ->
fun({SP, L, _ID, SK, EK}) ->
Start = SP - StartPos,
<<_Pre:Start/binary, SlotBin:L/binary, _Post/binary>> = MultiSlotBin,
{SlotBin, SK, EK}
end.
-spec read_slots(file:io_device(), list(),
{false|list(), any()}, press_methods()) -> list().
%% @doc
%% The reading of sots will return a list of either 2-tuples containing
%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples
%% can be exploded into lists of {K, V} pairs using the binaryslot_reader/2
%% function
%%
%% Reading slots is generally unfiltered, but in the sepcial case when
%% querting across slots when only matching segment IDs are required the
%% BlockIndexCache can be used
%%
%% Note that false positives will be passed through. It is important that
%% any key comparison between levels should allow for a non-matching key to
%% be considered as superior to a matching key - as otherwise a matching key
%% may be intermittently removed from the result set
read_slots(Handle, SlotList, {false, _BlockIndexCache}, _PressMethod) ->
% No list of segments passed
LengthList = lists:map(fun pointer_mapfun/1, SlotList),
{MultiSlotBin, StartPos} = read_length_list(Handle, LengthList),
lists:map(binarysplit_mapfun(MultiSlotBin, StartPos), LengthList);
read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) ->
% List of segments passed so only {K, V} pairs matching those segments
% should be returned. This required the {K, V} pair to have been added
% with the appropriate hash - if the pair were added with no_lookup as
% the hash value this will fial unexpectedly.
BinMapFun =
fun(Pointer, Acc) ->
{SP, _L, ID, _SK, _EK} = pointer_mapfun(Pointer),
case array:get(ID - 1, BlockIndexCache) of
none ->
% If there is an attempt to use the seg list query and the
% index block cache isn't cached for any part this may be
% slower as each slot will be read in turn
LengthDetails = pointer_mapfun(Pointer),
{MultiSlotBin, StartPos} =
read_length_list(Handle, [LengthDetails]),
MapFun = binarysplit_mapfun(MultiSlotBin, StartPos),
Acc ++ [MapFun(LengthDetails)];
<<BlockLengths:24/binary, BlockIdx/binary>> ->
% If there is a BlockIndex cached then we cna use it to
% check to see if any of the expected segments are
% present without lifting the slot off disk. Also the
% fact that we know position can be used to filter out
% other keys
case find_pos(BlockIdx, SegList, [], 0) of
[] ->
Acc;
PL ->
Acc ++ check_blocks(PL, Handle, SP, BlockLengths,
false, PressMethod, [])
end
end
end,
lists:foldl(BinMapFun, [], SlotList).
-spec binaryslot_reader(list({tuple(), tuple()}|{binary(), tuple(), tuple()}),
native|lz4) -> list({tuple(), tuple()}).
%% @doc
%% Read the binary slots converting them to {K, V} pairs if they were not
%% already {K, V} pairs
binaryslot_reader(SlotBinsToFetch, PressMethod) ->
binaryslot_reader(SlotBinsToFetch, PressMethod, []).
binaryslot_reader([], _PressMethod, Acc) ->
Acc;
binaryslot_reader([{SlotBin, SK, EK}|Tail], PressMethod, Acc) ->
binaryslot_reader(Tail,
PressMethod,
Acc ++ binaryslot_trimmedlist(SlotBin,
SK, EK,
PressMethod));
binaryslot_reader([{K, V}|Tail], PressMethod, Acc) ->
binaryslot_reader(Tail, PressMethod, Acc ++ [{K, V}]).
read_length_list(Handle, LengthList) ->
StartPos = element(1, lists:nth(1, LengthList)),
EndPos = element(1, lists:last(LengthList))
+ element(2, lists:last(LengthList)),
{ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos),
{MultiSlotBin, StartPos}.
BinSplitMapFun =
fun({SP, L, SK, EK}) ->
Start = SP - StartPos,
<<_Pre:Start/binary,
SlotBin:L/binary,
_Post/binary>> = MultiSlotBin,
{SlotBin, SK, EK}
end,
lists:map(BinSplitMapFun, LengthList).
binaryslot_get(FullBin, Key, Hash, PressMethod) ->
@ -1275,10 +1416,21 @@ block_offsetandlength(BlockLengths, BlockID) ->
end.
extra_hash({SegHash, _ExtraHash}) when is_integer(SegHash) ->
SegHash band 32767;
tune_hash(SegHash);
extra_hash(NotHash) ->
NotHash.
tune_hash(SegHash) ->
SegHash band 32767.
tune_seglist(SegList) ->
case is_list(SegList) of
true ->
lists:map(fun tune_hash/1, SegList);
false ->
SegList
end.
fetch_value([], _BlockLengths, _Blocks, _Key, _PressMethod) ->
not_present;
fetch_value([Pos|Rest], BlockLengths, Blocks, Key, PressMethod) ->
@ -1315,6 +1467,14 @@ revert_position(Pos) ->
find_pos(<<>>, _Hash, PosList, _Count) ->
PosList;
find_pos(<<1:1/integer, PotentialHit:15/integer, T/binary>>,
HashList, PosList, Count) when is_list(HashList) ->
case lists:member(PotentialHit, HashList) of
true ->
find_pos(T, HashList, PosList ++ [Count], Count + 1);
false ->
find_pos(T, HashList, PosList, Count + 1)
end;
find_pos(<<1:1/integer, Hash:15/integer, T/binary>>, Hash, PosList, Count) ->
find_pos(T, Hash, PosList ++ [Count], Count + 1);
find_pos(<<1:1/integer, _Miss:15/integer, T/binary>>, Hash, PosList, Count) ->
@ -1528,7 +1688,11 @@ maybe_expand_pointer(List) ->
List.
expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) ->
expand_list_by_pointer(Pointer, Tail, Width) ->
expand_list_by_pointer(Pointer, Tail, Width, false).
expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey},
Tail, Width, SegList) ->
FoldFun =
fun(X, {Pointers, Remainder}) ->
case length(Pointers) of
@ -1545,16 +1709,19 @@ expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) -
end,
InitAcc = {[{pointer, Slot, StartKey, EndKey}], []},
{AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail),
ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers),
ExpPointers =
leveled_sst:sst_getfilteredslots(SSTPid, AccPointers, SegList),
lists:append(ExpPointers, AccTail);
expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Tail, Width) ->
expand_list_by_pointer({next, ManEntry, StartKey, EndKey},
Tail, Width, SegList) ->
SSTPid = ManEntry#manifest_entry.owner,
leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]),
ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width),
ExpPointer =
leveled_sst:sst_getfilteredrange(SSTPid,
StartKey, EndKey,
Width, SegList),
ExpPointer ++ Tail.
%%%============================================================================
%%% Test
%%%============================================================================
@ -1776,16 +1943,8 @@ indexed_list_mixedkeys_bitflip_test() ->
{_PosBinIndex1, FullBin, _HL, LK} =
generate_binary_slot(lookup, Keys, native),
?assertMatch(LK, element(1, lists:last(Keys))),
L = byte_size(FullBin),
Byte1 = leveled_rand:uniform(L),
<<PreB1:Byte1/binary, A:8/integer, PostByte1/binary>> = FullBin,
FullBin0 =
case A of
0 ->
<<PreB1:Byte1/binary, 255:8/integer, PostByte1/binary>>;
_ ->
<<PreB1:Byte1/binary, 0:8/integer, PostByte1/binary>>
end,
FullBin0 = flip_byte(FullBin),
{TestK1, _TestV1} = lists:nth(20, KVL1),
MH1 = leveled_codec:segment_hash(TestK1),
@ -1801,6 +1960,17 @@ indexed_list_mixedkeys_bitflip_test() ->
?assertMatch([], O1).
flip_byte(Binary) ->
L = byte_size(Binary),
Byte1 = leveled_rand:uniform(L),
<<PreB1:Byte1/binary, A:8/integer, PostByte1/binary>> = Binary,
case A of
0 ->
<<PreB1:Byte1/binary, 255:8/integer, PostByte1/binary>>;
_ ->
<<PreB1:Byte1/binary, 0:8/integer, PostByte1/binary>>
end.
test_binary_slot(FullBin, Key, Hash, ExpectedValue) ->
% SW = os:timestamp(),
@ -2152,6 +2322,54 @@ nonsense_coverage_test() ->
#state{},
nonsense)),
?assertMatch({reply, undefined, reader, #state{}},
handle_sync_event("hello", self(), reader, #state{})).
handle_sync_event("hello", self(), reader, #state{})),
SampleBin = <<0:128/integer>>,
FlippedBin = flip_byte(SampleBin),
?assertMatch(false, FlippedBin == SampleBin).
hashmatching_bytreesize_test() ->
B = <<"Bucket">>,
V = leveled_codec:riak_metadata_to_binary(term_to_binary([{"actor1", 1}]),
<<1:32/integer,
0:32/integer,
0:32/integer>>),
GenKeyFun =
fun(X) ->
LK =
{?RIAK_TAG,
B,
list_to_binary("Key" ++ integer_to_list(X)),
null},
LKV = leveled_codec:generate_ledgerkv(LK,
X,
V,
byte_size(V),
{active, infinity}),
{_Bucket, _Key, MetaValue, _Hashes, _LastMods} = LKV,
{LK, MetaValue}
end,
KVL = lists:map(GenKeyFun, lists:seq(1, 128)),
{PosBinIndex1, _FullBin, _HL, _LK} =
generate_binary_slot(lookup, KVL, native),
check_segment_match(PosBinIndex1, KVL, small),
check_segment_match(PosBinIndex1, KVL, medium).
check_segment_match(PosBinIndex1, KVL, TreeSize) ->
CheckFun =
fun({{_T, B, K, null}, _V}) ->
Seg =
leveled_tictac:get_segment(
leveled_tictac:keyto_segment32(<<B/binary, K/binary>>),
TreeSize),
SegList0 = tune_seglist([Seg]),
PosList = find_pos(PosBinIndex1, SegList0, [], 0),
?assertMatch(true, length(PosList) >= 1)
end,
lists:foreach(CheckFun, KVL).
-endif.

View file

@ -57,29 +57,37 @@
-export([
new_tree/1,
new_tree/2,
add_kv/5,
add_kv/4,
find_dirtyleaves/2,
find_dirtysegments/2,
fetch_root/1,
fetch_leaves/2,
merge_trees/2,
get_segment/2,
tictac_hash/3,
tictac_hash/2,
export_tree/1,
import_tree/1,
valid_size/1
valid_size/1,
keyto_segment32/1,
generate_segmentfilter_list/2
]).
-include_lib("eunit/include/eunit.hrl").
-define(HASH_SIZE, 4).
-define(XXSMALL, {6, 64, 64 * 64}).
-define(XSMALL, {7, 128, 128 * 128}).
%% UNSUUPPORTED tree sizes for accelerated segment filtering
-define(XXSMALL, {6, 64, 64 * 64}).
-define(XSMALL, {7, 128, 128 * 128}).
%% SUPPORTED tree sizes for accelerated segment filtering
-define(SMALL, {8, 256, 256 * 256}).
-define(MEDIUM, {9, 512, 512 * 512}).
-define(LARGE, {10, 1024, 1024 * 1024}).
-define(XLARGE, {11, 2048, 2048 * 2048}).
-define(EMPTY, <<0:8/integer>>).
-define(VALID_SIZES, [xxsmall, xsmall, small, medium, large, xlarge]).
@ -169,24 +177,16 @@ import_tree(ExportedTree) ->
level1 = L1Bin,
level2 = Lv2}.
-spec add_kv(tictactree(), tuple(), tuple(), fun()) -> tictactree().
add_kv(TicTacTree, Key, Value, BinExtractFun) ->
add_kv(TicTacTree, Key, Value, BinExtractFun, false).
-spec add_kv(tictactree(), tuple(), tuple(), fun(), boolean()) -> tictactree().
-spec add_kv(tictactree(), tuple(), tuple(), fun()) -> tictactree().
%% @doc
%% Add a Key and value to a tictactree using the BinExtractFun to extract a
%% binary from the Key and value from which to generate the hash. The
%% BinExtractFun will also need to do any canonicalisation necessary to make
%% the hash consistent (such as whitespace removal, or sorting)
%%
%% For exportable trees the hash function will be based on the CJ Bernstein
%% magic hash. For non-exportable trees erlang:phash2 will be used, and so
%% non-binary Keys and Values can be returned from the BinExtractFun in this
%% case.
add_kv(TicTacTree, Key, Value, BinExtractFun, Exportable) ->
add_kv(TicTacTree, Key, Value, BinExtractFun) ->
{BinK, BinV} = BinExtractFun(Key, Value),
{SegHash, SegChangeHash} = tictac_hash(BinK, BinV, Exportable),
{SegHash, SegChangeHash} = tictac_hash(BinK, BinV),
Segment = get_segment(SegHash, TicTacTree#tictactree.segment_count),
Level2Pos =
@ -286,14 +286,7 @@ merge_trees(TreeA, TreeB) ->
fun(SQN, MergeL2) ->
L2A = get_level2(TreeA, SQN),
L2B = get_level2(TreeB, SQN),
BothEmpty = (L2A == ?EMPTY) and (L2B == ?EMPTY),
NewLevel2 =
case BothEmpty of
true ->
?EMPTY;
false ->
merge_binaries(L2A, L2B)
end,
NewLevel2 = merge_binaries(L2A, L2B),
array:set(SQN, NewLevel2, MergeL2)
end,
NewLevel2 = lists:foldl(MergeFun,
@ -314,28 +307,65 @@ get_segment(Hash, TreeSize) ->
get_segment(Hash, element(3, get_size(TreeSize))).
-spec tictac_hash(any(), any(), boolean()) -> {integer(), integer()}.
-spec tictac_hash(binary(), any()) -> {integer(), integer()}.
%% @doc
%% Hash the key and term, to either something repetable in Erlang, or using
%% the DJ Bernstein hash if it is the tree needs to be compared with one
%% calculated with a non-Erlang store
%%
%% Boolean is Exportable. does the hash need to be repetable by a non-Erlang
%% machine
tictac_hash(BinKey, BinVal, true)
when is_binary(BinKey) and is_binary(BinVal) ->
HashKey = leveled_codec:magic_hash({binary, BinKey}),
HashVal = leveled_codec:magic_hash({binary, BinVal}),
{HashKey, HashKey bxor HashVal};
tictac_hash(BinKey, {is_hash, HashedVal}, false) ->
{erlang:phash2(BinKey), erlang:phash2(BinKey) bxor HashedVal};
tictac_hash(BinKey, BinVal, false) ->
{erlang:phash2(BinKey), erlang:phash2(BinKey) bxor erlang:phash2(BinVal)}.
%% Hash the key and term.
%% The term can be of the form {is_hash, 32-bit integer)} to indicate the hash
%% has already been taken. If the value is not a pre-extracted hash just use
%% erlang:phash2. If an exportable hash of the value is required this should
%% be managed through the add_kv ExtractFun providing a pre-prepared Hash.
tictac_hash(BinKey, Val) when is_binary(BinKey) ->
HashKey = keyto_segment32(BinKey),
HashVal =
case Val of
{is_hash, HashedVal} ->
HashedVal;
_ ->
erlang:phash2(Val)
end,
{HashKey, HashKey bxor HashVal}.
-spec keyto_segment32(any()) -> integer().
%% @doc
%% The first 16 bits of the segment hash used in the tictac tree should be
%% made up of the segment ID part (which is used to accelerate queries)
keyto_segment32(BinKey) when is_binary(BinKey) ->
{SegmentID, ExtraHash} = leveled_codec:segment_hash(BinKey),
(ExtraHash band 65535) bsl 16 + SegmentID;
keyto_segment32(Key) ->
keyto_segment32(term_to_binary(Key)).
-spec generate_segmentfilter_list(list(integer()), atom())
-> false|list(integer()).
%% @doc
%% Cannot accelerate segment listing for trees below certain sizes, so check
%% the creation of segment filter lists with this function
generate_segmentfilter_list(_SegmentList, xxsmall) ->
false;
generate_segmentfilter_list(SegmentList, xsmall) ->
case length(SegmentList) =< 4 of
true ->
A0 = 1 bsl 15,
A1 = 1 bsl 14,
ExpandSegFun =
fun(X, Acc) ->
Acc ++ [X, X + A0, X + A1, X + A0 + A1]
end,
lists:foldl(ExpandSegFun, [], SegmentList);
false ->
false
end;
generate_segmentfilter_list(SegmentList, Size) ->
case lists:member(Size, ?VALID_SIZES) of
true ->
SegmentList
end.
%%%============================================================================
%%% Internal functions
%%%============================================================================
get_level2(TicTacTree, L1Pos) ->
case array:get(L1Pos, TicTacTree#tictactree.level2) of
?EMPTY ->
@ -454,7 +484,7 @@ simple_test_withsize(Size) ->
GetSegFun =
fun(TK) ->
get_segment(erlang:phash2(term_to_binary(TK)), SC)
get_segment(keyto_segment32(term_to_binary(TK)), SC)
end,
DL0 = find_dirtyleaves(Tree1, Tree0),
@ -513,9 +543,15 @@ merge_test_withsize(Size) ->
?assertMatch(false, TreeM1#tictactree.level1 == TreeZ4#tictactree.level1).
exportable_test() ->
{Int1, Int2} = tictac_hash(<<"key">>, <<"value">>, true),
{Int1, Int2} = tictac_hash(<<"key">>, <<"value">>),
?assertMatch({true, true}, {Int1 >= 0, Int2 >=0}).
merge_emptytree_test() ->
TreeA = new_tree("A"),
TreeB = new_tree("B"),
TreeC = merge_trees(TreeA, TreeB),
?assertMatch([], find_dirtyleaves(TreeA, TreeC)).
-endif.

View file

@ -117,8 +117,7 @@ aae_missingjournal(_Config) ->
{foldheads_allkeys,
?RIAK_TAG,
FoldHeadsFun,
true,
true}),
true, true, false}),
HeadL1 = length(AllHeadF1()),
io:format("Fold head returned ~w objects~n", [HeadL1]),
@ -135,8 +134,7 @@ aae_missingjournal(_Config) ->
{foldheads_allkeys,
?RIAK_TAG,
FoldHeadsFun,
true,
true}),
true, true, false}),
HeadL2 = length(AllHeadF2()),
io:format("Fold head returned ~w objects~n", [HeadL2]),
true = HeadL2 < HeadL1,

View file

@ -0,0 +1,263 @@
-module(riak_SUITE).
-include_lib("common_test/include/ct.hrl").
-include("include/leveled.hrl").
-export([all/0]).
-export([
crossbucket_aae/1
]).
all() -> [
crossbucket_aae
].
-define(MAGIC, 53). % riak_kv -> riak_object
crossbucket_aae(_Config) ->
% Test requires multiple different databases, so want to mount them all
% on individual file paths
RootPathA = testutil:reset_filestructure("testA"),
RootPathB = testutil:reset_filestructure("testB"),
% Start the first database, load a test object, close it, start it again
StartOpts1 = [{root_path, RootPathA},
{max_pencillercachesize, 16000},
{sync_strategy, riak_sync}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{B1, K1, V1, S1, MD} = {<<"Bucket">>,
<<"Key1.1.4567.4321">>,
<<"Value1">>,
[],
[{<<"MDK1">>, <<"MDV1">>}]},
{TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
testutil:check_forobject(Bookie1, TestObject),
ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = [{root_path, RootPathA},
{max_journalsize, 500000000},
{max_pencillercachesize, 32000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:check_forobject(Bookie2, TestObject),
% Generate 200K objects to be sued within the test, and load them into
% the first store (outputting the generated objects as a list of lists)
% to be used elsewhere
GenList =
[{binary, 2}, {binary, 40002}, {binary, 80002}, {binary, 120002}],
CLs = testutil:load_objects(40000,
GenList,
Bookie2,
TestObject,
fun testutil:generate_smallobjects/2,
40000),
% Start a new store, and load the same objects (except fot the original
% test object) into this store
StartOpts3 = [{root_path, RootPathB},
{max_journalsize, 200000000},
{max_pencillercachesize, 16000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie3} = leveled_bookie:book_start(StartOpts3),
lists:foreach(fun(ObjL) -> testutil:riakload(Bookie3, ObjL) end, CLs),
test_singledelta_stores(Bookie2, Bookie3, small, {B1, K1}),
test_singledelta_stores(Bookie2, Bookie3, medium, {B1, K1}),
test_singledelta_stores(Bookie2, Bookie3, xsmall, {B1, K1}),
test_singledelta_stores(Bookie2, Bookie3, xxsmall, {B1, K1}),
% Test with a newly opend book (i.e with no blovk indexes cached)
ok = leveled_bookie:book_close(Bookie2),
{ok, Bookie2A} = leveled_bookie:book_start(StartOpts2),
test_singledelta_stores(Bookie2A, Bookie3, small, {B1, K1}),
ok = leveled_bookie:book_close(Bookie2A),
ok = leveled_bookie:book_close(Bookie3).
test_singledelta_stores(BookA, BookB, TreeSize, DeltaKey) ->
io:format("Test for single delta with tree size ~w~n", [TreeSize]),
% Now run a tictac query against both stores to see the extent to which
% state between stores is consistent
TicTacFolder =
{foldheads_allkeys,
?RIAK_TAG,
{fun head_tictac_foldfun/4,
{0, leveled_tictac:new_tree(test, TreeSize)}},
false, true, false},
% tictac query by bucket (should be same result as all stores)
TicTacByBucketFolder =
{foldheads_bybucket,
?RIAK_TAG, <<"Bucket">>,
all,
{fun head_tictac_foldfun/4,
{0, leveled_tictac:new_tree(test, TreeSize)}},
false, false, false},
DLs = check_tictacfold(BookA, BookB,
TicTacFolder,
DeltaKey,
TreeSize),
DLs = check_tictacfold(BookA, BookB,
TicTacByBucketFolder,
DeltaKey,
TreeSize),
HeadSegmentFolder =
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
false, true, false},
SW_SL0 = os:timestamp(),
{async, BookASegFolder} =
leveled_bookie:book_returnfolder(BookA, HeadSegmentFolder),
{async, BookBSegFolder} =
leveled_bookie:book_returnfolder(BookB, HeadSegmentFolder),
BookASegList = BookASegFolder(),
BookBSegList = BookBSegFolder(),
Time_SL0 = timer:now_diff(os:timestamp(), SW_SL0)/1000,
io:format("Two unfiltered segment list folds took ~w milliseconds ~n",
[Time_SL0]),
io:format("Segment lists found of lengths ~w ~w~n",
[length(BookASegList), length(BookBSegList)]),
Delta = lists:subtract(BookASegList, BookBSegList),
true = length(Delta) == 1,
SegFilterList = leveled_tictac:generate_segmentfilter_list(DLs, TreeSize),
SuperHeadSegmentFolder =
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
false, true, SegFilterList},
SW_SL1 = os:timestamp(),
{async, BookASegFolder1} =
leveled_bookie:book_returnfolder(BookA, SuperHeadSegmentFolder),
{async, BookBSegFolder1} =
leveled_bookie:book_returnfolder(BookB, SuperHeadSegmentFolder),
BookASegList1 = BookASegFolder1(),
BookBSegList1 = BookBSegFolder1(),
Time_SL1 = timer:now_diff(os:timestamp(), SW_SL1)/1000,
io:format("Two filtered segment list folds took ~w milliseconds ~n",
[Time_SL1]),
io:format("Segment lists found of lengths ~w ~w~n",
[length(BookASegList1), length(BookBSegList1)]),
SuperHeadSegmentFolderCP =
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
true, true, SegFilterList},
SW_SL1CP = os:timestamp(),
{async, BookASegFolder1CP} =
leveled_bookie:book_returnfolder(BookA, SuperHeadSegmentFolderCP),
{async, BookBSegFolder1CP} =
leveled_bookie:book_returnfolder(BookB, SuperHeadSegmentFolderCP),
BookASegList1CP = BookASegFolder1CP(),
BookBSegList1CP = BookBSegFolder1CP(),
Time_SL1CP = timer:now_diff(os:timestamp(), SW_SL1CP)/1000,
io:format("Two filtered segment list folds " ++
"with presence check took ~w milliseconds ~n",
[Time_SL1CP]),
io:format("Segment lists found of lengths ~w ~w~n",
[length(BookASegList1CP), length(BookBSegList1CP)]),
FalseMatchFilter = DLs ++ [1, 100, 101, 1000, 1001],
SegFilterListF =
leveled_tictac:generate_segmentfilter_list(FalseMatchFilter, TreeSize),
SuperHeadSegmentFolderF =
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
false, true, SegFilterListF},
SW_SL1F = os:timestamp(),
{async, BookASegFolder1F} =
leveled_bookie:book_returnfolder(BookA, SuperHeadSegmentFolderF),
{async, BookBSegFolder1F} =
leveled_bookie:book_returnfolder(BookB, SuperHeadSegmentFolderF),
BookASegList1F = BookASegFolder1F(),
BookBSegList1F = BookBSegFolder1F(),
Time_SL1F = timer:now_diff(os:timestamp(), SW_SL1F)/1000,
io:format("Two filtered segment list folds " ++
" with false positives took ~w milliseconds ~n",
[Time_SL1F]),
io:format("Segment lists found of lengths ~w ~w~n",
[length(BookASegList1F), length(BookBSegList1F)]),
Delta1F = lists:subtract(BookASegList1F, BookBSegList1F),
io:format("Delta found of ~w~n", [Delta1F]),
true = length(Delta1F) == 1.
get_segment_folder(SegmentList, TreeSize) ->
fun(B, K, PO, KeysAndClocksAcc) ->
SegmentH = leveled_tictac:keyto_segment32(<<B/binary, K/binary>>),
Segment = leveled_tictac:get_segment(SegmentH, TreeSize),
case lists:member(Segment, SegmentList) of
true ->
{VC, _Sz, _SC} = summary_from_binary(PO),
[{B, K, VC}|KeysAndClocksAcc];
false ->
KeysAndClocksAcc
end
end.
head_tictac_foldfun(B, K, PO, {Count, TreeAcc}) ->
ExtractFun =
fun({BBin, KBin}, Obj) ->
{VC, _Sz, _SC} = summary_from_binary(Obj),
{<<BBin/binary, KBin/binary>>, lists:sort(VC)}
end,
{Count + 1,
leveled_tictac:add_kv(TreeAcc, {B, K}, PO, ExtractFun)}.
check_tictacfold(BookA, BookB, HeadTicTacFolder, {B1, K1}, TreeSize) ->
SW_TT0 = os:timestamp(),
{async, BookATreeFolder} =
leveled_bookie:book_returnfolder(BookA, HeadTicTacFolder),
{async, BookBTreeFolder} =
leveled_bookie:book_returnfolder(BookB, HeadTicTacFolder),
{CountA, BookATree} = BookATreeFolder(),
{CountB, BookBTree} = BookBTreeFolder(),
Time_TT0 = timer:now_diff(os:timestamp(), SW_TT0)/1000,
io:format("Two tree folds took ~w milliseconds ~n", [Time_TT0]),
io:format("Fold over keys revealed counts of ~w and ~w~n",
[CountA, CountB]),
% There should be a single delta between the stores
1 = CountA - CountB,
DLs = leveled_tictac:find_dirtyleaves(BookATree, BookBTree),
io:format("Found dirty leaves with Riak fold_heads of ~w~n",
[length(DLs)]),
true = length(DLs) == 1,
ExpSeg = leveled_tictac:keyto_segment32(<<B1/binary, K1/binary>>),
TreeSeg = leveled_tictac:get_segment(ExpSeg, TreeSize),
[ActualSeg] = DLs,
true = TreeSeg == ActualSeg,
DLs.
summary_from_binary(<<131, _Rest/binary>>=ObjBin) ->
{proxy_object, HeadBin, ObjSize, _Fetcher} = binary_to_term(ObjBin),
summary_from_binary(HeadBin, ObjSize);
summary_from_binary(ObjBin) when is_binary(ObjBin) ->
summary_from_binary(ObjBin, byte_size(ObjBin)).
summary_from_binary(ObjBin, ObjSize) ->
<<?MAGIC:8/integer,
1:8/integer,
VclockLen:32/integer, VclockBin:VclockLen/binary,
SibCount:32/integer,
_Rest/binary>> = ObjBin,
{lists:usort(binary_to_term(VclockBin)), ObjSize, SibCount}.

View file

@ -356,6 +356,18 @@ generate_objects(Count, uuid, ObjL, Value, IndexGen, Bucket) ->
Value,
IndexGen,
Bucket);
generate_objects(Count, {binary, KeyNumber}, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} =
set_object(list_to_binary(Bucket),
list_to_binary("Key" ++ integer_to_list(KeyNumber)),
Value,
IndexGen),
generate_objects(Count - 1,
{binary, KeyNumber + 1},
ObjL ++ [{leveled_rand:uniform(), Obj1, Spec1}],
Value,
IndexGen,
Bucket);
generate_objects(Count, KeyNumber, ObjL, Value, IndexGen, Bucket) ->
{Obj1, Spec1} = set_object(Bucket,
"Key" ++ integer_to_list(KeyNumber),

View file

@ -131,11 +131,18 @@ many_put_compare(_Config) ->
{proxy_object, HeadBin, _Size, _FetchFun} = binary_to_term(Value),
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
VclockBin:VclockLen/binary, _Rest/binary>> = HeadBin,
{Key, lists:sort(binary_to_term(VclockBin))}
case is_binary(Key) of
true ->
{Key,
lists:sort(binary_to_term(VclockBin))};
false ->
{term_to_binary(Key),
lists:sort(binary_to_term(VclockBin))}
end
end,
FoldObjectsFun =
fun(_Bucket, Key, Value, Acc) ->
leveled_tictac:add_kv(Acc, Key, Value, ExtractClockFun, false)
leveled_tictac:add_kv(Acc, Key, Value, ExtractClockFun)
end,
FoldQ0 = {foldheads_bybucket,
@ -143,7 +150,7 @@ many_put_compare(_Config) ->
"Bucket",
all,
{FoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)},
false, true},
false, true, false},
{async, TreeAObjFolder0} =
leveled_bookie:book_returnfolder(Bookie2, FoldQ0),
SWB0Obj = os:timestamp(),
@ -158,7 +165,7 @@ many_put_compare(_Config) ->
"Bucket",
all,
{FoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)},
true, true},
true, true, false},
{async, TreeAObjFolder1} =
leveled_bookie:book_returnfolder(Bookie2, FoldQ1),
SWB1Obj = os:timestamp(),
@ -179,15 +186,14 @@ many_put_compare(_Config) ->
end,
AltFoldObjectsFun =
fun(_Bucket, Key, Value, Acc) ->
leveled_tictac:add_kv(Acc, Key, Value, AltExtractFun, true)
leveled_tictac:add_kv(Acc, Key, Value, AltExtractFun)
end,
AltFoldQ0 = {foldheads_bybucket,
o_rkv,
"Bucket",
all,
{AltFoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)},
false,
true},
false, true, false},
{async, TreeAAltObjFolder0} =
leveled_bookie:book_returnfolder(Bookie2, AltFoldQ0),
SWB2Obj = os:timestamp(),
@ -213,8 +219,7 @@ many_put_compare(_Config) ->
FoldKeysFun =
fun(SegListToFind) ->
fun(_B, K, Acc) ->
Seg =
leveled_tictac:get_segment(erlang:phash2(K), SegmentCount),
Seg = get_segment(K, SegmentCount),
case lists:member(Seg, SegListToFind) of
true ->
[K|Acc];
@ -488,8 +493,7 @@ index_compare(_Config) ->
FoldKeysIndexQFun =
fun(_Bucket, {Term, Key}, Acc) ->
Seg =
leveled_tictac:get_segment(erlang:phash2(Key), SegmentCount),
Seg = get_segment(Key, SegmentCount),
case lists:member(Seg, DL3_0) of
true ->
[{Term, Key}|Acc];
@ -1144,3 +1148,15 @@ get_tictactree_fun(Bookie, Bucket, TreeSize) ->
[LMD, timer:now_diff(os:timestamp(), SW)]),
leveled_tictac:merge_trees(R, Acc)
end.
get_segment(K, SegmentCount) ->
BinKey =
case is_binary(K) of
true ->
K;
false ->
term_to_binary(K)
end,
{SegmentID, ExtraHash} = leveled_codec:segment_hash(BinKey),
SegHash = (ExtraHash band 65535) bsl 16 + SegmentID,
leveled_tictac:get_segment(SegHash, SegmentCount).