diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 1d2b1c0..78fbce5 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -404,20 +404,22 @@ fetch(LedgerKey, Hash, State) -> State#state.blockindex_cache), case CachedBlockIdx of none -> + io:format("Looking for key without cache~n"), SlotBin = read_slot(State#state.handle, Slot), - {Result, BlockIdx} = binaryslot_get(SlotBin, - LedgerKey, - Hash, - none), + {Result, + BlockLengths, + BlockIdx} = binaryslot_get(SlotBin, LedgerKey, Hash), BlockIndexCache = array:set(SlotID - 1, - BlockIdx, + <>, State#state.blockindex_cache), {Result, slot_fetch, Slot#slot_index_value.slot_id, State#state{blockindex_cache = BlockIndexCache}}; - _ -> - PosList = find_pos(CachedBlockIdx, + <> -> + io:format("Looking for key with cache~n"), + PosList = find_pos(BlockIdx, double_hash(Hash, LedgerKey), [], 0), @@ -425,12 +427,12 @@ fetch(LedgerKey, Hash, State) -> [] -> {not_present, slot_bloom, SlotID, State}; _ -> - SlotBin = read_slot(State#state.handle, Slot), - Result = binaryslot_get(SlotBin, - LedgerKey, - Hash, - {true, PosList}), - {element(1, Result), slot_fetch, SlotID, State} + Result = check_blocks(PosList, + State#state.handle, + Slot, + BlockLengths, + LedgerKey), + {Result, slot_fetch, SlotID, State} end end end. @@ -607,45 +609,6 @@ build_all_slots(KVL, SC, Pos, SlotID, SlotIdx, BlockIdxA, SlotsBin) -> array:set(SlotID - 1, BlockIndex, BlockIdxA), <>). -read_slot(Handle, Slot) -> - {ok, SlotBin} = file:pread(Handle, - Slot#slot_index_value.start_position, - Slot#slot_index_value.length), - SlotBin. - -read_slots(Handle, SlotList) -> - PointerMapFun = - fun(Pointer) -> - {Slot, SK, EK} = - case Pointer of - {pointer, _Pid, Slot0, SK0, EK0} -> - {Slot0, SK0, EK0}; - {pointer, Slot0, SK0, EK0} -> - {Slot0, SK0, EK0} - end, - - {Slot#slot_index_value.start_position, - Slot#slot_index_value.length, - SK, - EK} - end, - - LengthList = lists:map(PointerMapFun, SlotList), - StartPos = element(1, lists:nth(1, LengthList)), - EndPos = element(1, lists:last(LengthList)) - + element(2, lists:last(LengthList)), - {ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos), - - BinSplitMapFun = - fun({SP, L, SK, EK}) -> - Start = SP - StartPos, - <<_Pre:Start/binary, - SlotBin:L/binary, - _Post/binary>> = MultiSlotBin, - {SlotBin, SK, EK} - end, - - lists:map(BinSplitMapFun, LengthList). generate_filenames(RootFilename) -> Ext = filename:extension(RootFilename), @@ -808,7 +771,7 @@ generate_binary_slot(KVL) -> B2L = byte_size(B2), B3L = byte_size(B3), B4L = byte_size(B4), - Lengths = < CRC32 = erlang:crc32(SlotBin), FullBin = <>, - {PosBinIndex1, FullBin, HashL}. + {<>, FullBin, HashL}. -binaryslot_get(FullBin, Key, Hash, CachedPosLookup) -> +check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey) -> + not_present; +check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey) -> + {BlockNumber, BlockPos} = revert_position(Pos), + io:format("Checking BlockNumber ~w in BlockPos ~w~n", + [BlockNumber, BlockPos]), + BlockBin = read_block(Handle, Slot, BlockLengths, BlockNumber), + BlockL = binary_to_term(BlockBin), + {K, V} = lists:nth(BlockPos, BlockL), + case K of + LedgerKey -> + io:format("Key mismatch in check_blocks~n"), + {K, V}; + _ -> + check_blocks(Rest, Handle, Slot, BlockLengths, LedgerKey) + end. + + +read_block(Handle, Slot, BlockLengths, BlockID) -> + {BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID), + io:format("Reading offset ~w Length ~w~n", [Offset, Length]), + {ok, BlockBin} = file:pread(Handle, + Slot#slot_index_value.start_position + + BlockPos + + Offset + + 24, + % 4-byte CRC, 4 byte pos, 4x4 byte lengths + Length), + BlockBin. + +read_slot(Handle, Slot) -> + {ok, SlotBin} = file:pread(Handle, + Slot#slot_index_value.start_position, + Slot#slot_index_value.length), + SlotBin. + +read_slots(Handle, SlotList) -> + PointerMapFun = + fun(Pointer) -> + {Slot, SK, EK} = + case Pointer of + {pointer, _Pid, Slot0, SK0, EK0} -> + {Slot0, SK0, EK0}; + {pointer, Slot0, SK0, EK0} -> + {Slot0, SK0, EK0} + end, + + {Slot#slot_index_value.start_position, + Slot#slot_index_value.length, + SK, + EK} + end, + + LengthList = lists:map(PointerMapFun, SlotList), + StartPos = element(1, lists:nth(1, LengthList)), + EndPos = element(1, lists:last(LengthList)) + + element(2, lists:last(LengthList)), + {ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos), + + BinSplitMapFun = + fun({SP, L, SK, EK}) -> + Start = SP - StartPos, + <<_Pre:Start/binary, + SlotBin:L/binary, + _Post/binary>> = MultiSlotBin, + {SlotBin, SK, EK} + end, + + lists:map(BinSplitMapFun, LengthList). + + +binaryslot_get(FullBin, Key, Hash) -> case crc_check_slot(FullBin) of - {Lengths, Rest} -> - B1P = element(1, Lengths), - case CachedPosLookup of - {true, PosList} -> - <<_PosBinIndex:B1P/binary, Blocks/binary>> = Rest, - {fetch_value(PosList, Lengths, Blocks, Key), none}; - none -> - <> = Rest, - PosList = find_pos(PosBinIndex, - double_hash(Hash, Key), - [], - 0), - {fetch_value(PosList, Lengths, Blocks, Key), PosBinIndex} - end; + {BlockLengths, Rest} -> + <> = BlockLengths, + <> = Rest, + PosList = find_pos(PosBinIndex, + double_hash(Hash, Key), + [], + 0), + {fetch_value(PosList, BlockLengths, Blocks, Key), + BlockLengths, + PosBinIndex}; crc_wonky -> - {not_present, none} + {not_present, + none, + none} end. binaryslot_tolist(FullBin) -> @@ -856,8 +888,12 @@ binaryslot_tolist(FullBin) -> {Out, _Rem} = case crc_check_slot(FullBin) of - {Lengths, RestBin} -> - {B1P, B1L, B2L, B3L, B4L} = Lengths, + {BlockLengths, RestBin} -> + <> = BlockLengths, <<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin, lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]); crc_wonky -> @@ -908,8 +944,12 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) -> {Out, _Rem} = case crc_check_slot(FullBin) of - {Lengths, RestBin} -> - {B1P, B1L, B2L, B3L, B4L} = Lengths, + {BlockLengths, RestBin} -> + <> = BlockLengths, <<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin, lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]); crc_wonky -> @@ -957,65 +997,68 @@ trim_booleans(FirstKey, LastKey, StartKey, EndKey) -> end. - - crc_check_slot(FullBin) -> <> = FullBin, case erlang:crc32(SlotBin) of CRC32 -> - <> = SlotBin, - Lengths = {B1P, B1L, B2L, B3L, B4L}, - {Lengths, Rest}; + <> = SlotBin, + {BlockLengths, Rest}; _ -> leveled_log:log("SST09", []), crc_wonky end. +block_offsetandlength(BlockLengths, BlockID) -> + <> = BlockLengths, + case BlockID of + 1 -> + <> = BlockLengths0, + {BlocksPos, 0, B1L}; + 2 -> + <> = BlockLengths0, + {BlocksPos, B1L, B2L}; + 3 -> + <> = BlockLengths0, + {BlocksPos, B1L + B2L, B3L}; + 4 -> + <> = BlockLengths0, + {BlocksPos, B1L + B2L + B3L, B4L} + end. + double_hash(Hash, Key) -> H2 = erlang:phash2(Key), (Hash bxor H2) band 32767. -fetch_value([], _Lengths, _Blocks, _Key) -> +fetch_value([], _BlockLengths, _Blocks, _Key) -> not_present; -fetch_value([Pos|Rest], Lengths, Blocks, Key) -> - BlockNumber = (Pos div 32) + 1, - BlockPos = (Pos rem 32) + 1, - BlockL = - case BlockNumber of - 1 -> - B1L = element(2, Lengths), - <> = Blocks, - binary_to_term(Block); - 2 -> - B1L = element(2, Lengths), - B2L = element(3, Lengths), - <<_Pass:B1L/binary, Block:B2L/binary, _Rest/binary>> = Blocks, - binary_to_term(Block); - 3 -> - PreL = element(2, Lengths) + element(3, Lengths), - B3L = element(4, Lengths), - <<_Pass:PreL/binary, Block:B3L/binary, _Rest/binary>> = Blocks, - binary_to_term(Block); - 4 -> - {_B1P, B1L, B2L, B3L, B4L} = Lengths, - PreL = B1L + B2L + B3L, - <<_Pass:PreL/binary, Block:B4L/binary>> = Blocks, - binary_to_term(Block) - end, - +fetch_value([Pos|Rest], BlockLengths, Blocks, Key) -> + {BlockNumber, BlockPos} = revert_position(Pos), + {_BlockPos, + Offset, + Length} = block_offsetandlength(BlockLengths, BlockNumber), + <<_Pre:Offset/binary, Block:Length/binary, _Rest/binary>> = Blocks, + BlockL = binary_to_term(Block), {K, V} = lists:nth(BlockPos, BlockL), case K of Key -> + io:format("Key mismatch in fetch_value~n"), {K, V}; _ -> - fetch_value(Rest, Lengths, Blocks, Key) + fetch_value(Rest, BlockLengths, Blocks, Key) end. + +revert_position(Pos) -> + BlockNumber = (Pos div 32) + 1, + BlockPos = (Pos rem 32) + 1, + {BlockNumber, BlockPos}. + find_pos(<<>>, _Hash, PosList, _Count) -> PosList; find_pos(<<1:1/integer, Hash:15/integer, T/binary>>, Hash, PosList, Count) -> @@ -1272,7 +1315,7 @@ indexed_list_mixedkeys2_test() -> indexed_list_allindexkeys_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), 128), {PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys), - ?assertMatch(<<127:8/integer>>, PosBinIndex1), + ?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1), % SW = os:timestamp(), BinToList = binaryslot_tolist(FullBin), % io:format(user, @@ -1285,7 +1328,7 @@ indexed_list_allindexkeys_test() -> indexed_list_allindexkeys_trimmed_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), 128), {PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys), - ?assertMatch(<<127:8/integer>>, PosBinIndex1), + ?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1), ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, {i, "Bucket", @@ -1351,7 +1394,7 @@ indexed_list_mixedkeys_bitflip_test() -> test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> % SW = os:timestamp(), - {ReturnedValue, _} = binaryslot_get(FullBin, Key, Hash, none), + {ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash), ?assertMatch(ExpectedValue, ReturnedValue). % io:format(user, "Fetch success in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]).