Mini Refactor
Thought about the mess, thought about swithcing to a FSM, throught about just sorting a bit of the mess instead.
This commit is contained in:
parent
ad5aebe93e
commit
f58f4d0ea5
1 changed files with 127 additions and 159 deletions
|
@ -232,7 +232,6 @@
|
||||||
terminate/2,
|
terminate/2,
|
||||||
code_change/3,
|
code_change/3,
|
||||||
pcl_start/1,
|
pcl_start/1,
|
||||||
pcl_quickstart/1,
|
|
||||||
pcl_pushmem/2,
|
pcl_pushmem/2,
|
||||||
pcl_fetch/2,
|
pcl_fetch/2,
|
||||||
pcl_checksequencenumber/3,
|
pcl_checksequencenumber/3,
|
||||||
|
@ -292,8 +291,6 @@
|
||||||
%%% API
|
%%% API
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
pcl_quickstart(RootPath) ->
|
|
||||||
pcl_start(#penciller_options{root_path=RootPath}).
|
|
||||||
|
|
||||||
pcl_start(PCLopts) ->
|
pcl_start(PCLopts) ->
|
||||||
gen_server:start(?MODULE, [PCLopts], []).
|
gen_server:start(?MODULE, [PCLopts], []).
|
||||||
|
@ -361,34 +358,139 @@ init([PCLopts]) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
handle_call({push_mem, DumpList}, From, State) ->
|
handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap})
|
||||||
if
|
when Snap == false ->
|
||||||
State#state.is_snapshot == true ->
|
% The process for pushing to memory is as follows
|
||||||
{reply, bad_request, State};
|
% - Check that the inbound list does not contain any Keys with a lower
|
||||||
true ->
|
% sequence number than any existing keys (assess_sqn/1)
|
||||||
writer_call({push_mem, DumpList}, From, State)
|
% - Check that any file that had been sent to be written to L0 previously
|
||||||
|
% is now completed. If it is wipe out the in-memory view as this is now
|
||||||
|
% safely persisted. This will block waiting for this to complete if it
|
||||||
|
% hasn't (checkready_pushmem/1).
|
||||||
|
% - Quick check to see if there is a need to write a L0 file
|
||||||
|
% (quickcheck_pushmem/3). If there clearly isn't, then we can reply, and
|
||||||
|
% then add to memory in the background before updating the loop state
|
||||||
|
% - Push the update into memory (do_pushtomem/3)
|
||||||
|
% - If we haven't got through quickcheck now need to check if there is a
|
||||||
|
% definite need to write a new L0 file (roll_memory/2). If all clear this
|
||||||
|
% will write the file in the background and allow a response to the user.
|
||||||
|
% If not the change has still been made but the the L0 file will not have
|
||||||
|
% been prompted - so the reply does not indicate failure but returns the
|
||||||
|
% atom 'pause' to signal a loose desire for back-pressure to be applied.
|
||||||
|
% The only reason in this case why there should be a pause is if the
|
||||||
|
% manifest is locked pending completion of a manifest change - so reacting
|
||||||
|
% to the pause signal may not be sensible
|
||||||
|
StartWatch = os:timestamp(),
|
||||||
|
case assess_sqn(DumpList) of
|
||||||
|
{MinSQN, MaxSQN} when MaxSQN >= MinSQN,
|
||||||
|
MinSQN >= State#state.ledger_sqn ->
|
||||||
|
MaxTableSize = State#state.memtable_maxsize,
|
||||||
|
{TableSize0, State1} = checkready_pushtomem(State),
|
||||||
|
case quickcheck_pushtomem(DumpList,
|
||||||
|
TableSize0,
|
||||||
|
MaxTableSize) of
|
||||||
|
{twist, TableSize1} ->
|
||||||
|
gen_server:reply(From, ok),
|
||||||
|
io:format("Reply made on push in ~w microseconds~n",
|
||||||
|
[timer:now_diff(os:timestamp(), StartWatch)]),
|
||||||
|
L0Snap = do_pushtomem(DumpList,
|
||||||
|
State1#state.memtable,
|
||||||
|
State1#state.memtable_copy,
|
||||||
|
MaxSQN),
|
||||||
|
io:format("Push completed in ~w microseconds~n",
|
||||||
|
[timer:now_diff(os:timestamp(), StartWatch)]),
|
||||||
|
{noreply,
|
||||||
|
State1#state{memtable_copy=L0Snap,
|
||||||
|
table_size=TableSize1,
|
||||||
|
ledger_sqn=MaxSQN}};
|
||||||
|
{maybe_roll, TableSize1} ->
|
||||||
|
L0Snap = do_pushtomem(DumpList,
|
||||||
|
State1#state.memtable,
|
||||||
|
State1#state.memtable_copy,
|
||||||
|
MaxSQN),
|
||||||
|
|
||||||
|
case roll_memory(State1, MaxTableSize) of
|
||||||
|
{ok, L0Pend, ManSN, TableSize2} ->
|
||||||
|
io:format("Push completed in ~w microseconds~n",
|
||||||
|
[timer:now_diff(os:timestamp(), StartWatch)]),
|
||||||
|
{reply,
|
||||||
|
ok,
|
||||||
|
State1#state{levelzero_pending=L0Pend,
|
||||||
|
table_size=TableSize2,
|
||||||
|
manifest_sqn=ManSN,
|
||||||
|
memtable_copy=L0Snap,
|
||||||
|
ledger_sqn=MaxSQN,
|
||||||
|
backlog=false}};
|
||||||
|
{pause, Reason, Details} ->
|
||||||
|
io:format("Excess work due to - " ++ Reason,
|
||||||
|
Details),
|
||||||
|
{reply,
|
||||||
|
pause,
|
||||||
|
State1#state{backlog=true,
|
||||||
|
memtable_copy=L0Snap,
|
||||||
|
table_size=TableSize1,
|
||||||
|
ledger_sqn=MaxSQN}}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
{MinSQN, MaxSQN} ->
|
||||||
|
io:format("Mismatch of sequence number expectations with push "
|
||||||
|
++ "having sequence numbers between ~w and ~w "
|
||||||
|
++ "but current sequence number is ~w~n",
|
||||||
|
[MinSQN, MaxSQN, State#state.ledger_sqn]),
|
||||||
|
{reply, refused, State};
|
||||||
|
empty ->
|
||||||
|
io:format("Empty request pushed to Penciller~n"),
|
||||||
|
{reply, ok, State}
|
||||||
end;
|
end;
|
||||||
handle_call({confirm_delete, FileName}, _From, State) ->
|
handle_call({confirm_delete, FileName}, _From, State=#state{is_snapshot=Snap})
|
||||||
if
|
when Snap == false ->
|
||||||
State#state.is_snapshot == true ->
|
Reply = confirm_delete(FileName,
|
||||||
{reply, bad_request, State};
|
State#state.unreferenced_files,
|
||||||
|
State#state.registered_snapshots),
|
||||||
|
case Reply of
|
||||||
true ->
|
true ->
|
||||||
writer_call({confirm_delete, FileName}, _From, State)
|
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
|
||||||
|
{reply, true, State#state{unreferenced_files=UF1}};
|
||||||
|
_ ->
|
||||||
|
{reply, Reply, State}
|
||||||
end;
|
end;
|
||||||
handle_call(prompt_compaction, _From, State) ->
|
handle_call(prompt_compaction, _From, State=#state{is_snapshot=Snap})
|
||||||
|
when Snap == false ->
|
||||||
|
%% If there is a prompt immediately after a L0 async write event then
|
||||||
|
%% there exists the potential for the prompt to stall the database.
|
||||||
|
%% Should only accept prompts if there has been a safe wait from the
|
||||||
|
%% last L0 write event.
|
||||||
|
Proceed = case State#state.levelzero_pending of
|
||||||
|
{true, _Pid, TS} ->
|
||||||
|
TD = timer:now_diff(os:timestamp(),TS),
|
||||||
|
if
|
||||||
|
TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false;
|
||||||
|
true -> true
|
||||||
|
end;
|
||||||
|
?L0PEND_RESET ->
|
||||||
|
true
|
||||||
|
end,
|
||||||
if
|
if
|
||||||
State#state.is_snapshot == true ->
|
Proceed ->
|
||||||
{reply, bad_request, State};
|
{_TableSize, State1} = checkready_pushtomem(State),
|
||||||
|
case roll_memory(State1, State1#state.memtable_maxsize) of
|
||||||
|
{ok, L0Pend, MSN, TableSize} ->
|
||||||
|
io:format("Prompted push completed~n"),
|
||||||
|
{reply, ok, State1#state{levelzero_pending=L0Pend,
|
||||||
|
table_size=TableSize,
|
||||||
|
manifest_sqn=MSN,
|
||||||
|
backlog=false}};
|
||||||
|
{pause, Reason, Details} ->
|
||||||
|
io:format("Excess work due to - " ++ Reason, Details),
|
||||||
|
{reply, pause, State1#state{backlog=true}}
|
||||||
|
end;
|
||||||
true ->
|
true ->
|
||||||
writer_call(prompt_compaction, _From, State)
|
{reply, ok, State#state{backlog=false}}
|
||||||
end;
|
|
||||||
handle_call({manifest_change, WI}, _From, State) ->
|
|
||||||
if
|
|
||||||
State#state.is_snapshot == true ->
|
|
||||||
{reply, bad_request, State};
|
|
||||||
true ->
|
|
||||||
writer_call({manifest_change, WI}, _From, State)
|
|
||||||
end;
|
end;
|
||||||
|
handle_call({manifest_change, WI}, _From, State=#state{is_snapshot=Snap})
|
||||||
|
when Snap == false ->
|
||||||
|
{ok, UpdState} = commit_manifest_change(WI, State),
|
||||||
|
{reply, ok, UpdState};
|
||||||
handle_call({check_sqn, Key, SQN}, _From, State) ->
|
handle_call({check_sqn, Key, SQN}, _From, State) ->
|
||||||
Obj = if
|
Obj = if
|
||||||
State#state.is_snapshot == true ->
|
State#state.is_snapshot == true ->
|
||||||
|
@ -529,138 +631,6 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
|
||||||
writer_call({push_mem, DumpList}, From, State0) ->
|
|
||||||
% The process for pushing to memory is as follows
|
|
||||||
% - Check that the inbound list does not contain any Keys with a lower
|
|
||||||
% sequence number than any existing keys (assess_sqn/1)
|
|
||||||
% - Check that any file that had been sent to be written to L0 previously
|
|
||||||
% is now completed. If it is wipe out the in-memory view as this is now
|
|
||||||
% safely persisted. This will block waiting for this to complete if it
|
|
||||||
% hasn't (checkready_pushmem/1).
|
|
||||||
% - Quick check to see if there is a need to write a L0 file
|
|
||||||
% (quickcheck_pushmem/3). If there clearly isn't, then we can reply, and
|
|
||||||
% then add to memory in the background before updating the loop state
|
|
||||||
% - Push the update into memory (do_pushtomem/3)
|
|
||||||
% - If we haven't got through quickcheck now need to check if there is a
|
|
||||||
% definite need to write a new L0 file (roll_memory/2). If all clear this
|
|
||||||
% will write the file in the background and allow a response to the user.
|
|
||||||
% If not the change has still been made but the the L0 file will not have
|
|
||||||
% been prompted - so the reply does not indicate failure but returns the
|
|
||||||
% atom 'pause' to signal a loose desire for back-pressure to be applied.
|
|
||||||
% The only reason in this case why there should be a pause is if the
|
|
||||||
% manifest is locked pending completion of a manifest change - so reacting
|
|
||||||
% to the pause signal may not be sensible
|
|
||||||
StartWatch = os:timestamp(),
|
|
||||||
case assess_sqn(DumpList) of
|
|
||||||
{MinSQN, MaxSQN} when MaxSQN >= MinSQN,
|
|
||||||
MinSQN >= State0#state.ledger_sqn ->
|
|
||||||
MaxTableSize = State0#state.memtable_maxsize,
|
|
||||||
{TableSize0, State1} = checkready_pushtomem(State0),
|
|
||||||
case quickcheck_pushtomem(DumpList,
|
|
||||||
TableSize0,
|
|
||||||
MaxTableSize) of
|
|
||||||
{twist, TableSize1} ->
|
|
||||||
gen_server:reply(From, ok),
|
|
||||||
io:format("Reply made on push in ~w microseconds~n",
|
|
||||||
[timer:now_diff(os:timestamp(), StartWatch)]),
|
|
||||||
L0Snap = do_pushtomem(DumpList,
|
|
||||||
State1#state.memtable,
|
|
||||||
State1#state.memtable_copy,
|
|
||||||
MaxSQN),
|
|
||||||
io:format("Push completed in ~w microseconds~n",
|
|
||||||
[timer:now_diff(os:timestamp(), StartWatch)]),
|
|
||||||
{noreply,
|
|
||||||
State1#state{memtable_copy=L0Snap,
|
|
||||||
table_size=TableSize1,
|
|
||||||
ledger_sqn=MaxSQN}};
|
|
||||||
{maybe_roll, TableSize1} ->
|
|
||||||
L0Snap = do_pushtomem(DumpList,
|
|
||||||
State1#state.memtable,
|
|
||||||
State1#state.memtable_copy,
|
|
||||||
MaxSQN),
|
|
||||||
|
|
||||||
case roll_memory(State1, MaxTableSize) of
|
|
||||||
{ok, L0Pend, ManSN, TableSize2} ->
|
|
||||||
io:format("Push completed in ~w microseconds~n",
|
|
||||||
[timer:now_diff(os:timestamp(), StartWatch)]),
|
|
||||||
{reply,
|
|
||||||
ok,
|
|
||||||
State1#state{levelzero_pending=L0Pend,
|
|
||||||
table_size=TableSize2,
|
|
||||||
manifest_sqn=ManSN,
|
|
||||||
memtable_copy=L0Snap,
|
|
||||||
ledger_sqn=MaxSQN,
|
|
||||||
backlog=false}};
|
|
||||||
{pause, Reason, Details} ->
|
|
||||||
io:format("Excess work due to - " ++ Reason,
|
|
||||||
Details),
|
|
||||||
{reply,
|
|
||||||
pause,
|
|
||||||
State1#state{backlog=true,
|
|
||||||
memtable_copy=L0Snap,
|
|
||||||
table_size=TableSize1,
|
|
||||||
ledger_sqn=MaxSQN}}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
{MinSQN, MaxSQN} ->
|
|
||||||
io:format("Mismatch of sequence number expectations with push "
|
|
||||||
++ "having sequence numbers between ~w and ~w "
|
|
||||||
++ "but current sequence number is ~w~n",
|
|
||||||
[MinSQN, MaxSQN, State0#state.ledger_sqn]),
|
|
||||||
{reply, refused, State0};
|
|
||||||
empty ->
|
|
||||||
io:format("Empty request pushed to Penciller~n"),
|
|
||||||
{reply, ok, State0}
|
|
||||||
end;
|
|
||||||
writer_call({confirm_delete, FileName}, _From, State) ->
|
|
||||||
Reply = confirm_delete(FileName,
|
|
||||||
State#state.unreferenced_files,
|
|
||||||
State#state.registered_snapshots),
|
|
||||||
case Reply of
|
|
||||||
true ->
|
|
||||||
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
|
|
||||||
{reply, true, State#state{unreferenced_files=UF1}};
|
|
||||||
_ ->
|
|
||||||
{reply, Reply, State}
|
|
||||||
end;
|
|
||||||
writer_call(prompt_compaction, _From, State) ->
|
|
||||||
%% If there is a prompt immediately after a L0 async write event then
|
|
||||||
%% there exists the potential for the prompt to stall the database.
|
|
||||||
%% Should only accept prompts if there has been a safe wait from the
|
|
||||||
%% last L0 write event.
|
|
||||||
Proceed = case State#state.levelzero_pending of
|
|
||||||
{true, _Pid, TS} ->
|
|
||||||
TD = timer:now_diff(os:timestamp(),TS),
|
|
||||||
if
|
|
||||||
TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false;
|
|
||||||
true -> true
|
|
||||||
end;
|
|
||||||
?L0PEND_RESET ->
|
|
||||||
true
|
|
||||||
end,
|
|
||||||
if
|
|
||||||
Proceed ->
|
|
||||||
{_TableSize, State1} = checkready_pushtomem(State),
|
|
||||||
case roll_memory(State1, State1#state.memtable_maxsize) of
|
|
||||||
{ok, L0Pend, MSN, TableSize} ->
|
|
||||||
io:format("Prompted push completed~n"),
|
|
||||||
{reply, ok, State1#state{levelzero_pending=L0Pend,
|
|
||||||
table_size=TableSize,
|
|
||||||
manifest_sqn=MSN,
|
|
||||||
backlog=false}};
|
|
||||||
{pause, Reason, Details} ->
|
|
||||||
io:format("Excess work due to - " ++ Reason, Details),
|
|
||||||
{reply, pause, State1#state{backlog=true}}
|
|
||||||
end;
|
|
||||||
true ->
|
|
||||||
{reply, ok, State#state{backlog=false}}
|
|
||||||
end;
|
|
||||||
writer_call({manifest_change, WI}, _From, State) ->
|
|
||||||
{ok, UpdState} = commit_manifest_change(WI, State),
|
|
||||||
{reply, ok, UpdState}.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -1286,8 +1256,6 @@ simple_server_test() ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
io:format("Unexpected sequence number on restart ~w~n", [TopSQN]),
|
io:format("Unexpected sequence number on restart ~w~n", [TopSQN]),
|
||||||
ok = pcl_close(PCLr),
|
|
||||||
clean_testdir(RootPath),
|
|
||||||
error
|
error
|
||||||
end,
|
end,
|
||||||
?assertMatch(Check, ok),
|
?assertMatch(Check, ok),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue