From 27dc026176ec89895c10a4a65c37548ff838e5e8 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Wed, 6 Jul 2016 10:52:47 +0100 Subject: [PATCH] Write a SFT File With some initial test support --- src/leveled_sft.erl | 226 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 207 insertions(+), 19 deletions(-) diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index c55e8ce..ef26814 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -168,13 +168,21 @@ -define(DIVISOR_BITS, 13). -define(DIVISOR, 8092). -define(COMPRESSION_LEVEL, 1). +-define(HEADER_LENGTH, 56). -record(state, {version = ?CURRENT_VERSION :: tuple(), - slot_index = gb_trees:empty() :: gb_trees:tree(), + slot_index :: list(), next_position :: integer(), smallest_sqn :: integer(), - largest_sqn :: integer()}). + highest_sqn :: integer(), + smallest_key :: string(), + highest_key :: string(), + slots_pointer :: integer(), + index_pointer :: integer(), + filter_pointer :: integer(), + summary_pointer :: integer(), + summary_length :: integer()}). %% Start a bare file with an initial header and no further details @@ -186,10 +194,25 @@ create_file(Handle) -> Header = create_header(initial), {ok, _} = file:position(Handle, bof), ok = file:write(Handle, Header), - {ok, StartPos} = file:position(Handle), + {ok, StartPos} = file:position(Handle, cur), FileMD = #state{next_position=StartPos}, {Handle, FileMD}. + +%% The 56-byte header is made up of +%% - 1 byte version (major 5 bits, minor 3 bits) - default 0.1 +%% - 1 byte options (currently undefined) +%% - 1 byte Block Size - the expected number of keys in each block +%% - 1 byte Block Count - the expected number of blocks in each slot +%% - 2 byte Slot Count - the maximum number of slots in the file +%% - 6 bytes - spare +%% - 4 bytes - Blocks length +%% - 4 bytes - Slot Index length +%% - 4 bytes - Slot Filter length +%% - 4 bytes - Table summary length +%% - 24 bytes - spare +%% - 4 bytes - CRC32 + create_header(initial) -> {Major, Minor} = ?CURRENT_VERSION, Version = <>, @@ -203,13 +226,95 @@ create_header(initial) -> CRC32 = erlang:crc32(H1), <

