Fetch specific block only

Rely on CRC check in zlib.  Still need to catch on failure
This commit is contained in:
martinsumner 2017-03-07 20:19:11 +00:00
parent 3c5740e7bf
commit 04cfb453c4

View file

@ -404,20 +404,22 @@ fetch(LedgerKey, Hash, State) ->
State#state.blockindex_cache),
case CachedBlockIdx of
none ->
io:format("Looking for key without cache~n"),
SlotBin = read_slot(State#state.handle, Slot),
{Result, BlockIdx} = binaryslot_get(SlotBin,
LedgerKey,
Hash,
none),
{Result,
BlockLengths,
BlockIdx} = binaryslot_get(SlotBin, LedgerKey, Hash),
BlockIndexCache = array:set(SlotID - 1,
BlockIdx,
<<BlockLengths/binary,
BlockIdx/binary>>,
State#state.blockindex_cache),
{Result,
slot_fetch,
Slot#slot_index_value.slot_id,
State#state{blockindex_cache = BlockIndexCache}};
_ ->
PosList = find_pos(CachedBlockIdx,
<<BlockLengths:20/binary, BlockIdx/binary>> ->
io:format("Looking for key with cache~n"),
PosList = find_pos(BlockIdx,
double_hash(Hash, LedgerKey),
[],
0),
@ -425,12 +427,12 @@ fetch(LedgerKey, Hash, State) ->
[] ->
{not_present, slot_bloom, SlotID, State};
_ ->
SlotBin = read_slot(State#state.handle, Slot),
Result = binaryslot_get(SlotBin,
LedgerKey,
Hash,
{true, PosList}),
{element(1, Result), slot_fetch, SlotID, State}
Result = check_blocks(PosList,
State#state.handle,
Slot,
BlockLengths,
LedgerKey),
{Result, slot_fetch, SlotID, State}
end
end
end.
@ -607,45 +609,6 @@ build_all_slots(KVL, SC, Pos, SlotID, SlotIdx, BlockIdxA, SlotsBin) ->
array:set(SlotID - 1, BlockIndex, BlockIdxA),
<<SlotsBin/binary, SlotBin/binary>>).
read_slot(Handle, Slot) ->
{ok, SlotBin} = file:pread(Handle,
Slot#slot_index_value.start_position,
Slot#slot_index_value.length),
SlotBin.
read_slots(Handle, SlotList) ->
PointerMapFun =
fun(Pointer) ->
{Slot, SK, EK} =
case Pointer of
{pointer, _Pid, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0};
{pointer, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0}
end,
{Slot#slot_index_value.start_position,
Slot#slot_index_value.length,
SK,
EK}
end,
LengthList = lists:map(PointerMapFun, SlotList),
StartPos = element(1, lists:nth(1, LengthList)),
EndPos = element(1, lists:last(LengthList))
+ element(2, lists:last(LengthList)),
{ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos),
BinSplitMapFun =
fun({SP, L, SK, EK}) ->
Start = SP - StartPos,
<<_Pre:Start/binary,
SlotBin:L/binary,
_Post/binary>> = MultiSlotBin,
{SlotBin, SK, EK}
end,
lists:map(BinSplitMapFun, LengthList).
generate_filenames(RootFilename) ->
Ext = filename:extension(RootFilename),
@ -808,7 +771,7 @@ generate_binary_slot(KVL) ->
B2L = byte_size(B2),
B3L = byte_size(B3),
B4L = byte_size(B4),
Lengths = <<B1P:32/integer,
Lengths = <<B1P:32/integer,
B1L:32/integer,
B2L:32/integer,
B3L:32/integer,
@ -819,27 +782,96 @@ generate_binary_slot(KVL) ->
CRC32 = erlang:crc32(SlotBin),
FullBin = <<CRC32:32/integer, SlotBin/binary>>,
{PosBinIndex1, FullBin, HashL}.
{<<Lengths/binary, PosBinIndex1/binary>>, FullBin, HashL}.
binaryslot_get(FullBin, Key, Hash, CachedPosLookup) ->
check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey) ->
not_present;
check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey) ->
{BlockNumber, BlockPos} = revert_position(Pos),
io:format("Checking BlockNumber ~w in BlockPos ~w~n",
[BlockNumber, BlockPos]),
BlockBin = read_block(Handle, Slot, BlockLengths, BlockNumber),
BlockL = binary_to_term(BlockBin),
{K, V} = lists:nth(BlockPos, BlockL),
case K of
LedgerKey ->
io:format("Key mismatch in check_blocks~n"),
{K, V};
_ ->
check_blocks(Rest, Handle, Slot, BlockLengths, LedgerKey)
end.
read_block(Handle, Slot, BlockLengths, BlockID) ->
{BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockID),
io:format("Reading offset ~w Length ~w~n", [Offset, Length]),
{ok, BlockBin} = file:pread(Handle,
Slot#slot_index_value.start_position
+ BlockPos
+ Offset
+ 24,
% 4-byte CRC, 4 byte pos, 4x4 byte lengths
Length),
BlockBin.
read_slot(Handle, Slot) ->
{ok, SlotBin} = file:pread(Handle,
Slot#slot_index_value.start_position,
Slot#slot_index_value.length),
SlotBin.
read_slots(Handle, SlotList) ->
PointerMapFun =
fun(Pointer) ->
{Slot, SK, EK} =
case Pointer of
{pointer, _Pid, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0};
{pointer, Slot0, SK0, EK0} ->
{Slot0, SK0, EK0}
end,
{Slot#slot_index_value.start_position,
Slot#slot_index_value.length,
SK,
EK}
end,
LengthList = lists:map(PointerMapFun, SlotList),
StartPos = element(1, lists:nth(1, LengthList)),
EndPos = element(1, lists:last(LengthList))
+ element(2, lists:last(LengthList)),
{ok, MultiSlotBin} = file:pread(Handle, StartPos, EndPos - StartPos),
BinSplitMapFun =
fun({SP, L, SK, EK}) ->
Start = SP - StartPos,
<<_Pre:Start/binary,
SlotBin:L/binary,
_Post/binary>> = MultiSlotBin,
{SlotBin, SK, EK}
end,
lists:map(BinSplitMapFun, LengthList).
binaryslot_get(FullBin, Key, Hash) ->
case crc_check_slot(FullBin) of
{Lengths, Rest} ->
B1P = element(1, Lengths),
case CachedPosLookup of
{true, PosList} ->
<<_PosBinIndex:B1P/binary, Blocks/binary>> = Rest,
{fetch_value(PosList, Lengths, Blocks, Key), none};
none ->
<<PosBinIndex:B1P/binary, Blocks/binary>> = Rest,
PosList = find_pos(PosBinIndex,
double_hash(Hash, Key),
[],
0),
{fetch_value(PosList, Lengths, Blocks, Key), PosBinIndex}
end;
{BlockLengths, Rest} ->
<<B1P:32/integer, _R/binary>> = BlockLengths,
<<PosBinIndex:B1P/binary, Blocks/binary>> = Rest,
PosList = find_pos(PosBinIndex,
double_hash(Hash, Key),
[],
0),
{fetch_value(PosList, BlockLengths, Blocks, Key),
BlockLengths,
PosBinIndex};
crc_wonky ->
{not_present, none}
{not_present,
none,
none}
end.
binaryslot_tolist(FullBin) ->
@ -856,8 +888,12 @@ binaryslot_tolist(FullBin) ->
{Out, _Rem} =
case crc_check_slot(FullBin) of
{Lengths, RestBin} ->
{B1P, B1L, B2L, B3L, B4L} = Lengths,
{BlockLengths, RestBin} ->
<<B1P:32/integer,
B1L:32/integer,
B2L:32/integer,
B3L:32/integer,
B4L:32/integer>> = BlockLengths,
<<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin,
lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]);
crc_wonky ->
@ -908,8 +944,12 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) ->
{Out, _Rem} =
case crc_check_slot(FullBin) of
{Lengths, RestBin} ->
{B1P, B1L, B2L, B3L, B4L} = Lengths,
{BlockLengths, RestBin} ->
<<B1P:32/integer,
B1L:32/integer,
B2L:32/integer,
B3L:32/integer,
B4L:32/integer>> = BlockLengths,
<<_PosBinIndex:B1P/binary, Blocks/binary>> = RestBin,
lists:foldl(BlockFetchFun, {[], Blocks}, [B1L, B2L, B3L, B4L]);
crc_wonky ->
@ -957,65 +997,68 @@ trim_booleans(FirstKey, LastKey, StartKey, EndKey) ->
end.
crc_check_slot(FullBin) ->
<<CRC32:32/integer, SlotBin/binary>> = FullBin,
case erlang:crc32(SlotBin) of
CRC32 ->
<<B1P:32/integer,
B1L:32/integer,
B2L:32/integer,
B3L:32/integer,
B4L:32/integer,
Rest/binary>> = SlotBin,
Lengths = {B1P, B1L, B2L, B3L, B4L},
{Lengths, Rest};
<<BlockLengths:20/binary, Rest/binary>> = SlotBin,
{BlockLengths, Rest};
_ ->
leveled_log:log("SST09", []),
crc_wonky
end.
block_offsetandlength(BlockLengths, BlockID) ->
<<BlocksPos:32/integer, BlockLengths0:16/binary>> = BlockLengths,
case BlockID of
1 ->
<<B1L:32/integer, _BR/binary>> = BlockLengths0,
{BlocksPos, 0, B1L};
2 ->
<<B1L:32/integer, B2L:32/integer, _BR/binary>> = BlockLengths0,
{BlocksPos, B1L, B2L};
3 ->
<<B1L:32/integer,
B2L:32/integer,
B3L:32/integer,
_BR/binary>> = BlockLengths0,
{BlocksPos, B1L + B2L, B3L};
4 ->
<<B1L:32/integer,
B2L:32/integer,
B3L:32/integer,
B4L:32/integer>> = BlockLengths0,
{BlocksPos, B1L + B2L + B3L, B4L}
end.
double_hash(Hash, Key) ->
H2 = erlang:phash2(Key),
(Hash bxor H2) band 32767.
fetch_value([], _Lengths, _Blocks, _Key) ->
fetch_value([], _BlockLengths, _Blocks, _Key) ->
not_present;
fetch_value([Pos|Rest], Lengths, Blocks, Key) ->
BlockNumber = (Pos div 32) + 1,
BlockPos = (Pos rem 32) + 1,
BlockL =
case BlockNumber of
1 ->
B1L = element(2, Lengths),
<<Block:B1L/binary, _Rest/binary>> = Blocks,
binary_to_term(Block);
2 ->
B1L = element(2, Lengths),
B2L = element(3, Lengths),
<<_Pass:B1L/binary, Block:B2L/binary, _Rest/binary>> = Blocks,
binary_to_term(Block);
3 ->
PreL = element(2, Lengths) + element(3, Lengths),
B3L = element(4, Lengths),
<<_Pass:PreL/binary, Block:B3L/binary, _Rest/binary>> = Blocks,
binary_to_term(Block);
4 ->
{_B1P, B1L, B2L, B3L, B4L} = Lengths,
PreL = B1L + B2L + B3L,
<<_Pass:PreL/binary, Block:B4L/binary>> = Blocks,
binary_to_term(Block)
end,
fetch_value([Pos|Rest], BlockLengths, Blocks, Key) ->
{BlockNumber, BlockPos} = revert_position(Pos),
{_BlockPos,
Offset,
Length} = block_offsetandlength(BlockLengths, BlockNumber),
<<_Pre:Offset/binary, Block:Length/binary, _Rest/binary>> = Blocks,
BlockL = binary_to_term(Block),
{K, V} = lists:nth(BlockPos, BlockL),
case K of
Key ->
io:format("Key mismatch in fetch_value~n"),
{K, V};
_ ->
fetch_value(Rest, Lengths, Blocks, Key)
fetch_value(Rest, BlockLengths, Blocks, Key)
end.
revert_position(Pos) ->
BlockNumber = (Pos div 32) + 1,
BlockPos = (Pos rem 32) + 1,
{BlockNumber, BlockPos}.
find_pos(<<>>, _Hash, PosList, _Count) ->
PosList;
find_pos(<<1:1/integer, Hash:15/integer, T/binary>>, Hash, PosList, Count) ->
@ -1272,7 +1315,7 @@ indexed_list_mixedkeys2_test() ->
indexed_list_allindexkeys_test() ->
Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), 128),
{PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys),
?assertMatch(<<127:8/integer>>, PosBinIndex1),
?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1),
% SW = os:timestamp(),
BinToList = binaryslot_tolist(FullBin),
% io:format(user,
@ -1285,7 +1328,7 @@ indexed_list_allindexkeys_test() ->
indexed_list_allindexkeys_trimmed_test() ->
Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), 128),
{PosBinIndex1, FullBin, _HL} = generate_binary_slot(Keys),
?assertMatch(<<127:8/integer>>, PosBinIndex1),
?assertMatch(<<_BL:20/binary, 127:8/integer>>, PosBinIndex1),
?assertMatch(Keys, binaryslot_trimmedlist(FullBin,
{i,
"Bucket",
@ -1351,7 +1394,7 @@ indexed_list_mixedkeys_bitflip_test() ->
test_binary_slot(FullBin, Key, Hash, ExpectedValue) ->
% SW = os:timestamp(),
{ReturnedValue, _} = binaryslot_get(FullBin, Key, Hash, none),
{ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash),
?assertMatch(ExpectedValue, ReturnedValue).
% io:format(user, "Fetch success in ~w microseconds ~n",
% [timer:now_diff(os:timestamp(), SW)]).