diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 165d7ad..d6c18a8 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -79,6 +79,8 @@ -define(TIMING_SAMPLECOUNTDOWN, 10000). -define(TIMING_SAMPLESIZE, 100). -define(CACHE_SIZE, 32). +-define(BLOCK_LENGTHS_LENGTH, 20). +-define(FLIPPER32, 4294967295). -include_lib("eunit/include/eunit.hrl"). @@ -658,23 +660,22 @@ fetch(LedgerKey, Hash, State, Timings0) -> CachedBlockIdx = array:get(SlotID - 1, State#state.blockindex_cache), {SW2, Timings2} = update_timings(SW1, Timings1, lookup_cache, true), + BL = ?BLOCK_LENGTHS_LENGTH, case CachedBlockIdx of none -> SlotBin = read_slot(State#state.handle, Slot), - {Result, BlockLengths, BlockIdx} = + {Result, Header} = binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod), BlockIndexCache = - array:set(SlotID - 1, - <>, - State#state.blockindex_cache), + array:set(SlotID - 1, Header, State#state.blockindex_cache), {_SW3, Timings3} = update_timings(SW2, Timings2, noncached_block, false), {Result, State#state{blockindex_cache = BlockIndexCache}, Timings3}; - <> -> - PosList = find_pos(BlockIdx, extra_hash(Hash), [], 0), + <> -> + PosList = find_pos(PosBin, extra_hash(Hash), [], 0), case PosList of [] -> {_SW3, Timings3} = @@ -700,6 +701,7 @@ fetch(LedgerKey, Hash, State, Timings0) -> State#state.handle, StartPos, BlockLengths, + byte_size(PosBin), LedgerKey, PressMethod, not_present), @@ -865,12 +867,12 @@ build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN, Bloom) -> SummBin = term_to_binary({Summary, Bloom, lists:reverse(SlotIndex)}, ?BINARY_SETTINGS), - SummCRC = erlang:crc32(SummBin), + SummCRC = hmac(SummBin), <>. read_table_summary(BinWithCheck) -> <> = BinWithCheck, - CRCCheck = erlang:crc32(SummBin), + CRCCheck = hmac(SummBin), if CRCCheck == SummCRC -> % If not might it might be possible to rebuild from all the slots @@ -938,24 +940,46 @@ generate_filenames(RootFilename) -> %% checks serialise_block(Term, lz4) -> {ok, Bin} = lz4:pack(term_to_binary(Term)), - Bin; + CRC32 = hmac(Bin), + <>; serialise_block(Term, native) -> - term_to_binary(Term, ?BINARY_SETTINGS). + Bin = term_to_binary(Term, ?BINARY_SETTINGS), + CRC32 = hmac(Bin), + <>. -spec deserialise_block(binary(), press_methods()) -> any(). %% @doc %% Convert binary to term %% Function split out to make it easier to experiment with different -%% compression methods. Also, perhaps standardise applictaion of CRC -%% checks -deserialise_block(Bin, lz4) -> +%% compression methods. +%% +%% If CRC check fails we treat all the data as missing +deserialise_block(Bin, PressMethod) -> + BinS = byte_size(Bin) - 4, + <> = Bin, + case hmac(TermBin) of + CRC32 -> + deserialise_checkedblock(TermBin, PressMethod); + _ -> + [] + end. + +deserialise_checkedblock(Bin, lz4) -> {ok, Bin0} = lz4:unpack(Bin), binary_to_term(Bin0); -deserialise_block(Bin, native) -> +deserialise_checkedblock(Bin, native) -> binary_to_term(Bin). +-spec hmac(binary()|integer()) -> integer(). +%% @doc +%% Perform a CRC check on an input +hmac(Bin) when is_binary(Bin) -> + erlang:crc32(Bin); +hmac(Int) when is_integer(Int) -> + Int bxor ?FLIPPER32. + %%%============================================================================ %%% SlotIndex Implementation %%%============================================================================ @@ -1130,45 +1154,45 @@ generate_binary_slot(Lookup, KVL, PressMethod, BuildTimings0) -> BuildTimings2 = update_buildtimings(SW1, BuildTimings1, slot_serialise), SW2 = os:timestamp(), - B1P = byte_size(PosBinIndex), + B1P = byte_size(PosBinIndex) + ?BLOCK_LENGTHS_LENGTH, + CheckB1P = hmac(B1P), B1L = byte_size(B1), B2L = byte_size(B2), B3L = byte_size(B3), B4L = byte_size(B4), B5L = byte_size(B5), - Lengths = <>, - SlotBin = <>, - CRC32 = erlang:crc32(SlotBin), - FullBin = <>, + B5L:32/integer, + PosBinIndex/binary>>, + CheckH = hmac(Header), + SlotBin = <>, {LastKey, _LV} = lists:last(KVL), BuildTimings3 = update_buildtimings(SW2, BuildTimings2, slot_finish), - {{<>, FullBin, HashL, LastKey}, - BuildTimings3}. + {{Header, SlotBin, HashL, LastKey}, BuildTimings3}. % Acc should start as not_present if LedgerKey is a key, and a list if % LedgerKey is false -check_blocks([], _Handle, _StartPos, _BlockLengths, +check_blocks([], _Handle, _StartPos, _BlockLengths, _PosBinLength, _LedgerKeyToCheck, _PressMethod, Acc) -> Acc; -check_blocks([Pos|Rest], Handle, StartPos, - BlockLengths, LedgerKeyToCheck, PressMethod, Acc) -> +check_blocks([Pos|Rest], Handle, StartPos, BlockLengths, PosBinLength, + LedgerKeyToCheck, PressMethod, Acc) -> {BlockNumber, BlockPos} = revert_position(Pos), BlockBin = read_block(Handle, StartPos, BlockLengths, + PosBinLength, BlockNumber), BlockL = deserialise_block(BlockBin, PressMethod), {K, V} = lists:nth(BlockPos, BlockL), @@ -1180,20 +1204,22 @@ check_blocks([Pos|Rest], Handle, StartPos, false -> Acc ++ [{K, V}]; _ -> - check_blocks(Rest, Handle, StartPos, BlockLengths, + check_blocks(Rest, Handle, StartPos, + BlockLengths, PosBinLength, LedgerKeyToCheck, PressMethod, Acc) end end. -read_block(Handle, StartPos, BlockLengths, BlockID) -> - {BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID), +read_block(Handle, StartPos, BlockLengths, PosBinLength, BlockID) -> + {Offset, Length} = block_offsetandlength(BlockLengths, BlockID), {ok, BlockBin} = file:pread(Handle, StartPos - + BlockPos + Offset - + 28, - % 4-byte CRC, 4 byte pos, 5x4 byte lengths + + PosBinLength + + 32, + % 4-byte CRC, 4-byte pos, + % 4-byte CRC, 5x4 byte lengths Length), BlockBin. @@ -1260,6 +1286,7 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) -> BinMapFun = fun(Pointer, Acc) -> {SP, _L, ID, _SK, _EK} = pointer_mapfun(Pointer), + BL = ?BLOCK_LENGTHS_LENGTH, case array:get(ID - 1, BlockIndexCache) of none -> % If there is an attempt to use the seg list query and the @@ -1270,8 +1297,8 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) -> read_length_list(Handle, [LengthDetails]), MapFun = binarysplit_mapfun(MultiSlotBin, StartPos), Acc ++ [MapFun(LengthDetails)]; - <> -> - % If there is a BlockIndex cached then we cna use it to + <> -> + % If there is a BlockIndex cached then we can use it to % check to see if any of the expected segments are % present without lifting the slot off disk. Also the % fact that we know position can be used to filter out @@ -1279,9 +1306,14 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) -> case find_pos(BlockIdx, SegList, [], 0) of [] -> Acc; - PL -> - Acc ++ check_blocks(PL, Handle, SP, BlockLengths, - false, PressMethod, []) + PositionList -> + Acc ++ + check_blocks(PositionList, + Handle, SP, + BlockLengths, + byte_size(BlockIdx), + false, PressMethod, + []) end end end, @@ -1319,19 +1351,17 @@ read_length_list(Handle, LengthList) -> binaryslot_get(FullBin, Key, Hash, PressMethod) -> case crc_check_slot(FullBin) of - {BlockLengths, Rest} -> - <> = BlockLengths, - <> = Rest, + {Header, Blocks} -> + BL = ?BLOCK_LENGTHS_LENGTH, + <> = Header, PosList = find_pos(PosBinIndex, extra_hash(Hash), [], 0), {fetch_value(PosList, BlockLengths, Blocks, Key, PressMethod), - BlockLengths, - PosBinIndex}; + Header}; crc_wonky -> {not_present, - none, none} end. @@ -1349,14 +1379,13 @@ binaryslot_tolist(FullBin, PressMethod) -> {Out, _Rem} = case crc_check_slot(FullBin) of - {BlockLengths, RestBin} -> - < + <> = BlockLengths, - <<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin, + B5L:32/integer, + _PosBinIndex/binary>> = Header, lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L, B5L]); @@ -1382,17 +1411,16 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> % scenario is hard to do in concise code BlocksToCheck = case crc_check_slot(FullBin) of - {BlockLengths, RestBin} -> - < + <> = BlockLengths, - <<_PosBinIndex:B1P/binary, - Block1:B1L/binary, Block2:B2L/binary, + B5L:32/integer, + _PosBinIndex/binary>> = Header, + <> = RestBin, + Block4:B4L/binary, Block5:B5L/binary>> = Blocks, case B3L of 0 -> [Block1, Block2]; @@ -1440,11 +1468,10 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> false -> Block end, - {LastKey, _LV} = lists:last(BlockList), - case StartKey > LastKey of - true -> + case fetchend_rawblock(BlockList) of + {LastKey, _LV} when StartKey > LastKey -> {Acc, true}; - false -> + {LastKey, _LV} -> {_LDrop, RKeep} = lists:splitwith(LTrimFun, BlockList), case leveled_codec:endkey_passed(EndKey, LastKey) of @@ -1453,7 +1480,9 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> {Acc ++ LKeep, false}; false -> {Acc ++ RKeep, true} - end + end; + _ -> + {Acc, true} end; {_ , false} -> {Acc, false} @@ -1465,46 +1494,47 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> crc_check_slot(FullBin) -> - <> = FullBin, - case erlang:crc32(SlotBin) of - CRC32 -> - <> = SlotBin, - {BlockLengths, Rest}; + <> = FullBin, + case {hmac(Header), hmac(PosBL)} of + {CRC32H, CRC32PBL} -> + {Header, Blocks}; _ -> leveled_log:log("SST09", []), crc_wonky end. block_offsetandlength(BlockLengths, BlockID) -> - <> = BlockLengths, case BlockID of 1 -> - <> = BlockLengths0, - {BlocksPos, 0, B1L}; + <> = BlockLengths, + {0, B1L}; 2 -> - <> = BlockLengths0, - {BlocksPos, B1L, B2L}; + <> = BlockLengths, + {B1L, B2L}; 3 -> <> = BlockLengths0, - {BlocksPos, B1L + B2L, B3L}; + _BR/binary>> = BlockLengths, + {B1L + B2L, B3L}; 4 -> <> = BlockLengths0, - {BlocksPos, B1L + B2L + B3L, B4L}; + _BR/binary>> = BlockLengths, + {B1L + B2L + B3L, B4L}; 5 -> <> = BlockLengths0, - {BlocksPos, B1L + B2L + B3L + B4L, B5L} + B5L:32/integer>> = BlockLengths, + {B1L + B2L + B3L + B4L, B5L} end. extra_hash({SegHash, _ExtraHash}) when is_integer(SegHash) -> @@ -1530,19 +1560,26 @@ fetch_value([], _BlockLengths, _Blocks, _Key, _PressMethod) -> not_present; fetch_value([Pos|Rest], BlockLengths, Blocks, Key, PressMethod) -> {BlockNumber, BlockPos} = revert_position(Pos), - {_BlockPos, - Offset, - Length} = block_offsetandlength(BlockLengths, BlockNumber), + {Offset, Length} = block_offsetandlength(BlockLengths, BlockNumber), <<_Pre:Offset/binary, Block:Length/binary, _Rest/binary>> = Blocks, - BlockL = deserialise_block(Block, PressMethod), - {K, V} = lists:nth(BlockPos, BlockL), - case K of - Key -> + RawBlock = deserialise_block(Block, PressMethod), + case fetchfrom_rawblock(BlockPos, RawBlock) of + {K, V} when K == Key -> {K, V}; _ -> fetch_value(Rest, BlockLengths, Blocks, Key, PressMethod) end. +fetchfrom_rawblock(_BlockPos, []) -> + not_present; +fetchfrom_rawblock(BlockPos, RawBlock) -> + lists:nth(BlockPos, RawBlock). + +fetchend_rawblock([]) -> + not_present; +fetchend_rawblock(RawBlock) -> + lists:last(RawBlock). + revert_position(Pos) -> {SideBlockSize, MidBlockSize} = ?LOOK_BLOCKSIZE, @@ -2123,7 +2160,7 @@ indexed_list_mixedkeys2_test() -> IdxKeys2 = lists:ukeysort(1, generate_indexkeys(30)), % this isn't actually ordered correctly Keys = IdxKeys1 ++ KVL1 ++ IdxKeys2, - {{_PosBinIndex1, FullBin, _HL, _LK}, no_timing} = + {{_Header, FullBin, _HL, _LK}, no_timing} = generate_binary_slot(lookup, Keys, native, no_timing), lists:foreach(fun({K, V}) -> MH = leveled_codec:segment_hash(K), @@ -2134,10 +2171,10 @@ indexed_list_mixedkeys2_test() -> indexed_list_allindexkeys_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), ?LOOK_SLOTSIZE), - {{PosBinIndex1, FullBin, _HL, _LK}, no_timing} = + {{Header, FullBin, _HL, _LK}, no_timing} = generate_binary_slot(lookup, Keys, native, no_timing), EmptySlotSize = ?LOOK_SLOTSIZE - 1, - ?assertMatch(<<_BL:24/binary, EmptySlotSize:8/integer>>, PosBinIndex1), + ?assertMatch(<<_BL:20/binary, EmptySlotSize:8/integer>>, Header), % SW = os:timestamp(), BinToList = binaryslot_tolist(FullBin, native), % io:format(user, @@ -2149,9 +2186,9 @@ indexed_list_allindexkeys_test() -> indexed_list_allindexkeys_nolookup_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(1000)), ?NOLOOK_SLOTSIZE), - {{PosBinIndex1, FullBin, _HL, _LK}, no_timing} = + {{Header, FullBin, _HL, _LK}, no_timing} = generate_binary_slot(no_lookup, Keys, native, no_timing), - ?assertMatch(<<_BL:24/binary, 127:8/integer>>, PosBinIndex1), + ?assertMatch(<<_BL:20/binary, 127:8/integer>>, Header), % SW = os:timestamp(), BinToList = binaryslot_tolist(FullBin, native), % io:format(user, @@ -2163,10 +2200,10 @@ indexed_list_allindexkeys_nolookup_test() -> indexed_list_allindexkeys_trimmed_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), ?LOOK_SLOTSIZE), - {{PosBinIndex1, FullBin, _HL, _LK}, no_timing} = + {{Header, FullBin, _HL, _LK}, no_timing} = generate_binary_slot(lookup, Keys, native, no_timing), EmptySlotSize = ?LOOK_SLOTSIZE - 1, - ?assertMatch(<<_BL:24/binary, EmptySlotSize:8/integer>>, PosBinIndex1), + ?assertMatch(<<_BL:20/binary, EmptySlotSize:8/integer>>, Header), ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, {i, "Bucket", @@ -2204,29 +2241,64 @@ indexed_list_mixedkeys_bitflip_test() -> KVL0 = lists:ukeysort(1, generate_randomkeys(1, 50, 1, 4)), KVL1 = lists:sublist(KVL0, 33), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), - {{_PosBinIndex1, FullBin, _HL, LK}, no_timing} = + {{Header, SlotBin, _HL, LK}, no_timing} = generate_binary_slot(lookup, Keys, native, no_timing), + ?assertMatch(LK, element(1, lists:last(Keys))), - - FullBin0 = flip_byte(FullBin), - - {TestK1, _TestV1} = lists:nth(20, KVL1), - MH1 = leveled_codec:segment_hash(TestK1), - test_binary_slot(FullBin0, TestK1, MH1, not_present), - ToList = binaryslot_tolist(FullBin0, native), - ?assertMatch([], ToList), + <> = Header, + + TestKey1 = element(1, lists:nth(1, KVL1)), + TestKey2 = element(1, lists:nth(33, KVL1)), + MH1 = leveled_codec:segment_hash(TestKey1), + MH2 = leveled_codec:segment_hash(TestKey2), + + test_binary_slot(SlotBin, TestKey1, MH1, lists:nth(1, KVL1)), + test_binary_slot(SlotBin, TestKey2, MH2, lists:nth(33, KVL1)), + ToList = binaryslot_tolist(SlotBin, native), + ?assertMatch(Keys, ToList), + + [Pos1] = find_pos(PosBin, extra_hash(MH1), [], 0), + [Pos2] = find_pos(PosBin, extra_hash(MH2), [], 0), + {BN1, _BP1} = revert_position(Pos1), + {BN2, _BP2} = revert_position(Pos2), + {Offset1, Length1} = block_offsetandlength(Header, BN1), + {Offset2, Length2} = block_offsetandlength(Header, BN2), + + SlotBin1 = flip_byte(SlotBin, byte_size(Header) + 12 + Offset1, Length1), + SlotBin2 = flip_byte(SlotBin, byte_size(Header) + 12 + Offset2, Length2), + + test_binary_slot(SlotBin2, TestKey1, MH1, lists:nth(1, KVL1)), + test_binary_slot(SlotBin1, TestKey2, MH2, lists:nth(33, KVL1)), + + test_binary_slot(SlotBin1, TestKey1, MH1, not_present), + test_binary_slot(SlotBin2, TestKey2, MH2, not_present), + + ToList1 = binaryslot_tolist(SlotBin1, native), + ToList2 = binaryslot_tolist(SlotBin2, native), + + ?assertMatch(true, is_list(ToList1)), + ?assertMatch(true, is_list(ToList2)), + ?assertMatch(true, length(ToList1) > 0), + ?assertMatch(true, length(ToList2) > 0), + ?assertMatch(true, length(ToList1) < length(Keys)), + ?assertMatch(true, length(ToList2) < length(Keys)), + + SlotBin3 = flip_byte(SlotBin, byte_size(Header) + 12, B1L), {SK1, _} = lists:nth(10, Keys), - {EK1, _} = lists:nth(50, Keys), - O1 = binaryslot_trimmedlist(FullBin0, SK1, EK1, native), - ?assertMatch(0, length(O1)), + {EK1, _} = lists:nth(20, Keys), + O1 = binaryslot_trimmedlist(SlotBin3, SK1, EK1, native), ?assertMatch([], O1). -flip_byte(Binary) -> - L = byte_size(Binary), - Byte1 = leveled_rand:uniform(L) - 1, +flip_byte(Binary, Offset, Length) -> + Byte1 = leveled_rand:uniform(Length) + Offset - 1, <> = Binary, case A of 0 -> @@ -2238,7 +2310,7 @@ flip_byte(Binary) -> test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> % SW = os:timestamp(), - {ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash, native), + {ReturnedValue, _Header} = binaryslot_get(FullBin, Key, Hash, native), ?assertMatch(ExpectedValue, ReturnedValue). % io:format(user, "Fetch success in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]). @@ -2589,7 +2661,7 @@ nonsense_coverage_test() -> handle_sync_event("hello", self(), reader, #state{})), SampleBin = <<0:128/integer>>, - FlippedBin = flip_byte(SampleBin), + FlippedBin = flip_byte(SampleBin, 0, 16), ?assertMatch(false, FlippedBin == SampleBin). hashmatching_bytreesize_test() ->