From cc16f90c9cc28a327f278415cbc2b4b23f7f92b7 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 10 Jun 2016 19:09:55 +0100 Subject: [PATCH] SFT file continued Writing of a slot --- src/leveled_sft.erl | 209 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 178 insertions(+), 31 deletions(-) diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index cb253dc..9835fd0 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -14,16 +14,18 @@ %% %% All keys are not equal in sft files, keys are only expected in a specific %% series of formats -%% - {o, Bucket, Key, State} - Object Keys -%% - {i, Bucket, IndexName, IndexTerm, Key, State} - Postings +%% - {o, Bucket, Key} - Object Keys +%% - {i, Bucket, IndexName, IndexTerm, Key} - Postings %% The {Bucket, Key} part of all types of keys are hashed for segment filters. %% For Postings the {Bucket, IndexName, IndexTerm} is also hashed. This %% causes a false positive on lookup of a segment, but allows for the presence %% of specific index terms to be checked %% -%% The objects stored are a tuple of {Key, State, Value}, where +%% The objects stored are a tuple of {Key, SequenceNumber, State, Value}, where %% Key - as above -%% State - {SequenceNumber, active|tomb, ExpiryTimestamp | infinity} +%% SequenceNumber - monotonically increasing counter of addition to the nursery +%% log +%% State - {active|tomb, ExpiryTimestamp | infinity} %% Value - null (all postings) | [Object Metadata] (all object keys) %% Keys should be unique in files. If more than two keys are candidate for %% the same file the highest sequence number should be chosen. If the file @@ -149,7 +151,8 @@ serialise_segment_filter/1, check_for_segments/3, speedtest_check_forsegment/4, - generate_randomsegfilter/1]). + generate_randomsegfilter/1, + create_slot/3]). -include_lib("eunit/include/eunit.hrl"). @@ -163,6 +166,7 @@ -define(MAX_SEG_HASH, 1048576). -define(DIVISOR_BITS, 13). -define(DIVISOR, 8092). +-define(COMPRESSION_LEVEL, 1). -record(state, {version = ?CURRENT_VERSION :: tuple(), @@ -216,9 +220,12 @@ create_header(initial) -> %% Do we need to check here that KeyList1 and KeyList2 are not just a [pointer] %% Otherwise the pointer will never be expanded +%% +%% Also this should return a partial block if the KeyLists have been exhausted +%% but the block is full create_block(KeyList1, KeyList2, Level) -> - create_block(KeyList1, KeyList2, [], {0, 0}, [], Level). + create_block(KeyList1, KeyList2, [], {infinity, 0}, [], Level). create_block(KeyList1, KeyList2, BlockKeyList, {LSN, HSN}, SegmentList, _) @@ -231,7 +238,6 @@ create_block(KeyList1, KeyList2, BlockKeyList, {LSN, HSN}, SegmentList, Level) -> case key_dominates(KeyList1, KeyList2, Level) of {{next_key, TopKey}, Rem1, Rem2} -> - io:format("TopKey is ~w~n", [TopKey]), {UpdLSN, UpdHSN} = update_sequencenumbers(TopKey, LSN, HSN), NewBlockKeyList = lists:append(BlockKeyList, [TopKey]), @@ -241,7 +247,6 @@ create_block(KeyList1, KeyList2, NewBlockKeyList, {UpdLSN, UpdHSN}, NewSegmentList, Level); {skipped_key, Rem1, Rem2} -> - io:format("Key is skipped~n"), create_block(Rem1, Rem2, BlockKeyList, {LSN, HSN}, SegmentList, Level) @@ -261,24 +266,61 @@ create_block(KeyList1, KeyList2, %% - Remainder of any KeyLists used to make the slot -%% create_slot(KeyList1, KeyList2, Level) -%% create_slot(KeyList1, KeyList2, Level, ?BLOCK_COUNT, null, <<>>, <<>>, []). +create_slot(KeyList1, KeyList2, Level) -> + create_slot(KeyList1, KeyList2, Level, ?BLOCK_COUNT, [], <<>>, [], + {null, infinity, 0, null, full}). -%% create_slot(KL1, KL2, Level, 0, LowKey, SegFilter, SerialisedSlot, -%% LengthList, {LSN, HSN}, LastKey) -> -%% {{LowKey, SegFilter, SerialisedSlot, LengthList}, -%% {{LSN, HSN}, LastKey, full, KL1, KL2}}; -%% create_slot(KL1, KL2, Level, BlockCount, LowKey, SegFilter, SerialisedSlot, -%% LengthList, {LSN, HSN}, LastKey) -> -%% BlockDetails = create_block(KeyList1, KeyList2, Level), -%% {BlockKeyList, Status, {LSN, HSN}, SegmentList, KL1, KL2} = BlockDetails, -%% SerialisedBlock = serialise_block(BlockKeyList), -%% <>, - -%% case Status of -%% full -> - +%% Keep adding blocks to the slot until either the block count is reached or +%% there is a partial block +create_slot(KL1, KL2, _, 0, SegLists, SerialisedSlot, LengthList, + {LowKey, LSN, HSN, LastKey, Status}) -> + {{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList}, + {{LSN, HSN}, LastKey, Status}, + KL1, KL2}; +create_slot(KL1, KL2, _, _, SegLists, SerialisedSlot, LengthList, + {LowKey, LSN, HSN, LastKey, partial}) -> + {{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList}, + {{LSN, HSN}, LastKey, partial}, + KL1, KL2}; +create_slot(KL1, KL2, Level, BlockCount, SegLists, SerialisedSlot, LengthList, + {LowKey, LSN, HSN, LastKey, _Status}) -> + {BlockKeyList, Status, + {LSNb, HSNb}, + SegmentList, KL1b, KL2b} = create_block(KL1, KL2, Level), + case LowKey of + null -> + [NewLowKeyV|_] = BlockKeyList, + TrackingMetadata = {strip_to_keyonly(NewLowKeyV), + min(LSN, LSNb), max(HSN, HSNb), + strip_to_keyonly(last(BlockKeyList, + {last, LastKey})), + Status}; + _ -> + TrackingMetadata = {LowKey, + min(LSN, LSNb), max(HSN, HSNb), + strip_to_keyonly(last(BlockKeyList, + {last, LastKey})), + Status} + end, + SerialisedBlock = serialise_block(BlockKeyList), + BlockLength = bit_size(SerialisedBlock), + SerialisedSlot2 = <>, + create_slot(KL1b, KL2b, Level, BlockCount - 1, SegLists ++ [SegmentList], + SerialisedSlot2, LengthList ++ [BlockLength], TrackingMetadata). + + +last([], {last, LastKey}) -> {keyonly, LastKey}; +last([E|Es], PrevLast) -> last(E, Es, PrevLast). + +last(_, [E|Es], PrevLast) -> last(E, Es, PrevLast); +last(E, [], _) -> E. + +strip_to_keyonly({keyonly, K}) -> K; +strip_to_keyonly({K, _, _, _}) -> K. + +serialise_block(BlockKeyList) -> + term_to_binary(BlockKeyList, [{compressed, ?COMPRESSION_LEVEL}]). %% Compare the keys at the head of the list, and either skip that "best" key or @@ -390,6 +432,12 @@ update_sequencenumbers({_, _, _, _}, LSN, HSN) -> %% This is more space efficient than the equivalent bloom filter and avoids %% the calculation of many hash functions. +generate_segment_filter([SegL1, []]) -> + generate_segment_filter({SegL1, [], [], []}); +generate_segment_filter([SegL1, SegL2, []]) -> + generate_segment_filter({SegL1, SegL2, [], []}); +generate_segment_filter([SegL1, SegL2, SegL3, SegL4]) -> + generate_segment_filter({SegL1, SegL2, SegL3, SegL4}); generate_segment_filter(SegLists) -> generate_segment_filter(merge_seglists(SegLists), [], @@ -441,8 +489,8 @@ merge_seglists({SegList1, SegList2, SegList3, SegList4}) -> Stage4 = lists:foldl(fun(X, Acc) -> [{X, 3}|Acc] end, Stage3, SegList4), lists:sort(Stage4). -hash_for_segmentid(Key) -> - erlang:phash2(Key). +hash_for_segmentid(KV) -> + erlang:phash2(strip_to_keyonly(KV), ?MAX_SEG_HASH). %% Check for a given list of segments in the filter, returning in normal @@ -616,6 +664,35 @@ generate_randomsegfilter(BlockSize) -> Block4})). +generate_randomkeys(Count) -> + generate_randomkeys(Count, []). + +generate_randomkeys(0, Acc) -> + Acc; +generate_randomkeys(Count, Acc) -> + RandKey = {{o, + lists:concat(["Bucket", random:uniform(1024)]), + lists:concat(["Key", random:uniform(1024)])}, + random:uniform(1024*1024), + {active, infinity}, null}, + generate_randomkeys(Count - 1, [RandKey|Acc]). + +generate_sequentialkeys(Count, Start) -> + generate_sequentialkeys(Count + Start, Start, []). + +generate_sequentialkeys(Target, Incr, Acc) when Incr =:= Target -> + Acc; +generate_sequentialkeys(Target, Incr, Acc) -> + KeyStr = string:right(integer_to_list(Incr), 8, $0), + NextKey = {{o, + "BucketSeq", + lists:concat(["Key", KeyStr])}, + 5, + {active, infinity}, null}, + generate_sequentialkeys(Target, Incr + 1, [NextKey|Acc]). + + + simple_create_block_test() -> KeyList1 = [{{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}, {{o, "Bucket1", "Key3"}, 2, {active, infinity}, null}], @@ -629,7 +706,6 @@ simple_create_block_test() -> [H2|T2] = T1, ?assertMatch(H2, {{o, "Bucket1", "Key2"}, 3, {active, infinity}, null}), ?assertMatch(T2, [{{o, "Bucket1", "Key3"}, 2, {active, infinity}, null}]), - io:format("SN is ~w~n", [SN]), ?assertMatch(SN, {1,3}). dominate_create_block_test() -> @@ -645,7 +721,7 @@ dominate_create_block_test() -> ?assertMatch(K2, {{o, "Bucket1", "Key2"}, 3, {tomb, infinity}, null}), ?assertMatch(SN, {1,3}). -alternating_create_block_test() -> +sample_keylist() -> KeyList1 = [{{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}, {{o, "Bucket1", "Key3"}, 1, {active, infinity}, null}, {{o, "Bucket1", "Key5"}, 1, {active, infinity}, null}, @@ -675,14 +751,17 @@ alternating_create_block_test() -> {{o, "Bucket2", "Key6"}, 1, {active, infinity}, null}, {{o, "Bucket2", "Key8"}, 1, {active, infinity}, null}, {{o, "Bucket3", "Key2"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key4"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key6"}, 1, {active, infinity}, null}, + {{o, "Bucket3", "Key4"}, 3, {active, infinity}, null}, + {{o, "Bucket3", "Key6"}, 2, {active, infinity}, null}, {{o, "Bucket3", "Key8"}, 1, {active, infinity}, null}], + {KeyList1, KeyList2}. + +alternating_create_block_test() -> + {KeyList1, KeyList2} = sample_keylist(), {MergedKeyList, ListStatus, _, _, _, _} = create_block(KeyList1, KeyList2, 1), BlockSize = length(MergedKeyList), - io:format("Block size is ~w~n", [BlockSize]), ?assertMatch(BlockSize, 32), ?assertMatch(ListStatus, full), K1 = lists:nth(1, MergedKeyList), @@ -738,4 +817,72 @@ merge_seglists_test() -> ?assertMatch(R8, {maybe_present, [0]}), R9 = check_for_segments(SegBin, [1024*1024 - 1], false), ?assertMatch(R9, not_present). + +createslot_stage1_test() -> + {KeyList1, KeyList2} = sample_keylist(), + Out = create_slot(KeyList1, KeyList2, 1), + {{LowKey, SegFilter, _SerialisedSlot, _LengthList}, + {{LSN, HSN}, LastKey, Status}, + KL1, KL2} = Out, + ?assertMatch(LowKey, {o, "Bucket1", "Key1"}), + ?assertMatch(LastKey, {o, "Bucket4", "Key1"}), + ?assertMatch(Status, partial), + ?assertMatch(KL1, []), + ?assertMatch(KL2, []), + R0 = check_for_segments(serialise_segment_filter(SegFilter), + [hash_for_segmentid({keyonly, {o, "Bucket1", "Key1"}})], + true), + ?assertMatch(R0, {maybe_present, [0]}), + R1 = check_for_segments(serialise_segment_filter(SegFilter), + [hash_for_segmentid({keyonly, {o, "Bucket1", "Key99"}})], + true), + ?assertMatch(R1, not_present), + ?assertMatch(LSN, 1), + ?assertMatch(HSN, 3). + +createslot_stage2_test() -> + Out = create_slot(lists:sort(generate_randomkeys(100)), + lists:sort(generate_randomkeys(100)), + 1), + {{_LowKey, _SegFilter, SerialisedSlot, LengthList}, + {{_LSN, _HSN}, _LastKey, Status}, + _KL1, _KL2} = Out, + ?assertMatch(Status, full), + Sum1 = lists:foldl(fun(X, Sum) -> Sum + X end, 0, LengthList), + Sum2 = bit_size(SerialisedSlot), + ?assertMatch(Sum1, Sum2). + + +createslot_stage3_test() -> + Out = create_slot(lists:sort(generate_sequentialkeys(100, 1)), + lists:sort(generate_sequentialkeys(100, 101)), + 1), + {{LowKey, SegFilter, SerialisedSlot, LengthList}, + {{_LSN, _HSN}, LastKey, Status}, + KL1, KL2} = Out, + ?assertMatch(Status, full), + Sum1 = lists:foldl(fun(X, Sum) -> Sum + X end, 0, LengthList), + Sum2 = bit_size(SerialisedSlot), + ?assertMatch(Sum1, Sum2), + ?assertMatch(LowKey, {o, "BucketSeq", "Key00000001"}), + ?assertMatch(LastKey, {o, "BucketSeq", "Key00000128"}), + ?assertMatch(KL1, []), + Rem = length(KL2), + ?assertMatch(Rem, 72), + R0 = check_for_segments(serialise_segment_filter(SegFilter), + [hash_for_segmentid({keyonly, {o, "BucketSeq", "Key00000100"}})], + true), + ?assertMatch(R0, {maybe_present, [3]}), + R1 = check_for_segments(serialise_segment_filter(SegFilter), + [hash_for_segmentid({keyonly, {o, "Bucket1", "Key99"}})], + true), + ?assertMatch(R1, not_present), + R2 = check_for_segments(serialise_segment_filter(SegFilter), + [hash_for_segmentid({keyonly, {o, "BucketSeq", "Key00000040"}})], + true), + ?assertMatch(R2, {maybe_present, [1]}), + R3 = check_for_segments(serialise_segment_filter(SegFilter), + [hash_for_segmentid({keyonly, {o, "BucketSeq", "Key00000004"}})], + true), + ?assertMatch(R3, {maybe_present, [0]}).