Read (and ignore) last modified date

Add presence of LMD into index - and check everything happily lets it pass by
This commit is contained in:
Martin Sumner 2018-10-30 11:47:03 +00:00
parent 467c2fb89c
commit 7295a41321

View file

@ -88,7 +88,9 @@
-define(TIMING_SAMPLESIZE, 100). -define(TIMING_SAMPLESIZE, 100).
-define(CACHE_SIZE, 32). -define(CACHE_SIZE, 32).
-define(BLOCK_LENGTHS_LENGTH, 20). -define(BLOCK_LENGTHS_LENGTH, 20).
-define(LMD_LENGTH, 6).
-define(FLIPPER32, 4294967295). -define(FLIPPER32, 4294967295).
-define(FLIPPER48, 281474976710655).
-define(COMPRESS_AT_LEVEL, 1). -define(COMPRESS_AT_LEVEL, 1).
-define(INDEX_MODDATE, true). -define(INDEX_MODDATE, true).
@ -382,9 +384,10 @@ sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) ->
StartKey, EndKey, StartKey, EndKey,
ScanWidth, SegList0}, ScanWidth, SegList0},
infinity) of infinity) of
{yield, SlotsToFetchBinList, SlotsToPoint, PressMethod} -> {yield, SlotsToFetchBinList, SlotsToPoint, PressMethod, IdxModDate} ->
{L, _BIC} = {L, _BIC} =
binaryslot_reader(SlotsToFetchBinList, PressMethod, SegList0), binaryslot_reader(SlotsToFetchBinList,
PressMethod, IdxModDate, SegList0),
L ++ SlotsToPoint; L ++ SlotsToPoint;
Reply -> Reply ->
Reply Reply
@ -412,9 +415,9 @@ sst_getslots(Pid, SlotList) ->
%% false as a SegList to not filter %% false as a SegList to not filter
sst_getfilteredslots(Pid, SlotList, SegList) -> sst_getfilteredslots(Pid, SlotList, SegList) ->
SegL0 = tune_seglist(SegList), SegL0 = tune_seglist(SegList),
{SlotBins, PressMethod} = {SlotBins, PressMethod, IdxModDate} =
gen_fsm:sync_send_event(Pid, {get_slots, SlotList, SegL0}, infinity), gen_fsm:sync_send_event(Pid, {get_slots, SlotList, SegL0}, infinity),
{L, _BIC} = binaryslot_reader(SlotBins, PressMethod, SegL0), {L, _BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0),
L. L.
-spec sst_getmaxsequencenumber(pid()) -> integer(). -spec sst_getmaxsequencenumber(pid()) -> integer().
@ -584,6 +587,7 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, _From, State) ->
State), State),
PressMethod = State#state.compression_method, PressMethod = State#state.compression_method,
IdxModDate = State#state.index_moddate,
case State#state.yield_blockquery of case State#state.yield_blockquery of
true -> true ->
@ -591,12 +595,14 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, _From, State) ->
{yield, {yield,
SlotsToFetchBinList, SlotsToFetchBinList,
SlotsToPoint, SlotsToPoint,
PressMethod}, PressMethod,
IdxModDate},
reader, reader,
State}; State};
false -> false ->
{L, BIC} = {L, BIC} =
binaryslot_reader(SlotsToFetchBinList, PressMethod, SegList), binaryslot_reader(SlotsToFetchBinList,
PressMethod, IdxModDate, SegList),
FoldFun = FoldFun =
fun(CacheEntry, Cache) -> fun(CacheEntry, Cache) ->
case CacheEntry of case CacheEntry of
@ -613,12 +619,15 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList}, _From, State) ->
State#state{blockindex_cache = BlockIdxC0}} State#state{blockindex_cache = BlockIdxC0}}
end; end;
reader({get_slots, SlotList, SegList}, _From, State) -> reader({get_slots, SlotList, SegList}, _From, State) ->
PressMethod = State#state.compression_method,
IdxModDate = State#state.index_moddate,
SlotBins = SlotBins =
read_slots(State#state.handle, read_slots(State#state.handle,
SlotList, SlotList,
{SegList, State#state.blockindex_cache}, {SegList, State#state.blockindex_cache},
State#state.compression_method), State#state.compression_method,
{reply, {SlotBins, State#state.compression_method}, reader, State}; State#state.index_moddate),
{reply, {SlotBins, PressMethod, IdxModDate}, reader, State};
reader(get_maxsequencenumber, _From, State) -> reader(get_maxsequencenumber, _From, State) ->
Summary = State#state.summary, Summary = State#state.summary,
{reply, Summary#summary.max_sqn, reader, State}; {reply, Summary#summary.max_sqn, reader, State};
@ -664,13 +673,16 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList},
State, State,
?DELETE_TIMEOUT}; ?DELETE_TIMEOUT};
delete_pending({get_slots, SlotList, SegList}, _From, State) -> delete_pending({get_slots, SlotList, SegList}, _From, State) ->
PressMethod = State#state.compression_method,
IdxModDate = State#state.index_moddate,
SlotBins = SlotBins =
read_slots(State#state.handle, read_slots(State#state.handle,
SlotList, SlotList,
{SegList, State#state.blockindex_cache}, {SegList, State#state.blockindex_cache},
State#state.compression_method), PressMethod,
IdxModDate),
{reply, {reply,
{SlotBins, State#state.compression_method}, {SlotBins, PressMethod, IdxModDate},
delete_pending, delete_pending,
State, State,
?DELETE_TIMEOUT}; ?DELETE_TIMEOUT};
@ -730,6 +742,7 @@ fetch(LedgerKey, Hash, State, Timings0) ->
Summary = State#state.summary, Summary = State#state.summary,
PressMethod = State#state.compression_method, PressMethod = State#state.compression_method,
IdxModDate = State#state.index_moddate,
Slot = lookup_slot(LedgerKey, Summary#summary.index), Slot = lookup_slot(LedgerKey, Summary#summary.index),
{SW1, Timings1} = update_timings(SW0, Timings0, index_query, true), {SW1, Timings1} = update_timings(SW0, Timings0, index_query, true),
@ -738,13 +751,12 @@ fetch(LedgerKey, Hash, State, Timings0) ->
CachedBlockIdx = CachedBlockIdx =
array:get(SlotID - 1, State#state.blockindex_cache), array:get(SlotID - 1, State#state.blockindex_cache),
{SW2, Timings2} = update_timings(SW1, Timings1, lookup_cache, true), {SW2, Timings2} = update_timings(SW1, Timings1, lookup_cache, true),
BL = ?BLOCK_LENGTHS_LENGTH,
case CachedBlockIdx of case extract_header(CachedBlockIdx, IdxModDate) of
none -> none ->
SlotBin = read_slot(State#state.handle, Slot), SlotBin = read_slot(State#state.handle, Slot),
{Result, Header} = {Result, Header} =
binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod), binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate),
BlockIndexCache = BlockIndexCache =
array:set(SlotID - 1, Header, State#state.blockindex_cache), array:set(SlotID - 1, Header, State#state.blockindex_cache),
{_SW3, Timings3} = {_SW3, Timings3} =
@ -752,7 +764,7 @@ fetch(LedgerKey, Hash, State, Timings0) ->
{Result, {Result,
State#state{blockindex_cache = BlockIndexCache}, State#state{blockindex_cache = BlockIndexCache},
Timings3}; Timings3};
<<BlockLengths:BL/binary, PosBin/binary>> -> {BlockLengths, _LMD, PosBin} ->
PosList = find_pos(PosBin, extra_hash(Hash), [], 0), PosList = find_pos(PosBin, extra_hash(Hash), [], 0),
case PosList of case PosList of
[] -> [] ->
@ -781,6 +793,7 @@ fetch(LedgerKey, Hash, State, Timings0) ->
byte_size(PosBin), byte_size(PosBin),
LedgerKey, LedgerKey,
PressMethod, PressMethod,
IdxModDate,
not_present), not_present),
FetchCache0 = FetchCache0 =
array:set(CacheHash, Result, FetchCache), array:set(CacheHash, Result, FetchCache),
@ -861,7 +874,8 @@ fetch_range(StartKey, EndKey, ScanWidth, SegList, State) ->
read_slots(Handle, read_slots(Handle,
SlotsToFetch, SlotsToFetch,
{SegList, State#state.blockindex_cache}, {SegList, State#state.blockindex_cache},
State#state.compression_method), State#state.compression_method,
State#state.index_moddate),
{SlotsToFetchBinList, SlotsToPoint}. {SlotsToFetchBinList, SlotsToPoint}.
-spec compress_level(integer(), press_method()) -> press_method(). -spec compress_level(integer(), press_method()) -> press_method().
@ -1194,10 +1208,8 @@ accumulate_positions({K, V}, {PosBinAcc, NoHashCount, HashAcc, LMDAcc}) ->
%% @doc %% @doc
%% Get the last modified date. If no Last Modified Date on any object, can't %% Get the last modified date. If no Last Modified Date on any object, can't
%% add the accelerator and should check each object in turn %% add the accelerator and should check each object in turn
take_max_lastmoddate(_LMD, undefined) ->
undefined;
take_max_lastmoddate(undefined, _LMDAcc) -> take_max_lastmoddate(undefined, _LMDAcc) ->
undefined; ?FLIPPER48;
take_max_lastmoddate(LMD, LMDAcc) -> take_max_lastmoddate(LMD, LMDAcc) ->
max(LMD, LMDAcc). max(LMD, LMDAcc).
@ -1218,11 +1230,11 @@ generate_binary_slot(Lookup, KVL, PressMethod, _IndexModDate, BuildTimings0) ->
SW0 = os:timestamp(), SW0 = os:timestamp(),
{HashL, PosBinIndex} = {HashL, PosBinIndex, LMD} =
case Lookup of case Lookup of
lookup -> lookup ->
InitAcc = {<<>>, 0, [], 0}, InitAcc = {<<>>, 0, [], 0},
{PosBinIndex0, NHC, HashL0, _LMD} = {PosBinIndex0, NHC, HashL0, LMD0} =
lists:foldr(fun accumulate_positions/2, InitAcc, KVL), lists:foldr(fun accumulate_positions/2, InitAcc, KVL),
PosBinIndex1 = PosBinIndex1 =
case NHC of case NHC of
@ -1232,9 +1244,9 @@ generate_binary_slot(Lookup, KVL, PressMethod, _IndexModDate, BuildTimings0) ->
N = NHC - 1, N = NHC - 1,
<<0:1/integer, N:7/integer, PosBinIndex0/binary>> <<0:1/integer, N:7/integer, PosBinIndex0/binary>>
end, end,
{HashL0, PosBinIndex1}; {HashL0, PosBinIndex1, LMD0};
no_lookup -> no_lookup ->
{[], <<0:1/integer, 127:7/integer>>} {[], <<0:1/integer, 127:7/integer>>, 0}
end, end,
BuildTimings1 = update_buildtimings(SW0, BuildTimings0, slot_hashlist), BuildTimings1 = update_buildtimings(SW0, BuildTimings0, slot_hashlist),
@ -1295,7 +1307,7 @@ generate_binary_slot(Lookup, KVL, PressMethod, _IndexModDate, BuildTimings0) ->
BuildTimings2 = update_buildtimings(SW1, BuildTimings1, slot_serialise), BuildTimings2 = update_buildtimings(SW1, BuildTimings1, slot_serialise),
SW2 = os:timestamp(), SW2 = os:timestamp(),
B1P = byte_size(PosBinIndex) + ?BLOCK_LENGTHS_LENGTH, B1P = byte_size(PosBinIndex) + ?BLOCK_LENGTHS_LENGTH + ?LMD_LENGTH,
CheckB1P = hmac(B1P), CheckB1P = hmac(B1P),
B1L = byte_size(B1), B1L = byte_size(B1),
B2L = byte_size(B2), B2L = byte_size(B2),
@ -1307,6 +1319,7 @@ generate_binary_slot(Lookup, KVL, PressMethod, _IndexModDate, BuildTimings0) ->
B3L:32/integer, B3L:32/integer,
B4L:32/integer, B4L:32/integer,
B5L:32/integer, B5L:32/integer,
LMD:48/integer,
PosBinIndex/binary>>, PosBinIndex/binary>>,
CheckH = hmac(Header), CheckH = hmac(Header),
SlotBin = <<CheckB1P:32/integer, B1P:32/integer, SlotBin = <<CheckB1P:32/integer, B1P:32/integer,
@ -1326,25 +1339,27 @@ generate_binary_slot(Lookup, KVL, PressMethod, _IndexModDate, BuildTimings0) ->
integer(), integer(),
leveled_codec:ledger_key()|false, leveled_codec:ledger_key()|false,
press_method(), press_method(),
boolean(),
list()|not_present) -> list()|not_present. list()|not_present) -> list()|not_present.
%% @doc %% @doc
%% Acc should start as not_present if LedgerKey is a key, and a list if %% Acc should start as not_present if LedgerKey is a key, and a list if
%% LedgerKey is false %% LedgerKey is false
check_blocks([], _BlockPointer, _BlockLengths, _PosBinLength, check_blocks([], _BlockPointer, _BlockLengths, _PosBinLength,
_LedgerKeyToCheck, _PressMethod, not_present) -> _LedgerKeyToCheck, _PressMethod, _IdxModDate, not_present) ->
not_present; not_present;
check_blocks([], _BlockPointer, _BlockLengths, _PosBinLength, check_blocks([], _BlockPointer, _BlockLengths, _PosBinLength,
_LedgerKeyToCheck, _PressMethod, Acc) -> _LedgerKeyToCheck, _PressMethod, _IdxModDate, Acc) ->
lists:reverse(Acc); lists:reverse(Acc);
check_blocks([Pos|Rest], BlockPointer, BlockLengths, PosBinLength, check_blocks([Pos|Rest], BlockPointer, BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, Acc) -> LedgerKeyToCheck, PressMethod, IdxModDate, Acc) ->
{BlockNumber, BlockPos} = revert_position(Pos), {BlockNumber, BlockPos} = revert_position(Pos),
BlockBin = BlockBin =
read_block(BlockPointer, read_block(BlockPointer,
BlockLengths, BlockLengths,
PosBinLength, PosBinLength,
BlockNumber), BlockNumber,
additional_offset(IdxModDate)),
BlockL = deserialise_block(BlockBin, PressMethod), BlockL = deserialise_block(BlockBin, PressMethod),
{K, V} = lists:nth(BlockPos, BlockL), {K, V} = lists:nth(BlockPos, BlockL),
case K of case K of
@ -1355,30 +1370,38 @@ check_blocks([Pos|Rest], BlockPointer, BlockLengths, PosBinLength,
false -> false ->
check_blocks(Rest, BlockPointer, check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength, BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, LedgerKeyToCheck, PressMethod, IdxModDate,
[{K, V}|Acc]); [{K, V}|Acc]);
_ -> _ ->
check_blocks(Rest, BlockPointer, check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength, BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, Acc) LedgerKeyToCheck, PressMethod, IdxModDate,
Acc)
end end
end. end.
-spec additional_offset(boolean()) -> pos_integer().
%% @doc
%% 4-byte CRC, 4-byte pos, 4-byte CRC, 5x4 byte lengths, 6 byte LMD
%% LMD may not be present
additional_offset(true) ->
?BLOCK_LENGTHS_LENGTH + 4 + 4 + 4 + 6;
additional_offset(false) ->
?BLOCK_LENGTHS_LENGTH + 4 + 4 + 4.
read_block({Handle, StartPos}, BlockLengths, PosBinLength, BlockID) ->
read_block({Handle, StartPos}, BlockLengths, PosBinLength, BlockID, AO) ->
{Offset, Length} = block_offsetandlength(BlockLengths, BlockID), {Offset, Length} = block_offsetandlength(BlockLengths, BlockID),
{ok, BlockBin} = file:pread(Handle, {ok, BlockBin} = file:pread(Handle,
StartPos StartPos
+ Offset + Offset
+ PosBinLength + PosBinLength
+ 32, + AO,
% 4-byte CRC, 4-byte pos,
% 4-byte CRC, 5x4 byte lengths
Length), Length),
BlockBin; BlockBin;
read_block(SlotBin, BlockLengths, PosBinLength, BlockID) -> read_block(SlotBin, BlockLengths, PosBinLength, BlockID, AO) ->
{Offset, Length} = block_offsetandlength(BlockLengths, BlockID), {Offset, Length} = block_offsetandlength(BlockLengths, BlockID),
StartPos = Offset + PosBinLength + 32, StartPos = Offset + PosBinLength + AO,
<<_Pre:StartPos/binary, BlockBin:Length/binary, _Rest/binary>> = SlotBin, <<_Pre:StartPos/binary, BlockBin:Length/binary, _Rest/binary>> = SlotBin,
BlockBin. BlockBin.
@ -1417,12 +1440,12 @@ binarysplit_mapfun(MultiSlotBin, StartPos) ->
-spec read_slots(file:io_device(), list(), -spec read_slots(file:io_device(), list(),
{false|list(), any()}, press_method()) {false|list(), any()}, press_method(), boolean())
-> list(binaryslot_element()). -> list(binaryslot_element()).
%% @doc %% @doc
%% The reading of sots will return a list of either 2-tuples containing %% The reading of sots will return a list of either 2-tuples containing
%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples %% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples
%% can be exploded into lists of {K, V} pairs using the binaryslot_reader/3 %% can be exploded into lists of {K, V} pairs using the binaryslot_reader/4
%% function %% function
%% %%
%% Reading slots is generally unfiltered, but in the sepcial case when %% Reading slots is generally unfiltered, but in the sepcial case when
@ -1433,12 +1456,14 @@ binarysplit_mapfun(MultiSlotBin, StartPos) ->
%% any key comparison between levels should allow for a non-matching key to %% any key comparison between levels should allow for a non-matching key to
%% be considered as superior to a matching key - as otherwise a matching key %% be considered as superior to a matching key - as otherwise a matching key
%% may be intermittently removed from the result set %% may be intermittently removed from the result set
read_slots(Handle, SlotList, {false, _BlockIndexCache}, _PressMethod) -> read_slots(Handle, SlotList, {false, _BlockIndexCache},
_PressMethod, _IdxModDate) ->
% No list of segments passed % No list of segments passed
LengthList = lists:map(fun pointer_mapfun/1, SlotList), LengthList = lists:map(fun pointer_mapfun/1, SlotList),
{MultiSlotBin, StartPos} = read_length_list(Handle, LengthList), {MultiSlotBin, StartPos} = read_length_list(Handle, LengthList),
lists:map(binarysplit_mapfun(MultiSlotBin, StartPos), LengthList); lists:map(binarysplit_mapfun(MultiSlotBin, StartPos), LengthList);
read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) -> read_slots(Handle, SlotList, {SegList, BlockIndexCache},
PressMethod, IdxModDate) ->
% List of segments passed so only {K, V} pairs matching those segments % List of segments passed so only {K, V} pairs matching those segments
% should be returned. This required the {K, V} pair to have been added % should be returned. This required the {K, V} pair to have been added
% with the appropriate hash - if the pair were added with no_lookup as % with the appropriate hash - if the pair were added with no_lookup as
@ -1446,8 +1471,8 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) ->
BinMapFun = BinMapFun =
fun(Pointer, Acc) -> fun(Pointer, Acc) ->
{SP, _L, ID, _SK, _EK} = pointer_mapfun(Pointer), {SP, _L, ID, _SK, _EK} = pointer_mapfun(Pointer),
BL = ?BLOCK_LENGTHS_LENGTH, CachedHeader = array:get(ID - 1, BlockIndexCache),
case array:get(ID - 1, BlockIndexCache) of case extract_header(CachedHeader, IdxModDate) of
none -> none ->
% If there is an attempt to use the seg list query and the % If there is an attempt to use the seg list query and the
% index block cache isn't cached for any part this may be % index block cache isn't cached for any part this may be
@ -1457,7 +1482,7 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) ->
read_length_list(Handle, [LengthDetails]), read_length_list(Handle, [LengthDetails]),
MapFun = binarysplit_mapfun(MultiSlotBin, StartPos), MapFun = binarysplit_mapfun(MultiSlotBin, StartPos),
Acc ++ [MapFun(LengthDetails)]; Acc ++ [MapFun(LengthDetails)];
<<BlockLengths:BL/binary, BlockIdx/binary>> -> {BlockLengths, _LMD, BlockIdx} ->
% If there is a BlockIndex cached then we can use it to % If there is a BlockIndex cached then we can use it to
% check to see if any of the expected segments are % check to see if any of the expected segments are
% present without lifting the slot off disk. Also the % present without lifting the slot off disk. Also the
@ -1472,7 +1497,7 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) ->
{Handle, SP}, {Handle, SP},
BlockLengths, BlockLengths,
byte_size(BlockIdx), byte_size(BlockIdx),
false, PressMethod, false, PressMethod, IdxModDate,
[]) [])
end end
end end
@ -1481,32 +1506,39 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) ->
-spec binaryslot_reader(list(binaryslot_element()), -spec binaryslot_reader(list(binaryslot_element()),
native|lz4, press_method(),
boolean(),
leveled_codec:segment_list()) leveled_codec:segment_list())
-> {list({tuple(), tuple()}), -> {list({tuple(), tuple()}),
list({integer(), binary()})}. list({integer(), binary()})}.
%% @doc %% @doc
%% Read the binary slots converting them to {K, V} pairs if they were not %% Read the binary slots converting them to {K, V} pairs if they were not
%% already {K, V} pairs %% already {K, V} pairs
binaryslot_reader(SlotBinsToFetch, PressMethod, SegList) -> binaryslot_reader(SlotBinsToFetch, PressMethod, IdxModDate, SegList) ->
binaryslot_reader(SlotBinsToFetch, PressMethod, SegList, [], []). binaryslot_reader(SlotBinsToFetch,
PressMethod, IdxModDate, SegList, [], []).
binaryslot_reader([], _PressMethod, _SegList, Acc, BIAcc) -> binaryslot_reader([], _PressMethod, _IdxModDate, _SegList, Acc, BIAcc) ->
{Acc, BIAcc}; {Acc, BIAcc};
binaryslot_reader([{SlotBin, ID, SK, EK}|Tail], binaryslot_reader([{SlotBin, ID, SK, EK}|Tail],
PressMethod, SegList, Acc, BIAcc) -> PressMethod, IdxModDate, SegList, Acc, BIAcc) ->
{TrimmedL, BICache} = {TrimmedL, BICache} =
binaryslot_trimmedlist(SlotBin, binaryslot_trimmedlist(SlotBin,
SK, EK, SK, EK,
PressMethod, PressMethod,
IdxModDate,
SegList), SegList),
binaryslot_reader(Tail, binaryslot_reader(Tail,
PressMethod, PressMethod,
IdxModDate,
SegList, SegList,
Acc ++ TrimmedL, Acc ++ TrimmedL,
[{ID, BICache}|BIAcc]); [{ID, BICache}|BIAcc]);
binaryslot_reader([{K, V}|Tail], PressMethod, SegList, Acc, BIAcc) -> binaryslot_reader([{K, V}|Tail],
binaryslot_reader(Tail, PressMethod, SegList, Acc ++ [{K, V}], BIAcc). PressMethod, IdxModDate, SegList, Acc, BIAcc) ->
binaryslot_reader(Tail,
PressMethod, IdxModDate, SegList,
Acc ++ [{K, V}], BIAcc).
read_length_list(Handle, LengthList) -> read_length_list(Handle, LengthList) ->
@ -1517,12 +1549,27 @@ read_length_list(Handle, LengthList) ->
{MultiSlotBin, StartPos}. {MultiSlotBin, StartPos}.
-spec extract_header(binary()|none, boolean()) ->
{binary(), integer(), binary()}|none.
%% @doc
%% Helper for extracting the binaries from the header ignoring the missing LMD
%% if LMD is not indexed
extract_header(none, _IdxModDate) ->
none; % used when the block cache has returned none
extract_header(Header, true) ->
BL = ?BLOCK_LENGTHS_LENGTH,
<<BlockLengths:BL/binary, LMD:48/integer, PosBinIndex/binary>> = Header,
{BlockLengths, LMD, PosBinIndex};
extract_header(Header, false) ->
BL = ?BLOCK_LENGTHS_LENGTH,
<<BlockLengths:BL/binary, PosBinIndex/binary>> = Header,
{BlockLengths, 0, PosBinIndex}.
binaryslot_get(FullBin, Key, Hash, PressMethod) -> binaryslot_get(FullBin, Key, Hash, PressMethod, IdxModDate) ->
case crc_check_slot(FullBin) of case crc_check_slot(FullBin) of
{Header, Blocks} -> {Header, Blocks} ->
BL = ?BLOCK_LENGTHS_LENGTH, {BlockLengths, _LMD, PosBinIndex} =
<<BlockLengths:BL/binary, PosBinIndex/binary>> = Header, extract_header(Header, IdxModDate),
PosList = find_pos(PosBinIndex, PosList = find_pos(PosBinIndex,
extra_hash(Hash), extra_hash(Hash),
[], [],
@ -1534,7 +1581,7 @@ binaryslot_get(FullBin, Key, Hash, PressMethod) ->
none} none}
end. end.
binaryslot_tolist(FullBin, PressMethod) -> binaryslot_tolist(FullBin, PressMethod, IdxModDate) ->
BlockFetchFun = BlockFetchFun =
fun(Length, {Acc, Bin}) -> fun(Length, {Acc, Bin}) ->
case Length of case Length of
@ -1549,12 +1596,13 @@ binaryslot_tolist(FullBin, PressMethod) ->
{Out, _Rem} = {Out, _Rem} =
case crc_check_slot(FullBin) of case crc_check_slot(FullBin) of
{Header, Blocks} -> {Header, Blocks} ->
{BlockLengths, _LMD, _PosBinIndex} =
extract_header(Header, IdxModDate),
<<B1L:32/integer, <<B1L:32/integer,
B2L:32/integer, B2L:32/integer,
B3L:32/integer, B3L:32/integer,
B4L:32/integer, B4L:32/integer,
B5L:32/integer, B5L:32/integer>> = BlockLengths,
_PosBinIndex/binary>> = Header,
lists:foldl(BlockFetchFun, lists:foldl(BlockFetchFun,
{[], Blocks}, {[], Blocks},
[B1L, B2L, B3L, B4L, B5L]); [B1L, B2L, B3L, B4L, B5L]);
@ -1564,9 +1612,11 @@ binaryslot_tolist(FullBin, PressMethod) ->
Out. Out.
binaryslot_trimmedlist(FullBin, all, all, PressMethod, false) -> binaryslot_trimmedlist(FullBin, all, all,
{binaryslot_tolist(FullBin, PressMethod), none}; PressMethod, IdxModDate, false) ->
binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod, SegList) -> {binaryslot_tolist(FullBin, PressMethod, IdxModDate), none};
binaryslot_trimmedlist(FullBin, StartKey, EndKey,
PressMethod, IdxModDate, SegList) ->
LTrimFun = fun({K, _V}) -> K < StartKey end, LTrimFun = fun({K, _V}) -> K < StartKey end,
RTrimFun = fun({K, _V}) -> not leveled_codec:endkey_passed(EndKey, K) end, RTrimFun = fun({K, _V}) -> not leveled_codec:endkey_passed(EndKey, K) end,
BlockCheckFun = BlockCheckFun =
@ -1615,12 +1665,13 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod, SegList) ->
% and Block3 (as Block 1 will also be checked), but finessing this last % and Block3 (as Block 1 will also be checked), but finessing this last
% scenario is hard to do in concise code % scenario is hard to do in concise code
{{Header, Blocks}, false} -> {{Header, Blocks}, false} ->
{BlockLengths, _LMD, _PosBinIndex} =
extract_header(Header, IdxModDate),
<<B1L:32/integer, <<B1L:32/integer,
B2L:32/integer, B2L:32/integer,
B3L:32/integer, B3L:32/integer,
B4L:32/integer, B4L:32/integer,
B5L:32/integer, B5L:32/integer>> = BlockLengths,
_PosBinIndex/binary>> = Header,
<<Block1:B1L/binary, Block2:B2L/binary, <<Block1:B1L/binary, Block2:B2L/binary,
MidBlock:B3L/binary, MidBlock:B3L/binary,
Block4:B4L/binary, Block5:B5L/binary>> = Blocks, Block4:B4L/binary, Block5:B5L/binary>> = Blocks,
@ -1665,7 +1716,8 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod, SegList) ->
BlockLengths, BlockLengths,
byte_size(BlockIdx), byte_size(BlockIdx),
false, false,
PressMethod, PressMethod,
IdxModDate,
[]), []),
{KVL, Header}; {KVL, Header};
{crc_wonky, _} -> {crc_wonky, _} ->
@ -2360,35 +2412,52 @@ indexed_list_mixedkeys2_test() ->
indexed_list_allindexkeys_test() -> indexed_list_allindexkeys_test() ->
Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)),
?LOOK_SLOTSIZE), ?LOOK_SLOTSIZE),
{{Header, FullBin, _HL, _LK}, no_timing} = {{HeaderT, FullBinT, _HL, _LK}, no_timing} =
generate_binary_slot(lookup, Keys, native, ?INDEX_MODDATE,no_timing), generate_binary_slot(lookup, Keys, native, true, no_timing),
{{HeaderF, FullBinF, _HL, _LK}, no_timing} =
generate_binary_slot(lookup, Keys, native, false, no_timing),
EmptySlotSize = ?LOOK_SLOTSIZE - 1, EmptySlotSize = ?LOOK_SLOTSIZE - 1,
?assertMatch(<<_BL:20/binary, EmptySlotSize:8/integer>>, Header), LMD = ?FLIPPER48,
?assertMatch(<<_BL:20/binary, LMD:48/integer, EmptySlotSize:8/integer>>,
HeaderT),
?assertMatch(<<_BL:20/binary, LMD:48/integer, EmptySlotSize:8/integer>>,
HeaderF),
% SW = os:timestamp(), % SW = os:timestamp(),
BinToList = binaryslot_tolist(FullBin, native), BinToListT = binaryslot_tolist(FullBinT, native, true),
BinToListF = binaryslot_tolist(FullBinF, native, false),
% io:format(user, % io:format(user,
% "Indexed list flattened in ~w microseconds ~n", % "Indexed list flattened in ~w microseconds ~n",
% [timer:now_diff(os:timestamp(), SW)]), % [timer:now_diff(os:timestamp(), SW)]),
?assertMatch(Keys, BinToList), ?assertMatch(Keys, BinToListT),
?assertMatch({Keys, none}, binaryslot_trimmedlist(FullBin, ?assertMatch({Keys, none}, binaryslot_trimmedlist(FullBinT,
all, all, all, all,
native, false)). native,
true,
false)),
?assertMatch(Keys, BinToListF),
?assertMatch({Keys, none}, binaryslot_trimmedlist(FullBinF,
all, all,
native,
false,
false)).
indexed_list_allindexkeys_nolookup_test() -> indexed_list_allindexkeys_nolookup_test() ->
Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(1000)), Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(1000)),
?NOLOOK_SLOTSIZE), ?NOLOOK_SLOTSIZE),
{{Header, FullBin, _HL, _LK}, no_timing} = {{Header, FullBin, _HL, _LK}, no_timing} =
generate_binary_slot(no_lookup, Keys, native, ?INDEX_MODDATE,no_timing), generate_binary_slot(no_lookup, Keys, native, ?INDEX_MODDATE,no_timing),
?assertMatch(<<_BL:20/binary, 127:8/integer>>, Header), ?assertMatch(<<_BL:20/binary, _LMD:48/integer, 127:8/integer>>, Header),
% SW = os:timestamp(), % SW = os:timestamp(),
BinToList = binaryslot_tolist(FullBin, native), BinToList = binaryslot_tolist(FullBin, native, ?INDEX_MODDATE),
% io:format(user, % io:format(user,
% "Indexed list flattened in ~w microseconds ~n", % "Indexed list flattened in ~w microseconds ~n",
% [timer:now_diff(os:timestamp(), SW)]), % [timer:now_diff(os:timestamp(), SW)]),
?assertMatch(Keys, BinToList), ?assertMatch(Keys, BinToList),
?assertMatch({Keys, none}, binaryslot_trimmedlist(FullBin, ?assertMatch({Keys, none}, binaryslot_trimmedlist(FullBin,
all, all, all, all,
native, false)). native,
?INDEX_MODDATE,
false)).
indexed_list_allindexkeys_trimmed_test() -> indexed_list_allindexkeys_trimmed_test() ->
Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)),
@ -2396,7 +2465,8 @@ indexed_list_allindexkeys_trimmed_test() ->
{{Header, FullBin, _HL, _LK}, no_timing} = {{Header, FullBin, _HL, _LK}, no_timing} =
generate_binary_slot(lookup, Keys, native, ?INDEX_MODDATE,no_timing), generate_binary_slot(lookup, Keys, native, ?INDEX_MODDATE,no_timing),
EmptySlotSize = ?LOOK_SLOTSIZE - 1, EmptySlotSize = ?LOOK_SLOTSIZE - 1,
?assertMatch(<<_BL:20/binary, EmptySlotSize:8/integer>>, Header), ?assertMatch(<<_BL:20/binary, _LMD:48/integer, EmptySlotSize:8/integer>>,
Header),
?assertMatch({Keys, none}, binaryslot_trimmedlist(FullBin, ?assertMatch({Keys, none}, binaryslot_trimmedlist(FullBin,
{i, {i,
"Bucket", "Bucket",
@ -2407,26 +2477,30 @@ indexed_list_allindexkeys_trimmed_test() ->
{"t1_int", 99999}, {"t1_int", 99999},
null}, null},
native, native,
?INDEX_MODDATE,
false)), false)),
{SK1, _} = lists:nth(10, Keys), {SK1, _} = lists:nth(10, Keys),
{EK1, _} = lists:nth(100, Keys), {EK1, _} = lists:nth(100, Keys),
R1 = lists:sublist(Keys, 10, 91), R1 = lists:sublist(Keys, 10, 91),
{O1, none} = binaryslot_trimmedlist(FullBin, SK1, EK1, native, false), {O1, none} = binaryslot_trimmedlist(FullBin, SK1, EK1,
native, ?INDEX_MODDATE, false),
?assertMatch(91, length(O1)), ?assertMatch(91, length(O1)),
?assertMatch(R1, O1), ?assertMatch(R1, O1),
{SK2, _} = lists:nth(10, Keys), {SK2, _} = lists:nth(10, Keys),
{EK2, _} = lists:nth(20, Keys), {EK2, _} = lists:nth(20, Keys),
R2 = lists:sublist(Keys, 10, 11), R2 = lists:sublist(Keys, 10, 11),
{O2, none} = binaryslot_trimmedlist(FullBin, SK2, EK2, native, false), {O2, none} = binaryslot_trimmedlist(FullBin, SK2, EK2,
native, ?INDEX_MODDATE, false),
?assertMatch(11, length(O2)), ?assertMatch(11, length(O2)),
?assertMatch(R2, O2), ?assertMatch(R2, O2),
{SK3, _} = lists:nth(?LOOK_SLOTSIZE - 1, Keys), {SK3, _} = lists:nth(?LOOK_SLOTSIZE - 1, Keys),
{EK3, _} = lists:nth(?LOOK_SLOTSIZE, Keys), {EK3, _} = lists:nth(?LOOK_SLOTSIZE, Keys),
R3 = lists:sublist(Keys, ?LOOK_SLOTSIZE - 1, 2), R3 = lists:sublist(Keys, ?LOOK_SLOTSIZE - 1, 2),
{O3, none} = binaryslot_trimmedlist(FullBin, SK3, EK3, native, false), {O3, none} = binaryslot_trimmedlist(FullBin, SK3, EK3,
native, ?INDEX_MODDATE, false),
?assertMatch(2, length(O3)), ?assertMatch(2, length(O3)),
?assertMatch(R3, O3). ?assertMatch(R3, O3).
@ -2436,7 +2510,7 @@ indexed_list_mixedkeys_bitflip_test() ->
KVL1 = lists:sublist(KVL0, 33), KVL1 = lists:sublist(KVL0, 33),
Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1),
{{Header, SlotBin, _HL, LK}, no_timing} = {{Header, SlotBin, _HL, LK}, no_timing} =
generate_binary_slot(lookup, Keys, native, ?INDEX_MODDATE,no_timing), generate_binary_slot(lookup, Keys, native, ?INDEX_MODDATE, no_timing),
?assertMatch(LK, element(1, lists:last(Keys))), ?assertMatch(LK, element(1, lists:last(Keys))),
@ -2444,7 +2518,8 @@ indexed_list_mixedkeys_bitflip_test() ->
_B2L:32/integer, _B2L:32/integer,
_B3L:32/integer, _B3L:32/integer,
_B4L:32/integer, _B4L:32/integer,
_B5L:32/integer, _B5L:32/integer,
_LMD:48/integer,
PosBin/binary>> = Header, PosBin/binary>> = Header,
TestKey1 = element(1, lists:nth(1, KVL1)), TestKey1 = element(1, lists:nth(1, KVL1)),
@ -2454,7 +2529,7 @@ indexed_list_mixedkeys_bitflip_test() ->
test_binary_slot(SlotBin, TestKey1, MH1, lists:nth(1, KVL1)), test_binary_slot(SlotBin, TestKey1, MH1, lists:nth(1, KVL1)),
test_binary_slot(SlotBin, TestKey2, MH2, lists:nth(33, KVL1)), test_binary_slot(SlotBin, TestKey2, MH2, lists:nth(33, KVL1)),
ToList = binaryslot_tolist(SlotBin, native), ToList = binaryslot_tolist(SlotBin, native, ?INDEX_MODDATE),
?assertMatch(Keys, ToList), ?assertMatch(Keys, ToList),
[Pos1] = find_pos(PosBin, extra_hash(MH1), [], 0), [Pos1] = find_pos(PosBin, extra_hash(MH1), [], 0),
@ -2473,8 +2548,8 @@ indexed_list_mixedkeys_bitflip_test() ->
test_binary_slot(SlotBin1, TestKey1, MH1, not_present), test_binary_slot(SlotBin1, TestKey1, MH1, not_present),
test_binary_slot(SlotBin2, TestKey2, MH2, not_present), test_binary_slot(SlotBin2, TestKey2, MH2, not_present),
ToList1 = binaryslot_tolist(SlotBin1, native), ToList1 = binaryslot_tolist(SlotBin1, native, ?INDEX_MODDATE),
ToList2 = binaryslot_tolist(SlotBin2, native), ToList2 = binaryslot_tolist(SlotBin2, native, ?INDEX_MODDATE),
?assertMatch(true, is_list(ToList1)), ?assertMatch(true, is_list(ToList1)),
?assertMatch(true, is_list(ToList2)), ?assertMatch(true, is_list(ToList2)),
@ -2487,7 +2562,8 @@ indexed_list_mixedkeys_bitflip_test() ->
{SK1, _} = lists:nth(10, Keys), {SK1, _} = lists:nth(10, Keys),
{EK1, _} = lists:nth(20, Keys), {EK1, _} = lists:nth(20, Keys),
{O1, none} = binaryslot_trimmedlist(SlotBin3, SK1, EK1, native, false), {O1, none} = binaryslot_trimmedlist(SlotBin3, SK1, EK1,
native, ?INDEX_MODDATE, false),
?assertMatch([], O1), ?assertMatch([], O1),
SlotBin4 = flip_byte(SlotBin, 0, 20), SlotBin4 = flip_byte(SlotBin, 0, 20),
@ -2495,12 +2571,14 @@ indexed_list_mixedkeys_bitflip_test() ->
test_binary_slot(SlotBin4, TestKey1, MH1, not_present), test_binary_slot(SlotBin4, TestKey1, MH1, not_present),
test_binary_slot(SlotBin5, TestKey1, MH1, not_present), test_binary_slot(SlotBin5, TestKey1, MH1, not_present),
ToList4 = binaryslot_tolist(SlotBin4, native), ToList4 = binaryslot_tolist(SlotBin4, native, ?INDEX_MODDATE),
ToList5 = binaryslot_tolist(SlotBin5, native), ToList5 = binaryslot_tolist(SlotBin5, native, ?INDEX_MODDATE),
?assertMatch([], ToList4), ?assertMatch([], ToList4),
?assertMatch([], ToList5), ?assertMatch([], ToList5),
{O4, none} = binaryslot_trimmedlist(SlotBin4, SK1, EK1, native, false), {O4, none} = binaryslot_trimmedlist(SlotBin4, SK1, EK1,
{O5, none} = binaryslot_trimmedlist(SlotBin4, SK1, EK1, native, false), native, ?INDEX_MODDATE, false),
{O5, none} = binaryslot_trimmedlist(SlotBin4, SK1, EK1,
native, ?INDEX_MODDATE, false),
?assertMatch([], O4), ?assertMatch([], O4),
?assertMatch([], O5). ?assertMatch([], O5).
@ -2518,7 +2596,8 @@ flip_byte(Binary, Offset, Length) ->
test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> test_binary_slot(FullBin, Key, Hash, ExpectedValue) ->
% SW = os:timestamp(), % SW = os:timestamp(),
{ReturnedValue, _Header} = binaryslot_get(FullBin, Key, Hash, native), {ReturnedValue, _Header} =
binaryslot_get(FullBin, Key, Hash, native, ?INDEX_MODDATE),
?assertMatch(ExpectedValue, ReturnedValue). ?assertMatch(ExpectedValue, ReturnedValue).
% io:format(user, "Fetch success in ~w microseconds ~n", % io:format(user, "Fetch success in ~w microseconds ~n",
% [timer:now_diff(os:timestamp(), SW)]). % [timer:now_diff(os:timestamp(), SW)]).