Allow for segment-acceleration of folds

Initially with basic tests.  If the SlotIndex has been cached, we can now use the slot index as it is based on the Segment hash algortihm.

This looks like it should lead to an order of magnitude improvement in querying for keys/clocks by segment ID.

This also required a slight tweak to the penciller keyfolder.  It now caches the next answer from the SSTiter, rather than restart the iterator.   When the IMMiter has many more entries than the SSTiter (as the sSTiter is being filtered but not the IMMiter) this could lead to lots of repeated folding.
This commit is contained in:
Martin Sumner 2017-10-31 23:28:35 +00:00
parent f5878548f9
commit b141dd199c
6 changed files with 519 additions and 202 deletions

View file

@ -699,7 +699,17 @@ get_runner(State,
{foldheads_allkeys, Tag, FoldFun, JournalCheck, SnapPreFold}) ->
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck);
leveled_runner:foldheads_allkeys(SnapFun, Tag, FoldFun,
JournalCheck, false);
get_runner(State,
{foldheads_allkeys,
Tag, FoldFun,
JournalCheck, SnapPreFold, SegmentList}) ->
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_allkeys(SnapFun,
Tag, FoldFun,
JournalCheck, SegmentList);
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) ->
SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold),

View file

@ -97,6 +97,7 @@ segment_hash({?RIAK_TAG, Bucket, Key, null})
segment_hash(Key) ->
segment_hash(term_to_binary(Key)).
-spec magic_hash(any()) -> integer().
%% @doc
%% Use DJ Bernstein magic hash function. Note, this is more expensive than

View file

