Merge pull request #299 from martinsumner/mas-i298-corruptedblocks
Mas i298 corruptedblocks
This commit is contained in:
commit
4b967bee4a
1 changed files with 201 additions and 63 deletions
|
@ -1388,7 +1388,7 @@ serialise_block(Term, none) ->
|
||||||
%% compression methods.
|
%% compression methods.
|
||||||
%%
|
%%
|
||||||
%% If CRC check fails we treat all the data as missing
|
%% If CRC check fails we treat all the data as missing
|
||||||
deserialise_block(Bin, PressMethod) ->
|
deserialise_block(Bin, PressMethod) when byte_size(Bin) > 4 ->
|
||||||
BinS = byte_size(Bin) - 4,
|
BinS = byte_size(Bin) - 4,
|
||||||
<<TermBin:BinS/binary, CRC32:32/integer>> = Bin,
|
<<TermBin:BinS/binary, CRC32:32/integer>> = Bin,
|
||||||
case hmac(TermBin) of
|
case hmac(TermBin) of
|
||||||
|
@ -1396,7 +1396,9 @@ deserialise_block(Bin, PressMethod) ->
|
||||||
deserialise_checkedblock(TermBin, PressMethod);
|
deserialise_checkedblock(TermBin, PressMethod);
|
||||||
_ ->
|
_ ->
|
||||||
[]
|
[]
|
||||||
end.
|
end;
|
||||||
|
deserialise_block(_Bin, _PM) ->
|
||||||
|
[].
|
||||||
|
|
||||||
deserialise_checkedblock(Bin, lz4) ->
|
deserialise_checkedblock(Bin, lz4) ->
|
||||||
{ok, Bin0} = lz4:unpack(Bin),
|
{ok, Bin0} = lz4:unpack(Bin),
|
||||||
|
@ -1671,13 +1673,17 @@ generate_binary_slot(Lookup, KVL, PressMethod, IndexModDate, BuildTimings0) ->
|
||||||
binary(),
|
binary(),
|
||||||
integer(),
|
integer(),
|
||||||
leveled_codec:ledger_key()|false,
|
leveled_codec:ledger_key()|false,
|
||||||
|
%% if false the acc is a list, and if true
|
||||||
|
%% Acc will be initially not_present, and may
|
||||||
|
%% result in a {K, V} tuple
|
||||||
press_method(),
|
press_method(),
|
||||||
boolean(),
|
boolean(),
|
||||||
list()|not_present) -> list()|not_present.
|
list()|not_present) ->
|
||||||
|
list(leveled_codec:ledger_kv())|
|
||||||
|
not_present|leveled_codec:ledger_kv().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Acc should start as not_present if LedgerKey is a key, and a list if
|
%% Acc should start as not_present if LedgerKey is a key, and a list if
|
||||||
%% LedgerKey is false
|
%% LedgerKey is false
|
||||||
|
|
||||||
check_blocks([], _BlockPointer, _BlockLengths, _PosBinLength,
|
check_blocks([], _BlockPointer, _BlockLengths, _PosBinLength,
|
||||||
_LedgerKeyToCheck, _PressMethod, _IdxModDate, not_present) ->
|
_LedgerKeyToCheck, _PressMethod, _IdxModDate, not_present) ->
|
||||||
not_present;
|
not_present;
|
||||||
|
@ -1693,14 +1699,11 @@ check_blocks([Pos|Rest], BlockPointer, BlockLengths, PosBinLength,
|
||||||
PosBinLength,
|
PosBinLength,
|
||||||
BlockNumber,
|
BlockNumber,
|
||||||
additional_offset(IdxModDate)),
|
additional_offset(IdxModDate)),
|
||||||
BlockL = deserialise_block(BlockBin, PressMethod),
|
R = fetchfrom_rawblock(BlockPos, deserialise_block(BlockBin, PressMethod)),
|
||||||
{K, V} = lists:nth(BlockPos, BlockL),
|
case {R, LedgerKeyToCheck} of
|
||||||
case K of
|
{{K, V}, K} ->
|
||||||
LedgerKeyToCheck ->
|
|
||||||
{K, V};
|
{K, V};
|
||||||
_ ->
|
{{K, V}, false} ->
|
||||||
case LedgerKeyToCheck of
|
|
||||||
false ->
|
|
||||||
check_blocks(Rest, BlockPointer,
|
check_blocks(Rest, BlockPointer,
|
||||||
BlockLengths, PosBinLength,
|
BlockLengths, PosBinLength,
|
||||||
LedgerKeyToCheck, PressMethod, IdxModDate,
|
LedgerKeyToCheck, PressMethod, IdxModDate,
|
||||||
|
@ -1710,7 +1713,6 @@ check_blocks([Pos|Rest], BlockPointer, BlockLengths, PosBinLength,
|
||||||
BlockLengths, PosBinLength,
|
BlockLengths, PosBinLength,
|
||||||
LedgerKeyToCheck, PressMethod, IdxModDate,
|
LedgerKeyToCheck, PressMethod, IdxModDate,
|
||||||
Acc)
|
Acc)
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec additional_offset(boolean()) -> pos_integer().
|
-spec additional_offset(boolean()) -> pos_integer().
|
||||||
|
@ -2029,10 +2031,13 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
|
||||||
false ->
|
false ->
|
||||||
Block
|
Block
|
||||||
end,
|
end,
|
||||||
case fetchend_rawblock(BlockList) of
|
case fetchends_rawblock(BlockList) of
|
||||||
{LastKey, _LV} when StartKey > LastKey ->
|
{_, LastKey} when StartKey > LastKey ->
|
||||||
|
%% This includes the case when LastKey is
|
||||||
|
%% not_present due to corruption in the BlockList
|
||||||
|
%% as tuple is > not_present.
|
||||||
{Acc, true};
|
{Acc, true};
|
||||||
{LastKey, _LV} ->
|
{_, LastKey} ->
|
||||||
{_LDrop, RKeep} = lists:splitwith(LTrimFun,
|
{_LDrop, RKeep} = lists:splitwith(LTrimFun,
|
||||||
BlockList),
|
BlockList),
|
||||||
case leveled_codec:endkey_passed(EndKey,
|
case leveled_codec:endkey_passed(EndKey,
|
||||||
|
@ -2043,9 +2048,7 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
|
||||||
{Acc ++ LKeep, false};
|
{Acc ++ LKeep, false};
|
||||||
false ->
|
false ->
|
||||||
{Acc ++ RKeep, true}
|
{Acc ++ RKeep, true}
|
||||||
end;
|
end
|
||||||
_ ->
|
|
||||||
{Acc, true}
|
|
||||||
end;
|
end;
|
||||||
{_ , false} ->
|
{_ , false} ->
|
||||||
{Acc, false}
|
{Acc, false}
|
||||||
|
@ -2073,39 +2076,15 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
|
||||||
MidBlock:B3L/binary,
|
MidBlock:B3L/binary,
|
||||||
Block4:B4L/binary, Block5:B5L/binary>> = Blocks,
|
Block4:B4L/binary, Block5:B5L/binary>> = Blocks,
|
||||||
BlocksToCheck =
|
BlocksToCheck =
|
||||||
case B3L of
|
blocks_required({StartKey, EndKey},
|
||||||
0 ->
|
[Block1, Block2, MidBlock, Block4, Block5],
|
||||||
[Block1, Block2];
|
PressMethod),
|
||||||
_ ->
|
{Acc, _Continue} =
|
||||||
MidBlockList =
|
lists:foldl(BlockCheckFun, {[], true}, BlocksToCheck),
|
||||||
deserialise_block(MidBlock, PressMethod),
|
|
||||||
{MidFirst, _} = lists:nth(1, MidBlockList),
|
|
||||||
{MidLast, _} = lists:last(MidBlockList),
|
|
||||||
Split = {StartKey > MidLast,
|
|
||||||
StartKey >= MidFirst,
|
|
||||||
leveled_codec:endkey_passed(EndKey,
|
|
||||||
MidFirst),
|
|
||||||
leveled_codec:endkey_passed(EndKey,
|
|
||||||
MidLast)},
|
|
||||||
case Split of
|
|
||||||
{true, _, _, _} ->
|
|
||||||
[Block4, Block5];
|
|
||||||
{false, true, false, true} ->
|
|
||||||
[MidBlockList];
|
|
||||||
{false, true, false, false} ->
|
|
||||||
[MidBlockList, Block4, Block5];
|
|
||||||
{false, false, true, true} ->
|
|
||||||
[Block1, Block2];
|
|
||||||
{false, false, false, true} ->
|
|
||||||
[Block1, Block2, MidBlockList];
|
|
||||||
_ ->
|
|
||||||
[Block1, Block2, MidBlockList, Block4, Block5]
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
{Acc, _Continue} = lists:foldl(BlockCheckFun, {[], true}, BlocksToCheck),
|
|
||||||
{Acc, none};
|
{Acc, none};
|
||||||
{{Header, _Blocks}, SegList} ->
|
{{Header, _Blocks}, SegList} ->
|
||||||
{BlockLengths, _LMD, BlockIdx} = extract_header(Header, IdxModDate),
|
{BlockLengths, _LMD, BlockIdx} =
|
||||||
|
extract_header(Header, IdxModDate),
|
||||||
PosList = find_pos(BlockIdx, SegList, [], 0),
|
PosList = find_pos(BlockIdx, SegList, [], 0),
|
||||||
KVL = check_blocks(PosList,
|
KVL = check_blocks(PosList,
|
||||||
FullBin,
|
FullBin,
|
||||||
|
@ -2121,6 +2100,41 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
blocks_required({StartKey, EndKey}, [B1, B2, MidBlock, B4, B5], PressMethod) ->
|
||||||
|
MidBlockList = deserialise_block(MidBlock, PressMethod),
|
||||||
|
filter_blocks_required(fetchends_rawblock(MidBlockList),
|
||||||
|
{StartKey, EndKey},
|
||||||
|
[B1, B2, MidBlockList, B4, B5]).
|
||||||
|
|
||||||
|
filter_blocks_required({not_present, not_present}, _RangeKeys, AllBlocks) ->
|
||||||
|
AllBlocks;
|
||||||
|
filter_blocks_required({_MidFirst, MidLast}, {StartKey, _EndKey},
|
||||||
|
[_Block1, _Block2, _MidBlockList, Block4, Block5])
|
||||||
|
when StartKey > MidLast ->
|
||||||
|
[Block4, Block5];
|
||||||
|
filter_blocks_required({MidFirst, MidLast}, {StartKey, EndKey},
|
||||||
|
[_Block1, _Block2, MidBlockList, Block4, Block5])
|
||||||
|
when StartKey >= MidFirst ->
|
||||||
|
NoneAfter = leveled_codec:endkey_passed(EndKey, MidLast),
|
||||||
|
case NoneAfter of
|
||||||
|
true ->
|
||||||
|
[MidBlockList];
|
||||||
|
false ->
|
||||||
|
[MidBlockList, Block4, Block5]
|
||||||
|
end;
|
||||||
|
filter_blocks_required({MidFirst, MidLast}, {_StartKey, EndKey},
|
||||||
|
[Block1, Block2, MidBlockList, Block4, Block5]) ->
|
||||||
|
AllBefore = leveled_codec:endkey_passed(EndKey, MidFirst),
|
||||||
|
NoneAfter = leveled_codec:endkey_passed(EndKey, MidLast),
|
||||||
|
case {AllBefore, NoneAfter} of
|
||||||
|
{true, true} ->
|
||||||
|
[Block1, Block2];
|
||||||
|
{false, true} ->
|
||||||
|
[Block1, Block2, MidBlockList];
|
||||||
|
{false, false} ->
|
||||||
|
[Block1, Block2, MidBlockList, Block4, Block5]
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
crc_check_slot(FullBin) ->
|
crc_check_slot(FullBin) ->
|
||||||
<<CRC32PBL:32/integer,
|
<<CRC32PBL:32/integer,
|
||||||
|
@ -2183,15 +2197,31 @@ fetch_value([Pos|Rest], BlockLengths, Blocks, Key, PressMethod) ->
|
||||||
fetch_value(Rest, BlockLengths, Blocks, Key, PressMethod)
|
fetch_value(Rest, BlockLengths, Blocks, Key, PressMethod)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
fetchfrom_rawblock(_BlockPos, []) ->
|
-spec fetchfrom_rawblock(pos_integer(), list(leveled_codec:ledger_kv())) ->
|
||||||
|
not_present|leveled_codec:ledger_kv().
|
||||||
|
%% @doc
|
||||||
|
%% Fetch from a deserialised block, but accounting for potential corruption
|
||||||
|
%% in that block which may lead to it returning as an empty list if that
|
||||||
|
%% corruption is detected by the deserialising function
|
||||||
|
fetchfrom_rawblock(BlockPos, RawBlock) when BlockPos > length(RawBlock) ->
|
||||||
|
%% Capture the slightly more general case than this being an empty list
|
||||||
|
%% in case of some other unexpected misalignement that would otherwise
|
||||||
|
%% crash the leveled_sst file process
|
||||||
not_present;
|
not_present;
|
||||||
fetchfrom_rawblock(BlockPos, RawBlock) ->
|
fetchfrom_rawblock(BlockPos, RawBlock) ->
|
||||||
lists:nth(BlockPos, RawBlock).
|
lists:nth(BlockPos, RawBlock).
|
||||||
|
|
||||||
fetchend_rawblock([]) ->
|
-spec fetchends_rawblock(list(leveled_codec:ledger_kv())) ->
|
||||||
not_present;
|
{not_present, not_present}|
|
||||||
fetchend_rawblock(RawBlock) ->
|
{leveled_codec:ledger_key(), leveled_codec:ledger_key()}.
|
||||||
lists:last(RawBlock).
|
%% @doc
|
||||||
|
%% Fetch the first and last key from a block, and not_present if the block
|
||||||
|
%% is empty (rather than crashing)
|
||||||
|
fetchends_rawblock([]) ->
|
||||||
|
{not_present, not_present};
|
||||||
|
fetchends_rawblock(RawBlock) ->
|
||||||
|
{element(1, lists:nth(1, RawBlock)),
|
||||||
|
element(1, lists:last(RawBlock))}.
|
||||||
|
|
||||||
|
|
||||||
revert_position(Pos) ->
|
revert_position(Pos) ->
|
||||||
|
@ -3541,6 +3571,114 @@ stop_whenstarter_stopped_testto() ->
|
||||||
end,
|
end,
|
||||||
?assertMatch(false, lists:foldl(TestFun, true, [10000, 2000, 2000, 2000])).
|
?assertMatch(false, lists:foldl(TestFun, true, [10000, 2000, 2000, 2000])).
|
||||||
|
|
||||||
|
corrupted_block_range_test() ->
|
||||||
|
corrupted_block_rangetester(native, 100),
|
||||||
|
corrupted_block_rangetester(lz4, 100),
|
||||||
|
corrupted_block_rangetester(none, 100).
|
||||||
|
|
||||||
|
corrupted_block_rangetester(PressMethod, TestCount) ->
|
||||||
|
N = 100,
|
||||||
|
KVL1 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 2)),
|
||||||
|
RandomRangesFun =
|
||||||
|
fun(_X) ->
|
||||||
|
SKint = leveled_rand:uniform(90) + 1,
|
||||||
|
EKint = min(N, leveled_rand:uniform(N - SKint)),
|
||||||
|
SK = element(1, lists:nth(SKint, KVL1)),
|
||||||
|
EK = element(1, lists:nth(EKint, KVL1)),
|
||||||
|
{SK, EK}
|
||||||
|
end,
|
||||||
|
RandomRanges = lists:map(RandomRangesFun, lists:seq(1, TestCount)),
|
||||||
|
B1 = serialise_block(lists:sublist(KVL1, 1, 20), PressMethod),
|
||||||
|
B2 = serialise_block(lists:sublist(KVL1, 21, 20), PressMethod),
|
||||||
|
MidBlock = serialise_block(lists:sublist(KVL1, 41, 20), PressMethod),
|
||||||
|
B4 = serialise_block(lists:sublist(KVL1, 61, 20), PressMethod),
|
||||||
|
B5 = serialise_block(lists:sublist(KVL1, 81, 20), PressMethod),
|
||||||
|
CorruptBlockFun =
|
||||||
|
fun(Block) ->
|
||||||
|
case leveled_rand:uniform(10) < 2 of
|
||||||
|
true ->
|
||||||
|
flip_byte(Block, 0 , byte_size(Block));
|
||||||
|
false ->
|
||||||
|
Block
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
CheckFun =
|
||||||
|
fun({SK, EK}) ->
|
||||||
|
InputBlocks =
|
||||||
|
lists:map(CorruptBlockFun, [B1, B2, MidBlock, B4, B5]),
|
||||||
|
BR = blocks_required({SK, EK}, InputBlocks, PressMethod),
|
||||||
|
?assertMatch(true, length(BR) =< 5),
|
||||||
|
BlockListFun =
|
||||||
|
fun(B) ->
|
||||||
|
case is_binary(B) of
|
||||||
|
true ->
|
||||||
|
deserialise_block(B, PressMethod);
|
||||||
|
false ->
|
||||||
|
B
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
BRL = lists:flatten(lists:map(BlockListFun, BR)),
|
||||||
|
lists:foreach(fun({_K, _V}) -> ok end, BRL)
|
||||||
|
end,
|
||||||
|
lists:foreach(CheckFun, RandomRanges).
|
||||||
|
|
||||||
|
corrupted_block_fetch_test() ->
|
||||||
|
corrupted_block_fetch_tester(native),
|
||||||
|
corrupted_block_fetch_tester(lz4),
|
||||||
|
corrupted_block_fetch_tester(none).
|
||||||
|
|
||||||
|
corrupted_block_fetch_tester(PressMethod) ->
|
||||||
|
KC = 120,
|
||||||
|
KVL1 = lists:ukeysort(1, generate_randomkeys(1, KC, 1, 2)),
|
||||||
|
|
||||||
|
{{Header, SlotBin, _HashL, _LastKey}, _BT} =
|
||||||
|
generate_binary_slot(lookup, KVL1, PressMethod, false, no_timing),
|
||||||
|
<<B1L:32/integer,
|
||||||
|
B2L:32/integer,
|
||||||
|
B3L:32/integer,
|
||||||
|
B4L:32/integer,
|
||||||
|
B5L:32/integer,
|
||||||
|
PosBinIndex/binary>> = Header,
|
||||||
|
HS = byte_size(Header),
|
||||||
|
|
||||||
|
<<CheckB1P:32/integer, B1P:32/integer,
|
||||||
|
CheckH:32/integer, Header:HS/binary,
|
||||||
|
B1:B1L/binary, B2:B2L/binary, B3:B3L/binary,
|
||||||
|
B4:B4L/binary, B5:B5L/binary>> = SlotBin,
|
||||||
|
|
||||||
|
CorruptB3 = flip_byte(B3, 0 , B3L),
|
||||||
|
CorruptSlotBin =
|
||||||
|
<<CheckB1P:32/integer, B1P:32/integer,
|
||||||
|
CheckH:32/integer, Header/binary,
|
||||||
|
B1/binary, B2/binary, CorruptB3/binary, B4/binary, B5/binary>>,
|
||||||
|
|
||||||
|
CheckFun =
|
||||||
|
fun(N, {AccHit, AccMiss}) ->
|
||||||
|
PosL = [min(0, leveled_rand:uniform(N - 2)), N - 1],
|
||||||
|
{LK, LV} = lists:nth(N, KVL1),
|
||||||
|
{BlockLengths, 0, PosBinIndex} =
|
||||||
|
extract_header(Header, false),
|
||||||
|
R = check_blocks(PosL,
|
||||||
|
CorruptSlotBin,
|
||||||
|
BlockLengths,
|
||||||
|
byte_size(PosBinIndex),
|
||||||
|
LK,
|
||||||
|
PressMethod,
|
||||||
|
false,
|
||||||
|
not_present),
|
||||||
|
case R of
|
||||||
|
not_present ->
|
||||||
|
{AccHit, AccMiss + 1};
|
||||||
|
{LK, LV} ->
|
||||||
|
{AccHit + 1, AccMiss}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{_HitCount, MissCount} =
|
||||||
|
lists:foldl(CheckFun, {0, 0}, lists:seq(16, length(KVL1))),
|
||||||
|
ExpectedMisses = element(2, ?LOOK_BLOCKSIZE),
|
||||||
|
?assertMatch(ExpectedMisses, MissCount).
|
||||||
|
|
||||||
|
|
||||||
receive_fun() ->
|
receive_fun() ->
|
||||||
receive
|
receive
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue