From 5bac389d0cd8fb3f19d03502903900049b8feb9f Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 1 Dec 2017 14:15:13 +0000 Subject: [PATCH] Switch to CRC check at Block Level Previously done at Slot Level - but Blocks were still read from disk after the Slot CRC had been checked. This seems safer. It requires an extra CRC check for every fetch. However, CRC chekcing smaller binaries during the buld process appears to be beneficial to performance. Hoped this will be an enabler to turning off compression at Levels 0 and 1 to improve performance (wihtout having a compensating issues with reduced CRC performance) --- src/leveled_sst.erl | 292 +++++++++++++++++++++++++++----------------- 1 file changed, 182 insertions(+), 110 deletions(-) diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 165d7ad..d6c18a8 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -79,6 +79,8 @@ -define(TIMING_SAMPLECOUNTDOWN, 10000). -define(TIMING_SAMPLESIZE, 100). -define(CACHE_SIZE, 32). +-define(BLOCK_LENGTHS_LENGTH, 20). +-define(FLIPPER32, 4294967295). -include_lib("eunit/include/eunit.hrl"). @@ -658,23 +660,22 @@ fetch(LedgerKey, Hash, State, Timings0) -> CachedBlockIdx = array:get(SlotID - 1, State#state.blockindex_cache), {SW2, Timings2} = update_timings(SW1, Timings1, lookup_cache, true), + BL = ?BLOCK_LENGTHS_LENGTH, case CachedBlockIdx of none -> SlotBin = read_slot(State#state.handle, Slot), - {Result, BlockLengths, BlockIdx} = + {Result, Header} = binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod), BlockIndexCache = - array:set(SlotID - 1, - <>, - State#state.blockindex_cache), + array:set(SlotID - 1, Header, State#state.blockindex_cache), {_SW3, Timings3} = update_timings(SW2, Timings2, noncached_block, false), {Result, State#state{blockindex_cache = BlockIndexCache}, Timings3}; - <> -> - PosList = find_pos(BlockIdx, extra_hash(Hash), [], 0), + <> -> + PosList = find_pos(PosBin, extra_hash(Hash), [], 0), case PosList of [] -> {_SW3, Timings3} = @@ -700,6 +701,7 @@ fetch(LedgerKey, Hash, State, Timings0) -> State#state.handle, StartPos, BlockLengths, + byte_size(PosBin), LedgerKey, PressMethod, not_present), @@ -865,12 +867,12 @@ build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN, Bloom) -> SummBin = term_to_binary({Summary, Bloom, lists:reverse(SlotIndex)}, ?BINARY_SETTINGS), - SummCRC = erlang:crc32(SummBin), + SummCRC = hmac(SummBin), <>. read_table_summary(BinWithCheck) -> <> = BinWithCheck, - CRCCheck = erlang:crc32(SummBin), + CRCCheck = hmac(SummBin), if CRCCheck == SummCRC -> % If not might it might be possible to rebuild from all the slots @@ -938,24 +940,46 @@ generate_filenames(RootFilename) -> %% checks serialise_block(Term, lz4) -> {ok, Bin} = lz4:pack(term_to_binary(Term)), - Bin; + CRC32 = hmac(Bin), + <>; serialise_block(Term, native) -> - term_to_binary(Term, ?BINARY_SETTINGS). + Bin = term_to_binary(Term, ?BINARY_SETTINGS), + CRC32 = hmac(Bin), + <>. -spec deserialise_block(binary(), press_methods()) -> any(). %% @doc %% Convert binary to term %% Function split out to make it easier to experiment with different -%% compression methods. Also, perhaps standardise applictaion of CRC -%% checks -deserialise_block(Bin, lz4) -> +%% compression methods. +%% +%% If CRC check fails we treat all the data as missing +deserialise_block(Bin, PressMethod) -> + BinS = byte_size(Bin) - 4, + <> = Bin, + case hmac(TermBin) of + CRC32 -> + deserialise_checkedblock(TermBin, PressMethod); + _ -> + [] + end. + +deserialise_checkedblock(Bin, lz4) -> {ok, Bin0} = lz4:unpack(Bin), binary_to_term(Bin0); -deserialise_block(Bin, native) -> +deserialise_checkedblock(Bin, native) -> binary_to_term(Bin). +-spec hmac(binary()|integer()) -> integer(). +%% @doc +%% Perform a CRC check on an input +hmac(Bin) when is_binary(Bin) -> + erlang:crc32(Bin); +hmac(Int) when is_integer(Int) -> + Int bxor ?FLIPPER32. + %%%============================================================================ %%% SlotIndex Implementation %%%============================================================================ @@ -1130,45 +1154,45 @@ generate_binary_slot(Lookup, KVL, PressMethod, BuildTimings0) -> BuildTimings2 = update_buildtimings(SW1, BuildTimings1, slot_serialise), SW2 = os:timestamp(), - B1P = byte_size(PosBinIndex), + B1P = byte_size(PosBinIndex) + ?BLOCK_LENGTHS_LENGTH, + CheckB1P = hmac(B1P), B1L = byte_size(B1), B2L = byte_size(B2), B3L = byte_size(B3), B4L = byte_size(B4), B5L = byte_size(B5), - Lengths = <>, - SlotBin = <>, - CRC32 = erlang:crc32(SlotBin), - FullBin = <>, + B5L:32/integer, + PosBinIndex/binary>>, + CheckH = hmac(Header), + SlotBin = <>, {LastKey, _LV} = lists:last(KVL), BuildTimings3 = update_buildtimings(SW2, BuildTimings2, slot_finish), - {{<>, FullBin, HashL, LastKey}, - BuildTimings3}. + {{Header, SlotBin, HashL, LastKey}, BuildTimings3}. % Acc should start as not_present if LedgerKey is a key, and a list if % LedgerKey is false -check_blocks([], _Handle, _StartPos, _BlockLengths, +check_blocks([], _Handle, _StartPos, _BlockLengths, _PosBinLength, _LedgerKeyToCheck, _PressMethod, Acc) -> Acc; -check_blocks([Pos|Rest], Handle, StartPos, - BlockLengths, LedgerKeyToCheck, PressMethod, Acc) -> +check_blocks([Pos|Rest], Handle, StartPos, BlockLengths, PosBinLength, + LedgerKeyToCheck, PressMethod, Acc) -> {BlockNumber, BlockPos} = revert_position(Pos), BlockBin = read_block(Handle, StartPos, BlockLengths, + PosBinLength, BlockNumber), BlockL = deserialise_block(BlockBin, PressMethod), {K, V} = lists:nth(BlockPos, BlockL), @@ -1180,20 +1204,22 @@ check_blocks([Pos|Rest], Handle, StartPos, false -> Acc ++ [{K, V}]; _ -> - check_blocks(Rest, Handle, StartPos, BlockLengths, + check_blocks(Rest, Handle, StartPos, + BlockLengths, PosBinLength, LedgerKeyToCheck, PressMethod, Acc) end end. -read_block(Handle, StartPos, BlockLengths, BlockID) -> - {BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID), +read_block(Handle, StartPos, BlockLengths, PosBinLength, BlockID) -> + {Offset, Length} = block_offsetandlength(BlockLengths, BlockID), {ok, BlockBin} = file:pread(Handle, StartPos - + BlockPos + Offset - + 28, - % 4-byte CRC, 4 byte pos, 5x4 byte lengths + + PosBinLength + + 32, + % 4-byte CRC, 4-byte pos, + % 4-byte CRC, 5x4 byte lengths Length), BlockBin. @@ -1260,6 +1286,7 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) -> BinMapFun = fun(Pointer, Acc) -> {SP, _L, ID, _SK, _EK} = pointer_mapfun(Pointer), + BL = ?BLOCK_LENGTHS_LENGTH, case array:get(ID - 1, BlockIndexCache) of none -> % If there is an attempt to use the seg list query and the @@ -1270,8 +1297,8 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) -> read_length_list(Handle, [LengthDetails]), MapFun = binarysplit_mapfun(MultiSlotBin, StartPos), Acc ++ [MapFun(LengthDetails)]; - <> -> - % If there is a BlockIndex cached then we cna use it to + <> -> + % If there is a BlockIndex cached then we can use it to % check to see if any of the expected segments are % present without lifting the slot off disk. Also the % fact that we know position can be used to filter out @@ -1279,9 +1306,14 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) -> case find_pos(BlockIdx, SegList, [], 0) of [] -> Acc; - PL -> - Acc ++ check_blocks(PL, Handle, SP, BlockLengths, - false, PressMethod, []) + PositionList -> + Acc ++ + check_blocks(PositionList, + Handle, SP, + BlockLengths, + byte_size(BlockIdx), + false, PressMethod, + []) end end end, @@ -1319,19 +1351,17 @@ read_length_list(Handle, LengthList) -> binaryslot_get(FullBin, Key, Hash, PressMethod) -> case crc_check_slot(FullBin) of - {BlockLengths, Rest} -> - <> = BlockLengths, - <> = Rest, + {Header, Blocks} -> + BL = ?BLOCK_LENGTHS_LENGTH, + <> = Header, PosList = find_pos(PosBinIndex, extra_hash(Hash), [], 0), {fetch_value(PosList, BlockLengths, Blocks, Key, PressMethod), - BlockLengths, - PosBinIndex}; + Header}; crc_wonky -> {not_present, - none, none} end. @@ -1349,14 +1379,13 @@ binaryslot_tolist(FullBin, PressMethod) -> {Out, _Rem} = case crc_check_slot(FullBin) of - {BlockLengths, RestBin} -> - < + <> = BlockLengths, - <<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin, + B5L:32/integer, + _PosBinIndex/binary>> = Header, lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L, B5L]); @@ -1382,17 +1411,16 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> % scenario is hard to do in concise code BlocksToCheck = case crc_check_slot(FullBin) of - {BlockLengths, RestBin} -> - < + <> = BlockLengths, - <<_PosBinIndex:B1P/binary, - Block1:B1L/binary, Block2:B2L/binary, + B5L:32/integer, + _PosBinIndex/binary>> = Header, + <> = RestBin, + Block4:B4L/binary, Block5:B5L/binary>> = Blocks, case B3L of 0 -> [Block1, Block2]; @@ -1440,11 +1468,10 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> false -> Block end, - {LastKey, _LV} = lists:last(BlockList), - case StartKey > LastKey of - true -> + case fetchend_rawblock(BlockList) of + {LastKey, _LV} when StartKey > LastKey -> {Acc, true}; - false -> + {LastKey, _LV} -> {_LDrop, RKeep} = lists:splitwith(LTrimFun, BlockList), case leveled_codec:endkey_passed(EndKey, LastKey) of @@ -1453,7 +1480,9 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> {Acc ++ LKeep, false}; false -> {Acc ++ RKeep, true} - end + end; + _ -> + {Acc, true} end; {_ , false} -> {Acc, false} @@ -1465,46 +1494,47 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> crc_check_slot(FullBin) -> - <> = FullBin, - case erlang:crc32(SlotBin) of - CRC32 -> - <> = SlotBin, - {BlockLengths, Rest}; + <> = FullBin, + case {hmac(Header), hmac(PosBL)} of + {CRC32H, CRC32PBL} -> + {Header, Blocks}; _ -> leveled_log:log("SST09", []), crc_wonky end. block_offsetandlength(BlockLengths, BlockID) -> - <> = BlockLengths, case BlockID of 1 -> - <> = BlockLengths0, - {BlocksPos, 0, B1L}; + <> = BlockLengths, + {0, B1L}; 2 -> - <> = BlockLengths0, - {BlocksPos, B1L, B2L}; + <> = BlockLengths, + {B1L, B2L}; 3 -> <> = BlockLengths0, - {BlocksPos, B1L + B2L, B3L}; + _BR/binary>> = BlockLengths, + {B1L + B2L, B3L}; 4 -> <> = BlockLengths0, - {BlocksPos, B1L + B2L + B3L, B4L}; + _BR/binary>> = BlockLengths, + {B1L + B2L + B3L, B4L}; 5 -> <> = BlockLengths0, - {BlocksPos, B1L + B2L + B3L + B4L, B5L} + B5L:32/integer>> = BlockLengths, + {B1L + B2L + B3L + B4L, B5L} end. extra_hash({SegHash, _ExtraHash}) when is_integer(SegHash) -> @@ -1530,19 +1560,26 @@ fetch_value([], _BlockLengths, _Blocks, _Key, _PressMethod) -> not_present; fetch_value([Pos|Rest], BlockLengths, Blocks, Key, PressMethod) -> {BlockNumber, BlockPos} = revert_position(Pos), - {_BlockPos, - Offset, - Length} = block_offsetandlength(BlockLengths, BlockNumber), + {Offset, Length} = block_offsetandlength(BlockLengths, BlockNumber), <<_Pre:Offset/binary, Block:Length/binary, _Rest/binary>> = Blocks, - BlockL = deserialise_block(Block, PressMethod), - {K, V} = lists:nth(BlockPos, BlockL), - case K of - Key -> + RawBlock = deserialise_block(Block, PressMethod), + case fetchfrom_rawblock(BlockPos, RawBlock) of + {K, V} when K == Key -> {K, V}; _ -> fetch_value(Rest, BlockLengths, Blocks, Key, PressMethod) end. +fetchfrom_rawblock(_BlockPos, []) -> + not_present; +fetchfrom_rawblock(BlockPos, RawBlock) -> + lists:nth(BlockPos, RawBlock). + +fetchend_rawblock([]) -> + not_present; +fetchend_rawblock(RawBlock) -> + lists:last(RawBlock). + revert_position(Pos) -> {SideBlockSize, MidBlockSize} = ?LOOK_BLOCKSIZE, @@ -2123,7 +2160,7 @@ indexed_list_mixedkeys2_test() -> IdxKeys2 = lists:ukeysort(1, generate_indexkeys(30)), % this isn't actually ordered correctly Keys = IdxKeys1 ++ KVL1 ++ IdxKeys2, - {{_PosBinIndex1, FullBin, _HL, _LK}, no_timing} = + {{_Header, FullBin, _HL, _LK}, no_timing} = generate_binary_slot(lookup, Keys, native, no_timing), lists:foreach(fun({K, V}) -> MH = leveled_codec:segment_hash(K), @@ -2134,10 +2171,10 @@ indexed_list_mixedkeys2_test() -> indexed_list_allindexkeys_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), ?LOOK_SLOTSIZE), - {{PosBinIndex1, FullBin, _HL, _LK}, no_timing} = + {{Header, FullBin, _HL, _LK}, no_timing} = generate_binary_slot(lookup, Keys, native, no_timing), EmptySlotSize = ?LOOK_SLOTSIZE - 1, - ?assertMatch(<<_BL:24/binary, EmptySlotSize:8/integer>>, PosBinIndex1), + ?assertMatch(<<_BL:20/binary, EmptySlotSize:8/integer>>, Header), % SW = os:timestamp(), BinToList = binaryslot_tolist(FullBin, native), % io:format(user, @@ -2149,9 +2186,9 @@ indexed_list_allindexkeys_test() -> indexed_list_allindexkeys_nolookup_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(1000)), ?NOLOOK_SLOTSIZE), - {{PosBinIndex1, FullBin, _HL, _LK}, no_timing} = + {{Header, FullBin, _HL, _LK}, no_timing} = generate_binary_slot(no_lookup, Keys, native, no_timing), - ?assertMatch(<<_BL:24/binary, 127:8/integer>>, PosBinIndex1), + ?assertMatch(<<_BL:20/binary, 127:8/integer>>, Header), % SW = os:timestamp(), BinToList = binaryslot_tolist(FullBin, native), % io:format(user, @@ -2163,10 +2200,10 @@ indexed_list_allindexkeys_nolookup_test() -> indexed_list_allindexkeys_trimmed_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), ?LOOK_SLOTSIZE), - {{PosBinIndex1, FullBin, _HL, _LK}, no_timing} = + {{Header, FullBin, _HL, _LK}, no_timing} = generate_binary_slot(lookup, Keys, native, no_timing), EmptySlotSize = ?LOOK_SLOTSIZE - 1, - ?assertMatch(<<_BL:24/binary, EmptySlotSize:8/integer>>, PosBinIndex1), + ?assertMatch(<<_BL:20/binary, EmptySlotSize:8/integer>>, Header), ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, {i, "Bucket", @@ -2204,29 +2241,64 @@ indexed_list_mixedkeys_bitflip_test() -> KVL0 = lists:ukeysort(1, generate_randomkeys(1, 50, 1, 4)), KVL1 = lists:sublist(KVL0, 33), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), - {{_PosBinIndex1, FullBin, _HL, LK}, no_timing} = + {{Header, SlotBin, _HL, LK}, no_timing} = generate_binary_slot(lookup, Keys, native, no_timing), + ?assertMatch(LK, element(1, lists:last(Keys))), - - FullBin0 = flip_byte(FullBin), - - {TestK1, _TestV1} = lists:nth(20, KVL1), - MH1 = leveled_codec:segment_hash(TestK1), - test_binary_slot(FullBin0, TestK1, MH1, not_present), - ToList = binaryslot_tolist(FullBin0, native), - ?assertMatch([], ToList), + <> = Header, + + TestKey1 = element(1, lists:nth(1, KVL1)), + TestKey2 = element(1, lists:nth(33, KVL1)), + MH1 = leveled_codec:segment_hash(TestKey1), + MH2 = leveled_codec:segment_hash(TestKey2), + + test_binary_slot(SlotBin, TestKey1, MH1, lists:nth(1, KVL1)), + test_binary_slot(SlotBin, TestKey2, MH2, lists:nth(33, KVL1)), + ToList = binaryslot_tolist(SlotBin, native), + ?assertMatch(Keys, ToList), + + [Pos1] = find_pos(PosBin, extra_hash(MH1), [], 0), + [Pos2] = find_pos(PosBin, extra_hash(MH2), [], 0), + {BN1, _BP1} = revert_position(Pos1), + {BN2, _BP2} = revert_position(Pos2), + {Offset1, Length1} = block_offsetandlength(Header, BN1), + {Offset2, Length2} = block_offsetandlength(Header, BN2), + + SlotBin1 = flip_byte(SlotBin, byte_size(Header) + 12 + Offset1, Length1), + SlotBin2 = flip_byte(SlotBin, byte_size(Header) + 12 + Offset2, Length2), + + test_binary_slot(SlotBin2, TestKey1, MH1, lists:nth(1, KVL1)), + test_binary_slot(SlotBin1, TestKey2, MH2, lists:nth(33, KVL1)), + + test_binary_slot(SlotBin1, TestKey1, MH1, not_present), + test_binary_slot(SlotBin2, TestKey2, MH2, not_present), + + ToList1 = binaryslot_tolist(SlotBin1, native), + ToList2 = binaryslot_tolist(SlotBin2, native), + + ?assertMatch(true, is_list(ToList1)), + ?assertMatch(true, is_list(ToList2)), + ?assertMatch(true, length(ToList1) > 0), + ?assertMatch(true, length(ToList2) > 0), + ?assertMatch(true, length(ToList1) < length(Keys)), + ?assertMatch(true, length(ToList2) < length(Keys)), + + SlotBin3 = flip_byte(SlotBin, byte_size(Header) + 12, B1L), {SK1, _} = lists:nth(10, Keys), - {EK1, _} = lists:nth(50, Keys), - O1 = binaryslot_trimmedlist(FullBin0, SK1, EK1, native), - ?assertMatch(0, length(O1)), + {EK1, _} = lists:nth(20, Keys), + O1 = binaryslot_trimmedlist(SlotBin3, SK1, EK1, native), ?assertMatch([], O1). -flip_byte(Binary) -> - L = byte_size(Binary), - Byte1 = leveled_rand:uniform(L) - 1, +flip_byte(Binary, Offset, Length) -> + Byte1 = leveled_rand:uniform(Length) + Offset - 1, <> = Binary, case A of 0 -> @@ -2238,7 +2310,7 @@ flip_byte(Binary) -> test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> % SW = os:timestamp(), - {ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash, native), + {ReturnedValue, _Header} = binaryslot_get(FullBin, Key, Hash, native), ?assertMatch(ExpectedValue, ReturnedValue). % io:format(user, "Fetch success in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]). @@ -2589,7 +2661,7 @@ nonsense_coverage_test() -> handle_sync_event("hello", self(), reader, #state{})), SampleBin = <<0:128/integer>>, - FlippedBin = flip_byte(SampleBin), + FlippedBin = flip_byte(SampleBin, 0, 16), ?assertMatch(false, FlippedBin == SampleBin). hashmatching_bytreesize_test() ->