@ -176,6 +176,7 @@
pcl_fetch/2,
pcl_fetch/3,
pcl_fetchkeys/5,
pcl_fetchkeysbysegment/6,
pcl_fetchnextkey/5,
pcl_checksequencenumber/3,
pcl_workforclerk/1,
@ -346,7 +347,32 @@ pcl_fetch(Pid, Key, Hash) ->
%% Erlang term order.
pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
gen_server:call(Pid,
{fetch_keys, StartKey, EndKey, AccFun, InitAcc, -1},
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
false, -1},
infinity).
-spec pcl_fetchkeysbysegment(pid(), tuple(), tuple(), fun(), any(),
false|list(integer())) -> any().
%% @doc
%% Run a range query between StartKey and EndKey (inclusive). This will cover
%% all keys in the range - so must only be run against snapshots of the
%% penciller to avoid blocking behaviour.
%%
%% This version allows an additional input of a SegmentList. This is a list
%% of 16-bit integers representing the segment IDs band ((2 ^ 16) -1) that
%% are interesting to the fetch
%%
%% Note that segment must be false unless the object Tag supports additional
%% indexing by segment. This cannot be used on ?IDX_TAG and other tags that
%% use the no_lookup hash
pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) ->
gen_server:call(Pid,
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
SegmentList, -1},
infinity).
-spec pcl_fetchnextkey(pid(), tuple(), tuple(), fun(), any()) -> any().
@ -356,7 +382,10 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) ->
%% found in erlang term order.
pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
gen_server:call(Pid,
{fetch_keys, StartKey, EndKey, AccFun, InitAcc, 1},
{fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
false, 1},
infinity).
-spec pcl_checksequencenumber(pid(), tuple(), integer()) -> boolean().
@ -540,7 +569,10 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
State#state.levelzero_index),
SQN),
State};
handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
handle_call({fetch_keys,
StartKey, EndKey,
AccFun, InitAcc,
SegmentList, MaxKeys},
_From,
State=#state{snapshot_fully_loaded=Ready})
when Ready == true ->
@ -575,7 +607,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
Acc = keyfolder({L0AsList, SSTiter},
{StartKey, EndKey},
{AccFun, InitAcc},
MaxKeys),
{SegmentList, MaxKeys}),
{reply, Acc, State#state{levelzero_astree = L0AsList}};
handle_call(get_startup_sqn, _From, State) ->
@ -1098,43 +1130,175 @@ compare_to_sqn(Obj, SQN) ->
end.
%%%============================================================================
%%% Iterator functions
%%%
%%% TODO - move to dedicated module with extended unit testing
%%%============================================================================
-spec keyfolder(list(), list(), tuple(), tuple(), {fun(), any()}) -> any().
%% @doc
%% The keyfolder will compare an iterator across the immutable in-memory cache
%% of the Penciller (the IMMiter), with an iterator across the persisted part
%% (the SSTiter).
%%
%% A Segment List and a MaxKeys may be passed. Every time something is added
%% to the accumulator MaxKeys is reduced - so set MaxKeys to -1 if it is
%% intended to be infinite.
%%
%% The basic principle is to take the next key in the IMMiter and compare it
%% to the next key in the SSTiter, and decide which one should be added to the
%% accumulator. The iterators are advanced if they either win (i.e. are the
%% next key), or are dominated. This goes on until the iterators are empty.
%%
%% To advance the SSTiter the find_nextkey/4 function is used, as the SSTiter
%% is an iterator across multiple levels - and so needs to do its own
%% comparisons to pop the next result.
keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder({IMMiter, SSTiter},
{StartKey, EndKey},
{AccFun, Acc},
{false, -1}).
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, {_SegmentList, MaxKeys})
when MaxKeys == 0 ->
Acc;
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, {SegmentList, MaxKeys}) ->
{StartKey, EndKey} = KeyRange,
case find_nextkey(SSTiter, StartKey, EndKey, SegmentList) of
no_more_keys ->
Acc;
{NxSSTiter, {SSTKey, SSTVal}} ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[], NxSSTiter},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1})
end;
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys}) ->
{StartKey, EndKey} = KeyRange,
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
{true, _} ->
% Normally everything is pre-filterd, but the IMM iterator can
% be re-used and so may be behind the StartKey if the StartKey has
% advanced from the previous use
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys});
{false, true} ->
% There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty
% (see above)
keyfolder({[], SSTiterator},
KeyRange,
{AccFun, Acc},
{SegmentList, MaxKeys});
{false, false} ->
case find_nextkey(SSTiterator, StartKey, EndKey, SegmentList) of
no_more_keys ->
% No more keys in range in the persisted store, so use the
% in-memory KV as the next
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator,
[]},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
{NxSSTiterator, {SSTKey, SSTVal}} ->
% There is a next key, so need to know which is the
% next key between the two (and handle two keys
% with different sequence numbers).
case leveled_codec:key_dominates({IMMKey,
IMMVal},
{SSTKey,
SSTVal}) of
left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
% Stow the previous best result away at Level -1
% so that there is no need to iterate to it again
NewEntry = {-1, [{SSTKey, SSTVal}]},
keyfolder({NxIMMiterator,
lists:keystore(-1,
1,
NxSSTiterator,
NewEntry)},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
right_hand_first ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
NxSSTiterator},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1});
left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
% We can add to the accumulator here. As the SST
% key was the most dominant across all SST levels,
% so there is no need to hold off until the IMMKey
% is left hand first.
keyfolder({NxIMMiterator,
NxSSTiterator},
KeyRange,
{AccFun, Acc1},
{SegmentList, MaxKeys - 1})
end
end
end.
%% Looks to find the best choice for the next key across the levels (other
%% than in-memory table)
%% In finding the best choice, the next key in a given level may be a next
%% block or next file pointer which will need to be expanded
find_nextkey(QueryArray, StartKey, EndKey) ->
find_nextkey(QueryArray,
0,
{null, null},
StartKey,
EndKey,
?ITERATOR_SCANWIDTH).
find_nextkey(QueryArray, StartKey, EndKey, false).
find_nextkey(_QueryArray, LCnt, {null, null}, _StartKey, _EndKey, _Width)
when LCnt > ?MAX_LEVELS ->
find_nextkey(QueryArray, StartKey, EndKey, SegmentList) ->
find_nextkey(QueryArray,
-1,
{null, null},
StartKey, EndKey,
SegmentList, ?ITERATOR_SCANWIDTH).
find_nextkey(_QueryArray, LCnt,
{null, null},
_StartKey, _EndKey,
_SegList, _Width) when LCnt > ?MAX_LEVELS ->
% The array has been scanned wihtout finding a best key - must be
% exhausted - respond to indicate no more keys to be found by the
% iterator
no_more_keys;
find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _StartKey, _EndKey, _Width)
when LCnt > ?MAX_LEVELS ->
find_nextkey(QueryArray, LCnt,
{BKL, BestKV},
_StartKey, _EndKey,
_SegList, _Width) when LCnt > ?MAX_LEVELS ->
% All levels have been scanned, so need to remove the best result from
% the array, and return that array along with the best key/sqn/status
% combination
{BKL, [BestKV|Tail]} = lists:keyfind(BKL, 1, QueryArray),
{lists:keyreplace(BKL, 1, QueryArray, {BKL, Tail}), BestKV};
find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
StartKey, EndKey, Width) ->
find_nextkey(QueryArray, LCnt,
{BestKeyLevel, BestKV},
StartKey, EndKey,
SegList, Width) ->
% Get the next key at this level
{NextKey, RestOfKeys} = case lists:keyfind(LCnt, 1, QueryArray) of
false ->
{null, null};
{LCnt, []} ->
{null, null};
{LCnt, [NK|ROfKs]} ->
{NK, ROfKs}
end,
{NextKey, RestOfKeys} =
case lists:keyfind(LCnt, 1, QueryArray) of
false ->
{null, null};
{LCnt, []} ->
{null, null};
{LCnt, [NK|ROfKs]} ->
{NK, ROfKs}
end,
% Compare the next key at this level with the best key
case {NextKey, BestKeyLevel, BestKV} of
{null, BKL, BKV} ->
@ -1142,42 +1306,48 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
find_nextkey(QueryArray,
LCnt + 1,
{BKL, BKV},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{next, Owner, _SK}, BKL, BKV} ->
% The first key at this level is pointer to a file - need to query
% the file to expand this level out before proceeding
Pointer = {next, Owner, StartKey, EndKey},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
Width),
Width,
SegList),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
LCnt,
{BKL, BKV},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{pointer, SSTPid, Slot, PSK, PEK}, BKL, BKV} ->
% The first key at this level is pointer within a file - need to
% query the file to expand this level out before proceeding
Pointer = {pointer, SSTPid, Slot, PSK, PEK},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
Width),
Width,
SegList),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
LCnt,
{BKL, BKV},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{Key, Val}, null, null} ->
% No best key set - so can assume that this key is the best key,
% and check the lower levels
find_nextkey(QueryArray,
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{Key, Val}, _BKL, {BestKey, _BestVal}} when Key < BestKey ->
% There is a real key and a best key to compare, and the real key
% at this level is before the best key, and so is now the new best
@ -1186,7 +1356,8 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
find_nextkey(QueryArray,
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
{{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey ->
SQN = leveled_codec:strip_to_seqonly({Key, Val}),
BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}),
@ -1197,7 +1368,8 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry),
LCnt + 1,
{BKL, {BestKey, BestVal}},
StartKey, EndKey, Width);
StartKey, EndKey,
SegList, Width);
SQN > BestSQN ->
% There is a real key at the front of this level and it has
% a higher SQN than the best key, so we should use this as
@ -1212,92 +1384,19 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV},
{BKL, BestTail}),
LCnt + 1,
{LCnt, {Key, Val}},
StartKey, EndKey, Width)
StartKey, EndKey,
SegList, Width)
end;
{_, BKL, BKV} ->
% This is not the best key
find_nextkey(QueryArray,
LCnt + 1,
{BKL, BKV},
StartKey, EndKey, Width)
StartKey, EndKey,
SegList, Width)
end.
keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) ->
keyfolder({IMMiter, SSTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1).
keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 ->
Acc;
keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange,
case find_nextkey(SSTiter, StartKey, EndKey) of
no_more_keys ->
Acc;
{NxSSTiter, {SSTKey, SSTVal}} ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[], NxSSTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1)
end;
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange,
{AccFun, Acc}, MaxKeys) ->
{StartKey, EndKey} = KeyRange,
case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of
{true, _} ->
% Normally everything is pre-filterd, but the IMM iterator can
% be re-used and so may be behind the StartKey if the StartKey has
% advanced from the previous use
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
{false, true} ->
% There are no more keys in-range in the in-memory
% iterator, so take action as if this iterator is empty
% (see above)
keyfolder({[], SSTiterator},
KeyRange,
{AccFun, Acc},
MaxKeys);
{false, false} ->
case find_nextkey(SSTiterator, StartKey, EndKey) of
no_more_keys ->
% No more keys in range in the persisted store, so use the
% in-memory KV as the next
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
{NxSSTiterator, {SSTKey, SSTVal}} ->
% There is a next key, so need to know which is the
% next key between the two (and handle two keys
% with different sequence numbers).
case leveled_codec:key_dominates({IMMKey,
IMMVal},
{SSTKey,
SSTVal}) of
left_hand_first ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, SSTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
right_hand_first ->
Acc1 = AccFun(SSTKey, SSTVal, Acc),
keyfolder({[{IMMKey, IMMVal}|NxIMMiterator],
NxSSTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1);
left_hand_dominant ->
Acc1 = AccFun(IMMKey, IMMVal, Acc),
keyfolder({NxIMMiterator, NxSSTiterator},
KeyRange,
{AccFun, Acc1},
MaxKeys - 1)
end
end
end.
%%%============================================================================
@ -1666,6 +1765,7 @@ foldwithimm_simple_test() ->
IMMiterB = leveled_tree:match_range({o, "Bucket1", "Key1", null},
{o, null, null, null},
IMM3),
io:format("Compare IMM3 with QueryArrary~n"),
AccB = keyfolder(IMMiterB,
QueryArray,
{o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},

View file

@ -28,7 +28,7 @@
bucketkey_query/4,
hashlist_query/3,
tictactree/5,
foldheads_allkeys/4,
foldheads_allkeys/5,
foldobjects_allkeys/3,
foldheads_bybucket/4,
foldobjects_bybucket/3,
@ -213,14 +213,18 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) ->
end,
{async, Runner}.
-spec foldheads_allkeys(fun(), atom(), fun(), boolean()) -> {async, fun()}.
-spec foldheads_allkeys(fun(), atom(), fun(), boolean(), false|list(integer()))
-> {async, fun()}.
%% @doc
%% Fold over all heads in the store for a given tag - applying the passed
%% function to each proxy object
foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck) ->
foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
{true, JournalCheck}, SegmentList).
-spec foldobjects_allkeys(fun(), atom(), fun()) -> {async, fun()}.
%% @doc
@ -228,21 +232,30 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck) ->
foldobjects_allkeys(SnapFun, Tag, FoldFun) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
false, false).
-spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) ->
{async, fun()}.
%% @doc
%% Fold over all objects within a given key range in a bucket
foldobjects_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun) ->
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
false, false).
-spec foldheads_bybucket(fun(), {atom(), any(), any()}, fun(), boolean()) ->
{async, fun()}.
%% @doc
%% Fold over all object metadata within a given key range in a bucket
foldheads_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun, JournalCheck) ->
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
{true, JournalCheck}, false).
-spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}.
%% @doc
@ -253,7 +266,10 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) ->
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm),
EndKey =
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false).
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldFun,
false, false).
@ -302,8 +318,8 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
-spec foldobjects(fun(), atom(), tuple(), tuple(), fun(),
false|{true, boolean()}) ->
{async, fun()}.
false|{true, boolean()}, false|list(integer())) ->
{async, fun()}.
%% @doc
%% The object folder should be passed DeferredFetch.
%% DeferredFetch can either be false (which will return to the fold function
@ -311,7 +327,10 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
%% will be created that if understood by the fold function will allow the fold
%% function to work on the head of the object, and defer fetching the body in
%% case such a fetch is unecessary.
foldobjects(SnapFun, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) ->
foldobjects(SnapFun,
Tag, StartKey, EndKey,
FoldObjectsFun,
DeferredFetch, SegmentList) ->
{FoldFun, InitAcc} =
case is_tuple(FoldObjectsFun) of
true ->
@ -331,11 +350,12 @@ foldobjects(SnapFun, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) ->
JournalSnapshot,
Tag,
DeferredFetch),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey,
EndKey,
AccFun,
InitAcc),
Acc = leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot,
StartKey,
EndKey,
AccFun,
InitAcc,
SegmentList),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
case DeferredFetch of
{true, false} ->

