diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 4692bfc..4b5581a 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -1519,10 +1519,11 @@ find_nextkey(QueryArray, LCnt, % The first key at this level is pointer to a file - need to query % the file to expand this level out before proceeding Pointer = {next, Owner, StartKey, EndKey}, - UpdList = leveled_sst:expand_list_by_pointer(Pointer, - RestOfKeys, - Width, - SegList), + UpdList = leveled_sst:sst_expandpointer(Pointer, + RestOfKeys, + Width, + SegList, + 0), NewEntry = {LCnt, UpdList}, % Need to loop around at this level (LCnt) as we have not yet % examined a real key at this level @@ -1535,10 +1536,11 @@ find_nextkey(QueryArray, LCnt, % The first key at this level is pointer within a file - need to % query the file to expand this level out before proceeding Pointer = {pointer, SSTPid, Slot, PSK, PEK}, - UpdList = leveled_sst:expand_list_by_pointer(Pointer, - RestOfKeys, - Width, - SegList), + UpdList = leveled_sst:sst_expandpointer(Pointer, + RestOfKeys, + Width, + SegList, + 0), NewEntry = {LCnt, UpdList}, % Need to loop around at this level (LCnt) as we have not yet % examined a real key at this level diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 2349ef7..e9d6bd1 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -114,10 +114,7 @@ sst_open/2, sst_get/2, sst_get/3, - sst_getkvrange/4, - sst_getfilteredrange/5, - sst_getslots/2, - sst_getfilteredslots/3, + sst_expandpointer/5, sst_getmaxsequencenumber/1, sst_setfordelete/2, sst_clear/1, @@ -125,7 +122,6 @@ sst_deleteconfirmed/1, sst_close/1]). --export([expand_list_by_pointer/4]). -record(slot_index_value, {slot_id :: integer(), @@ -144,10 +140,22 @@ :: all|leveled_codec:ledger_key(). -type slot_pointer() :: {pointer, pid(), integer(), range_endpoint(), range_endpoint()}. --type sst_pointer() +-type sst_pointer() + % Used in sst_new :: {next, leveled_pmanifest:manifest_entry(), - leveled_codec:ledger_key()|all}. + range_endpoint()}. +-type sst_closed_pointer() + % used in expand_list_by_pointer + % (close point is added by maybe_expand_pointer + :: {next, + leveled_pmanifest:manifest_entry(), + range_endpoint(), + range_endpoint()}. +-type expandable_pointer() + :: slot_pointer()|sst_closed_pointer(). +-type expanded_pointer() + :: leveled_codec:ledger_kv()|expandable_pointer(). -type binaryslot_element() :: {tuple(), tuple()}|{binary(), integer(), tuple(), tuple()}. @@ -341,91 +349,29 @@ sst_get(Pid, LedgerKey) -> sst_get(Pid, LedgerKey, Hash) -> gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity). - --spec sst_getkvrange(pid(), - range_endpoint(), - range_endpoint(), - integer()) - -> list(leveled_codec:ledger_kv()|slot_pointer()). -%% @doc -%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey -%% (inclusive). The ScanWidth is the maximum size of the range, a pointer -%% will be placed on the tail of the resulting list if results expand beyond -%% the Scan Width -sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> - sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false). - - --spec sst_getfilteredrange(pid(), - range_endpoint(), - range_endpoint(), - integer(), - leveled_codec:segment_list()) - -> list(leveled_codec:ledger_kv()|slot_pointer()). -%% @doc -%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey -%% (inclusive). The ScanWidth is the maximum size of the range, a pointer -%% will be placed on the tail of the resulting list if results expand beyond -%% the Scan Width -%% -%% To make the range open-ended (either to start, end or both) the all atom -%% can be used in place of the Key tuple. -%% -%% A segment list can also be passed, which inidcates a subset of segment -%% hashes of interest in the query. -%% -%% TODO: Optimise this so that passing a list of segments that tune to the -%% same hash is faster - perhaps provide an exportable function in -%% leveled_tictac -sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) -> - SegList0 = tune_seglist(SegList), - case gen_fsm:sync_send_event(Pid, - {get_kvrange, - StartKey, EndKey, - ScanWidth, SegList0}, - infinity) of - {yield, SlotsToFetchBinList, SlotsToPoint, PressMethod, IdxModDate} -> - {L, _BIC} = - binaryslot_reader(SlotsToFetchBinList, - PressMethod, IdxModDate, SegList0), - L ++ SlotsToPoint; - Reply -> - Reply - end. - --spec sst_getslots(pid(), list(slot_pointer())) - -> list(leveled_codec:ledger_kv()). -%% @doc -%% Get a list of slots by their ID. The slot will be converted from the binary -%% to term form outside of the FSM loop, this is to stop the copying of the -%% converted term to the calling process. -sst_getslots(Pid, SlotList) -> - sst_getfilteredslots(Pid, SlotList, false). - --spec sst_getfilteredslots(pid(), - list(slot_pointer()), - leveled_codec:segment_list()) - -> list(leveled_codec:ledger_kv()). -%% @doc -%% Get a list of slots by their ID. The slot will be converted from the binary -%% to term form outside of the FSM loop -%% -%% A list of 16-bit integer Segment IDs can be passed to filter the keys -%% returned (not precisely - with false results returned in addition). Use -%% false as a SegList to not filter -sst_getfilteredslots(Pid, SlotList, SegList) -> - SegL0 = tune_seglist(SegList), - {SlotBins, PressMethod, IdxModDate} = - gen_fsm:sync_send_event(Pid, {get_slots, SlotList, SegL0}, infinity), - {L, _BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0), - L. - -spec sst_getmaxsequencenumber(pid()) -> integer(). %% @doc %% Get the maximume sequence number for this SST file sst_getmaxsequencenumber(Pid) -> gen_fsm:sync_send_event(Pid, get_maxsequencenumber, infinity). +-spec sst_expandpointer(expandable_pointer(), + list(expandable_pointer()), + pos_integer(), + leveled_codec:segment_list(), + non_neg_integer()) + -> list(expanded_pointer()). +%% @doc +%% Expand out a list of pointer to return a list of Keys and Values with a +%% tail of pointers (once the ScanWidth has been satisfied). +%% Folding over keys in a store uses this function, although this function +%% does not directly call the gen_server - it does so by sst_getfilteredslots +%% or sst_getfilteredrange depending on the nature of the pointer. +sst_expandpointer(Pointer, MorePointers, ScanWidth, SegmentList, LowLastMod) -> + expand_list_by_pointer(Pointer, MorePointers, ScanWidth, + SegmentList, LowLastMod). + + -spec sst_setfordelete(pid(), pid()|false) -> ok. %% @doc %% If the SST is no longer in use in the active ledger it can be set for @@ -579,11 +525,13 @@ reader({get_kv, LedgerKey, Hash}, _From, State) -> {reply, Result, reader, UpdState#state{timings = UpdTimings0, timings_countdown = CountDown}}; -reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, _From, State) -> +reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod}, + _From, State) -> {SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey, EndKey, ScanWidth, SegList, + LowLastMod, State), PressMethod = State#state.compression_method, @@ -618,13 +566,13 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, _From, State) -> reader, State#state{blockindex_cache = BlockIdxC0}} end; -reader({get_slots, SlotList, SegList}, _From, State) -> +reader({get_slots, SlotList, SegList, LowLastMod}, _From, State) -> PressMethod = State#state.compression_method, IdxModDate = State#state.index_moddate, SlotBins = read_slots(State#state.handle, SlotList, - {SegList, State#state.blockindex_cache}, + {SegList, LowLastMod, State#state.blockindex_cache}, State#state.compression_method, State#state.index_moddate), {reply, {SlotBins, PressMethod, IdxModDate}, reader, State}; @@ -658,12 +606,13 @@ reader(close, _From, State) -> delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> {Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing), {reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT}; -delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, - _From, State) -> +delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod}, + _From, State) -> {SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey, EndKey, ScanWidth, SegList, + LowLastMod, State), % Always yield as about to clear and de-reference PressMethod = State#state.compression_method, @@ -673,13 +622,13 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, delete_pending, State, ?DELETE_TIMEOUT}; -delete_pending({get_slots, SlotList, SegList}, _From, State) -> +delete_pending({get_slots, SlotList, SegList, LowLastMod}, _From, State) -> PressMethod = State#state.compression_method, IdxModDate = State#state.index_moddate, SlotBins = read_slots(State#state.handle, SlotList, - {SegList, State#state.blockindex_cache}, + {SegList, LowLastMod, State#state.blockindex_cache}, PressMethod, IdxModDate), {reply, @@ -726,6 +675,157 @@ code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. +%%%============================================================================ +%%% External Functions +%%%============================================================================ + +-spec expand_list_by_pointer(expandable_pointer(), + list(expandable_pointer()), + pos_integer()) + -> list(expanded_pointer()). +%% @doc +%% Expand a list of pointers, maybe ending up with a list of keys and values +%% with a tail of pointers +%% By defauls will not have a segment filter, or a low last_modified_date, but +%% they can be used. Range checking a last modified date must still be made on +%% the output - at this stage the low last_modified_date has been used to bulk +%% skip those slots not containing any information over the low last modified +%% date +expand_list_by_pointer(Pointer, Tail, Width) -> + expand_list_by_pointer(Pointer, Tail, Width, false). + +%% TODO until leveled_penciller updated +expand_list_by_pointer(Pointer, Tail, Width, SegList) -> + expand_list_by_pointer(Pointer, Tail, Width, SegList, 0). + +-spec expand_list_by_pointer(expandable_pointer(), + list(expandable_pointer()), + pos_integer(), + leveled_codec:segment_list(), + non_neg_integer()) + -> list(expanded_pointer()). +%% @doc +%% With filters (as described in expand_list_by_pointer/3 +expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, + Tail, Width, SegList, LowLastMod) -> + FoldFun = + fun(X, {Pointers, Remainder}) -> + case length(Pointers) of + L when L < Width -> + case X of + {pointer, SSTPid, S, SK, EK} -> + {Pointers ++ [{pointer, S, SK, EK}], Remainder}; + _ -> + {Pointers, Remainder ++ [X]} + end; + _ -> + {Pointers, Remainder ++ [X]} + end + end, + InitAcc = {[{pointer, Slot, StartKey, EndKey}], []}, + {AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail), + ExpPointers = sst_getfilteredslots(SSTPid, + AccPointers, + SegList, + LowLastMod), + lists:append(ExpPointers, AccTail); +expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, + Tail, Width, SegList, LowLastMod) -> + SSTPid = ManEntry#manifest_entry.owner, + leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]), + ExpPointer = sst_getfilteredrange(SSTPid, + StartKey, + EndKey, + Width, + SegList, + LowLastMod), + ExpPointer ++ Tail. + + +-spec sst_getkvrange(pid(), + range_endpoint(), + range_endpoint(), + integer()) + -> list(leveled_codec:ledger_kv()|slot_pointer()). +%% @doc +%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey +%% (inclusive). The ScanWidth is the maximum size of the range, a pointer +%% will be placed on the tail of the resulting list if results expand beyond +%% the Scan Width +sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> + sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false, 0). + + +-spec sst_getfilteredrange(pid(), + range_endpoint(), + range_endpoint(), + integer(), + leveled_codec:segment_list(), + non_neg_integer()) + -> list(leveled_codec:ledger_kv()|slot_pointer()). +%% @doc +%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey +%% (inclusive). The ScanWidth is the maximum size of the range, a pointer +%% will be placed on the tail of the resulting list if results expand beyond +%% the Scan Width +%% +%% To make the range open-ended (either to start, end or both) the all atom +%% can be used in place of the Key tuple. +%% +%% A segment list can also be passed, which inidcates a subset of segment +%% hashes of interest in the query. +%% +%% TODO: Optimise this so that passing a list of segments that tune to the +%% same hash is faster - perhaps provide an exportable function in +%% leveled_tictac +sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList, LowLastMod) -> + SegList0 = tune_seglist(SegList), + case gen_fsm:sync_send_event(Pid, + {get_kvrange, + StartKey, EndKey, + ScanWidth, SegList0, LowLastMod}, + infinity) of + {yield, SlotsToFetchBinList, SlotsToPoint, PressMethod, IdxModDate} -> + {L, _BIC} = + binaryslot_reader(SlotsToFetchBinList, + PressMethod, IdxModDate, SegList0), + L ++ SlotsToPoint; + Reply -> + Reply + end. + +-spec sst_getslots(pid(), list(slot_pointer())) + -> list(leveled_codec:ledger_kv()). +%% @doc +%% Get a list of slots by their ID. The slot will be converted from the binary +%% to term form outside of the FSM loop, this is to stop the copying of the +%% converted term to the calling process. +sst_getslots(Pid, SlotList) -> + sst_getfilteredslots(Pid, SlotList, false, 0). + +-spec sst_getfilteredslots(pid(), + list(slot_pointer()), + leveled_codec:segment_list(), + non_neg_integer()) + -> list(leveled_codec:ledger_kv()). +%% @doc +%% Get a list of slots by their ID. The slot will be converted from the binary +%% to term form outside of the FSM loop +%% +%% A list of 16-bit integer Segment IDs can be passed to filter the keys +%% returned (not precisely - with false results returned in addition). Use +%% false as a SegList to not filter. +%% An integer can be provided which gives a floor for the LastModified Date +%% of the object, if the object is to be covered by the query +sst_getfilteredslots(Pid, SlotList, SegList, LowLastMod) -> + SegL0 = tune_seglist(SegList), + {SlotBins, PressMethod, IdxModDate} = + gen_fsm:sync_send_event(Pid, + {get_slots, SlotList, SegL0, LowLastMod}, + infinity), + {L, _BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0), + L. + %%%============================================================================ %%% Internal Functions %%%============================================================================ @@ -811,8 +911,9 @@ fetch(LedgerKey, Hash, State, Timings0) -> end. --spec fetch_range(tuple(), tuple(), integer(), leveled_codec:segment_list(), - sst_state()) -> {list(), list()}. +-spec fetch_range(tuple(), tuple(), integer(), + leveled_codec:segment_list(), non_neg_integer(), + sst_state()) -> {list(), list()}. %% @doc %% Fetch the contents of the SST file for a given key range. This will %% pre-fetch some results, and append pointers for additional results. @@ -820,7 +921,7 @@ fetch(LedgerKey, Hash, State, Timings0) -> %% A filter can be provided based on the Segment ID (usable for hashable %% objects not no_lookup entries) to accelerate the query if the 5-arity %% version is used -fetch_range(StartKey, EndKey, ScanWidth, SegList, State) -> +fetch_range(StartKey, EndKey, ScanWidth, SegList, LowLastMod, State) -> Summary = State#state.summary, Handle = State#state.handle, {Slots, RTrim} = lookup_slots(StartKey, EndKey, Summary#summary.index), @@ -874,7 +975,7 @@ fetch_range(StartKey, EndKey, ScanWidth, SegList, State) -> SlotsToFetchBinList = read_slots(Handle, SlotsToFetch, - {SegList, State#state.blockindex_cache}, + {SegList, LowLastMod, State#state.blockindex_cache}, State#state.compression_method, State#state.index_moddate), {SlotsToFetchBinList, SlotsToPoint}. @@ -1441,8 +1542,8 @@ binarysplit_mapfun(MultiSlotBin, StartPos) -> -spec read_slots(file:io_device(), list(), - {false|list(), any()}, press_method(), boolean()) - -> list(binaryslot_element()). + {false|list(), non_neg_integer(), binary()}, + press_method(), boolean()) -> list(binaryslot_element()). %% @doc %% The reading of sots will return a list of either 2-tuples containing %% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples @@ -1457,13 +1558,13 @@ binarysplit_mapfun(MultiSlotBin, StartPos) -> %% any key comparison between levels should allow for a non-matching key to %% be considered as superior to a matching key - as otherwise a matching key %% may be intermittently removed from the result set -read_slots(Handle, SlotList, {false, _BlockIndexCache}, +read_slots(Handle, SlotList, {false, 0, _BlockIndexCache}, _PressMethod, _IdxModDate) -> - % No list of segments passed + % No list of segments passed or usefult Low LastModified Date LengthList = lists:map(fun pointer_mapfun/1, SlotList), {MultiSlotBin, StartPos} = read_length_list(Handle, LengthList), lists:map(binarysplit_mapfun(MultiSlotBin, StartPos), LengthList); -read_slots(Handle, SlotList, {SegList, BlockIndexCache}, +read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache}, PressMethod, IdxModDate) -> % List of segments passed so only {K, V} pairs matching those segments % should be returned. This required the {K, V} pair to have been added @@ -1483,23 +1584,30 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, read_length_list(Handle, [LengthDetails]), MapFun = binarysplit_mapfun(MultiSlotBin, StartPos), Acc ++ [MapFun(LengthDetails)]; - {BlockLengths, _LMD, BlockIdx} -> + {BlockLengths, LMD, BlockIdx} -> % If there is a BlockIndex cached then we can use it to % check to see if any of the expected segments are % present without lifting the slot off disk. Also the % fact that we know position can be used to filter out % other keys - case find_pos(BlockIdx, SegList, [], 0) of - [] -> + % + % Note that LMD will be 0 if the indexing of last mod + % date was not enable at creation time. So in this + % case the filter should always map + case LMD >= LowLastMod of + false -> Acc; - PositionList -> + true -> + PositionList = find_pos(BlockIdx, SegList, [], 0), Acc ++ - check_blocks(PositionList, + check_blocks(PositionList, {Handle, SP}, BlockLengths, byte_size(BlockIdx), false, PressMethod, IdxModDate, []) + % Note check_blocks shouldreturn [] if + % PositionList is empty end end end, @@ -2094,41 +2202,6 @@ maybe_expand_pointer(List) -> List. -expand_list_by_pointer(Pointer, Tail, Width) -> - expand_list_by_pointer(Pointer, Tail, Width, false). - -expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, - Tail, Width, SegList) -> - FoldFun = - fun(X, {Pointers, Remainder}) -> - case length(Pointers) of - L when L < Width -> - case X of - {pointer, SSTPid, S, SK, EK} -> - {Pointers ++ [{pointer, S, SK, EK}], Remainder}; - _ -> - {Pointers, Remainder ++ [X]} - end; - _ -> - {Pointers, Remainder ++ [X]} - end - end, - InitAcc = {[{pointer, Slot, StartKey, EndKey}], []}, - {AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail), - ExpPointers = - leveled_sst:sst_getfilteredslots(SSTPid, AccPointers, SegList), - lists:append(ExpPointers, AccTail); -expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, - Tail, Width, SegList) -> - SSTPid = ManEntry#manifest_entry.owner, - leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]), - ExpPointer = - leveled_sst:sst_getfilteredrange(SSTPid, - StartKey, EndKey, - Width, SegList), - ExpPointer ++ Tail. - - %%%============================================================================ %%% Timing Functions