diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 42d5e0f..d7dfa56 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -699,7 +699,17 @@ get_runner(State, {foldheads_allkeys, Tag, FoldFun, JournalCheck, SnapPreFold}) -> SnapType = snaptype_by_presence(JournalCheck), SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold), - leveled_runner:foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck); + leveled_runner:foldheads_allkeys(SnapFun, Tag, FoldFun, + JournalCheck, false); +get_runner(State, + {foldheads_allkeys, + Tag, FoldFun, + JournalCheck, SnapPreFold, SegmentList}) -> + SnapType = snaptype_by_presence(JournalCheck), + SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold), + leveled_runner:foldheads_allkeys(SnapFun, + Tag, FoldFun, + JournalCheck, SegmentList); get_runner(State, {foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) -> SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold), diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 2cc92b6..0b7e731 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -97,6 +97,7 @@ segment_hash({?RIAK_TAG, Bucket, Key, null}) segment_hash(Key) -> segment_hash(term_to_binary(Key)). + -spec magic_hash(any()) -> integer(). %% @doc %% Use DJ Bernstein magic hash function. Note, this is more expensive than diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 8abe56b..081bccb 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -176,6 +176,7 @@ pcl_fetch/2, pcl_fetch/3, pcl_fetchkeys/5, + pcl_fetchkeysbysegment/6, pcl_fetchnextkey/5, pcl_checksequencenumber/3, pcl_workforclerk/1, @@ -346,7 +347,32 @@ pcl_fetch(Pid, Key, Hash) -> %% Erlang term order. pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> gen_server:call(Pid, - {fetch_keys, StartKey, EndKey, AccFun, InitAcc, -1}, + {fetch_keys, + StartKey, EndKey, + AccFun, InitAcc, + false, -1}, + infinity). + +-spec pcl_fetchkeysbysegment(pid(), tuple(), tuple(), fun(), any(), + false|list(integer())) -> any(). +%% @doc +%% Run a range query between StartKey and EndKey (inclusive). This will cover +%% all keys in the range - so must only be run against snapshots of the +%% penciller to avoid blocking behaviour. +%% +%% This version allows an additional input of a SegmentList. This is a list +%% of 16-bit integers representing the segment IDs band ((2 ^ 16) -1) that +%% are interesting to the fetch +%% +%% Note that segment must be false unless the object Tag supports additional +%% indexing by segment. This cannot be used on ?IDX_TAG and other tags that +%% use the no_lookup hash +pcl_fetchkeysbysegment(Pid, StartKey, EndKey, AccFun, InitAcc, SegmentList) -> + gen_server:call(Pid, + {fetch_keys, + StartKey, EndKey, + AccFun, InitAcc, + SegmentList, -1}, infinity). -spec pcl_fetchnextkey(pid(), tuple(), tuple(), fun(), any()) -> any(). @@ -356,7 +382,10 @@ pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> %% found in erlang term order. pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> gen_server:call(Pid, - {fetch_keys, StartKey, EndKey, AccFun, InitAcc, 1}, + {fetch_keys, + StartKey, EndKey, + AccFun, InitAcc, + false, 1}, infinity). -spec pcl_checksequencenumber(pid(), tuple(), integer()) -> boolean(). @@ -540,7 +569,10 @@ handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> State#state.levelzero_index), SQN), State}; -handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, +handle_call({fetch_keys, + StartKey, EndKey, + AccFun, InitAcc, + SegmentList, MaxKeys}, _From, State=#state{snapshot_fully_loaded=Ready}) when Ready == true -> @@ -575,7 +607,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, Acc = keyfolder({L0AsList, SSTiter}, {StartKey, EndKey}, {AccFun, InitAcc}, - MaxKeys), + {SegmentList, MaxKeys}), {reply, Acc, State#state{levelzero_astree = L0AsList}}; handle_call(get_startup_sqn, _From, State) -> @@ -1098,43 +1130,175 @@ compare_to_sqn(Obj, SQN) -> end. +%%%============================================================================ +%%% Iterator functions +%%% +%%% TODO - move to dedicated module with extended unit testing +%%%============================================================================ + + +-spec keyfolder(list(), list(), tuple(), tuple(), {fun(), any()}) -> any(). +%% @doc +%% The keyfolder will compare an iterator across the immutable in-memory cache +%% of the Penciller (the IMMiter), with an iterator across the persisted part +%% (the SSTiter). +%% +%% A Segment List and a MaxKeys may be passed. Every time something is added +%% to the accumulator MaxKeys is reduced - so set MaxKeys to -1 if it is +%% intended to be infinite. +%% +%% The basic principle is to take the next key in the IMMiter and compare it +%% to the next key in the SSTiter, and decide which one should be added to the +%% accumulator. The iterators are advanced if they either win (i.e. are the +%% next key), or are dominated. This goes on until the iterators are empty. +%% +%% To advance the SSTiter the find_nextkey/4 function is used, as the SSTiter +%% is an iterator across multiple levels - and so needs to do its own +%% comparisons to pop the next result. +keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) -> + keyfolder({IMMiter, SSTiter}, + {StartKey, EndKey}, + {AccFun, Acc}, + {false, -1}). + +keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, {_SegmentList, MaxKeys}) + when MaxKeys == 0 -> + Acc; +keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, {SegmentList, MaxKeys}) -> + {StartKey, EndKey} = KeyRange, + case find_nextkey(SSTiter, StartKey, EndKey, SegmentList) of + no_more_keys -> + Acc; + {NxSSTiter, {SSTKey, SSTVal}} -> + Acc1 = AccFun(SSTKey, SSTVal, Acc), + keyfolder({[], NxSSTiter}, + KeyRange, + {AccFun, Acc1}, + {SegmentList, MaxKeys - 1}) + end; +keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, + KeyRange, + {AccFun, Acc}, + {SegmentList, MaxKeys}) -> + {StartKey, EndKey} = KeyRange, + case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of + {true, _} -> + + % Normally everything is pre-filterd, but the IMM iterator can + % be re-used and so may be behind the StartKey if the StartKey has + % advanced from the previous use + keyfolder({NxIMMiterator, SSTiterator}, + KeyRange, + {AccFun, Acc}, + {SegmentList, MaxKeys}); + {false, true} -> + % There are no more keys in-range in the in-memory + % iterator, so take action as if this iterator is empty + % (see above) + keyfolder({[], SSTiterator}, + KeyRange, + {AccFun, Acc}, + {SegmentList, MaxKeys}); + {false, false} -> + case find_nextkey(SSTiterator, StartKey, EndKey, SegmentList) of + no_more_keys -> + % No more keys in range in the persisted store, so use the + % in-memory KV as the next + Acc1 = AccFun(IMMKey, IMMVal, Acc), + keyfolder({NxIMMiterator, + []}, + KeyRange, + {AccFun, Acc1}, + {SegmentList, MaxKeys - 1}); + {NxSSTiterator, {SSTKey, SSTVal}} -> + % There is a next key, so need to know which is the + % next key between the two (and handle two keys + % with different sequence numbers). + case leveled_codec:key_dominates({IMMKey, + IMMVal}, + {SSTKey, + SSTVal}) of + left_hand_first -> + Acc1 = AccFun(IMMKey, IMMVal, Acc), + % Stow the previous best result away at Level -1 + % so that there is no need to iterate to it again + NewEntry = {-1, [{SSTKey, SSTVal}]}, + keyfolder({NxIMMiterator, + lists:keystore(-1, + 1, + NxSSTiterator, + NewEntry)}, + KeyRange, + {AccFun, Acc1}, + {SegmentList, MaxKeys - 1}); + right_hand_first -> + Acc1 = AccFun(SSTKey, SSTVal, Acc), + keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], + NxSSTiterator}, + KeyRange, + {AccFun, Acc1}, + {SegmentList, MaxKeys - 1}); + left_hand_dominant -> + Acc1 = AccFun(IMMKey, IMMVal, Acc), + % We can add to the accumulator here. As the SST + % key was the most dominant across all SST levels, + % so there is no need to hold off until the IMMKey + % is left hand first. + keyfolder({NxIMMiterator, + NxSSTiterator}, + KeyRange, + {AccFun, Acc1}, + {SegmentList, MaxKeys - 1}) + end + end + end. + %% Looks to find the best choice for the next key across the levels (other %% than in-memory table) %% In finding the best choice, the next key in a given level may be a next %% block or next file pointer which will need to be expanded find_nextkey(QueryArray, StartKey, EndKey) -> - find_nextkey(QueryArray, - 0, - {null, null}, - StartKey, - EndKey, - ?ITERATOR_SCANWIDTH). + find_nextkey(QueryArray, StartKey, EndKey, false). -find_nextkey(_QueryArray, LCnt, {null, null}, _StartKey, _EndKey, _Width) - when LCnt > ?MAX_LEVELS -> +find_nextkey(QueryArray, StartKey, EndKey, SegmentList) -> + find_nextkey(QueryArray, + -1, + {null, null}, + StartKey, EndKey, + SegmentList, ?ITERATOR_SCANWIDTH). + +find_nextkey(_QueryArray, LCnt, + {null, null}, + _StartKey, _EndKey, + _SegList, _Width) when LCnt > ?MAX_LEVELS -> % The array has been scanned wihtout finding a best key - must be % exhausted - respond to indicate no more keys to be found by the % iterator no_more_keys; -find_nextkey(QueryArray, LCnt, {BKL, BestKV}, _StartKey, _EndKey, _Width) - when LCnt > ?MAX_LEVELS -> +find_nextkey(QueryArray, LCnt, + {BKL, BestKV}, + _StartKey, _EndKey, + _SegList, _Width) when LCnt > ?MAX_LEVELS -> % All levels have been scanned, so need to remove the best result from % the array, and return that array along with the best key/sqn/status % combination {BKL, [BestKV|Tail]} = lists:keyfind(BKL, 1, QueryArray), {lists:keyreplace(BKL, 1, QueryArray, {BKL, Tail}), BestKV}; -find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, - StartKey, EndKey, Width) -> +find_nextkey(QueryArray, LCnt, + {BestKeyLevel, BestKV}, + StartKey, EndKey, + SegList, Width) -> % Get the next key at this level - {NextKey, RestOfKeys} = case lists:keyfind(LCnt, 1, QueryArray) of - false -> - {null, null}; - {LCnt, []} -> - {null, null}; - {LCnt, [NK|ROfKs]} -> - {NK, ROfKs} - end, + {NextKey, RestOfKeys} = + case lists:keyfind(LCnt, 1, QueryArray) of + false -> + {null, null}; + {LCnt, []} -> + {null, null}; + {LCnt, [NK|ROfKs]} -> + {NK, ROfKs} + end, % Compare the next key at this level with the best key case {NextKey, BestKeyLevel, BestKV} of {null, BKL, BKV} -> @@ -1142,42 +1306,48 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, find_nextkey(QueryArray, LCnt + 1, {BKL, BKV}, - StartKey, EndKey, Width); + StartKey, EndKey, + SegList, Width); {{next, Owner, _SK}, BKL, BKV} -> % The first key at this level is pointer to a file - need to query % the file to expand this level out before proceeding Pointer = {next, Owner, StartKey, EndKey}, UpdList = leveled_sst:expand_list_by_pointer(Pointer, RestOfKeys, - Width), + Width, + SegList), NewEntry = {LCnt, UpdList}, % Need to loop around at this level (LCnt) as we have not yet % examined a real key at this level find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), LCnt, {BKL, BKV}, - StartKey, EndKey, Width); + StartKey, EndKey, + SegList, Width); {{pointer, SSTPid, Slot, PSK, PEK}, BKL, BKV} -> % The first key at this level is pointer within a file - need to % query the file to expand this level out before proceeding Pointer = {pointer, SSTPid, Slot, PSK, PEK}, UpdList = leveled_sst:expand_list_by_pointer(Pointer, RestOfKeys, - Width), + Width, + SegList), NewEntry = {LCnt, UpdList}, % Need to loop around at this level (LCnt) as we have not yet % examined a real key at this level find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), LCnt, {BKL, BKV}, - StartKey, EndKey, Width); + StartKey, EndKey, + SegList, Width); {{Key, Val}, null, null} -> % No best key set - so can assume that this key is the best key, % and check the lower levels find_nextkey(QueryArray, LCnt + 1, {LCnt, {Key, Val}}, - StartKey, EndKey, Width); + StartKey, EndKey, + SegList, Width); {{Key, Val}, _BKL, {BestKey, _BestVal}} when Key < BestKey -> % There is a real key and a best key to compare, and the real key % at this level is before the best key, and so is now the new best @@ -1186,7 +1356,8 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, find_nextkey(QueryArray, LCnt + 1, {LCnt, {Key, Val}}, - StartKey, EndKey, Width); + StartKey, EndKey, + SegList, Width); {{Key, Val}, BKL, {BestKey, BestVal}} when Key == BestKey -> SQN = leveled_codec:strip_to_seqonly({Key, Val}), BestSQN = leveled_codec:strip_to_seqonly({BestKey, BestVal}), @@ -1197,7 +1368,8 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, find_nextkey(lists:keyreplace(LCnt, 1, QueryArray, NewEntry), LCnt + 1, {BKL, {BestKey, BestVal}}, - StartKey, EndKey, Width); + StartKey, EndKey, + SegList, Width); SQN > BestSQN -> % There is a real key at the front of this level and it has % a higher SQN than the best key, so we should use this as @@ -1212,92 +1384,19 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, {BKL, BestTail}), LCnt + 1, {LCnt, {Key, Val}}, - StartKey, EndKey, Width) + StartKey, EndKey, + SegList, Width) end; {_, BKL, BKV} -> % This is not the best key find_nextkey(QueryArray, LCnt + 1, {BKL, BKV}, - StartKey, EndKey, Width) + StartKey, EndKey, + SegList, Width) end. -keyfolder(IMMiter, SSTiter, StartKey, EndKey, {AccFun, Acc}) -> - keyfolder({IMMiter, SSTiter}, {StartKey, EndKey}, {AccFun, Acc}, -1). - -keyfolder(_Iterators, _KeyRange, {_AccFun, Acc}, MaxKeys) when MaxKeys == 0 -> - Acc; -keyfolder({[], SSTiter}, KeyRange, {AccFun, Acc}, MaxKeys) -> - {StartKey, EndKey} = KeyRange, - case find_nextkey(SSTiter, StartKey, EndKey) of - no_more_keys -> - Acc; - {NxSSTiter, {SSTKey, SSTVal}} -> - Acc1 = AccFun(SSTKey, SSTVal, Acc), - keyfolder({[], NxSSTiter}, KeyRange, {AccFun, Acc1}, MaxKeys - 1) - end; -keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange, - {AccFun, Acc}, MaxKeys) -> - {StartKey, EndKey} = KeyRange, - case {IMMKey < StartKey, leveled_codec:endkey_passed(EndKey, IMMKey)} of - {true, _} -> - - % Normally everything is pre-filterd, but the IMM iterator can - % be re-used and so may be behind the StartKey if the StartKey has - % advanced from the previous use - keyfolder({NxIMMiterator, SSTiterator}, - KeyRange, - {AccFun, Acc}, - MaxKeys); - {false, true} -> - % There are no more keys in-range in the in-memory - % iterator, so take action as if this iterator is empty - % (see above) - keyfolder({[], SSTiterator}, - KeyRange, - {AccFun, Acc}, - MaxKeys); - {false, false} -> - case find_nextkey(SSTiterator, StartKey, EndKey) of - no_more_keys -> - % No more keys in range in the persisted store, so use the - % in-memory KV as the next - Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder({NxIMMiterator, SSTiterator}, - KeyRange, - {AccFun, Acc1}, - MaxKeys - 1); - {NxSSTiterator, {SSTKey, SSTVal}} -> - % There is a next key, so need to know which is the - % next key between the two (and handle two keys - % with different sequence numbers). - case leveled_codec:key_dominates({IMMKey, - IMMVal}, - {SSTKey, - SSTVal}) of - left_hand_first -> - Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder({NxIMMiterator, SSTiterator}, - KeyRange, - {AccFun, Acc1}, - MaxKeys - 1); - right_hand_first -> - Acc1 = AccFun(SSTKey, SSTVal, Acc), - keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], - NxSSTiterator}, - KeyRange, - {AccFun, Acc1}, - MaxKeys - 1); - left_hand_dominant -> - Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder({NxIMMiterator, NxSSTiterator}, - KeyRange, - {AccFun, Acc1}, - MaxKeys - 1) - end - end - end. %%%============================================================================ @@ -1666,6 +1765,7 @@ foldwithimm_simple_test() -> IMMiterB = leveled_tree:match_range({o, "Bucket1", "Key1", null}, {o, null, null, null}, IMM3), + io:format("Compare IMM3 with QueryArrary~n"), AccB = keyfolder(IMMiterB, QueryArray, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 4a0b06b..1cdfc10 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -28,7 +28,7 @@ bucketkey_query/4, hashlist_query/3, tictactree/5, - foldheads_allkeys/4, + foldheads_allkeys/5, foldobjects_allkeys/3, foldheads_bybucket/4, foldobjects_bybucket/3, @@ -213,14 +213,18 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) -> end, {async, Runner}. --spec foldheads_allkeys(fun(), atom(), fun(), boolean()) -> {async, fun()}. +-spec foldheads_allkeys(fun(), atom(), fun(), boolean(), false|list(integer())) + -> {async, fun()}. %% @doc %% Fold over all heads in the store for a given tag - applying the passed %% function to each proxy object -foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck) -> +foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck, SegmentList) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}). + foldobjects(SnapFun, + Tag, StartKey, EndKey, + FoldFun, + {true, JournalCheck}, SegmentList). -spec foldobjects_allkeys(fun(), atom(), fun()) -> {async, fun()}. %% @doc @@ -228,21 +232,30 @@ foldheads_allkeys(SnapFun, Tag, FoldFun, JournalCheck) -> foldobjects_allkeys(SnapFun, Tag, FoldFun) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false). + foldobjects(SnapFun, + Tag, StartKey, EndKey, + FoldFun, + false, false). -spec foldobjects_bybucket(fun(), {atom(), any(), any()}, fun()) -> {async, fun()}. %% @doc %% Fold over all objects within a given key range in a bucket foldobjects_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun) -> - foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false). + foldobjects(SnapFun, + Tag, StartKey, EndKey, + FoldFun, + false, false). -spec foldheads_bybucket(fun(), {atom(), any(), any()}, fun(), boolean()) -> {async, fun()}. %% @doc %% Fold over all object metadata within a given key range in a bucket foldheads_bybucket(SnapFun, {Tag, StartKey, EndKey}, FoldFun, JournalCheck) -> - foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, {true, JournalCheck}). + foldobjects(SnapFun, + Tag, StartKey, EndKey, + FoldFun, + {true, JournalCheck}, false). -spec foldobjects_byindex(fun(), tuple(), fun()) -> {async, fun()}. %% @doc @@ -253,7 +266,10 @@ foldobjects_byindex(SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldFun) -> leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm), EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm), - foldobjects(SnapFun, Tag, StartKey, EndKey, FoldFun, false). + foldobjects(SnapFun, + Tag, StartKey, EndKey, + FoldFun, + false, false). @@ -302,8 +318,8 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) -> -spec foldobjects(fun(), atom(), tuple(), tuple(), fun(), - false|{true, boolean()}) -> - {async, fun()}. + false|{true, boolean()}, false|list(integer())) -> + {async, fun()}. %% @doc %% The object folder should be passed DeferredFetch. %% DeferredFetch can either be false (which will return to the fold function @@ -311,7 +327,10 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) -> %% will be created that if understood by the fold function will allow the fold %% function to work on the head of the object, and defer fetching the body in %% case such a fetch is unecessary. -foldobjects(SnapFun, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) -> +foldobjects(SnapFun, + Tag, StartKey, EndKey, + FoldObjectsFun, + DeferredFetch, SegmentList) -> {FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of true -> @@ -331,11 +350,12 @@ foldobjects(SnapFun, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) -> JournalSnapshot, Tag, DeferredFetch), - Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, - StartKey, - EndKey, - AccFun, - InitAcc), + Acc = leveled_penciller:pcl_fetchkeysbysegment(LedgerSnapshot, + StartKey, + EndKey, + AccFun, + InitAcc, + SegmentList), ok = leveled_penciller:pcl_close(LedgerSnapshot), case DeferredFetch of {true, false} -> diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 7b8633b..1795917 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -98,7 +98,9 @@ sst_get/2, sst_get/3, sst_getkvrange/4, + sst_getfilteredrange/5, sst_getslots/2, + sst_getfilteredslots/3, sst_getmaxsequencenumber/1, sst_setfordelete/2, sst_clear/1, @@ -106,7 +108,8 @@ sst_deleteconfirmed/1, sst_close/1]). --export([expand_list_by_pointer/3]). +-export([expand_list_by_pointer/4, + get_slotid/1]). -record(slot_index_value, {slot_id :: integer(), @@ -136,6 +139,7 @@ yield_blockquery = false :: boolean(), blockindex_cache}). +-type sst_state() :: #state{}. %%%============================================================================ %%% API @@ -248,25 +252,42 @@ sst_get(Pid, LedgerKey) -> sst_get(Pid, LedgerKey, Hash) -> gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity). + -spec sst_getkvrange(pid(), tuple()|all, tuple()|all, integer()) -> list(). %% @doc %% Get a range of {Key, Value} pairs as a list between StartKey and EndKey %% (inclusive). The ScanWidth is the maximum size of the range, a pointer %% will be placed on the tail of the resulting list if results expand beyond %% the Scan Width +sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> + sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false). + + +-spec sst_getfilteredrange(pid(), tuple()|all, tuple()|all, integer(), + list()|false) -> list(). +%% @doc +%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey +%% (inclusive). The ScanWidth is the maximum size of the range, a pointer +%% will be placed on the tail of the resulting list if results expand beyond +%% the Scan Width %% %% To make the range open-ended (either ta start, end or both) the all atom %% can be use din place of the Key tuple. -sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> +sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) -> + SegList0 = + case is_list(SegList) of + true -> + lists:map(fun tune_hash/1, SegList); + false -> + SegList + end, case gen_fsm:sync_send_event(Pid, - {get_kvrange, StartKey, EndKey, ScanWidth}, + {get_kvrange, + StartKey, EndKey, + ScanWidth, SegList0}, infinity) of {yield, SlotsToFetchBinList, SlotsToPoint} -> - FetchFun = - fun({SlotBin, SK, EK}, Acc) -> - Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK) - end, - lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint; + binaryslot_reader(SlotsToFetchBinList) ++ SlotsToPoint; Reply -> Reply end. @@ -274,14 +295,31 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> -spec sst_getslots(pid(), list()) -> list(). %% @doc %% Get a list of slots by their ID. The slot will be converted from the binary -%% to term form outside of the FSM loop +%% to term form outside of the FSM loop, this is to stop the copying of the +%% converted term to the calling process. sst_getslots(Pid, SlotList) -> - SlotBins = gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity), - FetchFun = - fun({SlotBin, SK, EK}, Acc) -> - Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK) + sst_getfilteredslots(Pid, SlotList, false). + +-spec sst_getfilteredslots(pid(), list(), false|list()) -> list(). +%% @doc +%% Get a list of slots by their ID. The slot will be converted from the binary +%% to term form outside of the FSM loop +%% +%% A list of 16-bit integer Segment IDs can be passed to filter the keys +%% returned (not precisely - with false results returned in addition). Use +%% false as a SegList to not filter +sst_getfilteredslots(Pid, SlotList, SegList) -> + SegList0 = + case is_list(SegList) of + true -> + lists:map(fun tune_hash/1, SegList); + false -> + SegList end, - lists:foldl(FetchFun, [], SlotBins). + SlotBins = gen_fsm:sync_send_event(Pid, + {get_slots, SlotList, SegList0}, + infinity), + binaryslot_reader(SlotBins). -spec sst_getmaxsequencenumber(pid()) -> integer(). %% @doc @@ -415,10 +453,11 @@ reader({get_kv, LedgerKey, Hash}, _From, State) -> {Result, Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State), UpdTimings = leveled_log:sst_timing(State#state.sst_timings, SW, Stage), {reply, Result, reader, UpdState#state{sst_timings = UpdTimings}}; -reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> +reader({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) -> {SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey, EndKey, ScanWidth, + SlotList, State), case State#state.yield_blockquery of true -> @@ -427,17 +466,16 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> reader, State}; false -> - FetchFun = - fun({SlotBin, SK, EK}, Acc) -> - Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK) - end, {reply, - lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint, + binaryslot_reader(SlotsToFetchBinList) ++ SlotsToPoint, reader, State} end; -reader({get_slots, SlotList}, _From, State) -> - SlotBins = read_slots(State#state.handle, SlotList), +reader({get_slots, SlotList, SegList}, _From, State) -> + SlotBins = + read_slots(State#state.handle, + SlotList, + {SegList, State#state.blockindex_cache}), {reply, SlotBins, reader, State}; reader(get_maxsequencenumber, _From, State) -> Summary = State#state.summary, @@ -469,10 +507,12 @@ reader(close, _From, State) -> delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> {Result, _Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State), {reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT}; -delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> +delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, + _From, State) -> {SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey, EndKey, ScanWidth, + SlotList, State), % Always yield as about to clear and de-reference {reply, @@ -480,8 +520,11 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> delete_pending, State, ?DELETE_TIMEOUT}; -delete_pending({get_slots, SlotList}, _From, State) -> - SlotBins = read_slots(State#state.handle, SlotList), +delete_pending({get_slots, SlotList, SegList}, _From, State) -> + SlotBins = + read_slots(State#state.handle, + SlotList, + {SegList, State#state.blockindex_cache}), {reply, SlotBins, delete_pending, State, ?DELETE_TIMEOUT}; delete_pending(close, _From, State) -> leveled_log:log("SST07", [State#state.filename]), @@ -560,18 +603,30 @@ fetch(LedgerKey, Hash, State) -> [] -> {not_present, slot_bloom, SlotID, State}; _ -> - Result = check_blocks(PosList, - State#state.handle, - Slot, - BlockLengths, - LedgerKey), + StartPos = Slot#slot_index_value.start_position, + Result = + check_blocks(PosList, + State#state.handle, + StartPos, + BlockLengths, + LedgerKey, + not_present), {Result, slot_fetch, SlotID, State} end end end. -fetch_range(StartKey, EndKey, ScanWidth, State) -> +-spec fetch_range(tuple(), tuple(), integer(), false|list(integer()), sst_state()) + -> {list(), list()}. +%% @doc +%% Fetch the contents of the SST file for a given key range. This will +%% pre-fetch some results, and append pointers for additional results. +%% +%% A filter can be provided based on the Segment ID (usable for hashable +%% objects not no_lookup entries) to accelerate the query if the 5-arity +%% version is used +fetch_range(StartKey, EndKey, ScanWidth, SegList, State) -> Summary = State#state.summary, Handle = State#state.handle, {Slots, RTrim} = lookup_slots(StartKey, EndKey, Summary#summary.index), @@ -622,7 +677,10 @@ fetch_range(StartKey, EndKey, ScanWidth, State) -> lists:split(ScanWidth, ExpandedSlots) end, - SlotsToFetchBinList = read_slots(Handle, SlotsToFetch), + SlotsToFetchBinList = + read_slots(Handle, + SlotsToFetch, + {SegList, State#state.blockindex_cache}), {SlotsToFetchBinList, SlotsToPoint}. @@ -929,25 +987,39 @@ generate_binary_slot(Lookup, KVL) -> {<>, FullBin, HashL, LastKey}. -check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey) -> - not_present; -check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey) -> +% Acc should start as not_present if LedgerKey is a key, and a list if +% LedgerKey is false + +check_blocks([], _Handle, _StartPos, _BlockLengths, _LedgerKeyToCheck, Acc) -> + Acc; +check_blocks([Pos|Rest], Handle, StartPos, BlockLengths, + LedgerKeyToCheck, Acc) -> {BlockNumber, BlockPos} = revert_position(Pos), - BlockBin = read_block(Handle, Slot, BlockLengths, BlockNumber), + BlockBin = + read_block(Handle, + StartPos, + BlockLengths, + BlockNumber), BlockL = binary_to_term(BlockBin), {K, V} = lists:nth(BlockPos, BlockL), case K of - LedgerKey -> + LedgerKeyToCheck -> {K, V}; _ -> - check_blocks(Rest, Handle, Slot, BlockLengths, LedgerKey) + case LedgerKeyToCheck of + false -> + Acc ++ [{K, V}]; + _ -> + check_blocks(Rest, Handle, StartPos, BlockLengths, + LedgerKeyToCheck, Acc) + end end. -read_block(Handle, Slot, BlockLengths, BlockID) -> +read_block(Handle, StartPos, BlockLengths, BlockID) -> {BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID), {ok, BlockBin} = file:pread(Handle, - Slot#slot_index_value.start_position + StartPos + BlockPos + Offset + 28, @@ -961,39 +1033,113 @@ read_slot(Handle, Slot) -> Slot#slot_index_value.length), SlotBin. -read_slots(Handle, SlotList) -> - PointerMapFun = - fun(Pointer) -> - {Slot, SK, EK} = - case Pointer of - {pointer, _Pid, Slot0, SK0, EK0} -> - {Slot0, SK0, EK0}; - {pointer, Slot0, SK0, EK0} -> - {Slot0, SK0, EK0} - end, - {Slot#slot_index_value.start_position, - Slot#slot_index_value.length, - SK, - EK} +pointer_mapfun(Pointer) -> + {Slot, SK, EK} = + case Pointer of + {pointer, _Pid, Slot0, SK0, EK0} -> + {Slot0, SK0, EK0}; + {pointer, Slot0, SK0, EK0} -> + {Slot0, SK0, EK0} end, - - LengthList = lists:map(PointerMapFun, SlotList), + + {Slot#slot_index_value.start_position, + Slot#slot_index_value.length, + Slot#slot_index_value.slot_id, + SK, + EK}. + +-spec binarysplit_mapfun(binary(), integer()) -> fun(). +%% @doc +%% Return a function that can pull individual slot binaries from a binary +%% covering multiple slots +binarysplit_mapfun(MultiSlotBin, StartPos) -> + fun({SP, L, _ID, SK, EK}) -> + Start = SP - StartPos, + <<_Pre:Start/binary, SlotBin:L/binary, _Post/binary>> = MultiSlotBin, + {SlotBin, SK, EK} + end. + + +-spec read_slots(file:io_device(), list(), {false:list(), any()}) -> list(). +%% @doc +%% The reading of sots will return a list of either 2-tuples containing +%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples +%% can be exploded into lists of {K, V} pairs using the binaryslot_reader/2 +%% function +%% +%% Reading slots is generally unfiltered, but in the sepcial case when +%% querting across slots when only matching segment IDs are required the +%% BlockIndexCache can be used +%% +%% Note that false positives will be passed through. It is important that +%% any key comparison between levels should allow for a non-matching key to +%% be considered as superior to a matching key - as otherwise a matching key +%% may be intermittently removed from the result set +read_slots(Handle, SlotList, {false, _BlockIndexCache}) -> + % No list of segments passed + LengthList = lists:map(fun pointer_mapfun/1, SlotList), + {MultiSlotBin, StartPos} = read_length_list(Handle, LengthList), + lists:map(binarysplit_mapfun(MultiSlotBin, StartPos), LengthList); +read_slots(Handle, SlotList, {SegList, BlockIndexCache}) -> + % List of segments passed so only {K, V} pairs matching those segments + % should be returned. This required the {K, V} pair to have been added + % with the appropriate hash - if the pair were added with no_lookup as + % the hash value this will fial unexpectedly. + BinMapFun = + fun(Pointer, Acc) -> + {SP, _L, ID, _SK, _EK} = pointer_mapfun(Pointer), + case array:get(ID - 1, BlockIndexCache) of + none -> + % If there is an attempt to use the seg list query and the + % index block cache isn't cached for any part this may be + % slower as each slot will be read in turn + LengthDetails = pointer_mapfun(Pointer), + {MultiSlotBin, StartPos} = + read_length_list(Handle, [LengthDetails]), + MapFun = binarysplit_mapfun(MultiSlotBin, StartPos), + Acc ++ [MapFun(LengthDetails)]; + <> -> + % If there is a BlockIndex cached then we cna use it to + % check to see if any of the expected segments are + % present without lifting the slot off disk. Also the + % fact that we know position can be used to filter out + % other keys + case find_pos(BlockIdx, SegList, [], 0) of + [] -> + Acc; + PL -> + Acc ++ check_blocks(PL, Handle, SP, BlockLengths, + false, []) + end + end + end, + lists:foldl(BinMapFun, [], SlotList). + + +-spec binaryslot_reader(list({tuple(), tuple()}|{binary(), tuple(), tuple()})) + -> list({tuple(), tuple()}). +%% @doc +%% Read the binary slots converting them to {K, V} pairs if they were not +%% already {K, V} pairs +binaryslot_reader(SlotBinsToFetch) -> + binaryslot_reader(SlotBinsToFetch, []). + +binaryslot_reader([], Acc) -> + Acc; +binaryslot_reader([{SlotBin, SK, EK}|Tail], Acc) -> + binaryslot_reader(Tail, Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)); +binaryslot_reader([{K, V}|Tail], Acc) -> + binaryslot_reader(Tail, Acc ++ [{K, V}]). + + +read_length_list(Handle, LengthList) -> StartPos = element(1, lists:nth(1, LengthList)), EndPos = element(1, lists:last(LengthList)) + element(2, lists:last(LengthList)), {ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos), + {MultiSlotBin, StartPos}. - BinSplitMapFun = - fun({SP, L, SK, EK}) -> - Start = SP - StartPos, - <<_Pre:Start/binary, - SlotBin:L/binary, - _Post/binary>> = MultiSlotBin, - {SlotBin, SK, EK} - end, - - lists:map(BinSplitMapFun, LengthList). binaryslot_get(FullBin, Key, Hash) -> @@ -1186,10 +1332,13 @@ block_offsetandlength(BlockLengths, BlockID) -> end. extra_hash({SegHash, _ExtraHash}) when is_integer(SegHash) -> - SegHash band 32767; + tune_hash(SegHash); extra_hash(NotHash) -> NotHash. +tune_hash(SegHash) -> + SegHash band 32767. + fetch_value([], _BlockLengths, _Blocks, _Key) -> not_present; fetch_value([Pos|Rest], BlockLengths, Blocks, Key) -> @@ -1226,6 +1375,14 @@ revert_position(Pos) -> find_pos(<<>>, _Hash, PosList, _Count) -> PosList; +find_pos(<<1:1/integer, PotentialHit:15/integer, T/binary>>, + HashList, PosList, Count) when is_list(HashList) -> + case lists:member(PotentialHit, HashList) of + true -> + find_pos(T, HashList, PosList ++ [Count], Count + 1); + false -> + find_pos(T, HashList, PosList, Count + 1) + end; find_pos(<<1:1/integer, Hash:15/integer, T/binary>>, Hash, PosList, Count) -> find_pos(T, Hash, PosList ++ [Count], Count + 1); find_pos(<<1:1/integer, _Miss:15/integer, T/binary>>, Hash, PosList, Count) -> @@ -1435,7 +1592,11 @@ maybe_expand_pointer(List) -> List. -expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) -> +expand_list_by_pointer(Pointer, Tail, Width) -> + expand_list_by_pointer(Pointer, Tail, Width, false). + +expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, + Tail, Width, SegList) -> FoldFun = fun(X, {Pointers, Remainder}) -> case length(Pointers) of @@ -1452,15 +1613,21 @@ expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) - end, InitAcc = {[{pointer, Slot, StartKey, EndKey}], []}, {AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail), - ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers), + ExpPointers = + leveled_sst:sst_getfilteredslots(SSTPid, AccPointers, SegList), lists:append(ExpPointers, AccTail); -expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Tail, Width) -> +expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, + Tail, Width, SegList) -> SSTPid = ManEntry#manifest_entry.owner, leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]), - ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width), + ExpPointer = + leveled_sst:sst_getfilteredrange(SSTPid, + StartKey, EndKey, + Width, SegList), ExpPointer ++ Tail. - +get_slotid(Slot) -> + Slot#slot_index_value.slot_id. %%%============================================================================ %%% Test @@ -1523,7 +1690,7 @@ form_slot_test() -> ?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1). merge_tombstonelist_test() -> - % Merge lists wiht nothing but tombstones + % Merge lists with nothing but tombstones SkippingKV1 = {{o, "B1", "K9995", null}, {9995, tomb, 1234567, {}}}, SkippingKV2 = {{o, "B1", "K9996", null}, {9996, tomb, 1234567, {}}}, SkippingKV3 = {{o, "B1", "K9997", null}, {9997, tomb, 1234567, {}}}, diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index ed3d1d8..71317bb 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -100,8 +100,7 @@ perbucket_aae(_Config) -> {foldheads_allkeys, ?RIAK_TAG, {get_segment_folder(DLs, TreeSize), []}, - false, - true}, + false, true}, SW_SL0 = os:timestamp(), {async, Book2SegFolder} = @@ -115,7 +114,27 @@ perbucket_aae(_Config) -> io:format("Segment lists found ~w ~w~n", [Book2SegList, Book3SegList]), Delta = lists:subtract(Book2SegList, Book3SegList), - true = length(Delta) == 1. + true = length(Delta) == 1, + + SuperHeadSegmentFolder = + {foldheads_allkeys, + ?RIAK_TAG, + {get_segment_folder(DLs, TreeSize), []}, + false, true, DLs}, + + SW_SL1 = os:timestamp(), + {async, Book2SegFolder1} = + leveled_bookie:book_returnfolder(Bookie2, SuperHeadSegmentFolder), + {async, Book3SegFolder1} = + leveled_bookie:book_returnfolder(Bookie3, SuperHeadSegmentFolder), + Book2SegList1 = Book2SegFolder1(), + Book3SegList1 = Book3SegFolder1(), + Time_SL1 = timer:now_diff(os:timestamp(), SW_SL1)/1000, + io:format("Two segment list folds took ~w milliseconds ~n", [Time_SL1]), + io:format("Segment lists found ~w ~w~n", [Book2SegList1, Book3SegList1]), + + Delta1 = lists:subtract(Book2SegList1, Book3SegList1), + true = length(Delta1) == 1. get_segment_folder(SegmentList, TreeSize) ->