View file

@ -98,7 +98,9 @@
sst_get/2,
sst_get/3,
sst_getkvrange/4,
sst_getfilteredrange/5,
sst_getslots/2,
sst_getfilteredslots/3,
sst_getmaxsequencenumber/1,
sst_setfordelete/2,
sst_clear/1,
@ -106,7 +108,8 @@
sst_deleteconfirmed/1,
sst_close/1]).
-export([expand_list_by_pointer/3]).
-export([expand_list_by_pointer/4,
get_slotid/1]).
-record(slot_index_value, {slot_id :: integer(),
@ -136,6 +139,7 @@
yield_blockquery = false :: boolean(),
blockindex_cache}).
-type sst_state() :: #state{}.
%%%============================================================================
%%% API
@ -248,25 +252,42 @@ sst_get(Pid, LedgerKey) ->
sst_get(Pid, LedgerKey, Hash) ->
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
-spec sst_getkvrange(pid(), tuple()|all, tuple()|all, integer()) -> list().
%% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer
%% will be placed on the tail of the resulting list if results expand beyond
%% the Scan Width
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false).
-spec sst_getfilteredrange(pid(), tuple()|all, tuple()|all, integer(),
list()|false) -> list().
%% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer
%% will be placed on the tail of the resulting list if results expand beyond
%% the Scan Width
%%
%% To make the range open-ended (either ta start, end or both) the all atom
%% can be use din place of the Key tuple.
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) ->
SegList0 =
case is_list(SegList) of
true ->
lists:map(fun tune_hash/1, SegList);
false ->
SegList
end,
case gen_fsm:sync_send_event(Pid,
{get_kvrange, StartKey, EndKey, ScanWidth},
{get_kvrange,
StartKey, EndKey,
ScanWidth, SegList0},
infinity) of
{yield, SlotsToFetchBinList, SlotsToPoint} ->
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
end,
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint;
binaryslot_reader(SlotsToFetchBinList) ++ SlotsToPoint;
Reply ->
Reply
end.
@ -274,14 +295,31 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
-spec sst_getslots(pid(), list()) -> list().
%% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop
%% to term form outside of the FSM loop, this is to stop the copying of the
%% converted term to the calling process.
sst_getslots(Pid, SlotList) ->
SlotBins = gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity),
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
sst_getfilteredslots(Pid, SlotList, false).
-spec sst_getfilteredslots(pid(), list(), false|list()) -> list().
%% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop
%%
%% A list of 16-bit integer Segment IDs can be passed to filter the keys
%% returned (not precisely - with false results returned in addition). Use
%% false as a SegList to not filter
sst_getfilteredslots(Pid, SlotList, SegList) ->
SegList0 =
case is_list(SegList) of
true ->
lists:map(fun tune_hash/1, SegList);
false ->
SegList
end,
lists:foldl(FetchFun, [], SlotBins).
SlotBins = gen_fsm:sync_send_event(Pid,
{get_slots, SlotList, SegList0},
infinity),
binaryslot_reader(SlotBins).
-spec sst_getmaxsequencenumber(pid()) -> integer().
%% @doc
@ -415,10 +453,11 @@ reader({get_kv, LedgerKey, Hash}, _From, State) ->
{Result, Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State),
UpdTimings = leveled_log:sst_timing(State#state.sst_timings, SW, Stage),
{reply, Result, reader, UpdState#state{sst_timings = UpdTimings}};
reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
reader({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) ->
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
EndKey,
ScanWidth,
SlotList,
State),
case State#state.yield_blockquery of
true ->
@ -427,17 +466,16 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
reader,
State};
false ->
FetchFun =
fun({SlotBin, SK, EK}, Acc) ->
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
end,
{reply,
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint,
binaryslot_reader(SlotsToFetchBinList) ++ SlotsToPoint,
reader,
State}
end;
reader({get_slots, SlotList}, _From, State) ->
SlotBins = read_slots(State#state.handle, SlotList),
reader({get_slots, SlotList, SegList}, _From, State) ->
SlotBins =
read_slots(State#state.handle,
SlotList,
{SegList, State#state.blockindex_cache}),
{reply, SlotBins, reader, State};
reader(get_maxsequencenumber, _From, State) ->
Summary = State#state.summary,
@ -469,10 +507,12 @@ reader(close, _From, State) ->
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
{Result, _Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State),
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SlotList},
_From, State) ->
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
EndKey,
ScanWidth,
SlotList,
State),
% Always yield as about to clear and de-reference
{reply,
@ -480,8 +520,11 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
delete_pending,
State,
?DELETE_TIMEOUT};
delete_pending({get_slots, SlotList}, _From, State) ->
SlotBins = read_slots(State#state.handle, SlotList),
delete_pending({get_slots, SlotList, SegList}, _From, State) ->
SlotBins =
read_slots(State#state.handle,
SlotList,
{SegList, State#state.blockindex_cache}),
{reply, SlotBins, delete_pending, State, ?DELETE_TIMEOUT};
delete_pending(close, _From, State) ->
leveled_log:log("SST07", [State#state.filename]),
@ -560,18 +603,30 @@ fetch(LedgerKey, Hash, State) ->
[] ->
{not_present, slot_bloom, SlotID, State};
_ ->
Result = check_blocks(PosList,
State#state.handle,
Slot,
BlockLengths,
LedgerKey),
StartPos = Slot#slot_index_value.start_position,
Result =
check_blocks(PosList,
State#state.handle,
StartPos,
BlockLengths,
LedgerKey,
not_present),
{Result, slot_fetch, SlotID, State}
end
end
end.
fetch_range(StartKey, EndKey, ScanWidth, State) ->
-spec fetch_range(tuple(), tuple(), integer(), false|list(integer()), sst_state())
-> {list(), list()}.
%% @doc
%% Fetch the contents of the SST file for a given key range. This will
%% pre-fetch some results, and append pointers for additional results.
%%
%% A filter can be provided based on the Segment ID (usable for hashable
%% objects not no_lookup entries) to accelerate the query if the 5-arity
%% version is used
fetch_range(StartKey, EndKey, ScanWidth, SegList, State) ->
Summary = State#state.summary,
Handle = State#state.handle,
{Slots, RTrim} = lookup_slots(StartKey, EndKey, Summary#summary.index),
@ -622,7 +677,10 @@ fetch_range(StartKey, EndKey, ScanWidth, State) ->
lists:split(ScanWidth, ExpandedSlots)
end,
SlotsToFetchBinList = read_slots(Handle, SlotsToFetch),
SlotsToFetchBinList =
read_slots(Handle,
SlotsToFetch,
{SegList, State#state.blockindex_cache}),
{SlotsToFetchBinList, SlotsToPoint}.
@ -929,25 +987,39 @@ generate_binary_slot(Lookup, KVL) ->
{<<Lengths/binary, PosBinIndex/binary>>, FullBin, HashL, LastKey}.
check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey) ->
not_present;
check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey) ->
% Acc should start as not_present if LedgerKey is a key, and a list if
% LedgerKey is false
check_blocks([], _Handle, _StartPos, _BlockLengths, _LedgerKeyToCheck, Acc) ->
Acc;
check_blocks([Pos|Rest], Handle, StartPos, BlockLengths,
LedgerKeyToCheck, Acc) ->
{BlockNumber, BlockPos} = revert_position(Pos),
BlockBin = read_block(Handle, Slot, BlockLengths, BlockNumber),
BlockBin =
read_block(Handle,
StartPos,
BlockLengths,
BlockNumber),
BlockL = binary_to_term(BlockBin),
{K, V} = lists:nth(BlockPos, BlockL),
case K of
LedgerKey ->
LedgerKeyToCheck ->
{K, V};
_ ->
check_blocks(Rest, Handle, Slot, BlockLengths, LedgerKey)
case LedgerKeyToCheck of
false ->
Acc ++ [{K, V}];
_ ->
check_blocks(Rest, Handle, StartPos, BlockLengths,
LedgerKeyToCheck, Acc)
end
end.
read_block(Handle, Slot, BlockLengths, BlockID) ->
read_block(Handle, StartPos, BlockLengths, BlockID) ->
{BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID),
{ok, BlockBin} = file:pread(Handle,
Slot#slot_index_value.start_position
StartPos
+ BlockPos
+ Offset
+ 28,
@ -961,39 +1033,113 @@ read_slot(Handle, Slot) ->
Slot#slot_index_value.length),
SlotBin.
read_slots(Handle, SlotList) ->
PointerMapFun =
fun(Pointer) ->
{Slot, SK, EK} =
case Pointer of
{pointer, _Pid, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0};
{pointer, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0}
end,
{Slot#slot_index_value.start_position,
Slot#slot_index_value.length,
SK,
EK}
pointer_mapfun(Pointer) ->
{Slot, SK, EK} =
case Pointer of
{pointer, _Pid, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0};
{pointer, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0}
end,
LengthList = lists:map(PointerMapFun, SlotList),
{Slot#slot_index_value.start_position,
Slot#slot_index_value.length,
Slot#slot_index_value.slot_id,
SK,
EK}.
-spec binarysplit_mapfun(binary(), integer()) -> fun().
%% @doc
%% Return a function that can pull individual slot binaries from a binary
%% covering multiple slots
binarysplit_mapfun(MultiSlotBin, StartPos) ->
fun({SP, L, _ID, SK, EK}) ->
Start = SP - StartPos,
<<_Pre:Start/binary, SlotBin:L/binary, _Post/binary>> = MultiSlotBin,
{SlotBin, SK, EK}
end.
-spec read_slots(file:io_device(), list(), {false:list(), any()}) -> list().
%% @doc
%% The reading of sots will return a list of either 2-tuples containing
%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples
%% can be exploded into lists of {K, V} pairs using the binaryslot_reader/2
%% function
%%
%% Reading slots is generally unfiltered, but in the sepcial case when
%% querting across slots when only matching segment IDs are required the
%% BlockIndexCache can be used
%%
%% Note that false positives will be passed through. It is important that
%% any key comparison between levels should allow for a non-matching key to
%% be considered as superior to a matching key - as otherwise a matching key
%% may be intermittently removed from the result set
read_slots(Handle, SlotList, {false, _BlockIndexCache}) ->
% No list of segments passed
LengthList = lists:map(fun pointer_mapfun/1, SlotList),
{MultiSlotBin, StartPos} = read_length_list(Handle, LengthList),
lists:map(binarysplit_mapfun(MultiSlotBin, StartPos), LengthList);
read_slots(Handle, SlotList, {SegList, BlockIndexCache}) ->
% List of segments passed so only {K, V} pairs matching those segments
% should be returned. This required the {K, V} pair to have been added
% with the appropriate hash - if the pair were added with no_lookup as
% the hash value this will fial unexpectedly.
BinMapFun =
fun(Pointer, Acc) ->
{SP, _L, ID, _SK, _EK} = pointer_mapfun(Pointer),
case array:get(ID - 1, BlockIndexCache) of
none ->
% If there is an attempt to use the seg list query and the
% index block cache isn't cached for any part this may be
% slower as each slot will be read in turn
LengthDetails = pointer_mapfun(Pointer),
{MultiSlotBin, StartPos} =
read_length_list(Handle, [LengthDetails]),
MapFun = binarysplit_mapfun(MultiSlotBin, StartPos),
Acc ++ [MapFun(LengthDetails)];
<<BlockLengths:24/binary, BlockIdx/binary>> ->
% If there is a BlockIndex cached then we cna use it to
% check to see if any of the expected segments are
% present without lifting the slot off disk. Also the
% fact that we know position can be used to filter out
% other keys
case find_pos(BlockIdx, SegList, [], 0) of
[] ->
Acc;
PL ->
Acc ++ check_blocks(PL, Handle, SP, BlockLengths,
false, [])
end
end
end,
lists:foldl(BinMapFun, [], SlotList).
-spec binaryslot_reader(list({tuple(), tuple()}|{binary(), tuple(), tuple()}))
-> list({tuple(), tuple()}).
%% @doc
%% Read the binary slots converting them to {K, V} pairs if they were not
%% already {K, V} pairs
binaryslot_reader(SlotBinsToFetch) ->
binaryslot_reader(SlotBinsToFetch, []).
binaryslot_reader([], Acc) ->
Acc;
binaryslot_reader([{SlotBin, SK, EK}|Tail], Acc) ->
binaryslot_reader(Tail, Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK));
binaryslot_reader([{K, V}|Tail], Acc) ->
binaryslot_reader(Tail, Acc ++ [{K, V}]).
read_length_list(Handle, LengthList) ->
StartPos = element(1, lists:nth(1, LengthList)),
EndPos = element(1, lists:last(LengthList))
+ element(2, lists:last(LengthList)),
{ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos),
{MultiSlotBin, StartPos}.
BinSplitMapFun =
fun({SP, L, SK, EK}) ->
Start = SP - StartPos,
<<_Pre:Start/binary,
SlotBin:L/binary,
_Post/binary>> = MultiSlotBin,
{SlotBin, SK, EK}
end,
lists:map(BinSplitMapFun, LengthList).
binaryslot_get(FullBin, Key, Hash) ->
@ -1186,10 +1332,13 @@ block_offsetandlength(BlockLengths, BlockID) ->
end.
extra_hash({SegHash, _ExtraHash}) when is_integer(SegHash) ->
SegHash band 32767;
tune_hash(SegHash);
extra_hash(NotHash) ->
NotHash.
tune_hash(SegHash) ->
SegHash band 32767.
fetch_value([], _BlockLengths, _Blocks, _Key) ->
not_present;
fetch_value([Pos|Rest], BlockLengths, Blocks, Key) ->
@ -1226,6 +1375,14 @@ revert_position(Pos) ->
find_pos(<<>>, _Hash, PosList, _Count) ->
PosList;
find_pos(<<1:1/integer, PotentialHit:15/integer, T/binary>>,
HashList, PosList, Count) when is_list(HashList) ->
case lists:member(PotentialHit, HashList) of
true ->
find_pos(T, HashList, PosList ++ [Count], Count + 1);
false ->
find_pos(T, HashList, PosList, Count + 1)
end;
find_pos(<<1:1/integer, Hash:15/integer, T/binary>>, Hash, PosList, Count) ->
find_pos(T, Hash, PosList ++ [Count], Count + 1);
find_pos(<<1:1/integer, _Miss:15/integer, T/binary>>, Hash, PosList, Count) ->
@ -1435,7 +1592,11 @@ maybe_expand_pointer(List) ->
List.
expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) ->
expand_list_by_pointer(Pointer, Tail, Width) ->
expand_list_by_pointer(Pointer, Tail, Width, false).
expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey},
Tail, Width, SegList) ->
FoldFun =
fun(X, {Pointers, Remainder}) ->
case length(Pointers) of
@ -1452,15 +1613,21 @@ expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) -
end,
InitAcc = {[{pointer, Slot, StartKey, EndKey}], []},
{AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail),
ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers),
ExpPointers =
leveled_sst:sst_getfilteredslots(SSTPid, AccPointers, SegList),
lists:append(ExpPointers, AccTail);
expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Tail, Width) ->
expand_list_by_pointer({next, ManEntry, StartKey, EndKey},
Tail, Width, SegList) ->
SSTPid = ManEntry#manifest_entry.owner,
leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]),
ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width),
ExpPointer =
leveled_sst:sst_getfilteredrange(SSTPid,
StartKey, EndKey,
Width, SegList),
ExpPointer ++ Tail.
get_slotid(Slot) ->
Slot#slot_index_value.slot_id.
%%%============================================================================
%%% Test
@ -1523,7 +1690,7 @@ form_slot_test() ->
?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1).
merge_tombstonelist_test() ->
% Merge lists wiht nothing but tombstones
% Merge lists with nothing but tombstones
SkippingKV1 = {{o, "B1", "K9995", null}, {9995, tomb, 1234567, {}}},
SkippingKV2 = {{o, "B1", "K9996", null}, {9996, tomb, 1234567, {}}},
SkippingKV3 = {{o, "B1", "K9997", null}, {9997, tomb, 1234567, {}}},

