Completing KeyLists on a block boundary
Handle when writing a block empties the Key Lists but the block is full - don't go-on and create a second empty block
This commit is contained in:
parent
cc16f90c9c
commit
eedc296314
1 changed files with 107 additions and 4 deletions
|
@ -160,6 +160,7 @@
|
||||||
-define(DWORD_SIZE, 8).
|
-define(DWORD_SIZE, 8).
|
||||||
-define(CURRENT_VERSION, {0,1}).
|
-define(CURRENT_VERSION, {0,1}).
|
||||||
-define(SLOT_COUNT, 256).
|
-define(SLOT_COUNT, 256).
|
||||||
|
-define(SLOT_GROUPWRITE_COUNT, 32).
|
||||||
-define(BLOCK_SIZE, 32).
|
-define(BLOCK_SIZE, 32).
|
||||||
-define(BLOCK_COUNT, 4).
|
-define(BLOCK_COUNT, 4).
|
||||||
-define(FOOTERPOS_HEADERPOS, 2).
|
-define(FOOTERPOS_HEADERPOS, 2).
|
||||||
|
@ -202,6 +203,72 @@ create_header(initial) ->
|
||||||
CRC32 = erlang:crc32(H1),
|
CRC32 = erlang:crc32(H1),
|
||||||
<<H1/binary, CRC32:32/integer>>.
|
<<H1/binary, CRC32:32/integer>>.
|
||||||
|
|
||||||
|
%% Take a file handle at the sart position (after creating the header) and then
|
||||||
|
%% write the Key lists to the file slot by slot.
|
||||||
|
%%
|
||||||
|
%% Slots are created then written in bulk to impove I/O efficiency. Slots will
|
||||||
|
%% be written in groups of 32
|
||||||
|
|
||||||
|
|
||||||
|
write_group(Handle, KL1, KL2, SlotIndex, SerialisedSlots, Level, WriteFun) ->
|
||||||
|
write_group(Handle, KL1, KL2, {0, 0},
|
||||||
|
SlotIndex, SerialisedSlots,
|
||||||
|
{infinity, 0}, null, {last, null}, Level, WriteFun).
|
||||||
|
|
||||||
|
|
||||||
|
write_group(Handle, KL1, KL2, {SlotCount, SlotTotal},
|
||||||
|
SlotIndex, SerialisedSlots,
|
||||||
|
{LSN, HSN}, LowKey, LastKey, Level, WriteFun)
|
||||||
|
when SlotCount =:= ?SLOT_GROUPWRITE_COUNT ->
|
||||||
|
UpdHandle = WriteFun(slot , {Handle, SerialisedSlots}),
|
||||||
|
case maxslots_bylevel(SlotTotal, Level) of
|
||||||
|
reached ->
|
||||||
|
UpdHandle;
|
||||||
|
continue ->
|
||||||
|
write_group(UpdHandle, KL1, KL2, 0,
|
||||||
|
SlotIndex, <<>>,
|
||||||
|
{LSN, HSN}, LowKey, LastKey, Level, WriteFun)
|
||||||
|
end;
|
||||||
|
write_group(Handle, KL1, KL2, {SlotCount, SlotTotal},
|
||||||
|
SlotIndex, SerialisedSlots,
|
||||||
|
{LSN, HSN}, LowKey, LastKey, Level, WriteFun) ->
|
||||||
|
SlotOutput = create_slot(KL1, KL2, Level),
|
||||||
|
{{LowKey_Slot, SegFilter, SerialisedSlot, LengthList},
|
||||||
|
{{LSN_Slot, HSN_Slot}, LastKey_Slot, Status},
|
||||||
|
KL1rem, KL2rem} = SlotOutput,
|
||||||
|
UpdSlotIndex = lists:append(SlotIndex,
|
||||||
|
[{LowKey_Slot, SegFilter, LengthList}]),
|
||||||
|
UpdSlots = <<SerialisedSlots/binary, SerialisedSlot/binary>>,
|
||||||
|
SNExtremes = {min(LSN_Slot, LSN), max(HSN_Slot, HSN)},
|
||||||
|
FinalKey = case LastKey_Slot of null -> LastKey; _ -> LastKey_Slot end,
|
||||||
|
FirstKey = case LowKey of null -> LowKey_Slot; _ -> LowKey end,
|
||||||
|
case Status of
|
||||||
|
partial ->
|
||||||
|
UpdHandle = WriteFun(slot , {Handle, UpdSlots}),
|
||||||
|
WriteFun(finalise, {UpdHandle, UpdSlotIndex, SNExtremes,
|
||||||
|
{FirstKey, FinalKey}});
|
||||||
|
full ->
|
||||||
|
write_group(Handle, KL1rem, KL2rem, {SlotCount + 1, SlotTotal + 1},
|
||||||
|
UpdSlotIndex, UpdSlots,
|
||||||
|
SNExtremes, FirstKey, FinalKey, Level, WriteFun)
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
sftwrite_function(slot, {Handle, _SerialisedSlots}) ->
|
||||||
|
Handle;
|
||||||
|
sftwrite_function(finalise,
|
||||||
|
{Handle, _UpdSlotIndex, _SNExtremes, _KeyExtremes}) ->
|
||||||
|
Handle.
|
||||||
|
|
||||||
|
maxslots_bylevel(SlotTotal, _Level) ->
|
||||||
|
case SlotTotal of
|
||||||
|
?SLOT_COUNT ->
|
||||||
|
reached;
|
||||||
|
X when X < ?SLOT_COUNT ->
|
||||||
|
continue
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%% Take two potentially overlapping lists of keys and output a Block,
|
%% Take two potentially overlapping lists of keys and output a Block,
|
||||||
%% together with:
|
%% together with:
|
||||||
%% - block status (full, partial)
|
%% - block status (full, partial)
|
||||||
|
@ -230,7 +297,12 @@ create_block(KeyList1, KeyList2, Level) ->
|
||||||
create_block(KeyList1, KeyList2,
|
create_block(KeyList1, KeyList2,
|
||||||
BlockKeyList, {LSN, HSN}, SegmentList, _)
|
BlockKeyList, {LSN, HSN}, SegmentList, _)
|
||||||
when length(BlockKeyList)==?BLOCK_SIZE ->
|
when length(BlockKeyList)==?BLOCK_SIZE ->
|
||||||
{BlockKeyList, full, {LSN, HSN}, SegmentList, KeyList1, KeyList2};
|
case {KeyList1, KeyList2} of
|
||||||
|
{[], []} ->
|
||||||
|
{BlockKeyList, complete, {LSN, HSN}, SegmentList, [], []};
|
||||||
|
_ ->
|
||||||
|
{BlockKeyList, full, {LSN, HSN}, SegmentList, KeyList1, KeyList2}
|
||||||
|
end;
|
||||||
create_block([], [],
|
create_block([], [],
|
||||||
BlockKeyList, {LSN, HSN}, SegmentList, _) ->
|
BlockKeyList, {LSN, HSN}, SegmentList, _) ->
|
||||||
{BlockKeyList, partial, {LSN, HSN}, SegmentList, [], []};
|
{BlockKeyList, partial, {LSN, HSN}, SegmentList, [], []};
|
||||||
|
@ -283,6 +355,11 @@ create_slot(KL1, KL2, _, _, SegLists, SerialisedSlot, LengthList,
|
||||||
{{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList},
|
{{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList},
|
||||||
{{LSN, HSN}, LastKey, partial},
|
{{LSN, HSN}, LastKey, partial},
|
||||||
KL1, KL2};
|
KL1, KL2};
|
||||||
|
create_slot(KL1, KL2, _, _, SegLists, SerialisedSlot, LengthList,
|
||||||
|
{LowKey, LSN, HSN, LastKey, complete}) ->
|
||||||
|
{{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList},
|
||||||
|
{{LSN, HSN}, LastKey, partial},
|
||||||
|
KL1, KL2};
|
||||||
create_slot(KL1, KL2, Level, BlockCount, SegLists, SerialisedSlot, LengthList,
|
create_slot(KL1, KL2, Level, BlockCount, SegLists, SerialisedSlot, LengthList,
|
||||||
{LowKey, LSN, HSN, LastKey, _Status}) ->
|
{LowKey, LSN, HSN, LastKey, _Status}) ->
|
||||||
{BlockKeyList, Status,
|
{BlockKeyList, Status,
|
||||||
|
@ -304,10 +381,11 @@ create_slot(KL1, KL2, Level, BlockCount, SegLists, SerialisedSlot, LengthList,
|
||||||
Status}
|
Status}
|
||||||
end,
|
end,
|
||||||
SerialisedBlock = serialise_block(BlockKeyList),
|
SerialisedBlock = serialise_block(BlockKeyList),
|
||||||
|
% io:format("Serialised Block to be added ~w based on BlockKeyList ~w~n", [SerialisedBlock, BlockKeyList]),
|
||||||
BlockLength = bit_size(SerialisedBlock),
|
BlockLength = bit_size(SerialisedBlock),
|
||||||
SerialisedSlot2 = <<SerialisedSlot/binary, SerialisedBlock/binary>>,
|
SerialisedSlot2 = <<SerialisedSlot/binary, SerialisedBlock/binary>>,
|
||||||
create_slot(KL1b, KL2b, Level, BlockCount - 1, SegLists ++ [SegmentList],
|
create_slot(KL1b, KL2b, Level, BlockCount - 1, SegLists ++ [SegmentList],
|
||||||
SerialisedSlot2, LengthList ++ [BlockLength], TrackingMetadata).
|
SerialisedSlot2, LengthList ++ [BlockLength], TrackingMetadata).
|
||||||
|
|
||||||
|
|
||||||
last([], {last, LastKey}) -> {keyonly, LastKey};
|
last([], {last, LastKey}) -> {keyonly, LastKey};
|
||||||
|
@ -432,6 +510,8 @@ update_sequencenumbers({_, _, _, _}, LSN, HSN) ->
|
||||||
%% This is more space efficient than the equivalent bloom filter and avoids
|
%% This is more space efficient than the equivalent bloom filter and avoids
|
||||||
%% the calculation of many hash functions.
|
%% the calculation of many hash functions.
|
||||||
|
|
||||||
|
generate_segment_filter([SegL1]) ->
|
||||||
|
generate_segment_filter({SegL1, [], [], []});
|
||||||
generate_segment_filter([SegL1, []]) ->
|
generate_segment_filter([SegL1, []]) ->
|
||||||
generate_segment_filter({SegL1, [], [], []});
|
generate_segment_filter({SegL1, [], [], []});
|
||||||
generate_segment_filter([SegL1, SegL2, []]) ->
|
generate_segment_filter([SegL1, SegL2, []]) ->
|
||||||
|
@ -763,13 +843,16 @@ alternating_create_block_test() ->
|
||||||
1),
|
1),
|
||||||
BlockSize = length(MergedKeyList),
|
BlockSize = length(MergedKeyList),
|
||||||
?assertMatch(BlockSize, 32),
|
?assertMatch(BlockSize, 32),
|
||||||
?assertMatch(ListStatus, full),
|
?assertMatch(ListStatus, complete),
|
||||||
K1 = lists:nth(1, MergedKeyList),
|
K1 = lists:nth(1, MergedKeyList),
|
||||||
?assertMatch(K1, {{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}),
|
?assertMatch(K1, {{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}),
|
||||||
K11 = lists:nth(11, MergedKeyList),
|
K11 = lists:nth(11, MergedKeyList),
|
||||||
?assertMatch(K11, {{o, "Bucket1", "Key9b"}, 1, {active, infinity}, null}),
|
?assertMatch(K11, {{o, "Bucket1", "Key9b"}, 1, {active, infinity}, null}),
|
||||||
K32 = lists:nth(32, MergedKeyList),
|
K32 = lists:nth(32, MergedKeyList),
|
||||||
?assertMatch(K32, {{o, "Bucket4", "Key1"}, 1, {active, infinity}, null}).
|
?assertMatch(K32, {{o, "Bucket4", "Key1"}, 1, {active, infinity}, null}),
|
||||||
|
HKey = {{o, "Bucket1", "Key0"}, 1, {active, infinity}, null},
|
||||||
|
{_, ListStatus2, _, _, _, _} = create_block([HKey|KeyList1], KeyList2, 1),
|
||||||
|
?assertMatch(ListStatus2, full).
|
||||||
|
|
||||||
|
|
||||||
merge_seglists_test() ->
|
merge_seglists_test() ->
|
||||||
|
@ -886,3 +969,23 @@ createslot_stage3_test() ->
|
||||||
[hash_for_segmentid({keyonly, {o, "BucketSeq", "Key00000004"}})],
|
[hash_for_segmentid({keyonly, {o, "BucketSeq", "Key00000004"}})],
|
||||||
true),
|
true),
|
||||||
?assertMatch(R3, {maybe_present, [0]}).
|
?assertMatch(R3, {maybe_present, [0]}).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
testwrite_function(slot, {Handle, SerialisedSlots}) ->
|
||||||
|
lists:append(Handle, [SerialisedSlots]);
|
||||||
|
testwrite_function(finalise, {Handle, UpdSlotIndex, SNExtremes, KeyExtremes}) ->
|
||||||
|
{Handle, UpdSlotIndex, SNExtremes, KeyExtremes}.
|
||||||
|
|
||||||
|
writegroup_stage1_test() ->
|
||||||
|
{KL1, KL2} = sample_keylist(),
|
||||||
|
Output = write_group([], KL1, KL2, [], <<>>, 1, fun testwrite_function/2),
|
||||||
|
{Handle, UpdSlotIndex, SNExtremes, KeyExtremes} = Output,
|
||||||
|
?assertMatch(SNExtremes, {1,3}),
|
||||||
|
?assertMatch(KeyExtremes, {{o, "Bucket1", "Key1"}, {o, "Bucket4", "Key1"}}),
|
||||||
|
[TopIndex|[]] = UpdSlotIndex,
|
||||||
|
{TopKey, _SegFilter, LengthList} = TopIndex,
|
||||||
|
?assertMatch(TopKey, {o, "Bucket1", "Key1"}),
|
||||||
|
TotalLength = lists:foldl(fun(X, Acc) -> Acc + X end, 0, LengthList),
|
||||||
|
ActualLength = lists:foldl(fun(X, Acc) -> Acc + bit_size(X) end, 0, Handle),
|
||||||
|
?assertMatch(TotalLength, ActualLength).
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue