Handle when corrupted blocks creates empty list

So can't get nth or last.
This commit is contained in:
Martin Sumner 2019-11-04 12:49:56 +00:00
parent 432fe71bf0
commit 843b28313d

View file

@ -1388,7 +1388,7 @@ serialise_block(Term, none) ->
%% compression methods.
%%
%% If CRC check fails we treat all the data as missing
deserialise_block(Bin, PressMethod) ->
deserialise_block(Bin, PressMethod) when byte_size(Bin) > 4 ->
BinS = byte_size(Bin) - 4,
<<TermBin:BinS/binary, CRC32:32/integer>> = Bin,
case hmac(TermBin) of
@ -1396,7 +1396,9 @@ deserialise_block(Bin, PressMethod) ->
deserialise_checkedblock(TermBin, PressMethod);
_ ->
[]
end.
end;
deserialise_block(_Bin, _PM) ->
[].
deserialise_checkedblock(Bin, lz4) ->
{ok, Bin0} = lz4:unpack(Bin),
@ -1693,24 +1695,20 @@ check_blocks([Pos|Rest], BlockPointer, BlockLengths, PosBinLength,
PosBinLength,
BlockNumber,
additional_offset(IdxModDate)),
BlockL = deserialise_block(BlockBin, PressMethod),
{K, V} = lists:nth(BlockPos, BlockL),
case K of
LedgerKeyToCheck ->
R = fetchfrom_rawblock(BlockPos, deserialise_block(BlockBin, PressMethod)),
case {R, LedgerKeyToCheck} of
{{K, V}, K} ->
{K, V};
{{K, V}, false} ->
check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, IdxModDate,
[{K, V}|Acc]);
_ ->
case LedgerKeyToCheck of
false ->
check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, IdxModDate,
[{K, V}|Acc]);
_ ->
check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, IdxModDate,
Acc)
end
check_blocks(Rest, BlockPointer,
BlockLengths, PosBinLength,
LedgerKeyToCheck, PressMethod, IdxModDate,
Acc)
end.
-spec additional_offset(boolean()) -> pos_integer().
@ -2029,10 +2027,13 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
false ->
Block
end,
case fetchend_rawblock(BlockList) of
{LastKey, _LV} when StartKey > LastKey ->
case fetchends_rawblock(BlockList) of
{_, LastKey} when StartKey > LastKey ->
%% This includes the case when LastKey is
%% not_present due to corruption in the BlockList
%% as tuple is > not_present.
{Acc, true};
{LastKey, _LV} ->
{_, LastKey} ->
{_LDrop, RKeep} = lists:splitwith(LTrimFun,
BlockList),
case leveled_codec:endkey_passed(EndKey,
@ -2043,9 +2044,7 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
{Acc ++ LKeep, false};
false ->
{Acc ++ RKeep, true}
end;
_ ->
{Acc, true}
end
end;
{_ , false} ->
{Acc, false}
@ -2073,39 +2072,15 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
MidBlock:B3L/binary,
Block4:B4L/binary, Block5:B5L/binary>> = Blocks,
BlocksToCheck =
case B3L of
0 ->
[Block1, Block2];
_ ->
MidBlockList =
deserialise_block(MidBlock, PressMethod),
{MidFirst, _} = lists:nth(1, MidBlockList),
{MidLast, _} = lists:last(MidBlockList),
Split = {StartKey > MidLast,
StartKey >= MidFirst,
leveled_codec:endkey_passed(EndKey,
MidFirst),
leveled_codec:endkey_passed(EndKey,
MidLast)},
case Split of
{true, _, _, _} ->
[Block4, Block5];
{false, true, false, true} ->
[MidBlockList];
{false, true, false, false} ->
[MidBlockList, Block4, Block5];
{false, false, true, true} ->
[Block1, Block2];
{false, false, false, true} ->
[Block1, Block2, MidBlockList];
_ ->
[Block1, Block2, MidBlockList, Block4, Block5]
end
end,
{Acc, _Continue} = lists:foldl(BlockCheckFun, {[], true}, BlocksToCheck),
blocks_required({StartKey, EndKey},
[Block1, Block2, MidBlock, Block4, Block5],
PressMethod),
{Acc, _Continue} =
lists:foldl(BlockCheckFun, {[], true}, BlocksToCheck),
{Acc, none};
{{Header, _Blocks}, SegList} ->
{BlockLengths, _LMD, BlockIdx} = extract_header(Header, IdxModDate),
{BlockLengths, _LMD, BlockIdx} =
extract_header(Header, IdxModDate),
PosList = find_pos(BlockIdx, SegList, [], 0),
KVL = check_blocks(PosList,
FullBin,
@ -2121,6 +2096,41 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
end.
blocks_required({StartKey, EndKey}, [B1, B2, MidBlock, B4, B5], PressMethod) ->
MidBlockList = deserialise_block(MidBlock, PressMethod),
filter_blocks_required(fetchends_rawblock(MidBlockList),
{StartKey, EndKey},
[B1, B2, MidBlockList, B4, B5]).
filter_blocks_required({not_present, not_present}, _RangeKeys, AllBlocks) ->
AllBlocks;
filter_blocks_required({_MidFirst, MidLast}, {StartKey, _EndKey},
[_Block1, _Block2, _MidBlockList, Block4, Block5])
when StartKey > MidLast ->
[Block4, Block5];
filter_blocks_required({MidFirst, MidLast}, {StartKey, EndKey},
[_Block1, _Block2, MidBlockList, Block4, Block5])
when StartKey >= MidFirst ->
NoneAfter = leveled_codec:endkey_passed(EndKey, MidLast),
case NoneAfter of
true ->
[MidBlockList];
false ->
[MidBlockList, Block4, Block5]
end;
filter_blocks_required({MidFirst, MidLast}, {_StartKey, EndKey},
[Block1, Block2, MidBlockList, Block4, Block5]) ->
AllBefore = leveled_codec:endkey_passed(EndKey, MidFirst),
NoneAfter = leveled_codec:endkey_passed(EndKey, MidLast),
case {AllBefore, NoneAfter} of
{true, true} ->
[Block1, Block2];
{false, true} ->
[Block1, Block2, MidBlockList];
{false, false} ->
[Block1, Block2, MidBlockList, Block4, Block5]
end.
crc_check_slot(FullBin) ->
<<CRC32PBL:32/integer,
@ -2188,10 +2198,11 @@ fetchfrom_rawblock(_BlockPos, []) ->
fetchfrom_rawblock(BlockPos, RawBlock) ->
lists:nth(BlockPos, RawBlock).
fetchend_rawblock([]) ->
not_present;
fetchend_rawblock(RawBlock) ->
lists:last(RawBlock).
fetchends_rawblock([]) ->
{not_present, not_present};
fetchends_rawblock(RawBlock) ->
{element(1, lists:nth(1, RawBlock)),
element(1, lists:last(RawBlock))}.
revert_position(Pos) ->
@ -3540,7 +3551,58 @@ stop_whenstarter_stopped_testto() ->
end
end,
?assertMatch(false, lists:foldl(TestFun, true, [10000, 2000, 2000, 2000])).
corrupted_block_range_test() ->
corrupted_block_rangetester(native, 100),
corrupted_block_rangetester(lz4, 100),
corrupted_block_rangetester(none, 100).
corrupted_block_rangetester(PressMethod, TestCount) ->
N = 100,
KVL1 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 2)),
RandomRangesFun =
fun(_X) ->
SKint = leveled_rand:uniform(90) + 1,
EKint = min(N, leveled_rand:uniform(N - SKint)),
SK = element(1, lists:nth(SKint, KVL1)),
EK = element(1, lists:nth(EKint, KVL1)),
{SK, EK}
end,
RandomRanges = lists:map(RandomRangesFun, lists:seq(1, TestCount)),
B1 = serialise_block(lists:sublist(KVL1, 1, 20), PressMethod),
B2 = serialise_block(lists:sublist(KVL1, 21, 20), PressMethod),
MidBlock = serialise_block(lists:sublist(KVL1, 41, 20), PressMethod),
B4 = serialise_block(lists:sublist(KVL1, 61, 20), PressMethod),
B5 = serialise_block(lists:sublist(KVL1, 81, 20), PressMethod),
CorruptBlockFun =
fun(Block) ->
case leveled_rand:uniform(10) < 2 of
true ->
flip_byte(Block, 0 , byte_size(Block));
false ->
Block
end
end,
CheckFun =
fun({SK, EK}) ->
InputBlocks =
lists:map(CorruptBlockFun, [B1, B2, MidBlock, B4, B5]),
BR = blocks_required({SK, EK}, InputBlocks, PressMethod),
?assertMatch(true, length(BR) =< 5),
BlockListFun =
fun(B) ->
case is_binary(B) of
true ->
deserialise_block(B, PressMethod);
false ->
B
end
end,
BRL = lists:flatten(lists:map(BlockListFun, BR)),
lists:foreach(fun({_K, _V}) -> ok end, BRL)
end,
lists:foreach(CheckFun, RandomRanges).
receive_fun() ->
receive