diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..1078c2e
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,6 @@
+language: erlang
+otp_release:
+ - 20.3.8
+script:
+ - chmod u+x rebar3
+ - ./rebar3 do upgrade, compile, dialyzer, xref, eunit
diff --git a/README.md b/README.md
index 44624cd..1cfc224 100644
--- a/README.md
+++ b/README.md
@@ -2,6 +2,8 @@
## Introduction
+[](https://travis-ci.com/martinsumner/leveled)
+
Leveled is a simple Key-Value store based on the concept of Log-Structured Merge Trees, with the following characteristics:
- Optimised for workloads with larger values (e.g. > 4KB).
diff --git a/rebar3 b/rebar3
new file mode 100755
index 0000000..00494f9
Binary files /dev/null and b/rebar3 differ
diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl
index 35b8690..92eac7c 100644
--- a/src/leveled_sst.erl
+++ b/src/leveled_sst.erl
@@ -1388,7 +1388,7 @@ serialise_block(Term, none) ->
%% compression methods.
%%
%% If CRC check fails we treat all the data as missing
-deserialise_block(Bin, PressMethod) ->
+deserialise_block(Bin, PressMethod) when byte_size(Bin) > 4 ->
BinS = byte_size(Bin) - 4,
<> = Bin,
case hmac(TermBin) of
@@ -1396,7 +1396,9 @@ deserialise_block(Bin, PressMethod) ->
deserialise_checkedblock(TermBin, PressMethod);
_ ->
[]
- end.
+ end;
+deserialise_block(_Bin, _PM) ->
+ [].
deserialise_checkedblock(Bin, lz4) ->
{ok, Bin0} = lz4:unpack(Bin),
@@ -1670,14 +1672,18 @@ generate_binary_slot(Lookup, KVL, PressMethod, IndexModDate, BuildTimings0) ->
binary()|{file:io_device(), integer()},
binary(),
integer(),
- leveled_codec:ledger_key()|false,
+ leveled_codec:ledger_key()|false,
+ %% if false the acc is a list, and if true
+ %% Acc will be initially not_present, and may
+ %% result in a {K, V} tuple
press_method(),
boolean(),
- list()|not_present) -> list()|not_present.
+ list()|not_present) ->
+ list(leveled_codec:ledger_kv())|
+ not_present|leveled_codec:ledger_kv().
%% @doc
%% Acc should start as not_present if LedgerKey is a key, and a list if
%% LedgerKey is false
-
check_blocks([], _BlockPointer, _BlockLengths, _PosBinLength,
_LedgerKeyToCheck, _PressMethod, _IdxModDate, not_present) ->
not_present;
@@ -1693,24 +1699,20 @@ check_blocks([Pos|Rest], BlockPointer, BlockLengths, PosBinLength,
PosBinLength,
BlockNumber,
additional_offset(IdxModDate)),
- BlockL = deserialise_block(BlockBin, PressMethod),
- {K, V} = lists:nth(BlockPos, BlockL),
- case K of
- LedgerKeyToCheck ->
+ R = fetchfrom_rawblock(BlockPos, deserialise_block(BlockBin, PressMethod)),
+ case {R, LedgerKeyToCheck} of
+ {{K, V}, K} ->
{K, V};
+ {{K, V}, false} ->
+ check_blocks(Rest, BlockPointer,
+ BlockLengths, PosBinLength,
+ LedgerKeyToCheck, PressMethod, IdxModDate,
+ [{K, V}|Acc]);
_ ->
- case LedgerKeyToCheck of
- false ->
- check_blocks(Rest, BlockPointer,
- BlockLengths, PosBinLength,
- LedgerKeyToCheck, PressMethod, IdxModDate,
- [{K, V}|Acc]);
- _ ->
- check_blocks(Rest, BlockPointer,
- BlockLengths, PosBinLength,
- LedgerKeyToCheck, PressMethod, IdxModDate,
- Acc)
- end
+ check_blocks(Rest, BlockPointer,
+ BlockLengths, PosBinLength,
+ LedgerKeyToCheck, PressMethod, IdxModDate,
+ Acc)
end.
-spec additional_offset(boolean()) -> pos_integer().
@@ -2029,10 +2031,13 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
false ->
Block
end,
- case fetchend_rawblock(BlockList) of
- {LastKey, _LV} when StartKey > LastKey ->
+ case fetchends_rawblock(BlockList) of
+ {_, LastKey} when StartKey > LastKey ->
+ %% This includes the case when LastKey is
+ %% not_present due to corruption in the BlockList
+ %% as tuple is > not_present.
{Acc, true};
- {LastKey, _LV} ->
+ {_, LastKey} ->
{_LDrop, RKeep} = lists:splitwith(LTrimFun,
BlockList),
case leveled_codec:endkey_passed(EndKey,
@@ -2043,9 +2048,7 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
{Acc ++ LKeep, false};
false ->
{Acc ++ RKeep, true}
- end;
- _ ->
- {Acc, true}
+ end
end;
{_ , false} ->
{Acc, false}
@@ -2073,39 +2076,15 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
MidBlock:B3L/binary,
Block4:B4L/binary, Block5:B5L/binary>> = Blocks,
BlocksToCheck =
- case B3L of
- 0 ->
- [Block1, Block2];
- _ ->
- MidBlockList =
- deserialise_block(MidBlock, PressMethod),
- {MidFirst, _} = lists:nth(1, MidBlockList),
- {MidLast, _} = lists:last(MidBlockList),
- Split = {StartKey > MidLast,
- StartKey >= MidFirst,
- leveled_codec:endkey_passed(EndKey,
- MidFirst),
- leveled_codec:endkey_passed(EndKey,
- MidLast)},
- case Split of
- {true, _, _, _} ->
- [Block4, Block5];
- {false, true, false, true} ->
- [MidBlockList];
- {false, true, false, false} ->
- [MidBlockList, Block4, Block5];
- {false, false, true, true} ->
- [Block1, Block2];
- {false, false, false, true} ->
- [Block1, Block2, MidBlockList];
- _ ->
- [Block1, Block2, MidBlockList, Block4, Block5]
- end
- end,
- {Acc, _Continue} = lists:foldl(BlockCheckFun, {[], true}, BlocksToCheck),
+ blocks_required({StartKey, EndKey},
+ [Block1, Block2, MidBlock, Block4, Block5],
+ PressMethod),
+ {Acc, _Continue} =
+ lists:foldl(BlockCheckFun, {[], true}, BlocksToCheck),
{Acc, none};
{{Header, _Blocks}, SegList} ->
- {BlockLengths, _LMD, BlockIdx} = extract_header(Header, IdxModDate),
+ {BlockLengths, _LMD, BlockIdx} =
+ extract_header(Header, IdxModDate),
PosList = find_pos(BlockIdx, SegList, [], 0),
KVL = check_blocks(PosList,
FullBin,
@@ -2121,6 +2100,41 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey,
end.
+blocks_required({StartKey, EndKey}, [B1, B2, MidBlock, B4, B5], PressMethod) ->
+ MidBlockList = deserialise_block(MidBlock, PressMethod),
+ filter_blocks_required(fetchends_rawblock(MidBlockList),
+ {StartKey, EndKey},
+ [B1, B2, MidBlockList, B4, B5]).
+
+filter_blocks_required({not_present, not_present}, _RangeKeys, AllBlocks) ->
+ AllBlocks;
+filter_blocks_required({_MidFirst, MidLast}, {StartKey, _EndKey},
+ [_Block1, _Block2, _MidBlockList, Block4, Block5])
+ when StartKey > MidLast ->
+ [Block4, Block5];
+filter_blocks_required({MidFirst, MidLast}, {StartKey, EndKey},
+ [_Block1, _Block2, MidBlockList, Block4, Block5])
+ when StartKey >= MidFirst ->
+ NoneAfter = leveled_codec:endkey_passed(EndKey, MidLast),
+ case NoneAfter of
+ true ->
+ [MidBlockList];
+ false ->
+ [MidBlockList, Block4, Block5]
+ end;
+filter_blocks_required({MidFirst, MidLast}, {_StartKey, EndKey},
+ [Block1, Block2, MidBlockList, Block4, Block5]) ->
+ AllBefore = leveled_codec:endkey_passed(EndKey, MidFirst),
+ NoneAfter = leveled_codec:endkey_passed(EndKey, MidLast),
+ case {AllBefore, NoneAfter} of
+ {true, true} ->
+ [Block1, Block2];
+ {false, true} ->
+ [Block1, Block2, MidBlockList];
+ {false, false} ->
+ [Block1, Block2, MidBlockList, Block4, Block5]
+ end.
+
crc_check_slot(FullBin) ->
<
fetch_value(Rest, BlockLengths, Blocks, Key, PressMethod)
end.
-fetchfrom_rawblock(_BlockPos, []) ->
+-spec fetchfrom_rawblock(pos_integer(), list(leveled_codec:ledger_kv())) ->
+ not_present|leveled_codec:ledger_kv().
+%% @doc
+%% Fetch from a deserialised block, but accounting for potential corruption
+%% in that block which may lead to it returning as an empty list if that
+%% corruption is detected by the deserialising function
+fetchfrom_rawblock(BlockPos, RawBlock) when BlockPos > length(RawBlock) ->
+ %% Capture the slightly more general case than this being an empty list
+ %% in case of some other unexpected misalignement that would otherwise
+ %% crash the leveled_sst file process
not_present;
fetchfrom_rawblock(BlockPos, RawBlock) ->
lists:nth(BlockPos, RawBlock).
-fetchend_rawblock([]) ->
- not_present;
-fetchend_rawblock(RawBlock) ->
- lists:last(RawBlock).
+-spec fetchends_rawblock(list(leveled_codec:ledger_kv())) ->
+ {not_present, not_present}|
+ {leveled_codec:ledger_key(), leveled_codec:ledger_key()}.
+%% @doc
+%% Fetch the first and last key from a block, and not_present if the block
+%% is empty (rather than crashing)
+fetchends_rawblock([]) ->
+ {not_present, not_present};
+fetchends_rawblock(RawBlock) ->
+ {element(1, lists:nth(1, RawBlock)),
+ element(1, lists:last(RawBlock))}.
revert_position(Pos) ->
@@ -3540,7 +3570,115 @@ stop_whenstarter_stopped_testto() ->
end
end,
?assertMatch(false, lists:foldl(TestFun, true, [10000, 2000, 2000, 2000])).
+
+corrupted_block_range_test() ->
+ corrupted_block_rangetester(native, 100),
+ corrupted_block_rangetester(lz4, 100),
+ corrupted_block_rangetester(none, 100).
+
+corrupted_block_rangetester(PressMethod, TestCount) ->
+ N = 100,
+ KVL1 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 2)),
+ RandomRangesFun =
+ fun(_X) ->
+ SKint = leveled_rand:uniform(90) + 1,
+ EKint = min(N, leveled_rand:uniform(N - SKint)),
+ SK = element(1, lists:nth(SKint, KVL1)),
+ EK = element(1, lists:nth(EKint, KVL1)),
+ {SK, EK}
+ end,
+ RandomRanges = lists:map(RandomRangesFun, lists:seq(1, TestCount)),
+ B1 = serialise_block(lists:sublist(KVL1, 1, 20), PressMethod),
+ B2 = serialise_block(lists:sublist(KVL1, 21, 20), PressMethod),
+ MidBlock = serialise_block(lists:sublist(KVL1, 41, 20), PressMethod),
+ B4 = serialise_block(lists:sublist(KVL1, 61, 20), PressMethod),
+ B5 = serialise_block(lists:sublist(KVL1, 81, 20), PressMethod),
+ CorruptBlockFun =
+ fun(Block) ->
+ case leveled_rand:uniform(10) < 2 of
+ true ->
+ flip_byte(Block, 0 , byte_size(Block));
+ false ->
+ Block
+ end
+ end,
+ CheckFun =
+ fun({SK, EK}) ->
+ InputBlocks =
+ lists:map(CorruptBlockFun, [B1, B2, MidBlock, B4, B5]),
+ BR = blocks_required({SK, EK}, InputBlocks, PressMethod),
+ ?assertMatch(true, length(BR) =< 5),
+ BlockListFun =
+ fun(B) ->
+ case is_binary(B) of
+ true ->
+ deserialise_block(B, PressMethod);
+ false ->
+ B
+ end
+ end,
+ BRL = lists:flatten(lists:map(BlockListFun, BR)),
+ lists:foreach(fun({_K, _V}) -> ok end, BRL)
+ end,
+ lists:foreach(CheckFun, RandomRanges).
+
+corrupted_block_fetch_test() ->
+ corrupted_block_fetch_tester(native),
+ corrupted_block_fetch_tester(lz4),
+ corrupted_block_fetch_tester(none).
+
+corrupted_block_fetch_tester(PressMethod) ->
+ KC = 120,
+ KVL1 = lists:ukeysort(1, generate_randomkeys(1, KC, 1, 2)),
+
+ {{Header, SlotBin, _HashL, _LastKey}, _BT} =
+ generate_binary_slot(lookup, KVL1, PressMethod, false, no_timing),
+ <> = Header,
+ HS = byte_size(Header),
+
+ <> = SlotBin,
+
+ CorruptB3 = flip_byte(B3, 0 , B3L),
+ CorruptSlotBin =
+ <>,
+
+ CheckFun =
+ fun(N, {AccHit, AccMiss}) ->
+ PosL = [min(0, leveled_rand:uniform(N - 2)), N - 1],
+ {LK, LV} = lists:nth(N, KVL1),
+ {BlockLengths, 0, PosBinIndex} =
+ extract_header(Header, false),
+ R = check_blocks(PosL,
+ CorruptSlotBin,
+ BlockLengths,
+ byte_size(PosBinIndex),
+ LK,
+ PressMethod,
+ false,
+ not_present),
+ case R of
+ not_present ->
+ {AccHit, AccMiss + 1};
+ {LK, LV} ->
+ {AccHit + 1, AccMiss}
+ end
+ end,
+ {_HitCount, MissCount} =
+ lists:foldl(CheckFun, {0, 0}, lists:seq(16, length(KVL1))),
+ ExpectedMisses = element(2, ?LOOK_BLOCKSIZE),
+ ?assertMatch(ExpectedMisses, MissCount).
+
receive_fun() ->
receive