From c664483f03b0fda4c38649f380b72ff00706e09e Mon Sep 17 00:00:00 2001 From: martinsumner Date: Wed, 28 Dec 2016 21:47:05 +0000 Subject: [PATCH] Add basic merge support No generates KV list first, and then creates a new SST --- src/leveled_pclerk.erl | 13 +- src/leveled_sst.erl | 292 +++++++++++++++++++++++++++++++++++------ 2 files changed, 254 insertions(+), 51 deletions(-) diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index b5f8e3f..a0f64d9 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -308,18 +308,11 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> [SrcLevel + 1, FileCounter])), leveled_log:log("PC012", [MSN, FileName]), TS1 = os:timestamp(), - LevelR = case IsB of - true -> - #level{level = SrcLevel + 1, - is_basement = true, - timestamp = leveled_codec:integer_now()}; - false -> - SrcLevel + 1 - end, - {ok, Pid, Reply} = leveled_sft:sft_new(FileName, + {ok, Pid, Reply} = leveled_sst:sst_new(FileName, KL1, KL2, - LevelR), + IsB, + SrcLevel + 1), case Reply of {{[], []}, null, _} -> leveled_log:log("PC013", [FileName]), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index fd2ffe5..877e42c 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -59,9 +59,11 @@ -include("include/leveled.hrl"). +-define(MAX_SLOTS, 256). -define(SLOT_SIZE, 128). -define(COMPRESSION_LEVEL, 1). -define(LEVEL_BLOOM_SLOTS, [{0, 64}, {1, 48}, {default, 32}]). +-define(MERGE_SCANWIDTH, 16). -define(DISCARD_EXT, ".discarded"). -include_lib("eunit/include/eunit.hrl"). @@ -76,6 +78,8 @@ reader/3]). -export([sst_new/3, + sst_new/5, + sst_newlevelzero/4, sst_open/1, sst_get/2, sst_get/3, @@ -126,27 +130,25 @@ sst_new(Filename, Level, KVList) -> {ok, Pid, {SK, EK}} end. -%sft_newlevelzero(Filename, Slots, FetchFun, Wait, Penciller) -> -% {ok, Pid} = gen_fsm:start(?MODULE, [], []), -% case Wait of -% true -> -% KL1 = leveled_pmem:to_list(Slots, FetchFun), -% Reply = gen_fsm:sync_send_event(Pid, -% {sft_new, -% Filename, -% 0, -% KL1}, -% infinity), -% {ok, Pid, Reply}; -% false -> -% gen_fsm:send_event(Pid, -% {sft_newlevelzero, -% Filename, -% Slots, -% FetchFun, -% Penciller}), -% {ok, Pid, noreply} -% end. +sst_new(Filename, KL1, KL2, IsBasement, Level) -> + {{Rem1, Rem2}, MergedList} = merge_lists(KL1, KL2, {IsBasement, Level}), + {ok, Pid} = gen_fsm:start(?MODULE, [], []), + case gen_fsm:sync_send_event(Pid, + {sst_new, Filename, Level, MergedList}, + infinity) of + {ok, {SK, EK}} -> + {ok, Pid, {{Rem1, Rem2}, SK, EK}} + end. + +sst_newlevelzero(Filename, Slots, FetchFun, Penciller) -> + {ok, Pid} = gen_fsm:start(?MODULE, [], []), + gen_fsm:send_event(Pid, + {sst_newlevelzero, + Filename, + Slots, + FetchFun, + Penciller}), + {ok, Pid, noreply}. sst_get(Pid, LedgerKey) -> sst_get(Pid, LedgerKey, leveled_codec:magic_hash(LedgerKey)). @@ -161,7 +163,7 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> sst_getslots(Pid, SlotList) -> gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity). - + sst_close(Pid) -> gen_fsm:sync_send_event(Pid, close, 2000). @@ -196,6 +198,20 @@ starting({sst_new, Filename, Level, KVList}, _From, State) -> reader, UpdState}. +starting({sst_newlevelzero, Filename, Slots, FetchFun, Penciller}, State) -> + KVList = leveled_pmem:to_list(Slots, FetchFun), + {FirstKey, L, SlotIndex, AllHashes, SlotsBin} = build_all_slots(KVList), + SummaryBin = build_table_summary(SlotIndex, AllHashes, 0, FirstKey, L), + ActualFilename = write_file(Filename, SummaryBin, SlotsBin), + UpdState = read_file(ActualFilename, + State#state{filename=ActualFilename}), + Summary = UpdState#state.summary, + leveled_penciller:pcl_confirml0complete(Penciller, + UpdState#state.filename, + Summary#summary.first_key, + Summary#summary.last_key), + {next_state, reader, UpdState}. + reader({get_kv, LedgerKey, Hash}, _From, State) -> SW = os:timestamp(), {Result, Stage, SlotID} = fetch(LedgerKey, Hash, State), @@ -219,14 +235,11 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> reader, State}; reader({get_slots, SlotList}, _From, State) -> - Handle = State#state.handle, - FetchFun = - fun({pointer, S, SK, EK}, Acc) -> - Acc ++ trim_slot({pointer, Handle, S}, SK, EK) end, - {reply, - lists:foldl(FetchFun, [], SlotList), - reader, - State}; + SlotBins = read_slots(State#state.handle, SlotList), + FoldFun = + fun({SlotBin, SK, EK}, Acc) -> + Acc ++ trim_slot(SlotBin, SK, EK) end, + {reply, lists:foldl(FoldFun, [], SlotBins), reader, State}; reader(print_timings, _From, State) -> io:format(user, "Timings of ~w~n", [State#state.sst_timings]), {reply, ok, reader, State#state{sst_timings = undefined}}; @@ -566,6 +579,36 @@ read_slot(Handle, Slot) -> crc_wonky end. +read_slots(Handle, SlotList) -> + [{pointer, FirstSlot, _SK, _EK}|_Rest] = SlotList, + {pointer, LastSlot, _SK, _EK} = lists:last(SlotList), + StartPos = FirstSlot#slot_index_value.start_position, + Length = LastSlot#slot_index_value.start_position + + LastSlot#slot_index_value.length + - StartPos, + {ok, MultiSlotBin} = file:pread(Handle, StartPos, Length), + read_off_binary(MultiSlotBin, SlotList, []). + +read_off_binary(<<>>, [], SplitBins) -> + SplitBins; +read_off_binary(MultiSlotBin, [TopSlot|Rest], SplitBins) -> + {pointer, Slot, SK, EK} = TopSlot, + Length = Slot#slot_index_value.length - 4, + <> = MultiSlotBin, + case erlang:crc32(SlotBin) of + SlotCRC -> + read_off_binary(RestBin, + Rest, + SplitBins ++ [{SlotBin, SK, EK}]); + _ -> + read_off_binary(RestBin, + Rest, + SplitBins ++ []) + end. + + trim_slot({pointer, Handle, Slot}, all, all) -> case read_slot(Handle, Slot) of crc_wonky -> @@ -623,6 +666,120 @@ generate_filenames(RootFilename) -> filename:join(DN, FP_NOEXT) ++ ".sst"} end. +%%%============================================================================ +%%% Merge Functions +%%%============================================================================ + +%% functions for merging two KV lists with pointers + +%% Compare the keys at the head of the list, and either skip that "best" key or +%% identify as the next key. +%% +%% The logic needs to change if the file is in the basement level, as keys with +%% expired timestamps need not be written at this level +%% +%% The best key is considered to be the lowest key in erlang term order. If +%% there are matching keys then the highest sequence number must be chosen and +%% any lower sequence numbers should be compacted out of existence + +merge_lists(KeyList1, KeyList2, LevelInfo) -> + merge_lists(KeyList1, KeyList2, LevelInfo, [], ?MAX_SLOTS * ?SLOT_SIZE). + +merge_lists([], [], _LevelR, MergedList, _MaxSize) -> + {{[], []}, lists:reverse(MergedList)}; +merge_lists(Rem1, Rem2, _LevelR, MergedList, 0) -> + {{Rem1, Rem2}, lists:reverse(MergedList)}; +merge_lists(KeyList1, KeyList2, {IsBasement, TS}, MergedList, MaxSize) -> + case key_dominates(KeyList1, KeyList2, {IsBasement, TS}) of + {{next_key, TopKey}, Rem1, Rem2} -> + merge_lists(Rem1, + Rem2, + {IsBasement, TS}, + [TopKey|MergedList], + MaxSize - 1); + {skipped_key, Rem1, Rem2} -> + merge_lists(Rem1, Rem2, {IsBasement, TS}, MergedList, MaxSize) + end. + +key_dominates(KL1, KL2, Level) -> + key_dominates_expanded(maybe_expand_pointer(KL1), + maybe_expand_pointer(KL2), + Level). + +key_dominates_expanded([H1|T1], [], Level) -> + case leveled_codec:maybe_reap_expiredkey(H1, Level) of + true -> + {skipped_key, T1, []}; + false -> + {{next_key, H1}, T1, []} + end; +key_dominates_expanded([], [H2|T2], Level) -> + case leveled_codec:maybe_reap_expiredkey(H2, Level) of + true -> + {skipped_key, [], T2}; + false -> + {{next_key, H2}, [], T2} + end; +key_dominates_expanded([H1|T1], [H2|T2], Level) -> + case leveled_codec:key_dominates(H1, H2) of + left_hand_first -> + case leveled_codec:maybe_reap_expiredkey(H1, Level) of + true -> + {skipped_key, T1, [H2|T2]}; + false -> + {{next_key, H1}, T1, [H2|T2]} + end; + right_hand_first -> + case leveled_codec:maybe_reap_expiredkey(H2, Level) of + true -> + {skipped_key, [H1|T1], T2}; + false -> + {{next_key, H2}, [H1|T1], T2} + end; + left_hand_dominant -> + {skipped_key, [H1|T1], T2}; + right_hand_dominant -> + {skipped_key, T1, [H2|T2]} + end. + + +%% When a list is provided it may include a pointer to gain another batch of +%% entries from the same file, or a new batch of entries from another file +%% +%% This resultant list should include the Tail of any pointers added at the +%% end of the list + +maybe_expand_pointer([]) -> + []; +maybe_expand_pointer([{pointer, SFTPid, Slot, StartKey, all}|Tail]) -> + FoldFun = + fun(X, {Pointers, Remainder}) -> + case length(Pointers) of + L when L < ?MERGE_SCANWIDTH -> + case X of + {pointer, SFTPid, S, SK, EK} -> + {Pointers ++ [{pointer, S, SK, EK}], Remainder}; + _ -> + {Pointers, Remainder ++ [X]} + end; + _ -> + {Pointers, Remainder ++ [X]} + end + end, + InitAcc = {[{pointer, Slot, StartKey, all}], []}, + {AccPointers, AccTail} = lists:foldl(FoldFun, InitAcc, Tail), + SW = os:timestamp(), + ExpPointers = sst_getslots(SFTPid, AccPointers), + leveled_log:log_timer("SFT14", [SFTPid], SW), + lists:append(ExpPointers, AccTail); +maybe_expand_pointer([{next, SFTPid, StartKey}|Tail]) -> + ExpPointer = sst_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH), + maybe_expand_pointer(ExpPointer ++ Tail); +maybe_expand_pointer(List) -> + List. + + + %%%============================================================================ %%% Test @@ -671,6 +828,53 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> BRange). +merge_test() -> + N = 3000, + KVL1 = lists:ukeysort(1, generate_randomkeys(N + 1, N, 1, 20)), + KVL2 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 20)), + KVL3 = lists:ukeymerge(1, KVL1, KVL2), + SW0 = os:timestamp(), + {ok, P1, {FK1, LK1}} = sst_new("../test/level1_src", 1, KVL1), + {ok, P2, {FK2, LK2}} = sst_new("../test/level2_src", 2, KVL2), + ExpFK1 = element(1, lists:nth(1, KVL1)), + ExpLK1 = element(1, lists:last(KVL1)), + ExpFK2 = element(1, lists:nth(1, KVL2)), + ExpLK2 = element(1, lists:last(KVL2)), + ?assertMatch(ExpFK1, FK1), + ?assertMatch(ExpFK2, FK2), + ?assertMatch(ExpLK1, LK1), + ?assertMatch(ExpLK2, LK2), + ML1 = [{next, P1, FK1}], + ML2 = [{next, P2, FK2}], + {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/level2_merge", + ML1, + ML2, + false, + 2), + ?assertMatch([], Rem1), + ?assertMatch([], Rem2), + ?assertMatch(true, FK3 == min(FK1, FK2)), + ?assertMatch(true, LK3 == max(LK1, LK2)), + io:format(user, + "Created and merged two files of size ~w in ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW0)]), + + SW1 = os:timestamp(), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(P3, K)) + end, + KVL3), + io:format(user, + "Checked presence of all ~w objects in ~w microseconds~n", + [length(KVL3), timer:now_diff(os:timestamp(), SW1)]), + + ok = sst_close(P1), + ok = sst_close(P2), + ok = sst_close(P3), + ok = file:delete("../test/level1_src.sst"), + ok = file:delete("../test/level2_src.sst"), + ok = file:delete("../test/level2_merge.sst"). + simple_slotbin_test() -> KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 2, 1, 4), KVList1 = lists:sublist(lists:ukeysort(1, KVList0), 1, ?SLOT_SIZE), @@ -703,7 +907,7 @@ simple_slotbin_test() -> simple_slotbinsummary_test() -> - KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 8 + 100, 1, 4), + KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 16, 1, 20), KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _V}|_Rest] = KVList1, {FirstKey, _L, SlotIndex, AllHashes, SlotsBin} = build_all_slots(KVList1), @@ -787,23 +991,29 @@ simple_persisted_test() -> {TenthKey, _v10} = lists:nth(10, KVList1), {Three000Key, _v300} = lists:nth(300, KVList1), - io:format("Looking for 291 elements between ~s ~s and ~s ~s~n", - [element(2, TenthKey), - element(3, TenthKey), - element(2, Three000Key), - element(3, Three000Key)]), SubKVList1 = lists:sublist(KVList1, 10, 291), SubKVList1L = length(SubKVList1), FetchList2 = sst_getkvrange(Pid, TenthKey, Three000Key, 2), + ?assertMatch(pointer, element(1, lists:last(FetchList2))), FetchedList2 = lists:foldl(FoldFun, [], FetchList2), - io:format("Found elements between ~s ~s and ~s ~s~n", - [element(2, element(1, lists:nth(1, FetchedList2))), - element(3, element(1, lists:nth(1, FetchedList2))), - element(2, element(1, lists:last(FetchedList2))), - element(3, element(1, lists:last(FetchedList2)))]), ?assertMatch(SubKVList1L, length(FetchedList2)), ?assertMatch(SubKVList1, FetchedList2), + {Eight000Key, _v800} = lists:nth(800, KVList1), + SubKVListA1 = lists:sublist(KVList1, 10, 791), + SubKVListA1L = length(SubKVListA1), + FetchListA2 = sst_getkvrange(Pid, TenthKey, Eight000Key, 2), + ?assertMatch(pointer, element(1, lists:last(FetchListA2))), + FetchedListA2 = lists:foldl(FoldFun, [], FetchListA2), + ?assertMatch(SubKVListA1L, length(FetchedListA2)), + ?assertMatch(SubKVListA1, FetchedListA2), + + FetchListB2 = sst_getkvrange(Pid, TenthKey, Eight000Key, 4), + ?assertMatch(pointer, element(1, lists:last(FetchListB2))), + FetchedListB2 = lists:foldl(FoldFun, [], FetchListB2), + ?assertMatch(SubKVListA1L, length(FetchedListB2)), + ?assertMatch(SubKVListA1, FetchedListB2), + ok = sst_close(Pid), ok = file:delete(Filename ++ ".sst").