Auto-merge

Allow for the clerk to merge continuously is no activity for the
penciller to prompt.

The penciller now must also correctly lock the manifest - to stop races
between the creation of ne wL0 files and the completion of work by the
clerk
This commit is contained in:
martinsumner 2016-08-12 01:05:59 +01:00
parent c269eb3c52
commit 6e56b569b8
3 changed files with 221 additions and 101 deletions

View file

@ -159,7 +159,8 @@
pcl_fetch/2,
pcl_workforclerk/1,
pcl_requestmanifestchange/2,
pcl_confirmdelete/2]).
pcl_confirmdelete/2,
pcl_prompt/1]).
-include_lib("eunit/include/eunit.hrl").
@ -181,26 +182,26 @@
-record(state, {manifest = [] :: list(),
ongoing_work = [] :: list(),
manifest_sqn = 0 :: integer(),
levelzero_sqn = 0 :: integer(),
registered_iterators = [] :: list(),
unreferenced_files = [] :: list(),
root_path = "../test/" :: string(),
root_path = "../test" :: string(),
table_size = 0 :: integer(),
clerk :: pid(),
levelzero_pending = {false, [], none} :: tuple(),
memtable}).
memtable,
backlog = false :: boolean()}).
%%%============================================================================
%%% API
%%%============================================================================
pcl_new() ->
gen_server:start(?MODULE, [], []).
pcl_start(_RootDir) ->
%% TODO
%% Need to call startup to rebuild from disk
pcl_start(RootDir) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
gen_server:call(Pid, {load, RootDir}, infinity),
ok.
pcl_pushmem(Pid, DumpList) ->
@ -219,6 +220,9 @@ pcl_requestmanifestchange(Pid, WorkItem) ->
pcl_confirmdelete(Pid, FileName) ->
gen_server:call(Pid, {confirm_delete, FileName}).
pcl_prompt(Pid) ->
gen_server:call(Pid, prompt_compaction).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
@ -229,79 +233,39 @@ init([]) ->
{ok, #state{memtable=TID, clerk=Clerk}}.
handle_call({push_mem, DumpList}, _From, State) ->
{TableSize, Manifest, L0Pend} = case State#state.levelzero_pending of
{true, Remainder, {StartKey, EndKey, Pid}} ->
%% Need to handle not error scenarios?
%% N.B. Sync call - so will be ready
{ok, SrcFN} = leveled_sft:sft_checkready(Pid),
%% Reset ETS, but re-insert any remainder
true = ets:delete_all_objects(State#state.memtable),
true = ets:insert(State#state.memtable, Remainder),
ManifestEntry = #manifest_entry{start_key=StartKey,
end_key=EndKey,
owner=Pid,
filename=SrcFN},
{length(Remainder),
lists:keystore(0,
1,
State#state.manifest,
{0, [ManifestEntry]}),
?L0PEND_RESET};
{false, _, _} ->
{State#state.table_size,
State#state.manifest,
State#state.levelzero_pending};
Unexpected ->
io:format("Unexpected value of ~w~n", [Unexpected]),
error
end,
%% Prompt clerk to ask about work - do this for every push_mem
ok = leveled_clerk:clerk_prompt(State#state.clerk, penciller),
case do_push_to_mem(DumpList, TableSize, State#state.memtable) of
{twist, ApproxTableSize} ->
{reply, ok, State#state{table_size=ApproxTableSize,
manifest=Manifest,
levelzero_pending=L0Pend}};
{roll, ApproxTableSize} ->
case {get_item(0, Manifest, []), L0Pend} of
{[], ?L0PEND_RESET} ->
L0SN = State#state.levelzero_sqn + 1,
FileName = State#state.root_path
++ ?FILES_FP ++ "/"
++ integer_to_list(L0SN) ++ "_0_0",
Dump = ets:tab2list(State#state.memtable),
L0_SFT = leveled_sft:sft_new(FileName,
Dump,
[],
0,
#sft_options{wait=false}),
{ok, L0Pid, Reply} = L0_SFT,
{{KL1Rem, []}, L0StartKey, L0EndKey} = Reply,
{reply, ok, State#state{levelzero_pending={true,
KL1Rem,
{L0StartKey,
L0EndKey,
L0Pid}},
table_size=ApproxTableSize,
levelzero_sqn=L0SN}};
_ ->
io:format("Memory has exceeded limit but L0 file is still
awaiting compaction ~n"),
{reply, pause, State#state{table_size=ApproxTableSize,
manifest=Manifest,
levelzero_pending=L0Pend}}
end
case push_to_memory(DumpList, State) of
{ok, UpdState} ->
{reply, ok, UpdState};
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true}}
end;
handle_call({fetch, Key}, _From, State) ->
{reply, fetch(Key, State#state.manifest, State#state.memtable), State};
handle_call(work_for_clerk, From, State) ->
{UpdState, Work} = return_work(State, From),
{reply, Work, UpdState};
{reply, {Work, UpdState#state.backlog}, UpdState};
handle_call({confirm_delete, FileName}, _From, State) ->
Reply = confirm_delete(FileName,
State#state.unreferenced_files,
State#state.registered_iterators),
{reply, Reply, State}.
{reply, Reply, State};
handle_call({load, RootDir}, _From, State) ->
{Manifest, ManifestSQN} = load_manifest(RootDir),
{UpdManifest, MaxSQN} = load_allsft(RootDir, Manifest),
{UpdMaxSQN} = load_levelzero(RootDir, MaxSQN),
{reply, UpdMaxSQN, State#state{root_path=RootDir,
manifest=UpdManifest,
manifest_sqn=ManifestSQN}};
handle_call(prompt_compaction, _From, State) ->
case push_to_memory([], State) of
{ok, UpdState} ->
{reply, ok, UpdState#state{backlog=false}};
{{pause, Reason, Details}, UpdState} ->
io:format("Excess work due to - " ++ Reason, Details),
{reply, pause, UpdState#state{backlog=true}}
end.
handle_cast({manifest_change, WI}, State) ->
{ok, UpdState} = commit_manifest_change(WI, State),
@ -321,6 +285,81 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%============================================================================
push_to_memory(DumpList, State) ->
{TableSize, UpdState} = case State#state.levelzero_pending of
{true, Remainder, {StartKey, EndKey, Pid}} ->
%% Need to handle not error scenarios?
%% N.B. Sync call - so will be ready
{ok, SrcFN} = leveled_sft:sft_checkready(Pid),
%% Reset ETS, but re-insert any remainder
true = ets:delete_all_objects(State#state.memtable),
true = ets:insert(State#state.memtable, Remainder),
ManifestEntry = #manifest_entry{start_key=StartKey,
end_key=EndKey,
owner=Pid,
filename=SrcFN},
{length(Remainder),
State#state{manifest=lists:keystore(0,
1,
State#state.manifest,
{0, [ManifestEntry]}),
levelzero_pending=?L0PEND_RESET}};
{false, _, _} ->
{State#state.table_size, State}
end,
%% Prompt clerk to ask about work - do this for every push_mem
ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller),
case do_push_to_mem(DumpList, TableSize, UpdState#state.memtable) of
{twist, ApproxTableSize} ->
{ok, UpdState#state{table_size=ApproxTableSize}};
{roll, ApproxTableSize} ->
L0 = get_item(0, UpdState#state.manifest, []),
case {L0, manifest_locked(UpdState)} of
{[], false} ->
MSN = UpdState#state.manifest_sqn + 1,
FileName = UpdState#state.root_path
++ "/" ++ ?FILES_FP ++ "/"
++ integer_to_list(MSN) ++ "_0_0",
Dump = ets:tab2list(UpdState#state.memtable),
L0_SFT = leveled_sft:sft_new(FileName,
Dump,
[],
0,
#sft_options{wait=false}),
{ok, L0Pid, Reply} = L0_SFT,
{{KL1Rem, []}, L0StartKey, L0EndKey} = Reply,
Backlog = length(KL1Rem),
Rsp =
if
Backlog > ?MAX_TABLESIZE ->
{pause,
"Backlog of ~w in memory table~n",
[Backlog]};
true ->
ok
end,
{Rsp,
UpdState#state{levelzero_pending={true,
KL1Rem,
{L0StartKey,
L0EndKey,
L0Pid}},
table_size=ApproxTableSize,
manifest_sqn=MSN}};
{[], true} ->
{{pause,
"L0 file write blocked by change at sqn=~w~n",
[UpdState#state.manifest_sqn]},
UpdState#state{table_size=ApproxTableSize}};
_ ->
{{pause,
"L0 file write blocked by L0 file in manifest~n",
[]},
UpdState#state{table_size=ApproxTableSize}}
end
end.
fetch(Key, Manifest, TID) ->
case ets:lookup(TID, Key) of
@ -373,6 +412,22 @@ do_push_to_mem(DumpList, TableSize, MemTable) ->
end.
%% Manifest lock - don't have two changes to the manifest happening
%% concurrently
manifest_locked(State) ->
if
length(State#state.ongoing_work) > 0 ->
true;
true ->
case State#state.levelzero_pending of
{true, _, _} ->
true;
_ ->
false
end
end.
%% Work out what the current work queue should be
%%
@ -387,11 +442,11 @@ return_work(State, From) ->
case length(WorkQueue) of
L when L > 0 ->
[{SrcLevel, Manifest}|OtherWork] = WorkQueue,
io:format("Work at Level ~w to be scheduled for ~w
with ~w queue items outstanding~n",
io:format("Work at Level ~w to be scheduled for ~w with ~w " ++
"queue items outstanding~n",
[SrcLevel, From, length(OtherWork)]),
case State#state.ongoing_work of
[] ->
case {manifest_locked(State), State#state.ongoing_work} of
{false, _} ->
%% No work currently outstanding
%% Can allocate work
NextSQN = State#state.manifest_sqn + 1,
@ -409,15 +464,20 @@ return_work(State, From) ->
ledger_filepath = FP,
manifest_file = ManFile},
{State#state{ongoing_work=[WI]}, WI};
[OutstandingWork] ->
{true, [OutstandingWork]} ->
%% Still awaiting a response
io:format("Ongoing work requested by ~w but work
outstanding from Level ~w and Clerk ~w
at sequence number ~w~n",
io:format("Ongoing work requested by ~w " ++
"but work outstanding from Level ~w " ++
"and Clerk ~w at sequence number ~w~n",
[From,
OutstandingWork#penciller_work.src_level,
OutstandingWork#penciller_work.clerk,
OutstandingWork#penciller_work.next_sqn]),
{State, none};
{true, _} ->
%% Manifest locked
io:format("Manifest locked but no work outstanding " ++
"with clerk~n"),
{State, none}
end;
_ ->
@ -443,10 +503,8 @@ maybe_append_work(WorkQ, Level, Manifest,
io:format("Outstanding compaction work items of ~w at level ~w~n",
[FileCount - MaxFiles, Level]),
lists:append(WorkQ, [{Level, Manifest}]);
maybe_append_work(WorkQ, Level, _Manifest,
_MaxFiles, FileCount) ->
io:format("No compaction work due to file count ~w at level ~w~n",
[FileCount, Level]),
maybe_append_work(WorkQ, _Level, _Manifest,
_MaxFiles, _FileCount) ->
WorkQ.
@ -544,6 +602,29 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) ->
end
end.
%% load_manifest(RootDir),
%% {UpdManifest, MaxSQN} = load_allsft(RootDir, Manifest),
%% Level0SQN, UpdMaxSQN} = load_levelzero(RootDir, MaxSQN)
load_manifest(_RootDir) ->
{{}, 0}.
load_allsft(_RootDir, _Manifest) ->
%% Manifest has been persisted with PIDs that are no longer
%% valid, roll through each entry opening files and replacing Pids in
%% Manifest
{{}, 0}.
load_levelzero(_RootDir, _MaxSQN) ->
%% When loading L0 manifest make sure that the lowest sequence number in
%% the L0 manifest is bigger than the highest in all levels below
%% - not True
%% ... what about key remainders?
%% ... need to rethink L0
{0, 0}.
%%%============================================================================
%%% Test
%%%============================================================================