Refactor writing SFT Files
Previously the code had involved veyr high arity functions which were hard to follow. This has been simplified somewhat with the addition of a writer record to make things easier to track, as well as a general refactoring to better logically seperate the building of things.
This commit is contained in:
parent
addd4c89d0
commit
1f56501499
1 changed files with 248 additions and 379 deletions
|
@ -214,6 +214,13 @@
|
|||
oversized_file = false :: boolean(),
|
||||
penciller :: pid()}).
|
||||
|
||||
%% Helper object when writing a file to keep track of various accumulators
|
||||
-record(writer, {slot_index = [] :: list(),
|
||||
slot_binary = <<>> :: binary(),
|
||||
bloom = leveled_tinybloom:empty(?BLOOM_WIDTH),
|
||||
min_sqn = infinity :: integer()|infinity,
|
||||
max_sqn = 0 :: integer(),
|
||||
last_key = {last, null}}).
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
|
@ -532,13 +539,12 @@ complete_file(Handle, FileMD, KL1, KL2, LevelR) ->
|
|||
complete_file(Handle, FileMD, KL1, KL2, LevelR, false).
|
||||
|
||||
complete_file(Handle, FileMD, KL1, KL2, LevelR, Rename) ->
|
||||
EmptyBloom = leveled_tinybloom:empty(?BLOOM_WIDTH),
|
||||
{ok, KeyRemainders} = write_keys(Handle,
|
||||
maybe_expand_pointer(KL1),
|
||||
maybe_expand_pointer(KL2),
|
||||
[], <<>>, EmptyBloom,
|
||||
LevelR,
|
||||
fun sftwrite_function/2),
|
||||
fun sftwrite_function/2,
|
||||
#writer{}),
|
||||
{ReadHandle, UpdFileMD} = case Rename of
|
||||
false ->
|
||||
open_file(FileMD);
|
||||
|
@ -570,27 +576,33 @@ rename_file(OldName, NewName) ->
|
|||
%% A key out of range may fail
|
||||
|
||||
fetch_keyvalue(Handle, FileMD, Key) ->
|
||||
{_NearestKey, {FilterLen, PointerF},
|
||||
{LengthList, PointerB}} = get_nearestkey(FileMD#state.slot_index, Key),
|
||||
{ok, SegFilter} = file:pread(Handle,
|
||||
PointerF + FileMD#state.filter_pointer,
|
||||
FilterLen),
|
||||
SegID = hash_for_segmentid({keyonly, Key}),
|
||||
case check_for_segments(SegFilter, [SegID], true) of
|
||||
{maybe_present, BlockList} ->
|
||||
fetch_keyvalue_fromblock(BlockList,
|
||||
Key,
|
||||
LengthList,
|
||||
Handle,
|
||||
PointerB + FileMD#state.slots_pointer);
|
||||
not_present ->
|
||||
case get_nearestkey(FileMD#state.slot_index, Key) of
|
||||
not_found ->
|
||||
not_present;
|
||||
error_so_maybe_present ->
|
||||
fetch_keyvalue_fromblock(lists:seq(0,length(LengthList)),
|
||||
Key,
|
||||
LengthList,
|
||||
Handle,
|
||||
PointerB + FileMD#state.slots_pointer)
|
||||
{_NearestKey, {FilterLen, PointerF}, {LengthList, PointerB}} ->
|
||||
FilterPointer = PointerF + FileMD#state.filter_pointer,
|
||||
{ok, SegFilter} = file:pread(Handle,
|
||||
FilterPointer,
|
||||
FilterLen),
|
||||
SegID = hash_for_segmentid({keyonly, Key}),
|
||||
case check_for_segments(SegFilter, [SegID], true) of
|
||||
{maybe_present, BlockList} ->
|
||||
BlockPointer = PointerB + FileMD#state.slots_pointer,
|
||||
fetch_keyvalue_fromblock(BlockList,
|
||||
Key,
|
||||
LengthList,
|
||||
Handle,
|
||||
BlockPointer);
|
||||
not_present ->
|
||||
not_present;
|
||||
error_so_maybe_present ->
|
||||
BlockPointer = PointerB + FileMD#state.slots_pointer,
|
||||
fetch_keyvalue_fromblock(lists:seq(0,length(LengthList)),
|
||||
Key,
|
||||
LengthList,
|
||||
Handle,
|
||||
BlockPointer)
|
||||
end
|
||||
end.
|
||||
|
||||
%% Fetches a range of keys returning a list of {Key, SeqN} tuples
|
||||
|
@ -767,100 +779,59 @@ get_nextkeyaftermatch([_KTuple|T], KeyToFind, PrevV) ->
|
|||
%% write the Key lists to the file slot by slot.
|
||||
%%
|
||||
%% Slots are created then written in bulk to impove I/O efficiency. Slots will
|
||||
%% be written in groups of 32
|
||||
%% be written in groups
|
||||
|
||||
write_keys(Handle,
|
||||
KL1, KL2,
|
||||
SlotIndex, SerialisedSlots, InitialBloom,
|
||||
LevelR, WriteFun) ->
|
||||
write_keys(Handle,
|
||||
KL1, KL2,
|
||||
{0, 0},
|
||||
SlotIndex, SerialisedSlots, InitialBloom,
|
||||
{infinity, 0}, null, {last, null},
|
||||
LevelR, WriteFun).
|
||||
write_keys(Handle, KL1, KL2, LevelR, WriteFun, WriteState) ->
|
||||
write_keys(Handle, KL1, KL2, LevelR, WriteFun, WriteState, {0, 0, []}).
|
||||
|
||||
|
||||
write_keys(Handle,
|
||||
KL1, KL2,
|
||||
{SlotCount, SlotTotal},
|
||||
SlotIndex, SerialisedSlots, Bloom,
|
||||
{LSN, HSN}, LowKey, LastKey,
|
||||
LevelR, WriteFun)
|
||||
when SlotCount =:= ?SLOT_GROUPWRITE_COUNT ->
|
||||
UpdHandle = WriteFun(slots , {Handle, SerialisedSlots}),
|
||||
case maxslots_bylevel(SlotTotal, LevelR#level.level) of
|
||||
write_keys(Handle, KL1, KL2, LevelR, WriteFun, WState,
|
||||
{SlotC, SlotT, SlotLists})
|
||||
when SlotC =:= ?SLOT_GROUPWRITE_COUNT ->
|
||||
WState0 = lists:foldl(fun finalise_slot/2, WState, SlotLists),
|
||||
Handle0 = WriteFun(slots, {Handle, WState0#writer.slot_binary}),
|
||||
case maxslots_bylevel(SlotT, LevelR#level.level) of
|
||||
reached ->
|
||||
{complete_keywrite(UpdHandle,
|
||||
SlotIndex,
|
||||
{{LSN, HSN}, {LowKey, LastKey}, Bloom},
|
||||
WriteFun),
|
||||
{KL1, KL2}};
|
||||
{complete_keywrite(Handle0, WState0, WriteFun), {KL1, KL2}};
|
||||
continue ->
|
||||
write_keys(UpdHandle,
|
||||
KL1, KL2,
|
||||
{0, SlotTotal},
|
||||
SlotIndex, <<>>, Bloom,
|
||||
{LSN, HSN}, LowKey, LastKey,
|
||||
LevelR, WriteFun)
|
||||
write_keys(Handle0, KL1, KL2, LevelR, WriteFun,
|
||||
WState0#writer{slot_binary = <<>>}, {0, SlotT, []})
|
||||
end;
|
||||
write_keys(Handle,
|
||||
KL1, KL2,
|
||||
{SlotCount, SlotTotal},
|
||||
SlotIndex, SerialisedSlots, Bloom,
|
||||
{LSN, HSN}, LowKey, LastKey,
|
||||
LevelR, WriteFun) ->
|
||||
SlotOutput = create_slot(KL1, KL2, LevelR, Bloom),
|
||||
{{LowKey_Slot, SegFilter, SerialisedSlot, LengthList},
|
||||
{{LSN_Slot, HSN_Slot}, LastKey_Slot, Status},
|
||||
UpdBloom,
|
||||
KL1rem, KL2rem} = SlotOutput,
|
||||
UpdSlotIndex = lists:append(SlotIndex,
|
||||
[{LowKey_Slot, SegFilter, LengthList}]),
|
||||
UpdSlots = <<SerialisedSlots/binary, SerialisedSlot/binary>>,
|
||||
SNExtremes = {min(LSN_Slot, LSN), max(HSN_Slot, HSN)},
|
||||
FinalKey = case LastKey_Slot of
|
||||
null -> LastKey;
|
||||
_ -> LastKey_Slot
|
||||
end,
|
||||
FirstKey = case LowKey of
|
||||
null -> LowKey_Slot;
|
||||
_ -> LowKey
|
||||
end,
|
||||
write_keys(Handle, KL1, KL2, LevelR, WriteFun, WState,
|
||||
{SlotC, SlotT, SlotLists}) ->
|
||||
{Status, BlockKeyLists} = create_slot(KL1, KL2, LevelR),
|
||||
case Status of
|
||||
partial ->
|
||||
UpdHandle = WriteFun(slots , {Handle, UpdSlots}),
|
||||
{complete_keywrite(UpdHandle,
|
||||
UpdSlotIndex,
|
||||
{SNExtremes, {FirstKey, FinalKey}, UpdBloom},
|
||||
WriteFun),
|
||||
{KL1rem, KL2rem}};
|
||||
full ->
|
||||
write_keys(Handle,
|
||||
KL1rem, KL2rem,
|
||||
{SlotCount + 1, SlotTotal + 1},
|
||||
UpdSlotIndex, UpdSlots, UpdBloom,
|
||||
SNExtremes, FirstKey, FinalKey,
|
||||
LevelR, WriteFun);
|
||||
complete ->
|
||||
UpdHandle = WriteFun(slots , {Handle, UpdSlots}),
|
||||
{complete_keywrite(UpdHandle,
|
||||
UpdSlotIndex,
|
||||
{SNExtremes, {FirstKey, FinalKey}, UpdBloom},
|
||||
WriteFun),
|
||||
{KL1rem, KL2rem}}
|
||||
S when S == complete; S == partial ->
|
||||
WState0 =
|
||||
case BlockKeyLists of
|
||||
[[]] ->
|
||||
WState;
|
||||
_ ->
|
||||
lists:foldl(fun finalise_slot/2,
|
||||
WState,
|
||||
SlotLists ++ [BlockKeyLists])
|
||||
end,
|
||||
Handle0 = WriteFun(slots, {Handle, WState0#writer.slot_binary}),
|
||||
{complete_keywrite(Handle0, WState0, WriteFun), {[], []}};
|
||||
{full, KL1Rem, KL2Rem} ->
|
||||
write_keys(Handle, KL1Rem, KL2Rem, LevelR, WriteFun, WState,
|
||||
{SlotC + 1, SlotT, SlotLists ++ [BlockKeyLists]})
|
||||
end.
|
||||
|
||||
|
||||
|
||||
complete_keywrite(Handle,
|
||||
SlotIndex,
|
||||
{SNExtremes, {FirstKey, FinalKey}, Bloom},
|
||||
WriteFun) ->
|
||||
ConvSlotIndex = convert_slotindex(SlotIndex),
|
||||
complete_keywrite(Handle, WriteState, WriteFun) ->
|
||||
FirstKey =
|
||||
case length(WriteState#writer.slot_index) of
|
||||
0 ->
|
||||
null;
|
||||
_ ->
|
||||
element(1, lists:nth(1, WriteState#writer.slot_index))
|
||||
end,
|
||||
ConvSlotIndex = convert_slotindex(WriteState#writer.slot_index),
|
||||
WriteFun(finalise, {Handle,
|
||||
ConvSlotIndex,
|
||||
{SNExtremes, {FirstKey, FinalKey}, Bloom}}).
|
||||
|
||||
{{WriteState#writer.min_sqn, WriteState#writer.max_sqn},
|
||||
{FirstKey, WriteState#writer.last_key},
|
||||
WriteState#writer.bloom}}).
|
||||
|
||||
%% Take a slot index, and remove the SegFilters replacing with pointers
|
||||
%% Return a tuple of the accumulated slot filters, and a pointer-based
|
||||
|
@ -877,9 +848,8 @@ convert_slotindex(SlotIndex) ->
|
|||
{LengthList, PointerB}}]),
|
||||
PointerF + FilterLen,
|
||||
PointerB + lists:sum(LengthList)} end,
|
||||
{SlotFilters, PointerIndex, _FLength, _BLength} = lists:foldl(SlotFun,
|
||||
{<<>>, [], 0, 0},
|
||||
SlotIndex),
|
||||
{SlotFilters, PointerIndex, _FLength, _BLength} =
|
||||
lists:foldl(SlotFun, {<<>>, [], 0, 0}, SlotIndex),
|
||||
{SlotFilters, PointerIndex}.
|
||||
|
||||
sftwrite_function(slots, {Handle, SerialisedSlots}) ->
|
||||
|
@ -927,159 +897,116 @@ maxslots_bylevel(SlotTotal, _Level) ->
|
|||
|
||||
|
||||
|
||||
%% Take two potentially overlapping lists of keys and output a Block,
|
||||
%% together with:
|
||||
%% - block status (full, partial)
|
||||
%% - the lowest and highest sequence numbers in the block
|
||||
%% - the list of segment IDs in the block
|
||||
%% - the remainders of the lists
|
||||
%% The Key lists must be sorted in key order. The last key in a list may be
|
||||
%% a pointer to request more keys for the file (otherwise it is assumed there
|
||||
%% are no more keys)
|
||||
%%
|
||||
%% Level also to be passed in
|
||||
%% This is either an integer (to be ignored) of {floor, os:timestamp()}
|
||||
%% if this is the basement level of the LevelDB database and expired keys
|
||||
%% and tombstone should be reaped
|
||||
%% Take two potentially overlapping lists of keys and produce a block size
|
||||
%% list of keys in the correct order. Outputs:
|
||||
%% - Status of
|
||||
%% - - all_complete (no more keys and block is complete)
|
||||
%% - - partial (no more keys and block is not complete)
|
||||
%% - - {block_full, Rem1, Rem2} the block is complete but there is a remainder
|
||||
%% of keys
|
||||
|
||||
create_block(KeyList1, KeyList2, LevelR) ->
|
||||
create_block(KeyList1, KeyList2, LevelR, []).
|
||||
|
||||
|
||||
%% Do we need to check here that KeyList1 and KeyList2 are not just a [pointer]
|
||||
%% Otherwise the pointer will never be expanded
|
||||
%%
|
||||
%% Also this should return a partial block if the KeyLists have been exhausted
|
||||
%% but the block is full
|
||||
|
||||
create_block(KeyList1, KeyList2, LevelR, Bloom) ->
|
||||
create_block(KeyList1, KeyList2, [], {infinity, 0}, [], LevelR, Bloom).
|
||||
|
||||
create_block(KeyList1, KeyList2,
|
||||
BlockKeyList, {LSN, HSN}, SegmentList, _LevelR, Bloom)
|
||||
create_block([], [], _LevelR, BlockKeyList)
|
||||
when length(BlockKeyList)==?BLOCK_SIZE ->
|
||||
case {KeyList1, KeyList2} of
|
||||
{[], []} ->
|
||||
{lists:reverse(BlockKeyList),
|
||||
complete,
|
||||
{LSN, HSN},
|
||||
SegmentList,
|
||||
Bloom,
|
||||
[], []};
|
||||
_ ->
|
||||
{lists:reverse(BlockKeyList),
|
||||
full,
|
||||
{LSN, HSN},
|
||||
SegmentList,
|
||||
Bloom,
|
||||
KeyList1, KeyList2}
|
||||
end;
|
||||
create_block([], [], BlockKeyList, {LSN, HSN}, SegmentList, _LevelR, Bloom) ->
|
||||
{lists:reverse(BlockKeyList),
|
||||
partial,
|
||||
{LSN, HSN},
|
||||
SegmentList,
|
||||
Bloom,
|
||||
[], []};
|
||||
create_block(KeyList1, KeyList2,
|
||||
BlockKeyList, {LSN, HSN}, SegmentList, LevelR, Bloom) ->
|
||||
case key_dominates(KeyList1,
|
||||
KeyList2,
|
||||
{all_complete, lists:reverse(BlockKeyList)};
|
||||
create_block([], [], _LevelR, BlockKeyList) ->
|
||||
{partial, lists:reverse(BlockKeyList)};
|
||||
create_block(KeyList1, KeyList2, _LevelR, BlockKeyList)
|
||||
when length(BlockKeyList)==?BLOCK_SIZE ->
|
||||
{{block_full, KeyList1, KeyList2}, lists:reverse(BlockKeyList)};
|
||||
create_block(KeyList1, KeyList2, LevelR, BlockKeyList) ->
|
||||
case key_dominates(KeyList1, KeyList2,
|
||||
{LevelR#level.is_basement, LevelR#level.timestamp}) of
|
||||
{{next_key, TopKey}, Rem1, Rem2} ->
|
||||
{_K, V} = TopKey,
|
||||
{SQN, _St, MH, _MD} = leveled_codec:striphead_to_details(V),
|
||||
{UpdLSN, UpdHSN} = update_sequencenumbers(SQN, LSN, HSN),
|
||||
UpdBloom = leveled_tinybloom:enter({hash, MH}, Bloom),
|
||||
NewBlockKeyList = [TopKey|BlockKeyList],
|
||||
NewSegmentList = [hash_for_segmentid(TopKey)|SegmentList],
|
||||
create_block(Rem1, Rem2,
|
||||
NewBlockKeyList, {UpdLSN, UpdHSN},
|
||||
NewSegmentList, LevelR, UpdBloom);
|
||||
create_block(Rem1, Rem2, LevelR, [TopKey|BlockKeyList]);
|
||||
{skipped_key, Rem1, Rem2} ->
|
||||
create_block(Rem1, Rem2,
|
||||
BlockKeyList, {LSN, HSN},
|
||||
SegmentList, LevelR, Bloom)
|
||||
create_block(Rem1, Rem2, LevelR, BlockKeyList)
|
||||
end.
|
||||
|
||||
%% create_slot should simply output a list of BlockKeyLists no bigger than
|
||||
%% the BlockCount, the the status (with key remianders if not complete)
|
||||
|
||||
create_slot(KL1, KL2, LevelR) ->
|
||||
create_slot(KL1, KL2, LevelR, ?BLOCK_COUNT, []).
|
||||
|
||||
create_slot(KL1, KL2, LevelR, BlockCount, BlockKeyLists) ->
|
||||
{Status, KeyList} = create_block(KL1, KL2, LevelR),
|
||||
case {Status, BlockCount - 1} of
|
||||
{partial, _N} ->
|
||||
{partial, BlockKeyLists ++ [KeyList]};
|
||||
{all_complete, 0} ->
|
||||
{complete, BlockKeyLists ++ [KeyList]};
|
||||
{all_complete, _N} ->
|
||||
% From the perspective of the slot it is partially complete
|
||||
{partial, BlockKeyLists ++ [KeyList]};
|
||||
{{block_full, KL1Rem, KL2Rem}, 0} ->
|
||||
{{full, KL1Rem, KL2Rem}, BlockKeyLists ++ [KeyList]};
|
||||
{{block_full, KL1Rem, KL2Rem}, N} ->
|
||||
create_slot(KL1Rem, KL2Rem, LevelR, N, BlockKeyLists ++ [KeyList])
|
||||
end.
|
||||
|
||||
|
||||
|
||||
%% Should return an index entry in the Slot Index. Each entry consists of:
|
||||
%% - Start Key
|
||||
%% - SegmentIDFilter for the (will eventually be replaced with a pointer)
|
||||
%% - Serialised Slot (will eventually be replaced with a pointer)
|
||||
%% - Length for each Block within the Serialised Slot
|
||||
%% Additional information will also be provided
|
||||
%% - {Low Seq Number, High Seq Number} within the slot
|
||||
%% - End Key
|
||||
%% - Whether the slot is full or partially filled
|
||||
%% - Remainder of any KeyLists used to make the slot
|
||||
%% Fold over the List of BlockKeys updating the writer record
|
||||
finalise_slot(BlockKeyLists, WriteState) ->
|
||||
BlockFolder =
|
||||
fun(KV, {AccMinSQN, AccMaxSQN, Bloom, SegmentIDList}) ->
|
||||
{SQN, Hash} = leveled_codec:strip_to_seqnhashonly(KV),
|
||||
{min(AccMinSQN, SQN),
|
||||
max(AccMaxSQN, SQN),
|
||||
leveled_tinybloom:enter({hash, Hash}, Bloom),
|
||||
[hash_for_segmentid(KV)|SegmentIDList]}
|
||||
end,
|
||||
SlotFolder =
|
||||
fun(BlockKeyList,
|
||||
{MinSQN, MaxSQN, Bloom, SegLists, KVBinary, Lengths}) ->
|
||||
{BlockMinSQN, BlockMaxSQN, UpdBloom, Segs} =
|
||||
lists:foldr(BlockFolder,
|
||||
{infinity, 0, Bloom, []},
|
||||
BlockKeyList),
|
||||
SerialisedBlock = serialise_block(BlockKeyList),
|
||||
{min(MinSQN, BlockMinSQN),
|
||||
max(MaxSQN, BlockMaxSQN),
|
||||
UpdBloom,
|
||||
SegLists ++ [Segs],
|
||||
<<KVBinary/binary, SerialisedBlock/binary>>,
|
||||
Lengths ++ [byte_size(SerialisedBlock)]}
|
||||
end,
|
||||
|
||||
{SlotMinSQN,
|
||||
SlotMaxSQN,
|
||||
SlotUpdBloom,
|
||||
SlotSegLists,
|
||||
SlotBinary,
|
||||
BlockLengths} =
|
||||
lists:foldl(SlotFolder,
|
||||
{WriteState#writer.min_sqn,
|
||||
WriteState#writer.max_sqn,
|
||||
WriteState#writer.bloom,
|
||||
[],
|
||||
WriteState#writer.slot_binary,
|
||||
[]},
|
||||
BlockKeyLists),
|
||||
|
||||
FirstSlotKey = leveled_codec:strip_to_keyonly(lists:nth(1,
|
||||
lists:nth(1,
|
||||
BlockKeyLists))),
|
||||
LastSlotKV = lists:last(lists:last(BlockKeyLists)),
|
||||
SegFilter = generate_segment_filter(SlotSegLists),
|
||||
UpdSlotIndex = lists:append(WriteState#writer.slot_index,
|
||||
[{FirstSlotKey, SegFilter, BlockLengths}]),
|
||||
|
||||
#writer{slot_index = UpdSlotIndex,
|
||||
slot_binary = SlotBinary,
|
||||
bloom = SlotUpdBloom,
|
||||
min_sqn = SlotMinSQN,
|
||||
max_sqn = SlotMaxSQN,
|
||||
last_key = leveled_codec:strip_to_keyonly(LastSlotKV)}.
|
||||
|
||||
|
||||
create_slot(KeyList1, KeyList2, Level, Bloom) ->
|
||||
create_slot(KeyList1, KeyList2, Level, ?BLOCK_COUNT, Bloom,
|
||||
[], <<>>, [],
|
||||
{null, infinity, 0, null, full}).
|
||||
|
||||
%% Keep adding blocks to the slot until either the block count is reached or
|
||||
%% there is a partial block
|
||||
|
||||
create_slot(KL1, KL2, _, 0, Bloom,
|
||||
SegLists, SerialisedSlot, LengthList,
|
||||
{LowKey, LSN, HSN, LastKey, Status}) ->
|
||||
{{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList},
|
||||
{{LSN, HSN}, LastKey, Status},
|
||||
Bloom,
|
||||
KL1, KL2};
|
||||
create_slot(KL1, KL2, _, _, Bloom,
|
||||
SegLists, SerialisedSlot, LengthList,
|
||||
{LowKey, LSN, HSN, LastKey, partial}) ->
|
||||
{{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList},
|
||||
{{LSN, HSN}, LastKey, partial},
|
||||
Bloom,
|
||||
KL1, KL2};
|
||||
create_slot(KL1, KL2, _, _, Bloom,
|
||||
SegLists, SerialisedSlot, LengthList,
|
||||
{LowKey, LSN, HSN, LastKey, complete}) ->
|
||||
{{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList},
|
||||
{{LSN, HSN}, LastKey, partial},
|
||||
Bloom,
|
||||
KL1, KL2};
|
||||
create_slot(KL1, KL2, LevelR, BlockCount, Bloom,
|
||||
SegLists, SerialisedSlot, LengthList,
|
||||
{LowKey, LSN, HSN, LastKey, _Status}) ->
|
||||
{BlockKeyList, Status,
|
||||
{LSNb, HSNb},
|
||||
SegmentList,
|
||||
UpdBloom,
|
||||
KL1b, KL2b} = create_block(KL1, KL2, LevelR, Bloom),
|
||||
TrackingMetadata = case {LowKey, BlockKeyList} of
|
||||
{null, []} ->
|
||||
{null, LSN, HSN, LastKey, Status};
|
||||
{null, _} ->
|
||||
[NewLowKeyV|_] = BlockKeyList,
|
||||
NewLastKey = last_key(BlockKeyList, {keyonly, LastKey}),
|
||||
{leveled_codec:strip_to_keyonly(NewLowKeyV),
|
||||
min(LSN, LSNb), max(HSN, HSNb),
|
||||
leveled_codec:strip_to_keyonly(NewLastKey),
|
||||
Status};
|
||||
{_, _} ->
|
||||
NewLastKey = last_key(BlockKeyList, {keyonly, LastKey}),
|
||||
{LowKey,
|
||||
min(LSN, LSNb), max(HSN, HSNb),
|
||||
leveled_codec:strip_to_keyonly(NewLastKey),
|
||||
Status}
|
||||
end,
|
||||
SerialisedBlock = serialise_block(BlockKeyList),
|
||||
BlockLength = byte_size(SerialisedBlock),
|
||||
SerialisedSlot2 = <<SerialisedSlot/binary, SerialisedBlock/binary>>,
|
||||
SegList2 = SegLists ++ [SegmentList],
|
||||
create_slot(KL1b, KL2b, LevelR, BlockCount - 1, UpdBloom,
|
||||
SegList2, SerialisedSlot2, LengthList ++ [BlockLength],
|
||||
TrackingMetadata).
|
||||
|
||||
last_key([], LastKey) ->
|
||||
LastKey;
|
||||
last_key(BlockKeyList, _LastKey) ->
|
||||
lists:last(BlockKeyList).
|
||||
|
||||
serialise_block(BlockKeyList) ->
|
||||
term_to_binary(BlockKeyList, [{compressed, ?COMPRESSION_LEVEL}]).
|
||||
|
||||
|
@ -1164,17 +1091,6 @@ pointer_append_queryresults(Results, QueryPid) ->
|
|||
lists:append(Acc, [{next, QueryPid, StartKey}])
|
||||
end.
|
||||
|
||||
|
||||
%% Update the sequence numbers
|
||||
update_sequencenumbers(SN, infinity, 0) ->
|
||||
{SN, SN};
|
||||
update_sequencenumbers(SN, LSN, HSN) when SN < LSN ->
|
||||
{SN, HSN};
|
||||
update_sequencenumbers(SN, LSN, HSN) when SN > HSN ->
|
||||
{LSN, SN};
|
||||
update_sequencenumbers(_SN, LSN, HSN) ->
|
||||
{LSN, HSN}.
|
||||
|
||||
|
||||
%% The Segment filter is a compressed filter representing the keys in a
|
||||
%% given slot. The filter is delta-compressed list of integers using rice
|
||||
|
@ -1464,21 +1380,18 @@ simple_create_block_test() ->
|
|||
{2, {active, infinity}, no_lookup, null}}],
|
||||
KeyList2 = [{{o, "Bucket1", "Key2", null},
|
||||
{3, {active, infinity}, no_lookup, null}}],
|
||||
BlockOutput = create_block(KeyList1,
|
||||
KeyList2,
|
||||
#level{level=1},
|
||||
leveled_tinybloom:empty(4)),
|
||||
{MergedKeyList, ListStatus, SN, _, _, _, _} = BlockOutput,
|
||||
?assertMatch(partial, ListStatus),
|
||||
[H1|T1] = MergedKeyList,
|
||||
{Status, BlockKeyList} = create_block(KeyList1,
|
||||
KeyList2,
|
||||
#level{level=1}),
|
||||
?assertMatch(partial, Status),
|
||||
[H1|T1] = BlockKeyList,
|
||||
?assertMatch({{o, "Bucket1", "Key1", null},
|
||||
{1, {active, infinity}, no_lookup, null}}, H1),
|
||||
[H2|T2] = T1,
|
||||
?assertMatch({{o, "Bucket1", "Key2", null},
|
||||
{3, {active, infinity}, no_lookup, null}}, H2),
|
||||
?assertMatch([{{o, "Bucket1", "Key3", null},
|
||||
{2, {active, infinity}, no_lookup, null}}], T2),
|
||||
?assertMatch(SN, {1,3}).
|
||||
{2, {active, infinity}, no_lookup, null}}], T2).
|
||||
|
||||
dominate_create_block_test() ->
|
||||
KeyList1 = [{{o, "Bucket1", "Key1", null},
|
||||
|
@ -1487,16 +1400,13 @@ dominate_create_block_test() ->
|
|||
{2, {active, infinity}, no_lookup, null}}],
|
||||
KeyList2 = [{{o, "Bucket1", "Key2", null},
|
||||
{3, {tomb, infinity}, no_lookup, null}}],
|
||||
BlockOutput = create_block(KeyList1,
|
||||
KeyList2,
|
||||
#level{level=1},
|
||||
leveled_tinybloom:empty(4)),
|
||||
{MergedKeyList, ListStatus, SN, _, _, _, _} = BlockOutput,
|
||||
?assertMatch(partial, ListStatus),
|
||||
[K1, K2] = MergedKeyList,
|
||||
{Status, BlockKeyList} = create_block(KeyList1,
|
||||
KeyList2,
|
||||
#level{level=1}),
|
||||
?assertMatch(partial, Status),
|
||||
[K1, K2] = BlockKeyList,
|
||||
?assertMatch(K1, lists:nth(1, KeyList1)),
|
||||
?assertMatch(K2, lists:nth(1, KeyList2)),
|
||||
?assertMatch(SN, {1,3}).
|
||||
?assertMatch(K2, lists:nth(1, KeyList2)).
|
||||
|
||||
sample_keylist() ->
|
||||
KeyList1 =
|
||||
|
@ -1537,26 +1447,21 @@ sample_keylist() ->
|
|||
|
||||
alternating_create_block_test() ->
|
||||
{KeyList1, KeyList2} = sample_keylist(),
|
||||
BlockOutput = create_block(KeyList1,
|
||||
KeyList2,
|
||||
#level{level=1},
|
||||
leveled_tinybloom:empty(4)),
|
||||
{MergedKeyList, ListStatus, _SN, _, _, _, _} = BlockOutput,
|
||||
BlockSize = length(MergedKeyList),
|
||||
{Status, BlockKeyList} = create_block(KeyList1,
|
||||
KeyList2,
|
||||
#level{level=1}),
|
||||
BlockSize = length(BlockKeyList),
|
||||
?assertMatch(BlockSize, 32),
|
||||
?assertMatch(ListStatus, complete),
|
||||
K1 = lists:nth(1, MergedKeyList),
|
||||
?assertMatch(all_complete, Status),
|
||||
K1 = lists:nth(1, BlockKeyList),
|
||||
?assertMatch(K1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, 0, null}}),
|
||||
K11 = lists:nth(11, MergedKeyList),
|
||||
K11 = lists:nth(11, BlockKeyList),
|
||||
?assertMatch(K11, {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, 0, null}}),
|
||||
K32 = lists:nth(32, MergedKeyList),
|
||||
K32 = lists:nth(32, BlockKeyList),
|
||||
?assertMatch(K32, {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, 0, null}}),
|
||||
HKey = {{o, "Bucket1", "Key0", null}, {1, {active, infinity}, 0, null}},
|
||||
{_, LStatus2, _, _, _, _, _} = create_block([HKey|KeyList1],
|
||||
KeyList2,
|
||||
#level{level=1},
|
||||
leveled_tinybloom:empty(4)),
|
||||
?assertMatch(full, LStatus2).
|
||||
{Status2, _} = create_block([HKey|KeyList1], KeyList2, #level{level=1}),
|
||||
?assertMatch(block_full, element(1, Status2)).
|
||||
|
||||
|
||||
merge_seglists_test() ->
|
||||
|
@ -1693,113 +1598,78 @@ merge_seglists_test() ->
|
|||
|
||||
createslot_stage1_test() ->
|
||||
{KeyList1, KeyList2} = sample_keylist(),
|
||||
Out = create_slot(KeyList1,
|
||||
KeyList2,
|
||||
#level{level=1},
|
||||
leveled_tinybloom:empty(4)),
|
||||
{{LowKey, SegFilter, _SerialisedSlot, _LengthList},
|
||||
{{LSN, HSN}, LastKey, Status},
|
||||
_UpdBloom,
|
||||
KL1, KL2} = Out,
|
||||
?assertMatch(LowKey, {o, "Bucket1", "Key1", null}),
|
||||
?assertMatch(LastKey, {o, "Bucket4", "Key1", null}),
|
||||
?assertMatch(Status, partial),
|
||||
?assertMatch(KL1, []),
|
||||
?assertMatch(KL2, []),
|
||||
{Status, BlockKeyLists} = create_slot(KeyList1, KeyList2, #level{level=1}),
|
||||
WState = finalise_slot(BlockKeyLists, #writer{}),
|
||||
|
||||
?assertMatch({o, "Bucket4", "Key1", null}, WState#writer.last_key),
|
||||
?assertMatch(partial, Status),
|
||||
|
||||
%% Writer state has the SlotIndex which includes the segment filter
|
||||
SegFilter = element(2, lists:nth(1, WState#writer.slot_index)),
|
||||
|
||||
R0 = check_for_segments(serialise_segment_filter(SegFilter),
|
||||
[hash_for_segmentid({keyonly, {o, "Bucket1", "Key1", null}})],
|
||||
true),
|
||||
?assertMatch(R0, {maybe_present, [0]}),
|
||||
?assertMatch({maybe_present, [0]}, R0),
|
||||
R1 = check_for_segments(serialise_segment_filter(SegFilter),
|
||||
[hash_for_segmentid({keyonly, {o, "Bucket1", "Key99", null}})],
|
||||
true),
|
||||
?assertMatch(R1, not_present),
|
||||
?assertMatch(LSN, 1),
|
||||
?assertMatch(HSN, 3).
|
||||
?assertMatch(not_present, R1),
|
||||
?assertMatch(1, WState#writer.min_sqn),
|
||||
?assertMatch(3, WState#writer.max_sqn).
|
||||
|
||||
|
||||
createslot_stage2_test() ->
|
||||
Out = create_slot(lists:sort(generate_randomkeys(100)),
|
||||
lists:sort(generate_randomkeys(100)),
|
||||
#level{level=1},
|
||||
leveled_tinybloom:empty(4)),
|
||||
{{_LowKey, _SegFilter, SerialisedSlot, LengthList},
|
||||
{{_LSN, _HSN}, _LastKey, Status},
|
||||
_UpdBloom,
|
||||
_KL1, _KL2} = Out,
|
||||
?assertMatch(Status, full),
|
||||
Sum1 = lists:foldl(fun(X, Sum) -> Sum + X end, 0, LengthList),
|
||||
Sum2 = byte_size(SerialisedSlot),
|
||||
{Status, BlockKeyLists} = create_slot(lists:sort(generate_randomkeys(100)),
|
||||
lists:sort(generate_randomkeys(100)),
|
||||
#level{level=1}),
|
||||
WState = finalise_slot(BlockKeyLists, #writer{}),
|
||||
LengthList = element(3, lists:nth(1, WState#writer.slot_index)),
|
||||
|
||||
?assertMatch(full, element(1, Status)),
|
||||
Sum1 = lists:sum(LengthList),
|
||||
Sum2 = byte_size(WState#writer.slot_binary),
|
||||
?assertMatch(Sum1, Sum2).
|
||||
|
||||
|
||||
createslot_stage3_test() ->
|
||||
Out = create_slot(lists:sort(generate_sequentialkeys(100, 1)),
|
||||
lists:sort(generate_sequentialkeys(100, 101)),
|
||||
#level{level=1},
|
||||
leveled_tinybloom:empty(4)),
|
||||
{{LowKey, SegFilter, SerialisedSlot, LengthList},
|
||||
{{_LSN, _HSN}, LastKey, Status},
|
||||
_UpdBloom,
|
||||
KL1, KL2} = Out,
|
||||
?assertMatch(Status, full),
|
||||
Sum1 = lists:foldl(fun(X, Sum) -> Sum + X end, 0, LengthList),
|
||||
Sum2 = byte_size(SerialisedSlot),
|
||||
{Status, BlockKeyLists} = create_slot(lists:sort(generate_sequentialkeys(100, 1)),
|
||||
lists:sort(generate_sequentialkeys(100, 101)),
|
||||
#level{level=1}),
|
||||
WState = finalise_slot(BlockKeyLists, #writer{}),
|
||||
{FirstKey, SegFilter, LengthList} = lists:nth(1, WState#writer.slot_index),
|
||||
|
||||
?assertMatch(full, element(1, Status)),
|
||||
Sum1 = lists:sum(LengthList),
|
||||
Sum2 = byte_size(WState#writer.slot_binary),
|
||||
?assertMatch(Sum1, Sum2),
|
||||
?assertMatch(LowKey, {o, "BucketSeq", "Key00000001", null}),
|
||||
?assertMatch(LastKey, {o, "BucketSeq", "Key00000128", null}),
|
||||
?assertMatch(KL1, []),
|
||||
Rem = length(KL2),
|
||||
?assertMatch({o, "BucketSeq", "Key00000001", null}, FirstKey),
|
||||
?assertMatch({o, "BucketSeq", "Key00000128", null}, WState#writer.last_key),
|
||||
?assertMatch([], element(2, Status)),
|
||||
Rem = length(element(3, Status)),
|
||||
?assertMatch(Rem, 72),
|
||||
R0 = check_for_segments(serialise_segment_filter(SegFilter),
|
||||
[hash_for_segmentid({keyonly,
|
||||
{o, "BucketSeq", "Key00000100", null}})],
|
||||
true),
|
||||
?assertMatch(R0, {maybe_present, [3]}),
|
||||
?assertMatch({maybe_present, [3]}, R0),
|
||||
R1 = check_for_segments(serialise_segment_filter(SegFilter),
|
||||
[hash_for_segmentid({keyonly,
|
||||
{o, "Bucket1", "Key99", null}})],
|
||||
true),
|
||||
?assertMatch(R1, not_present),
|
||||
?assertMatch(not_present, R1),
|
||||
R2 = check_for_segments(serialise_segment_filter(SegFilter),
|
||||
[hash_for_segmentid({keyonly,
|
||||
{o, "BucketSeq", "Key00000040", null}})],
|
||||
true),
|
||||
?assertMatch(R2, {maybe_present, [1]}),
|
||||
?assertMatch({maybe_present, [1]}, R2),
|
||||
R3 = check_for_segments(serialise_segment_filter(SegFilter),
|
||||
[hash_for_segmentid({keyonly,
|
||||
{o, "BucketSeq", "Key00000004", null}})],
|
||||
true),
|
||||
?assertMatch(R3, {maybe_present, [0]}).
|
||||
?assertMatch({maybe_present, [0]}, R3).
|
||||
|
||||
|
||||
|
||||
testwrite_function(slots, {Handle, SerialisedSlots}) ->
|
||||
lists:append(Handle, [SerialisedSlots]);
|
||||
testwrite_function(finalise,
|
||||
{Handle, C_SlotIndex, {SNExtremes, KeyExtremes, Bloom}}) ->
|
||||
{Handle, C_SlotIndex, SNExtremes, KeyExtremes, Bloom}.
|
||||
|
||||
writekeys_stage1_test() ->
|
||||
{KL1, KL2} = sample_keylist(),
|
||||
{FunOut, {_KL1Rem, _KL2Rem}} = write_keys([],
|
||||
KL1, KL2,
|
||||
[], <<>>,
|
||||
leveled_tinybloom:empty(4),
|
||||
#level{level=1},
|
||||
fun testwrite_function/2),
|
||||
{Handle, {_, PointerIndex}, SNExtremes, KeyExtremes, _Bloom} = FunOut,
|
||||
?assertMatch(SNExtremes, {1,3}),
|
||||
?assertMatch(KeyExtremes, {{o, "Bucket1", "Key1", null},
|
||||
{o, "Bucket4", "Key1", null}}),
|
||||
[TopIndex|[]] = PointerIndex,
|
||||
{TopKey, _SegFilter, {LengthList, _Total}} = TopIndex,
|
||||
?assertMatch(TopKey, {o, "Bucket1", "Key1", null}),
|
||||
TotalLength = lists:foldl(fun(X, Acc) -> Acc + X end,
|
||||
0, LengthList),
|
||||
ActualLength = lists:foldl(fun(X, Acc) -> Acc + byte_size(X) end,
|
||||
0, Handle),
|
||||
?assertMatch(TotalLength, ActualLength).
|
||||
|
||||
initial_create_header_test() ->
|
||||
Output = create_header(initial),
|
||||
?assertMatch(?HEADER_LEN, byte_size(Output)).
|
||||
|
@ -1811,13 +1681,13 @@ initial_create_file_test() ->
|
|||
{UpdHandle, UpdFileMD, {[], []}} = complete_file(Handle, FileMD,
|
||||
KL1, KL2,
|
||||
#level{level=1}),
|
||||
|
||||
io:format("Slot Index of UpdFileMD ~w~n", [UpdFileMD#state.slot_index]),
|
||||
Result1 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key8", null}),
|
||||
io:format("Result is ~w~n", [Result1]),
|
||||
?assertMatch(Result1, {{o, "Bucket1", "Key8", null},
|
||||
{1, {active, infinity}, 0, null}}),
|
||||
?assertMatch({{o, "Bucket1", "Key8", null},
|
||||
{1, {active, infinity}, 0, null}}, Result1),
|
||||
Result2 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key88", null}),
|
||||
io:format("Result is ~w~n", [Result2]),
|
||||
?assertMatch(Result2, not_present),
|
||||
?assertMatch(not_present, Result2),
|
||||
ok = file:close(UpdHandle),
|
||||
ok = file:delete(Filename).
|
||||
|
||||
|
@ -1834,8 +1704,8 @@ big_create_file_test() ->
|
|||
[{K2, {Sq2, St2, MH2, V2}}|_] = KL2,
|
||||
Result1 = fetch_keyvalue(Handle, FileMD, K1),
|
||||
Result2 = fetch_keyvalue(Handle, FileMD, K2),
|
||||
?assertMatch(Result1, {K1, {Sq1, St1, MH1, V1}}),
|
||||
?assertMatch(Result2, {K2, {Sq2, St2, MH2, V2}}),
|
||||
?assertMatch({K1, {Sq1, St1, MH1, V1}}, Result1),
|
||||
?assertMatch({K2, {Sq2, St2, MH2, V2}}, Result2),
|
||||
SubList = lists:sublist(KL2, 1000),
|
||||
lists:foreach(fun(KV) ->
|
||||
{Kn, _} = KV,
|
||||
|
@ -1997,9 +1867,8 @@ big_iterator_test() ->
|
|||
{o, "Bucket0000", "Key0000", null},
|
||||
{o, "Bucket9999", "Key9999", null},
|
||||
256),
|
||||
NumFoundKeys1 = length(Result1),
|
||||
NumAddedKeys = 10000 - length(KL1Rem),
|
||||
?assertMatch(NumFoundKeys1, NumAddedKeys),
|
||||
?assertMatch(NumAddedKeys, length(Result1)),
|
||||
{partial,
|
||||
Result2,
|
||||
_} = fetch_range_keysonly(Handle,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue