Fold API to leveled_sst

Externally to leveled_sst all folds are actually managed through exapnd_list_by_pointer.

Make the API a bit clearer in this regards, and add specs to help dialyzer.

This also adds LowLastMod to the API for expanding pointers (although the leveled_penciller just defaults this to 0 for everything.
This commit is contained in:
Martin Sumner 2018-10-30 16:44:00 +00:00
parent bdd1762130
commit b7e697f7f0
2 changed files with 225 additions and 150 deletions

View file

@ -1519,10 +1519,11 @@ find_nextkey(QueryArray, LCnt,
% The first key at this level is pointer to a file - need to query
% the file to expand this level out before proceeding
Pointer = {next, Owner, StartKey, EndKey},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
Width,
SegList),
UpdList = leveled_sst:sst_expandpointer(Pointer,
RestOfKeys,
Width,
SegList,
0),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level
@ -1535,10 +1536,11 @@ find_nextkey(QueryArray, LCnt,
% The first key at this level is pointer within a file - need to
% query the file to expand this level out before proceeding
Pointer = {pointer, SSTPid, Slot, PSK, PEK},
UpdList = leveled_sst:expand_list_by_pointer(Pointer,
RestOfKeys,
Width,
SegList),
UpdList = leveled_sst:sst_expandpointer(Pointer,
RestOfKeys,
Width,
SegList,
0),
NewEntry = {LCnt, UpdList},
% Need to loop around at this level (LCnt) as we have not yet
% examined a real key at this level

View file

@ -114,10 +114,7 @@
sst_open/2,
sst_get/2,
sst_get/3,
sst_getkvrange/4,
sst_getfilteredrange/5,
sst_getslots/2,
sst_getfilteredslots/3,
sst_expandpointer/5,
sst_getmaxsequencenumber/1,
sst_setfordelete/2,
sst_clear/1,
@ -125,7 +122,6 @@
sst_deleteconfirmed/1,
sst_close/1]).
-export([expand_list_by_pointer/4]).
-record(slot_index_value, {slot_id :: integer(),
@ -145,9 +141,21 @@
-type slot_pointer()
:: {pointer, pid(), integer(), range_endpoint(), range_endpoint()}.
-type sst_pointer()
% Used in sst_new
:: {next,
leveled_pmanifest:manifest_entry(),
leveled_codec:ledger_key()|all}.
range_endpoint()}.
-type sst_closed_pointer()
% used in expand_list_by_pointer
% (close point is added by maybe_expand_pointer
:: {next,
leveled_pmanifest:manifest_entry(),
range_endpoint(),
range_endpoint()}.
-type expandable_pointer()
:: slot_pointer()|sst_closed_pointer().
-type expanded_pointer()
:: leveled_codec:ledger_kv()|expandable_pointer().
-type binaryslot_element()
:: {tuple(), tuple()}|{binary(), integer(), tuple(), tuple()}.
@ -341,91 +349,29 @@ sst_get(Pid, LedgerKey) ->
sst_get(Pid, LedgerKey, Hash) ->
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
-spec sst_getkvrange(pid(),
range_endpoint(),
range_endpoint(),
integer())
-> list(leveled_codec:ledger_kv()|slot_pointer()).
%% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer
%% will be placed on the tail of the resulting list if results expand beyond
%% the Scan Width
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false).
-spec sst_getfilteredrange(pid(),
range_endpoint(),
range_endpoint(),
integer(),
leveled_codec:segment_list())
-> list(leveled_codec:ledger_kv()|slot_pointer()).
%% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer
%% will be placed on the tail of the resulting list if results expand beyond
%% the Scan Width
%%
%% To make the range open-ended (either to start, end or both) the all atom
%% can be used in place of the Key tuple.
%%
%% A segment list can also be passed, which inidcates a subset of segment
%% hashes of interest in the query.
%%
%% TODO: Optimise this so that passing a list of segments that tune to the
%% same hash is faster - perhaps provide an exportable function in
%% leveled_tictac
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) ->
SegList0 = tune_seglist(SegList),
case gen_fsm:sync_send_event(Pid,
{get_kvrange,
StartKey, EndKey,
ScanWidth, SegList0},
infinity) of
{yield, SlotsToFetchBinList, SlotsToPoint, PressMethod, IdxModDate} ->
{L, _BIC} =
binaryslot_reader(SlotsToFetchBinList,
PressMethod, IdxModDate, SegList0),
L ++ SlotsToPoint;
Reply ->
Reply
end.
-spec sst_getslots(pid(), list(slot_pointer()))
-> list(leveled_codec:ledger_kv()).
%% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop, this is to stop the copying of the
%% converted term to the calling process.
sst_getslots(Pid, SlotList) ->
sst_getfilteredslots(Pid, SlotList, false).
-spec sst_getfilteredslots(pid(),
list(slot_pointer()),
leveled_codec:segment_list())
-> list(leveled_codec:ledger_kv()).
%% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop
%%
%% A list of 16-bit integer Segment IDs can be passed to filter the keys
%% returned (not precisely - with false results returned in addition). Use
%% false as a SegList to not filter
sst_getfilteredslots(Pid, SlotList, SegList) ->
SegL0 = tune_seglist(SegList),
{SlotBins, PressMethod, IdxModDate} =
gen_fsm:sync_send_event(Pid, {get_slots, SlotList, SegL0}, infinity),
{L, _BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0),
L.
-spec sst_getmaxsequencenumber(pid()) -> integer().
%% @doc
%% Get the maximume sequence number for this SST file
sst_getmaxsequencenumber(Pid) ->
gen_fsm:sync_send_event(Pid, get_maxsequencenumber, infinity).
-spec sst_expandpointer(expandable_pointer(),
list(expandable_pointer()),
pos_integer(),
leveled_codec:segment_list(),
non_neg_integer())
-> list(expanded_pointer()).
%% @doc
%% Expand out a list of pointer to return a list of Keys and Values with a
%% tail of pointers (once the ScanWidth has been satisfied).
%% Folding over keys in a store uses this function, although this function
%% does not directly call the gen_server - it does so by sst_getfilteredslots
%% or sst_getfilteredrange depending on the nature of the pointer.
sst_expandpointer(Pointer, MorePointers, ScanWidth, SegmentList, LowLastMod) ->
expand_list_by_pointer(Pointer, MorePointers, ScanWidth,
SegmentList, LowLastMod).
-spec sst_setfordelete(pid(), pid()|false) -> ok.
%% @doc
%% If the SST is no longer in use in the active ledger it can be set for
@ -579,11 +525,13 @@ reader({get_kv, LedgerKey, Hash}, _From, State) ->
{reply, Result, reader, UpdState#state{timings = UpdTimings0,
timings_countdown = CountDown}};
reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, _From, State) ->
reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
_From, State) ->
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
EndKey,
ScanWidth,
SegList,
LowLastMod,
State),
PressMethod = State#state.compression_method,
@ -618,13 +566,13 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, _From, State) ->
reader,
State#state{blockindex_cache = BlockIdxC0}}
end;
reader({get_slots, SlotList, SegList}, _From, State) ->
reader({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
PressMethod = State#state.compression_method,
IdxModDate = State#state.index_moddate,
SlotBins =
read_slots(State#state.handle,
SlotList,
{SegList, State#state.blockindex_cache},
{SegList, LowLastMod, State#state.blockindex_cache},
State#state.compression_method,
State#state.index_moddate),
{reply, {SlotBins, PressMethod, IdxModDate}, reader, State};
@ -658,12 +606,13 @@ reader(close, _From, State) ->
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList},
_From, State) ->
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
_From, State) ->
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
EndKey,
ScanWidth,
SegList,
LowLastMod,
State),
% Always yield as about to clear and de-reference
PressMethod = State#state.compression_method,
@ -673,13 +622,13 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList},
delete_pending,
State,
?DELETE_TIMEOUT};
delete_pending({get_slots, SlotList, SegList}, _From, State) ->
delete_pending({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
PressMethod = State#state.compression_method,
IdxModDate = State#state.index_moddate,
SlotBins =
read_slots(State#state.handle,
SlotList,
{SegList, State#state.blockindex_cache},
{SegList, LowLastMod, State#state.blockindex_cache},
PressMethod,
IdxModDate),
{reply,
@ -726,6 +675,157 @@ code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
%%%============================================================================
%%% External Functions
%%%============================================================================
-spec expand_list_by_pointer(expandable_pointer(),
list(expandable_pointer()),
pos_integer())
-> list(expanded_pointer()).
%% @doc
%% Expand a list of pointers, maybe ending up with a list of keys and values
%% with a tail of pointers
%% By defauls will not have a segment filter, or a low last_modified_date, but
%% they can be used. Range checking a last modified date must still be made on
%% the output - at this stage the low last_modified_date has been used to bulk
%% skip those slots not containing any information over the low last modified
%% date
expand_list_by_pointer(Pointer, Tail, Width) ->
expand_list_by_pointer(Pointer, Tail, Width, false).
%% TODO until leveled_penciller updated
expand_list_by_pointer(Pointer, Tail, Width, SegList) ->
expand_list_by_pointer(Pointer, Tail, Width, SegList, 0).
-spec expand_list_by_pointer(expandable_pointer(),
list(expandable_pointer()),
pos_integer(),
leveled_codec:segment_list(),
non_neg_integer())
-> list(expanded_pointer()).
%% @doc
%% With filters (as described in expand_list_by_pointer/3
expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey},
Tail, Width, SegList, LowLastMod) ->
FoldFun =
fun(X, {Pointers, Remainder}) ->
case length(Pointers) of
L when L < Width ->
case X of
{pointer, SSTPid, S, SK, EK} ->
{Pointers ++ [{pointer, S, SK, EK}], Remainder};
_ ->
{Pointers, Remainder ++ [X]}
end;
_ ->
{Pointers, Remainder ++ [X]}
end
end,
InitAcc = {[{pointer, Slot, StartKey, EndKey}], []},
{AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail),
ExpPointers = sst_getfilteredslots(SSTPid,
AccPointers,
SegList,
LowLastMod),
lists:append(ExpPointers, AccTail);
expand_list_by_pointer({next, ManEntry, StartKey, EndKey},
Tail, Width, SegList, LowLastMod) ->
SSTPid = ManEntry#manifest_entry.owner,
leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]),
ExpPointer = sst_getfilteredrange(SSTPid,
StartKey,
EndKey,
Width,
SegList,
LowLastMod),
ExpPointer ++ Tail.
-spec sst_getkvrange(pid(),
range_endpoint(),
range_endpoint(),
integer())
-> list(leveled_codec:ledger_kv()|slot_pointer()).
%% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer
%% will be placed on the tail of the resulting list if results expand beyond
%% the Scan Width
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, false, 0).
-spec sst_getfilteredrange(pid(),
range_endpoint(),
range_endpoint(),
integer(),
leveled_codec:segment_list(),
non_neg_integer())
-> list(leveled_codec:ledger_kv()|slot_pointer()).
%% @doc
%% Get a range of {Key, Value} pairs as a list between StartKey and EndKey
%% (inclusive). The ScanWidth is the maximum size of the range, a pointer
%% will be placed on the tail of the resulting list if results expand beyond
%% the Scan Width
%%
%% To make the range open-ended (either to start, end or both) the all atom
%% can be used in place of the Key tuple.
%%
%% A segment list can also be passed, which inidcates a subset of segment
%% hashes of interest in the query.
%%
%% TODO: Optimise this so that passing a list of segments that tune to the
%% same hash is faster - perhaps provide an exportable function in
%% leveled_tictac
sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList, LowLastMod) ->
SegList0 = tune_seglist(SegList),
case gen_fsm:sync_send_event(Pid,
{get_kvrange,
StartKey, EndKey,
ScanWidth, SegList0, LowLastMod},
infinity) of
{yield, SlotsToFetchBinList, SlotsToPoint, PressMethod, IdxModDate} ->
{L, _BIC} =
binaryslot_reader(SlotsToFetchBinList,
PressMethod, IdxModDate, SegList0),
L ++ SlotsToPoint;
Reply ->
Reply
end.
-spec sst_getslots(pid(), list(slot_pointer()))
-> list(leveled_codec:ledger_kv()).
%% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop, this is to stop the copying of the
%% converted term to the calling process.
sst_getslots(Pid, SlotList) ->
sst_getfilteredslots(Pid, SlotList, false, 0).
-spec sst_getfilteredslots(pid(),
list(slot_pointer()),
leveled_codec:segment_list(),
non_neg_integer())
-> list(leveled_codec:ledger_kv()).
%% @doc
%% Get a list of slots by their ID. The slot will be converted from the binary
%% to term form outside of the FSM loop
%%
%% A list of 16-bit integer Segment IDs can be passed to filter the keys
%% returned (not precisely - with false results returned in addition). Use
%% false as a SegList to not filter.
%% An integer can be provided which gives a floor for the LastModified Date
%% of the object, if the object is to be covered by the query
sst_getfilteredslots(Pid, SlotList, SegList, LowLastMod) ->
SegL0 = tune_seglist(SegList),
{SlotBins, PressMethod, IdxModDate} =
gen_fsm:sync_send_event(Pid,
{get_slots, SlotList, SegL0, LowLastMod},
infinity),
{L, _BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0),
L.
%%%============================================================================
%%% Internal Functions
%%%============================================================================
@ -811,8 +911,9 @@ fetch(LedgerKey, Hash, State, Timings0) ->
end.
-spec fetch_range(tuple(), tuple(), integer(), leveled_codec:segment_list(),
sst_state()) -> {list(), list()}.
-spec fetch_range(tuple(), tuple(), integer(),
leveled_codec:segment_list(), non_neg_integer(),
sst_state()) -> {list(), list()}.
%% @doc
%% Fetch the contents of the SST file for a given key range. This will
%% pre-fetch some results, and append pointers for additional results.
@ -820,7 +921,7 @@ fetch(LedgerKey, Hash, State, Timings0) ->
%% A filter can be provided based on the Segment ID (usable for hashable
%% objects not no_lookup entries) to accelerate the query if the 5-arity
%% version is used
fetch_range(StartKey, EndKey, ScanWidth, SegList, State) ->
fetch_range(StartKey, EndKey, ScanWidth, SegList, LowLastMod, State) ->
Summary = State#state.summary,
Handle = State#state.handle,
{Slots, RTrim} = lookup_slots(StartKey, EndKey, Summary#summary.index),
@ -874,7 +975,7 @@ fetch_range(StartKey, EndKey, ScanWidth, SegList, State) ->
SlotsToFetchBinList =
read_slots(Handle,
SlotsToFetch,
{SegList, State#state.blockindex_cache},
{SegList, LowLastMod, State#state.blockindex_cache},
State#state.compression_method,
State#state.index_moddate),
{SlotsToFetchBinList, SlotsToPoint}.
@ -1441,8 +1542,8 @@ binarysplit_mapfun(MultiSlotBin, StartPos) ->
-spec read_slots(file:io_device(), list(),
{false|list(), any()}, press_method(), boolean())
-> list(binaryslot_element()).
{false|list(), non_neg_integer(), binary()},
press_method(), boolean()) -> list(binaryslot_element()).
%% @doc
%% The reading of sots will return a list of either 2-tuples containing
%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples
@ -1457,13 +1558,13 @@ binarysplit_mapfun(MultiSlotBin, StartPos) ->
%% any key comparison between levels should allow for a non-matching key to
%% be considered as superior to a matching key - as otherwise a matching key
%% may be intermittently removed from the result set
read_slots(Handle, SlotList, {false, _BlockIndexCache},
read_slots(Handle, SlotList, {false, 0, _BlockIndexCache},
_PressMethod, _IdxModDate) ->
% No list of segments passed
% No list of segments passed or usefult Low LastModified Date
LengthList = lists:map(fun pointer_mapfun/1, SlotList),
{MultiSlotBin, StartPos} = read_length_list(Handle, LengthList),
lists:map(binarysplit_mapfun(MultiSlotBin, StartPos), LengthList);
read_slots(Handle, SlotList, {SegList, BlockIndexCache},
read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
PressMethod, IdxModDate) ->
% List of segments passed so only {K, V} pairs matching those segments
% should be returned. This required the {K, V} pair to have been added
@ -1483,16 +1584,21 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache},
read_length_list(Handle, [LengthDetails]),
MapFun = binarysplit_mapfun(MultiSlotBin, StartPos),
Acc ++ [MapFun(LengthDetails)];
{BlockLengths, _LMD, BlockIdx} ->
{BlockLengths, LMD, BlockIdx} ->
% If there is a BlockIndex cached then we can use it to
% check to see if any of the expected segments are
% present without lifting the slot off disk. Also the
% fact that we know position can be used to filter out
% other keys
case find_pos(BlockIdx, SegList, [], 0) of
[] ->
%
% Note that LMD will be 0 if the indexing of last mod
% date was not enable at creation time. So in this
% case the filter should always map
case LMD >= LowLastMod of
false ->
Acc;
PositionList ->
true ->
PositionList = find_pos(BlockIdx, SegList, [], 0),
Acc ++
check_blocks(PositionList,
{Handle, SP},
@ -1500,6 +1606,8 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache},
byte_size(BlockIdx),
false, PressMethod, IdxModDate,
[])
% Note check_blocks shouldreturn [] if
% PositionList is empty
end
end
end,
@ -2094,41 +2202,6 @@ maybe_expand_pointer(List) ->
List.
expand_list_by_pointer(Pointer, Tail, Width) ->
expand_list_by_pointer(Pointer, Tail, Width, false).
expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey},
Tail, Width, SegList) ->
FoldFun =
fun(X, {Pointers, Remainder}) ->
case length(Pointers) of
L when L < Width ->
case X of
{pointer, SSTPid, S, SK, EK} ->
{Pointers ++ [{pointer, S, SK, EK}], Remainder};
_ ->
{Pointers, Remainder ++ [X]}
end;
_ ->
{Pointers, Remainder ++ [X]}
end
end,
InitAcc = {[{pointer, Slot, StartKey, EndKey}], []},
{AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail),
ExpPointers =
leveled_sst:sst_getfilteredslots(SSTPid, AccPointers, SegList),
lists:append(ExpPointers, AccTail);
expand_list_by_pointer({next, ManEntry, StartKey, EndKey},
Tail, Width, SegList) ->
SSTPid = ManEntry#manifest_entry.owner,
leveled_log:log("SST10", [SSTPid, is_process_alive(SSTPid)]),
ExpPointer =
leveled_sst:sst_getfilteredrange(SSTPid,
StartKey, EndKey,
Width, SegList),
ExpPointer ++ Tail.
%%%============================================================================
%%% Timing Functions