Merge branch 'master' into develop-3.0

This commit is contained in:
Martin Sumner 2019-11-04 22:27:10 +00:00
commit e4a5ac8776

View file

@ -1388,7 +1388,7 @@ serialise_block(Term, none) ->
%% compression methods.
%%
%% If CRC check fails we treat all the data as missing
deserialise_block(Bin, PressMethod) ->
deserialise_block(Bin, PressMethod) when byte_size(Bin) > 4 ->
BinS = byte_size(Bin) - 4,
<<TermBin:BinS/binary, CRC32:32/integer>> = Bin,
case hmac(TermBin) of
@ -1396,7 +1396,9 @@ deserialise_block(Bin, PressMethod) ->
deserialise_checkedblock(TermBin, PressMethod);
_ ->
[]
end.
end;
deserialise_block(_Bin, _PM) ->
[].
deserialise_checkedblock(Bin, lz4) ->
{ok, Bin0} = lz4:unpack(Bin),
@ -1670,14 +1672,18 @@ generate_binary_slot(Lookup, KVL, PressMethod, IndexModDate, BuildTimings0) ->
binary()|{file:io_device(), integer()},
binary(),
integer(),
leveled_codec:ledger_key()|false,
leveled_codec:ledger_key()|false,
%% if false the acc is a list, and if true
%% Acc will be initially not_present, and may
%% result in a {K, V} tuple
press_method(),
boolean(),
list()|not_present) -> list()|not_present.
list()|not_present) ->
list(leveled_codec:ledger_kv())|
not_present|leveled_codec:ledger_kv().
%% @doc
%% Acc should start as not_present if LedgerKey is a key, and a list if
%% LedgerKey is false
check_blocks([], _BlockPointer, _BlockLengths, _PosBinLength,
_LedgerKeyToCheck, _PressMethod, _IdxModDate, not_present) ->
not_present;
@ -1693,24 +1699,20 @@ check_blocks([Pos|Rest], BlockPointer, BlockLengths, PosBinLength,
PosBinLength,
BlockNumber,
additional_offset(IdxModDate)),
BlockL = deserialise_block(BlockBin, PressMethod),
{K, V} = lists:nth(BlockPos, BlockL),
case K of
LedgerKeyToCheck ->
R = fetchfrom_rawblock(BlockPos, deserialise_block(BlockBin, PressMethod)),
case {R, LedgerKeyToCheck} of
{{K, V}, K} ->
{K, V};
{{K, V}, false} ->
check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, IdxModDate,
[{K, V}|Acc]);
_ ->
case LedgerKeyToCheck of
false ->
check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, IdxModDate,
[{K, V}|Acc]);
_ ->
check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, IdxModDate,
Acc)
end
check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, IdxModDate,
Acc)
end.
-spec additional_offset(boolean()) -> pos_integer().
@ -2029,10 +2031,13 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
false ->
Block
end,
case fetchend_rawblock(BlockList) of
{LastKey, _LV} when StartKey > LastKey ->
case fetchends_rawblock(BlockList) of
{_, LastKey} when StartKey > LastKey ->
%% This includes the case when LastKey is
%% not_present due to corruption in the BlockList
%% as tuple is > not_present.
{Acc, true};
{LastKey, _LV} ->
{_, LastKey} ->
{_LDrop, RKeep} = lists:splitwith(LTrimFun,
BlockList),
case leveled_codec:endkey_passed(EndKey,
@ -2043,9 +2048,7 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
{Acc ++ LKeep, false};
false ->
{Acc ++ RKeep, true}
end;
_ ->
{Acc, true}
end
end;
{_ , false} ->
{Acc, false}
@ -2073,39 +2076,15 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
MidBlock:B3L/binary,
Block4:B4L/binary, Block5:B5L/binary>> = Blocks,
BlocksToCheck =
case B3L of
0 ->
[Block1, Block2];
_ ->
MidBlockList =
deserialise_block(MidBlock, PressMethod),
{MidFirst, _} = lists:nth(1, MidBlockList),
{MidLast, _} = lists:last(MidBlockList),
Split = {StartKey > MidLast,
StartKey >= MidFirst,
leveled_codec:endkey_passed(EndKey,
MidFirst),
leveled_codec:endkey_passed(EndKey,
MidLast)},
case Split of
{true, _, _, _} ->
[Block4, Block5];
{false, true, false, true} ->
[MidBlockList];
{false, true, false, false} ->
[MidBlockList, Block4, Block5];
{false, false, true, true} ->
[Block1, Block2];
{false, false, false, true} ->
[Block1, Block2, MidBlockList];
_ ->
[Block1, Block2, MidBlockList, Block4, Block5]
end
end,
{Acc, _Continue} = lists:foldl(BlockCheckFun, {[], true}, BlocksToCheck),
blocks_required({StartKey, EndKey},
[Block1, Block2, MidBlock, Block4, Block5],
PressMethod),
{Acc, _Continue} =
lists:foldl(BlockCheckFun, {[], true}, BlocksToCheck),
{Acc, none};
{{Header, _Blocks}, SegList} ->
{BlockLengths, _LMD, BlockIdx} = extract_header(Header, IdxModDate),
{BlockLengths, _LMD, BlockIdx} =
extract_header(Header, IdxModDate),
PosList = find_pos(BlockIdx, SegList, [], 0),
KVL = check_blocks(PosList,
FullBin,
@ -2121,6 +2100,41 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
end.
blocks_required({StartKey, EndKey}, [B1, B2, MidBlock, B4, B5], PressMethod) ->
MidBlockList = deserialise_block(MidBlock, PressMethod),
filter_blocks_required(fetchends_rawblock(MidBlockList),
{StartKey, EndKey},
[B1, B2, MidBlockList, B4, B5]).
filter_blocks_required({not_present, not_present}, _RangeKeys, AllBlocks) ->
AllBlocks;
filter_blocks_required({_MidFirst, MidLast}, {StartKey, _EndKey},
[_Block1, _Block2, _MidBlockList, Block4, Block5])
when StartKey > MidLast ->
[Block4, Block5];
filter_blocks_required({MidFirst, MidLast}, {StartKey, EndKey},
[_Block1, _Block2, MidBlockList, Block4, Block5])
when StartKey >= MidFirst ->
NoneAfter = leveled_codec:endkey_passed(EndKey, MidLast),
case NoneAfter of
true ->
[MidBlockList];
false ->
[MidBlockList, Block4, Block5]
end;
filter_blocks_required({MidFirst, MidLast}, {_StartKey, EndKey},
[Block1, Block2, MidBlockList, Block4, Block5]) ->
AllBefore = leveled_codec:endkey_passed(EndKey, MidFirst),
NoneAfter = leveled_codec:endkey_passed(EndKey, MidLast),
case {AllBefore, NoneAfter} of
{true, true} ->
[Block1, Block2];
{false, true} ->
[Block1, Block2, MidBlockList];
{false, false} ->
[Block1, Block2, MidBlockList, Block4, Block5]
end.
crc_check_slot(FullBin) ->
<<CRC32PBL:32/integer,
@ -2183,15 +2197,31 @@ fetch_value([Pos|Rest], BlockLengths, Blocks, Key, PressMethod) ->
fetch_value(Rest, BlockLengths, Blocks, Key, PressMethod)
end.
fetchfrom_rawblock(_BlockPos, []) ->
-spec fetchfrom_rawblock(pos_integer(), list(leveled_codec:ledger_kv())) ->
not_present|leveled_codec:ledger_kv().
%% @doc
%% Fetch from a deserialised block, but accounting for potential corruption
%% in that block which may lead to it returning as an empty list if that
%% corruption is detected by the deserialising function
fetchfrom_rawblock(BlockPos, RawBlock) when BlockPos > length(RawBlock) ->
%% Capture the slightly more general case than this being an empty list
%% in case of some other unexpected misalignement that would otherwise
%% crash the leveled_sst file process
not_present;
fetchfrom_rawblock(BlockPos, RawBlock) ->
lists:nth(BlockPos, RawBlock).
fetchend_rawblock([]) ->
not_present;
fetchend_rawblock(RawBlock) ->
lists:last(RawBlock).
-spec fetchends_rawblock(list(leveled_codec:ledger_kv())) ->
{not_present, not_present}|
{leveled_codec:ledger_key(), leveled_codec:ledger_key()}.
%% @doc
%% Fetch the first and last key from a block, and not_present if the block
%% is empty (rather than crashing)
fetchends_rawblock([]) ->
{not_present, not_present};
fetchends_rawblock(RawBlock) ->
{element(1, lists:nth(1, RawBlock)),
element(1, lists:last(RawBlock))}.
revert_position(Pos) ->
@ -3540,7 +3570,115 @@ stop_whenstarter_stopped_testto() ->
end
end,
?assertMatch(false, lists:foldl(TestFun, true, [10000, 2000, 2000, 2000])).
corrupted_block_range_test() ->
corrupted_block_rangetester(native, 100),
corrupted_block_rangetester(lz4, 100),
corrupted_block_rangetester(none, 100).
corrupted_block_rangetester(PressMethod, TestCount) ->
N = 100,
KVL1 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 2)),
RandomRangesFun =
fun(_X) ->
SKint = leveled_rand:uniform(90) + 1,
EKint = min(N, leveled_rand:uniform(N - SKint)),
SK = element(1, lists:nth(SKint, KVL1)),
EK = element(1, lists:nth(EKint, KVL1)),
{SK, EK}
end,
RandomRanges = lists:map(RandomRangesFun, lists:seq(1, TestCount)),
B1 = serialise_block(lists:sublist(KVL1, 1, 20), PressMethod),
B2 = serialise_block(lists:sublist(KVL1, 21, 20), PressMethod),
MidBlock = serialise_block(lists:sublist(KVL1, 41, 20), PressMethod),
B4 = serialise_block(lists:sublist(KVL1, 61, 20), PressMethod),
B5 = serialise_block(lists:sublist(KVL1, 81, 20), PressMethod),
CorruptBlockFun =
fun(Block) ->
case leveled_rand:uniform(10) < 2 of
true ->
flip_byte(Block, 0 , byte_size(Block));
false ->
Block
end
end,
CheckFun =
fun({SK, EK}) ->
InputBlocks =
lists:map(CorruptBlockFun, [B1, B2, MidBlock, B4, B5]),
BR = blocks_required({SK, EK}, InputBlocks, PressMethod),
?assertMatch(true, length(BR) =< 5),
BlockListFun =
fun(B) ->
case is_binary(B) of
true ->
deserialise_block(B, PressMethod);
false ->
B
end
end,
BRL = lists:flatten(lists:map(BlockListFun, BR)),
lists:foreach(fun({_K, _V}) -> ok end, BRL)
end,
lists:foreach(CheckFun, RandomRanges).
corrupted_block_fetch_test() ->
corrupted_block_fetch_tester(native),
corrupted_block_fetch_tester(lz4),
corrupted_block_fetch_tester(none).
corrupted_block_fetch_tester(PressMethod) ->
KC = 120,
KVL1 = lists:ukeysort(1, generate_randomkeys(1, KC, 1, 2)),
{{Header, SlotBin, _HashL, _LastKey}, _BT} =
generate_binary_slot(lookup, KVL1, PressMethod, false, no_timing),
<<B1L:32/integer,
B2L:32/integer,
B3L:32/integer,
B4L:32/integer,
B5L:32/integer,
PosBinIndex/binary>> = Header,
HS = byte_size(Header),
<<CheckB1P:32/integer, B1P:32/integer,
CheckH:32/integer, Header:HS/binary,
B1:B1L/binary, B2:B2L/binary, B3:B3L/binary,
B4:B4L/binary, B5:B5L/binary>> = SlotBin,
CorruptB3 = flip_byte(B3, 0 , B3L),
CorruptSlotBin =
<<CheckB1P:32/integer, B1P:32/integer,
CheckH:32/integer, Header/binary,
B1/binary, B2/binary, CorruptB3/binary, B4/binary, B5/binary>>,
CheckFun =
fun(N, {AccHit, AccMiss}) ->
PosL = [min(0, leveled_rand:uniform(N - 2)), N - 1],
{LK, LV} = lists:nth(N, KVL1),
{BlockLengths, 0, PosBinIndex} =
extract_header(Header, false),
R = check_blocks(PosL,
CorruptSlotBin,
BlockLengths,
byte_size(PosBinIndex),
LK,
PressMethod,
false,
not_present),
case R of
not_present ->
{AccHit, AccMiss + 1};
{LK, LV} ->
{AccHit + 1, AccMiss}
end
end,
{_HitCount, MissCount} =
lists:foldl(CheckFun, {0, 0}, lists:seq(16, length(KVL1))),
ExpectedMisses = element(2, ?LOOK_BLOCKSIZE),
?assertMatch(ExpectedMisses, MissCount).
receive_fun() ->
receive