>. + +%% Take a file handle with a previously created header and complete it based on +%% the two key lists KL1 and KL2 + +complete_file(Handle, FileMD, KL1, KL2, Level) -> + {UpdHandle, + PointerList, + {LowSQN, HighSQN}, + {LowKey, HighKey}} = write_group(Handle, KL1, KL2, [], <<>>, Level, + fun sftwrite_function/2), + {ok, HeaderLengths} = file:pread(UpdHandle, 12, 16), + <> = HeaderLengths, + {UpdHandle, FileMD#state{slot_index=PointerList, + smallest_sqn=LowSQN, + highest_sqn=HighSQN, + smallest_key=LowKey, + highest_key=HighKey, + slots_pointer=?HEADER_LENGTH, + index_pointer=?HEADER_LENGTH + Blnth, + filter_pointer=?HEADER_LENGTH + Blnth + Ilnth, + summary_pointer=?HEADER_LENGTH + Blnth + Ilnth + Flnth, + summary_length=Slnth}}. + +%% Fetch a Key and Value from a file, returns +%% {value, KV} or not_present + +fetch_keyvalue(Handle, FileMD, Key) -> + {_NearestKey, {FilterLen, PointerF}, + {LengthList, PointerB}} = get_nearestkey(FileMD#state.slot_index, Key), + {ok, SegFilter} = file:pread(Handle, + PointerF + FileMD#state.filter_pointer, + FilterLen), + SegID = hash_for_segmentid({keyonly, Key}), + case check_for_segments(SegFilter, [SegID], true) of + {maybe_present, BlockList} -> + fetch_keyvalue_fromblock(BlockList, + Key, + LengthList, + Handle, + PointerB + FileMD#state.slots_pointer); + not_present -> + not_present; + error_so_maybe_present -> + fetch_keyvalue_fromblock(lists:seq(0,3), + Key, + LengthList, + Handle, + PointerB + FileMD#state.slots_pointer) + end. + + +fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) -> + not_present; +fetch_keyvalue_fromblock([BlockNumber|T], Key, LengthList, Handle, StartOfSlot) -> + Start = lists:sum(lists:sublist(LengthList, BlockNumber)), + Length = lists:nth(BlockNumber + 1, LengthList), + {ok, BlockToCheckBin} = file:pread(Handle, Start + StartOfSlot, Length), + BlockToCheck = binary_to_term(BlockToCheckBin), + Result = lists:keyfind(Key, 1, BlockToCheck), + case Result of + false -> + fetch_keyvalue_fromblock(T, Key, LengthList, Handle, StartOfSlot); + KV -> + KV + end. + + + + +get_nearestkey(KVList, Key) -> + get_nearestkey(KVList, Key, not_found). + +get_nearestkey([], _KeyToFind, PrevV) -> + PrevV; +get_nearestkey([{K, _FilterInfo, _SlotInfo}|_T], KeyToFind, PrevV) when K > KeyToFind -> + PrevV; +get_nearestkey([Result|T], KeyToFind, _) -> + get_nearestkey(T, KeyToFind, Result). + + %% Take a file handle at the sart position (after creating the header) and then %% write the Key lists to the file slot by slot. %% %% Slots are created then written in bulk to impove I/O efficiency. Slots will %% be written in groups of 32 - write_group(Handle, KL1, KL2, SlotIndex, SerialisedSlots, Level, WriteFun) -> write_group(Handle, KL1, KL2, {0, 0}, SlotIndex, SerialisedSlots, @@ -220,7 +325,7 @@ write_group(Handle, KL1, KL2, {SlotCount, SlotTotal}, SlotIndex, SerialisedSlots, {LSN, HSN}, LowKey, LastKey, Level, WriteFun) when SlotCount =:= ?SLOT_GROUPWRITE_COUNT -> - UpdHandle = WriteFun(slot , {Handle, SerialisedSlots}), + UpdHandle = WriteFun(slots , {Handle, SerialisedSlots}), case maxslots_bylevel(SlotTotal, Level) of reached -> UpdHandle; @@ -244,9 +349,14 @@ write_group(Handle, KL1, KL2, {SlotCount, SlotTotal}, FirstKey = case LowKey of null -> LowKey_Slot; _ -> LowKey end, case Status of partial -> - UpdHandle = WriteFun(slot , {Handle, UpdSlots}), - WriteFun(finalise, {UpdHandle, UpdSlotIndex, SNExtremes, - {FirstKey, FinalKey}}); + UpdHandle = WriteFun(slots , {Handle, UpdSlots}), + ConvSlotIndex = convert_slotindex(UpdSlotIndex), + FinHandle = WriteFun(finalise, {UpdHandle, + ConvSlotIndex, + SNExtremes, + {FirstKey, FinalKey}}), + {_, PointerIndex} = ConvSlotIndex, + {FinHandle, PointerIndex, SNExtremes, {FirstKey, FinalKey}}; full -> write_group(Handle, KL1rem, KL2rem, {SlotCount + 1, SlotTotal + 1}, UpdSlotIndex, UpdSlots, @@ -254,11 +364,58 @@ write_group(Handle, KL1, KL2, {SlotCount, SlotTotal}, end. -sftwrite_function(slot, {Handle, _SerialisedSlots}) -> +%% Take a slot index, and remove the SegFilters replacing with pointers +%% Return a tuple of the accumulated slot filters, and a pointer-based slot-index + +convert_slotindex(SlotIndex) -> + SlotFun = fun({LowKey, SegFilter, LengthList}, + {FilterAcc, SlotIndexAcc, PointerF, PointerB}) -> + FilterOut = serialise_segment_filter(SegFilter), + FilterLen = byte_size(FilterOut), + {<>, + lists:append(SlotIndexAcc, [{LowKey, + {FilterLen, PointerF}, + {LengthList, PointerB}}]), + PointerF + FilterLen, + PointerB + lists:sum(LengthList)} end, + {SlotFilters, PointerIndex, _FLength, _BLength} = lists:foldl(SlotFun, + {<<>>, [], 0, 0}, + SlotIndex), + {SlotFilters, PointerIndex}. + +sftwrite_function(slots, {Handle, SerialisedSlots}) -> + ok = file:write(Handle, SerialisedSlots), Handle; sftwrite_function(finalise, - {Handle, _UpdSlotIndex, _SNExtremes, _KeyExtremes}) -> - Handle. + {Handle, + {SlotFilters, PointerIndex}, + _SNExtremes, + _KeyExtremes}) -> + {ok, Position} = file:position(Handle, cur), + + SlotsLength = Position - ?HEADER_LENGTH, + SerialisedIndex = term_to_binary(PointerIndex), + IndexLength = byte_size(SerialisedIndex), + + %% Write Index + ok = file:write(Handle, SerialisedIndex), + %% Write Filter + ok = file:write(Handle, SlotFilters), + %% Write Lengths into header + ok = file:pwrite(Handle, 12, <>), + Handle; +sftwrite_function(finalise, + {Handle, + SlotIndex, + SNExtremes, + KeyExtremes}) -> + {SlotFilters, PointerIndex} = convert_slotindex(SlotIndex), + sftwrite_function(finalise, + {Handle, + {SlotFilters, PointerIndex}, + SNExtremes, + KeyExtremes}). maxslots_bylevel(SlotTotal, _Level) -> case SlotTotal of @@ -269,6 +426,7 @@ maxslots_bylevel(SlotTotal, _Level) -> end. + %% Take two potentially overlapping lists of keys and output a Block, %% together with: %% - block status (full, partial) @@ -551,9 +709,16 @@ serialise_segment_filter({DeltaList, TopHashes}) -> <> end, - lists:foldl(F, HeaderBin, DeltaList). + pad_binary(lists:foldl(F, HeaderBin, DeltaList)). +pad_binary(BitString) -> + Pad = 8 - bit_size(BitString) rem 8, + case Pad of + 8 -> BitString; + _ -> <> + end. + buildexponent(Exponent) -> buildexponent(Exponent, <<0:1>>). @@ -880,7 +1045,8 @@ merge_seglists_test() -> 2:2, 1708:13, 2:2>>, ExpectedResult = <>, + ExpectedDeltas/bitstring, + 0:7/integer>>, ?assertMatch(SegBin, ExpectedResult), R1 = check_for_segments(SegBin, [100], true), ?assertMatch(R1,{maybe_present, [0]}), @@ -972,20 +1138,42 @@ createslot_stage3_test() -> -testwrite_function(slot, {Handle, SerialisedSlots}) -> +testwrite_function(slots, {Handle, SerialisedSlots}) -> lists:append(Handle, [SerialisedSlots]); -testwrite_function(finalise, {Handle, UpdSlotIndex, SNExtremes, KeyExtremes}) -> - {Handle, UpdSlotIndex, SNExtremes, KeyExtremes}. +testwrite_function(finalise, {Handle, ConvSlotIndex, SNExtremes, KeyExtremes}) -> + {Handle, ConvSlotIndex, SNExtremes, KeyExtremes}. writegroup_stage1_test() -> {KL1, KL2} = sample_keylist(), Output = write_group([], KL1, KL2, [], <<>>, 1, fun testwrite_function/2), - {Handle, UpdSlotIndex, SNExtremes, KeyExtremes} = Output, + {{Handle, {_, PointerIndex}, SNExtremes, KeyExtremes}, + PointerIndex, SNExtremes, KeyExtremes} = Output, ?assertMatch(SNExtremes, {1,3}), ?assertMatch(KeyExtremes, {{o, "Bucket1", "Key1"}, {o, "Bucket4", "Key1"}}), - [TopIndex|[]] = UpdSlotIndex, - {TopKey, _SegFilter, LengthList} = TopIndex, + [TopIndex|[]] = PointerIndex, + {TopKey, _SegFilter, {LengthList, _Total}} = TopIndex, ?assertMatch(TopKey, {o, "Bucket1", "Key1"}), TotalLength = lists:foldl(fun(X, Acc) -> Acc + X end, 0, LengthList), ActualLength = lists:foldl(fun(X, Acc) -> Acc + bit_size(X) end, 0, Handle), ?assertMatch(TotalLength, ActualLength). + +initial_create_header_test() -> + Output = create_header(initial), + ?assertMatch(?HEADER_LENGTH, byte_size(Output)). + +initial_create_file_test() -> + Filename = "test1.sft", + {KL1, KL2} = sample_keylist(), + {Handle, FileMD} = create_file(Filename), + {UpdHandle, UpdFileMD} = complete_file(Handle, FileMD, KL1, KL2, 1), + Result1 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key8"}), + io:format("Result is ~w~n", [Result1]), + ?assertMatch(Result1, {{o, "Bucket1", "Key8"}, 1, {active, infinity}, null}), + Result2 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key88"}), + io:format("Result is ~w~n", [Result2]), + ?assertMatch(Result2, not_present), + ok = file:close(UpdHandle), + ok = file:delete(Filename). + +%% big_create_file_test() -> + \ No newline at end of file