View file

@ -100,8 +100,7 @@ perbucket_aae(_Config) ->
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
false,
true},
false, true},
SW_SL0 = os:timestamp(),
{async, Book2SegFolder} =
@ -115,7 +114,27 @@ perbucket_aae(_Config) ->
io:format("Segment lists found ~w ~w~n", [Book2SegList, Book3SegList]),
Delta = lists:subtract(Book2SegList, Book3SegList),
true = length(Delta) == 1.
true = length(Delta) == 1,
SuperHeadSegmentFolder =
{foldheads_allkeys,
?RIAK_TAG,
{get_segment_folder(DLs, TreeSize), []},
false, true, DLs},
SW_SL1 = os:timestamp(),
{async, Book2SegFolder1} =
leveled_bookie:book_returnfolder(Bookie2, SuperHeadSegmentFolder),
{async, Book3SegFolder1} =
leveled_bookie:book_returnfolder(Bookie3, SuperHeadSegmentFolder),
Book2SegList1 = Book2SegFolder1(),
Book3SegList1 = Book3SegFolder1(),
Time_SL1 = timer:now_diff(os:timestamp(), SW_SL1)/1000,
io:format("Two segment list folds took ~w milliseconds ~n", [Time_SL1]),
io:format("Segment lists found ~w ~w~n", [Book2SegList1, Book3SegList1]),
Delta1 = lists:subtract(Book2SegList1, Book3SegList1),
true = length(Delta1) == 1.
get_segment_folder(SegmentList, TreeSize) ->