Write a SFT File
With some initial test support
This commit is contained in:
parent
eedc296314
commit
27dc026176
1 changed files with 207 additions and 19 deletions
|
@ -168,13 +168,21 @@
|
||||||
-define(DIVISOR_BITS, 13).
|
-define(DIVISOR_BITS, 13).
|
||||||
-define(DIVISOR, 8092).
|
-define(DIVISOR, 8092).
|
||||||
-define(COMPRESSION_LEVEL, 1).
|
-define(COMPRESSION_LEVEL, 1).
|
||||||
|
-define(HEADER_LENGTH, 56).
|
||||||
|
|
||||||
|
|
||||||
-record(state, {version = ?CURRENT_VERSION :: tuple(),
|
-record(state, {version = ?CURRENT_VERSION :: tuple(),
|
||||||
slot_index = gb_trees:empty() :: gb_trees:tree(),
|
slot_index :: list(),
|
||||||
next_position :: integer(),
|
next_position :: integer(),
|
||||||
smallest_sqn :: integer(),
|
smallest_sqn :: integer(),
|
||||||
largest_sqn :: integer()}).
|
highest_sqn :: integer(),
|
||||||
|
smallest_key :: string(),
|
||||||
|
highest_key :: string(),
|
||||||
|
slots_pointer :: integer(),
|
||||||
|
index_pointer :: integer(),
|
||||||
|
filter_pointer :: integer(),
|
||||||
|
summary_pointer :: integer(),
|
||||||
|
summary_length :: integer()}).
|
||||||
|
|
||||||
|
|
||||||
%% Start a bare file with an initial header and no further details
|
%% Start a bare file with an initial header and no further details
|
||||||
|
@ -186,10 +194,25 @@ create_file(Handle) ->
|
||||||
Header = create_header(initial),
|
Header = create_header(initial),
|
||||||
{ok, _} = file:position(Handle, bof),
|
{ok, _} = file:position(Handle, bof),
|
||||||
ok = file:write(Handle, Header),
|
ok = file:write(Handle, Header),
|
||||||
{ok, StartPos} = file:position(Handle),
|
{ok, StartPos} = file:position(Handle, cur),
|
||||||
FileMD = #state{next_position=StartPos},
|
FileMD = #state{next_position=StartPos},
|
||||||
{Handle, FileMD}.
|
{Handle, FileMD}.
|
||||||
|
|
||||||
|
|
||||||
|
%% The 56-byte header is made up of
|
||||||
|
%% - 1 byte version (major 5 bits, minor 3 bits) - default 0.1
|
||||||
|
%% - 1 byte options (currently undefined)
|
||||||
|
%% - 1 byte Block Size - the expected number of keys in each block
|
||||||
|
%% - 1 byte Block Count - the expected number of blocks in each slot
|
||||||
|
%% - 2 byte Slot Count - the maximum number of slots in the file
|
||||||
|
%% - 6 bytes - spare
|
||||||
|
%% - 4 bytes - Blocks length
|
||||||
|
%% - 4 bytes - Slot Index length
|
||||||
|
%% - 4 bytes - Slot Filter length
|
||||||
|
%% - 4 bytes - Table summary length
|
||||||
|
%% - 24 bytes - spare
|
||||||
|
%% - 4 bytes - CRC32
|
||||||
|
|
||||||
create_header(initial) ->
|
create_header(initial) ->
|
||||||
{Major, Minor} = ?CURRENT_VERSION,
|
{Major, Minor} = ?CURRENT_VERSION,
|
||||||
Version = <<Major:5, Minor:3>>,
|
Version = <<Major:5, Minor:3>>,
|
||||||
|
@ -203,13 +226,95 @@ create_header(initial) ->
|
||||||
CRC32 = erlang:crc32(H1),
|
CRC32 = erlang:crc32(H1),
|
||||||
<<H1/binary, CRC32:32/integer>>.
|
<<H1/binary, CRC32:32/integer>>.
|
||||||
|
|
||||||
|
|
||||||
|
%% Take a file handle with a previously created header and complete it based on
|
||||||
|
%% the two key lists KL1 and KL2
|
||||||
|
|
||||||
|
complete_file(Handle, FileMD, KL1, KL2, Level) ->
|
||||||
|
{UpdHandle,
|
||||||
|
PointerList,
|
||||||
|
{LowSQN, HighSQN},
|
||||||
|
{LowKey, HighKey}} = write_group(Handle, KL1, KL2, [], <<>>, Level,
|
||||||
|
fun sftwrite_function/2),
|
||||||
|
{ok, HeaderLengths} = file:pread(UpdHandle, 12, 16),
|
||||||
|
<<Blnth:32/integer,
|
||||||
|
Ilnth:32/integer,
|
||||||
|
Flnth:32/integer,
|
||||||
|
Slnth:32/integer>> = HeaderLengths,
|
||||||
|
{UpdHandle, FileMD#state{slot_index=PointerList,
|
||||||
|
smallest_sqn=LowSQN,
|
||||||
|
highest_sqn=HighSQN,
|
||||||
|
smallest_key=LowKey,
|
||||||
|
highest_key=HighKey,
|
||||||
|
slots_pointer=?HEADER_LENGTH,
|
||||||
|
index_pointer=?HEADER_LENGTH + Blnth,
|
||||||
|
filter_pointer=?HEADER_LENGTH + Blnth + Ilnth,
|
||||||
|
summary_pointer=?HEADER_LENGTH + Blnth + Ilnth + Flnth,
|
||||||
|
summary_length=Slnth}}.
|
||||||
|
|
||||||
|
%% Fetch a Key and Value from a file, returns
|
||||||
|
%% {value, KV} or not_present
|
||||||
|
|
||||||
|
fetch_keyvalue(Handle, FileMD, Key) ->
|
||||||
|
{_NearestKey, {FilterLen, PointerF},
|
||||||
|
{LengthList, PointerB}} = get_nearestkey(FileMD#state.slot_index, Key),
|
||||||
|
{ok, SegFilter} = file:pread(Handle,
|
||||||
|
PointerF + FileMD#state.filter_pointer,
|
||||||
|
FilterLen),
|
||||||
|
SegID = hash_for_segmentid({keyonly, Key}),
|
||||||
|
case check_for_segments(SegFilter, [SegID], true) of
|
||||||
|
{maybe_present, BlockList} ->
|
||||||
|
fetch_keyvalue_fromblock(BlockList,
|
||||||
|
Key,
|
||||||
|
LengthList,
|
||||||
|
Handle,
|
||||||
|
PointerB + FileMD#state.slots_pointer);
|
||||||
|
not_present ->
|
||||||
|
not_present;
|
||||||
|
error_so_maybe_present ->
|
||||||
|
fetch_keyvalue_fromblock(lists:seq(0,3),
|
||||||
|
Key,
|
||||||
|
LengthList,
|
||||||
|
Handle,
|
||||||
|
PointerB + FileMD#state.slots_pointer)
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) ->
|
||||||
|
not_present;
|
||||||
|
fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot) ->
|
||||||
|
Start = lists:sum(lists:sublist(LengthList, BlockNumber)),
|
||||||
|
Length = lists:nth(BlockNumber + 1, LengthList),
|
||||||
|
{ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length),
|
||||||
|
BlockToCheck = binary_to_term(BlockToCheckBin),
|
||||||
|
Result = lists:keyfind(Key, 1, BlockToCheck),
|
||||||
|
case Result of
|
||||||
|
false ->
|
||||||
|
fetch_keyvalue_fromblock(T, Key, LengthList, Handle, StartOfSlot);
|
||||||
|
KV ->
|
||||||
|
KV
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
get_nearestkey(KVList, Key) ->
|
||||||
|
get_nearestkey(KVList, Key, not_found).
|
||||||
|
|
||||||
|
get_nearestkey([], _KeyToFind, PrevV) ->
|
||||||
|
PrevV;
|
||||||
|
get_nearestkey([{K, _FilterInfo, _SlotInfo}|_T], KeyToFind, PrevV) when K > KeyToFind ->
|
||||||
|
PrevV;
|
||||||
|
get_nearestkey([Result|T], KeyToFind, _) ->
|
||||||
|
get_nearestkey(T, KeyToFind, Result).
|
||||||
|
|
||||||
|
|
||||||
%% Take a file handle at the sart position (after creating the header) and then
|
%% Take a file handle at the sart position (after creating the header) and then
|
||||||
%% write the Key lists to the file slot by slot.
|
%% write the Key lists to the file slot by slot.
|
||||||
%%
|
%%
|
||||||
%% Slots are created then written in bulk to impove I/O efficiency. Slots will
|
%% Slots are created then written in bulk to impove I/O efficiency. Slots will
|
||||||
%% be written in groups of 32
|
%% be written in groups of 32
|
||||||
|
|
||||||
|
|
||||||
write_group(Handle, KL1, KL2, SlotIndex, SerialisedSlots, Level, WriteFun) ->
|
write_group(Handle, KL1, KL2, SlotIndex, SerialisedSlots, Level, WriteFun) ->
|
||||||
write_group(Handle, KL1, KL2, {0, 0},
|
write_group(Handle, KL1, KL2, {0, 0},
|
||||||
SlotIndex, SerialisedSlots,
|
SlotIndex, SerialisedSlots,
|
||||||
|
@ -220,7 +325,7 @@ write_group(Handle, KL1, KL2, {SlotCount, SlotTotal},
|
||||||
SlotIndex, SerialisedSlots,
|
SlotIndex, SerialisedSlots,
|
||||||
{LSN, HSN}, LowKey, LastKey, Level, WriteFun)
|
{LSN, HSN}, LowKey, LastKey, Level, WriteFun)
|
||||||
when SlotCount =:= ?SLOT_GROUPWRITE_COUNT ->
|
when SlotCount =:= ?SLOT_GROUPWRITE_COUNT ->
|
||||||
UpdHandle = WriteFun(slot , {Handle, SerialisedSlots}),
|
UpdHandle = WriteFun(slots , {Handle, SerialisedSlots}),
|
||||||
case maxslots_bylevel(SlotTotal, Level) of
|
case maxslots_bylevel(SlotTotal, Level) of
|
||||||
reached ->
|
reached ->
|
||||||
UpdHandle;
|
UpdHandle;
|
||||||
|
@ -244,9 +349,14 @@ write_group(Handle, KL1, KL2, {SlotCount, SlotTotal},
|
||||||
FirstKey = case LowKey of null -> LowKey_Slot; _ -> LowKey end,
|
FirstKey = case LowKey of null -> LowKey_Slot; _ -> LowKey end,
|
||||||
case Status of
|
case Status of
|
||||||
partial ->
|
partial ->
|
||||||
UpdHandle = WriteFun(slot , {Handle, UpdSlots}),
|
UpdHandle = WriteFun(slots , {Handle, UpdSlots}),
|
||||||
WriteFun(finalise, {UpdHandle, UpdSlotIndex, SNExtremes,
|
ConvSlotIndex = convert_slotindex(UpdSlotIndex),
|
||||||
{FirstKey, FinalKey}});
|
FinHandle = WriteFun(finalise, {UpdHandle,
|
||||||
|
ConvSlotIndex,
|
||||||
|
SNExtremes,
|
||||||
|
{FirstKey, FinalKey}}),
|
||||||
|
{_, PointerIndex} = ConvSlotIndex,
|
||||||
|
{FinHandle, PointerIndex, SNExtremes, {FirstKey, FinalKey}};
|
||||||
full ->
|
full ->
|
||||||
write_group(Handle, KL1rem, KL2rem, {SlotCount + 1, SlotTotal + 1},
|
write_group(Handle, KL1rem, KL2rem, {SlotCount + 1, SlotTotal + 1},
|
||||||
UpdSlotIndex, UpdSlots,
|
UpdSlotIndex, UpdSlots,
|
||||||
|
@ -254,11 +364,58 @@ write_group(Handle, KL1, KL2, {SlotCount, SlotTotal},
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
sftwrite_function(slot, {Handle, _SerialisedSlots}) ->
|
%% Take a slot index, and remove the SegFilters replacing with pointers
|
||||||
|
%% Return a tuple of the accumulated slot filters, and a pointer-based slot-index
|
||||||
|
|
||||||
|
convert_slotindex(SlotIndex) ->
|
||||||
|
SlotFun = fun({LowKey, SegFilter, LengthList},
|
||||||
|
{FilterAcc, SlotIndexAcc, PointerF, PointerB}) ->
|
||||||
|
FilterOut = serialise_segment_filter(SegFilter),
|
||||||
|
FilterLen = byte_size(FilterOut),
|
||||||
|
{<<FilterAcc/binary, FilterOut/binary>>,
|
||||||
|
lists:append(SlotIndexAcc, [{LowKey,
|
||||||
|
{FilterLen, PointerF},
|
||||||
|
{LengthList, PointerB}}]),
|
||||||
|
PointerF + FilterLen,
|
||||||
|
PointerB + lists:sum(LengthList)} end,
|
||||||
|
{SlotFilters, PointerIndex, _FLength, _BLength} = lists:foldl(SlotFun,
|
||||||
|
{<<>>, [], 0, 0},
|
||||||
|
SlotIndex),
|
||||||
|
{SlotFilters, PointerIndex}.
|
||||||
|
|
||||||
|
sftwrite_function(slots, {Handle, SerialisedSlots}) ->
|
||||||
|
ok = file:write(Handle, SerialisedSlots),
|
||||||
Handle;
|
Handle;
|
||||||
sftwrite_function(finalise,
|
sftwrite_function(finalise,
|
||||||
{Handle, _UpdSlotIndex, _SNExtremes, _KeyExtremes}) ->
|
{Handle,
|
||||||
Handle.
|
{SlotFilters, PointerIndex},
|
||||||
|
_SNExtremes,
|
||||||
|
_KeyExtremes}) ->
|
||||||
|
{ok, Position} = file:position(Handle, cur),
|
||||||
|
|
||||||
|
SlotsLength = Position - ?HEADER_LENGTH,
|
||||||
|
SerialisedIndex = term_to_binary(PointerIndex),
|
||||||
|
IndexLength = byte_size(SerialisedIndex),
|
||||||
|
|
||||||
|
%% Write Index
|
||||||
|
ok = file:write(Handle, SerialisedIndex),
|
||||||
|
%% Write Filter
|
||||||
|
ok = file:write(Handle, SlotFilters),
|
||||||
|
%% Write Lengths into header
|
||||||
|
ok = file:pwrite(Handle, 12, <<SlotsLength:32/integer,
|
||||||
|
IndexLength:32/integer>>),
|
||||||
|
Handle;
|
||||||
|
sftwrite_function(finalise,
|
||||||
|
{Handle,
|
||||||
|
SlotIndex,
|
||||||
|
SNExtremes,
|
||||||
|
KeyExtremes}) ->
|
||||||
|
{SlotFilters, PointerIndex} = convert_slotindex(SlotIndex),
|
||||||
|
sftwrite_function(finalise,
|
||||||
|
{Handle,
|
||||||
|
{SlotFilters, PointerIndex},
|
||||||
|
SNExtremes,
|
||||||
|
KeyExtremes}).
|
||||||
|
|
||||||
maxslots_bylevel(SlotTotal, _Level) ->
|
maxslots_bylevel(SlotTotal, _Level) ->
|
||||||
case SlotTotal of
|
case SlotTotal of
|
||||||
|
@ -269,6 +426,7 @@ maxslots_bylevel(SlotTotal, _Level) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%% Take two potentially overlapping lists of keys and output a Block,
|
%% Take two potentially overlapping lists of keys and output a Block,
|
||||||
%% together with:
|
%% together with:
|
||||||
%% - block status (full, partial)
|
%% - block status (full, partial)
|
||||||
|
@ -551,9 +709,16 @@ serialise_segment_filter({DeltaList, TopHashes}) ->
|
||||||
<<Acc/bitstring,
|
<<Acc/bitstring,
|
||||||
Exponent/bitstring, Remainder:Factor/integer,
|
Exponent/bitstring, Remainder:Factor/integer,
|
||||||
Block2Bit:2/integer>> end,
|
Block2Bit:2/integer>> end,
|
||||||
lists:foldl(F, HeaderBin, DeltaList).
|
pad_binary(lists:foldl(F, HeaderBin, DeltaList)).
|
||||||
|
|
||||||
|
|
||||||
|
pad_binary(BitString) ->
|
||||||
|
Pad = 8 - bit_size(BitString) rem 8,
|
||||||
|
case Pad of
|
||||||
|
8 -> BitString;
|
||||||
|
_ -> <<BitString/bitstring, 0:Pad/integer>>
|
||||||
|
end.
|
||||||
|
|
||||||
buildexponent(Exponent) ->
|
buildexponent(Exponent) ->
|
||||||
buildexponent(Exponent, <<0:1>>).
|
buildexponent(Exponent, <<0:1>>).
|
||||||
|
|
||||||
|
@ -880,7 +1045,8 @@ merge_seglists_test() ->
|
||||||
2:2, 1708:13, 2:2>>,
|
2:2, 1708:13, 2:2>>,
|
||||||
ExpectedResult = <<ExpectedTopHashes/bitstring,
|
ExpectedResult = <<ExpectedTopHashes/bitstring,
|
||||||
7:16/integer,
|
7:16/integer,
|
||||||
ExpectedDeltas/bitstring>>,
|
ExpectedDeltas/bitstring,
|
||||||
|
0:7/integer>>,
|
||||||
?assertMatch(SegBin, ExpectedResult),
|
?assertMatch(SegBin, ExpectedResult),
|
||||||
R1 = check_for_segments(SegBin, [100], true),
|
R1 = check_for_segments(SegBin, [100], true),
|
||||||
?assertMatch(R1,{maybe_present, [0]}),
|
?assertMatch(R1,{maybe_present, [0]}),
|
||||||
|
@ -972,20 +1138,42 @@ createslot_stage3_test() ->
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
testwrite_function(slot, {Handle, SerialisedSlots}) ->
|
testwrite_function(slots, {Handle, SerialisedSlots}) ->
|
||||||
lists:append(Handle, [SerialisedSlots]);
|
lists:append(Handle, [SerialisedSlots]);
|
||||||
testwrite_function(finalise, {Handle, UpdSlotIndex, SNExtremes, KeyExtremes}) ->
|
testwrite_function(finalise, {Handle, ConvSlotIndex, SNExtremes, KeyExtremes}) ->
|
||||||
{Handle, UpdSlotIndex, SNExtremes, KeyExtremes}.
|
{Handle, ConvSlotIndex, SNExtremes, KeyExtremes}.
|
||||||
|
|
||||||
writegroup_stage1_test() ->
|
writegroup_stage1_test() ->
|
||||||
{KL1, KL2} = sample_keylist(),
|
{KL1, KL2} = sample_keylist(),
|
||||||
Output = write_group([], KL1, KL2, [], <<>>, 1, fun testwrite_function/2),
|
Output = write_group([], KL1, KL2, [], <<>>, 1, fun testwrite_function/2),
|
||||||
{Handle, UpdSlotIndex, SNExtremes, KeyExtremes} = Output,
|
{{Handle, {_, PointerIndex}, SNExtremes, KeyExtremes},
|
||||||
|
PointerIndex, SNExtremes, KeyExtremes} = Output,
|
||||||
?assertMatch(SNExtremes, {1,3}),
|
?assertMatch(SNExtremes, {1,3}),
|
||||||
?assertMatch(KeyExtremes, {{o, "Bucket1", "Key1"}, {o, "Bucket4", "Key1"}}),
|
?assertMatch(KeyExtremes, {{o, "Bucket1", "Key1"}, {o, "Bucket4", "Key1"}}),
|
||||||
[TopIndex|[]] = UpdSlotIndex,
|
[TopIndex|[]] = PointerIndex,
|
||||||
{TopKey, _SegFilter, LengthList} = TopIndex,
|
{TopKey, _SegFilter, {LengthList, _Total}} = TopIndex,
|
||||||
?assertMatch(TopKey, {o, "Bucket1", "Key1"}),
|
?assertMatch(TopKey, {o, "Bucket1", "Key1"}),
|
||||||
TotalLength = lists:foldl(fun(X, Acc) -> Acc + X end, 0, LengthList),
|
TotalLength = lists:foldl(fun(X, Acc) -> Acc + X end, 0, LengthList),
|
||||||
ActualLength = lists:foldl(fun(X, Acc) -> Acc + bit_size(X) end, 0, Handle),
|
ActualLength = lists:foldl(fun(X, Acc) -> Acc + bit_size(X) end, 0, Handle),
|
||||||
?assertMatch(TotalLength, ActualLength).
|
?assertMatch(TotalLength, ActualLength).
|
||||||
|
|
||||||
|
initial_create_header_test() ->
|
||||||
|
Output = create_header(initial),
|
||||||
|
?assertMatch(?HEADER_LENGTH, byte_size(Output)).
|
||||||
|
|
||||||
|
initial_create_file_test() ->
|
||||||
|
Filename = "test1.sft",
|
||||||
|
{KL1, KL2} = sample_keylist(),
|
||||||
|
{Handle, FileMD} = create_file(Filename),
|
||||||
|
{UpdHandle, UpdFileMD} = complete_file(Handle, FileMD, KL1, KL2, 1),
|
||||||
|
Result1 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key8"}),
|
||||||
|
io:format("Result is ~w~n", [Result1]),
|
||||||
|
?assertMatch(Result1, {{o, "Bucket1", "Key8"}, 1, {active, infinity}, null}),
|
||||||
|
Result2 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key88"}),
|
||||||
|
io:format("Result is ~w~n", [Result2]),
|
||||||
|
?assertMatch(Result2, not_present),
|
||||||
|
ok = file:close(UpdHandle),
|
||||||
|
ok = file:delete(Filename).
|
||||||
|
|
||||||
|
%% big_create_file_test() ->
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue