From 6e56b569b8852a528c0fd72aa4b15214ca818d61 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 12 Aug 2016 01:05:59 +0100 Subject: [PATCH] Auto-merge Allow for the clerk to merge continuously is no activity for the penciller to prompt. The penciller now must also correctly lock the manifest - to stop races between the creation of ne wL0 files and the completion of work by the clerk --- src/leveled_clerk.erl | 65 +++++++--- src/leveled_penciller.erl | 249 +++++++++++++++++++++++++------------- src/leveled_sft.erl | 8 +- 3 files changed, 221 insertions(+), 101 deletions(-) diff --git a/src/leveled_clerk.erl b/src/leveled_clerk.erl index bf5e252..1a181b2 100644 --- a/src/leveled_clerk.erl +++ b/src/leveled_clerk.erl @@ -20,7 +20,11 @@ -include_lib("eunit/include/eunit.hrl"). --record(state, {owner :: pid()}). +-define(INACTIVITY_TIMEOUT, 2000). +-define(HAPPYTIME_MULTIPLIER, 5). + +-record(state, {owner :: pid(), + in_backlog = false :: boolean()}). %%%============================================================================ %%% API @@ -47,20 +51,17 @@ handle_call({register, Owner}, _From, State) -> {reply, ok, State#state{owner=Owner}}. handle_cast(penciller_prompt, State) -> - case leveled_penciller:pcl_workforclerk(State#state.owner) of - none -> - io:format("Work prompted but none needed~n"), - {noreply, State}; - WI -> - {NewManifest, FilesToDelete} = merge(WI), - UpdWI = WI#penciller_work{new_manifest=NewManifest, - unreferenced_files=FilesToDelete}, - leveled_penciller:pcl_requestmanifestchange(State#state.owner, - UpdWI), - mark_for_delete(FilesToDelete, State#state.owner), - {noreply, State} - end. + Timeout = requestandhandle_work(State), + {noreply, State, Timeout}. +handle_info(timeout, State) -> + %% The pcl prompt will cause a penciller_prompt, to re-trigger timeout + case leveled_penciller:pcl_prompt(State#state.owner) of + ok -> + {noreply, State}; + pause -> + {noreply, State} + end; handle_info(_Info, State) -> {noreply, State}. @@ -75,6 +76,27 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ +requestandhandle_work(State) -> + case leveled_penciller:pcl_workforclerk(State#state.owner) of + {none, Backlog} -> + io:format("Work prompted but none needed~n"), + case Backlog of + false -> + ?INACTIVITY_TIMEOUT * ?HAPPYTIME_MULTIPLIER; + _ -> + ?INACTIVITY_TIMEOUT + end; + {WI, _} -> + {NewManifest, FilesToDelete} = merge(WI), + UpdWI = WI#penciller_work{new_manifest=NewManifest, + unreferenced_files=FilesToDelete}, + leveled_penciller:pcl_requestmanifestchange(State#state.owner, + UpdWI), + mark_for_delete(FilesToDelete, State#state.owner), + ?INACTIVITY_TIMEOUT + end. + + merge(WI) -> SrcLevel = WI#penciller_work.src_level, {SrcF, UpdMFest1} = select_filetomerge(SrcLevel, @@ -106,6 +128,8 @@ merge(WI) -> %% %% TODO: need to think still about simply renaming when at %% lower level + io:format("File ~s to simply switch levels to level ~w~n", + [SrcF#manifest_entry.filename, SrcLevel + 1]), [SrcF]; _ -> perform_merge({SrcF#manifest_entry.owner, @@ -130,7 +154,13 @@ merge(WI) -> [binary, raw, write]), ok = file:write(Handle, term_to_binary(UpdMFest2)), ok = file:close(Handle), - {UpdMFest2, Candidates} + case lists:member(SrcF, MergedFiles) of + true -> + {UpdMFest2, Candidates}; + false -> + %% Can rub out src file as it is not part of output + {UpdMFest2, Candidates ++ [SrcF]} + end end. @@ -298,7 +328,10 @@ merge_file_test() -> {ok, PidL2_4, _} = leveled_sft:sft_new("../test/KL4_L2.sft", KL4_L2, [], 2), Result = perform_merge({PidL1_1, "../test/KL1_L1.sft"}, - [PidL2_1, PidL2_2, PidL2_3, PidL2_4], + [#manifest_entry{owner=PidL2_1}, + #manifest_entry{owner=PidL2_2}, + #manifest_entry{owner=PidL2_3}, + #manifest_entry{owner=PidL2_4}], 2, {"../test/", 99}), lists:foreach(fun(ManEntry) -> {o, B1, K1} = ManEntry#manifest_entry.start_key, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index fb7432d..44e70c0 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -159,7 +159,8 @@ pcl_fetch/2, pcl_workforclerk/1, pcl_requestmanifestchange/2, - pcl_confirmdelete/2]). + pcl_confirmdelete/2, + pcl_prompt/1]). -include_lib("eunit/include/eunit.hrl"). @@ -181,26 +182,26 @@ -record(state, {manifest = [] :: list(), ongoing_work = [] :: list(), manifest_sqn = 0 :: integer(), - levelzero_sqn = 0 :: integer(), registered_iterators = [] :: list(), unreferenced_files = [] :: list(), - root_path = "../test/" :: string(), + root_path = "../test" :: string(), table_size = 0 :: integer(), clerk :: pid(), levelzero_pending = {false, [], none} :: tuple(), - memtable}). + memtable, + backlog = false :: boolean()}). %%%============================================================================ %%% API %%%============================================================================ - + pcl_new() -> gen_server:start(?MODULE, [], []). -pcl_start(_RootDir) -> - %% TODO - %% Need to call startup to rebuild from disk +pcl_start(RootDir) -> + {ok, Pid} = gen_server:start(?MODULE, [], []), + gen_server:call(Pid, {load, RootDir}, infinity), ok. pcl_pushmem(Pid, DumpList) -> @@ -219,6 +220,9 @@ pcl_requestmanifestchange(Pid, WorkItem) -> pcl_confirmdelete(Pid, FileName) -> gen_server:call(Pid, {confirm_delete, FileName}). +pcl_prompt(Pid) -> + gen_server:call(Pid, prompt_compaction). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -229,79 +233,39 @@ init([]) -> {ok, #state{memtable=TID, clerk=Clerk}}. handle_call({push_mem, DumpList}, _From, State) -> - {TableSize, Manifest, L0Pend} = case State#state.levelzero_pending of - {true, Remainder, {StartKey, EndKey, Pid}} -> - %% Need to handle not error scenarios? - %% N.B. Sync call - so will be ready - {ok, SrcFN} = leveled_sft:sft_checkready(Pid), - %% Reset ETS, but re-insert any remainder - true = ets:delete_all_objects(State#state.memtable), - true = ets:insert(State#state.memtable, Remainder), - ManifestEntry = #manifest_entry{start_key=StartKey, - end_key=EndKey, - owner=Pid, - filename=SrcFN}, - {length(Remainder), - lists:keystore(0, - 1, - State#state.manifest, - {0, [ManifestEntry]}), - ?L0PEND_RESET}; - {false, _, _} -> - {State#state.table_size, - State#state.manifest, - State#state.levelzero_pending}; - Unexpected -> - io:format("Unexpected value of ~w~n", [Unexpected]), - error - end, - %% Prompt clerk to ask about work - do this for every push_mem - ok = leveled_clerk:clerk_prompt(State#state.clerk, penciller), - case do_push_to_mem(DumpList, TableSize, State#state.memtable) of - {twist, ApproxTableSize} -> - {reply, ok, State#state{table_size=ApproxTableSize, - manifest=Manifest, - levelzero_pending=L0Pend}}; - {roll, ApproxTableSize} -> - case {get_item(0, Manifest, []), L0Pend} of - {[], ?L0PEND_RESET} -> - L0SN = State#state.levelzero_sqn + 1, - FileName = State#state.root_path - ++ ?FILES_FP ++ "/" - ++ integer_to_list(L0SN) ++ "_0_0", - Dump = ets:tab2list(State#state.memtable), - L0_SFT = leveled_sft:sft_new(FileName, - Dump, - [], - 0, - #sft_options{wait=false}), - {ok, L0Pid, Reply} = L0_SFT, - {{KL1Rem, []}, L0StartKey, L0EndKey} = Reply, - {reply, ok, State#state{levelzero_pending={true, - KL1Rem, - {L0StartKey, - L0EndKey, - L0Pid}}, - table_size=ApproxTableSize, - levelzero_sqn=L0SN}}; - _ -> - io:format("Memory has exceeded limit but L0 file is still - awaiting compaction ~n"), - {reply, pause, State#state{table_size=ApproxTableSize, - manifest=Manifest, - levelzero_pending=L0Pend}} - end + case push_to_memory(DumpList, State) of + {ok, UpdState} -> + {reply, ok, UpdState}; + {{pause, Reason, Details}, UpdState} -> + io:format("Excess work due to - " ++ Reason, Details), + {reply, pause, UpdState#state{backlog=true}} end; handle_call({fetch, Key}, _From, State) -> {reply, fetch(Key, State#state.manifest, State#state.memtable), State}; handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), - {reply, Work, UpdState}; + {reply, {Work, UpdState#state.backlog}, UpdState}; handle_call({confirm_delete, FileName}, _From, State) -> Reply = confirm_delete(FileName, State#state.unreferenced_files, State#state.registered_iterators), - {reply, Reply, State}. + {reply, Reply, State}; +handle_call({load, RootDir}, _From, State) -> + {Manifest, ManifestSQN} = load_manifest(RootDir), + {UpdManifest, MaxSQN} = load_allsft(RootDir, Manifest), + {UpdMaxSQN} = load_levelzero(RootDir, MaxSQN), + {reply, UpdMaxSQN, State#state{root_path=RootDir, + manifest=UpdManifest, + manifest_sqn=ManifestSQN}}; +handle_call(prompt_compaction, _From, State) -> + case push_to_memory([], State) of + {ok, UpdState} -> + {reply, ok, UpdState#state{backlog=false}}; + {{pause, Reason, Details}, UpdState} -> + io:format("Excess work due to - " ++ Reason, Details), + {reply, pause, UpdState#state{backlog=true}} + end. + handle_cast({manifest_change, WI}, State) -> {ok, UpdState} = commit_manifest_change(WI, State), @@ -321,6 +285,81 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ +push_to_memory(DumpList, State) -> + {TableSize, UpdState} = case State#state.levelzero_pending of + {true, Remainder, {StartKey, EndKey, Pid}} -> + %% Need to handle not error scenarios? + %% N.B. Sync call - so will be ready + {ok, SrcFN} = leveled_sft:sft_checkready(Pid), + %% Reset ETS, but re-insert any remainder + true = ets:delete_all_objects(State#state.memtable), + true = ets:insert(State#state.memtable, Remainder), + ManifestEntry = #manifest_entry{start_key=StartKey, + end_key=EndKey, + owner=Pid, + filename=SrcFN}, + {length(Remainder), + State#state{manifest=lists:keystore(0, + 1, + State#state.manifest, + {0, [ManifestEntry]}), + levelzero_pending=?L0PEND_RESET}}; + {false, _, _} -> + {State#state.table_size, State} + end, + + %% Prompt clerk to ask about work - do this for every push_mem + ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller), + + case do_push_to_mem(DumpList, TableSize, UpdState#state.memtable) of + {twist, ApproxTableSize} -> + {ok, UpdState#state{table_size=ApproxTableSize}}; + {roll, ApproxTableSize} -> + L0 = get_item(0, UpdState#state.manifest, []), + case {L0, manifest_locked(UpdState)} of + {[], false} -> + MSN = UpdState#state.manifest_sqn + 1, + FileName = UpdState#state.root_path + ++ "/" ++ ?FILES_FP ++ "/" + ++ integer_to_list(MSN) ++ "_0_0", + Dump = ets:tab2list(UpdState#state.memtable), + L0_SFT = leveled_sft:sft_new(FileName, + Dump, + [], + 0, + #sft_options{wait=false}), + {ok, L0Pid, Reply} = L0_SFT, + {{KL1Rem, []}, L0StartKey, L0EndKey} = Reply, + Backlog = length(KL1Rem), + Rsp = + if + Backlog > ?MAX_TABLESIZE -> + {pause, + "Backlog of ~w in memory table~n", + [Backlog]}; + true -> + ok + end, + {Rsp, + UpdState#state{levelzero_pending={true, + KL1Rem, + {L0StartKey, + L0EndKey, + L0Pid}}, + table_size=ApproxTableSize, + manifest_sqn=MSN}}; + {[], true} -> + {{pause, + "L0 file write blocked by change at sqn=~w~n", + [UpdState#state.manifest_sqn]}, + UpdState#state{table_size=ApproxTableSize}}; + _ -> + {{pause, + "L0 file write blocked by L0 file in manifest~n", + []}, + UpdState#state{table_size=ApproxTableSize}} + end + end. fetch(Key, Manifest, TID) -> case ets:lookup(TID, Key) of @@ -373,6 +412,22 @@ do_push_to_mem(DumpList, TableSize, MemTable) -> end. +%% Manifest lock - don't have two changes to the manifest happening +%% concurrently + +manifest_locked(State) -> + if + length(State#state.ongoing_work) > 0 -> + true; + true -> + case State#state.levelzero_pending of + {true, _, _} -> + true; + _ -> + false + end + end. + %% Work out what the current work queue should be %% @@ -387,11 +442,11 @@ return_work(State, From) -> case length(WorkQueue) of L when L > 0 -> [{SrcLevel, Manifest}|OtherWork] = WorkQueue, - io:format("Work at Level ~w to be scheduled for ~w - with ~w queue items outstanding~n", + io:format("Work at Level ~w to be scheduled for ~w with ~w " ++ + "queue items outstanding~n", [SrcLevel, From, length(OtherWork)]), - case State#state.ongoing_work of - [] -> + case {manifest_locked(State), State#state.ongoing_work} of + {false, _} -> %% No work currently outstanding %% Can allocate work NextSQN = State#state.manifest_sqn + 1, @@ -409,15 +464,20 @@ return_work(State, From) -> ledger_filepath = FP, manifest_file = ManFile}, {State#state{ongoing_work=[WI]}, WI}; - [OutstandingWork] -> + {true, [OutstandingWork]} -> %% Still awaiting a response - io:format("Ongoing work requested by ~w but work - outstanding from Level ~w and Clerk ~w - at sequence number ~w~n", + io:format("Ongoing work requested by ~w " ++ + "but work outstanding from Level ~w " ++ + "and Clerk ~w at sequence number ~w~n", [From, OutstandingWork#penciller_work.src_level, OutstandingWork#penciller_work.clerk, OutstandingWork#penciller_work.next_sqn]), + {State, none}; + {true, _} -> + %% Manifest locked + io:format("Manifest locked but no work outstanding " ++ + "with clerk~n"), {State, none} end; _ -> @@ -443,10 +503,8 @@ maybe_append_work(WorkQ, Level, Manifest, io:format("Outstanding compaction work items of ~w at level ~w~n", [FileCount - MaxFiles, Level]), lists:append(WorkQ, [{Level, Manifest}]); -maybe_append_work(WorkQ, Level, _Manifest, - _MaxFiles, FileCount) -> - io:format("No compaction work due to file count ~w at level ~w~n", - [FileCount, Level]), +maybe_append_work(WorkQ, _Level, _Manifest, + _MaxFiles, _FileCount) -> WorkQ. @@ -544,6 +602,29 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) -> end end. + +%% load_manifest(RootDir), +%% {UpdManifest, MaxSQN} = load_allsft(RootDir, Manifest), +%% Level0SQN, UpdMaxSQN} = load_levelzero(RootDir, MaxSQN) + +load_manifest(_RootDir) -> + {{}, 0}. + +load_allsft(_RootDir, _Manifest) -> + %% Manifest has been persisted with PIDs that are no longer + %% valid, roll through each entry opening files and replacing Pids in + %% Manifest + {{}, 0}. + +load_levelzero(_RootDir, _MaxSQN) -> + %% When loading L0 manifest make sure that the lowest sequence number in + %% the L0 manifest is bigger than the highest in all levels below + %% - not True + %% ... what about key remainders? + %% ... need to rethink L0 + {0, 0}. + + %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index cde8313..80d3fd4 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -161,6 +161,7 @@ sft_checkready/1, sft_getfilename/1, sft_setfordelete/2, + sft_getmaxsequencenumber/1, strip_to_keyonly/1, generate_randomkeys/1]). @@ -260,6 +261,9 @@ sft_checkready(Pid) -> sft_getfilename(Pid) -> gen_server:call(Pid, get_filename, infinty). +sft_getmaxsequencenumber(Pid) -> + gen_server:call(Pid, get_maxsqn, infinity). + %%%============================================================================ %%% API helper functions %%%============================================================================ @@ -394,7 +398,9 @@ handle_call({set_for_delete, Penciller}, _From, State) -> ok, State#state{ready_for_delete=true, penciller=Penciller}, - ?DELETE_TIMEOUT}. + ?DELETE_TIMEOUT}; +handle_call(get_maxsqn, _From, State) -> + {reply, State#state.highest_sqn, State}. handle_cast(_Msg, State) -> {noreply, State}.