Change Penciller to Clerk handoff
correct the previous change to make sure that deletes are not confirmed when work is outstanding, but also make sure that the clerk only ever casts the Penciller so no deadlocks can ensue.
This commit is contained in:
parent
9577d24be0
commit
f868be8086
2 changed files with 69 additions and 63 deletions
|
@ -37,12 +37,13 @@
|
||||||
-export([
|
-export([
|
||||||
clerk_new/2,
|
clerk_new/2,
|
||||||
clerk_prompt/1,
|
clerk_prompt/1,
|
||||||
|
clerk_push/2,
|
||||||
clerk_close/1
|
clerk_close/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(MAX_TIMEOUT, 1000).
|
-define(MAX_TIMEOUT, 2000).
|
||||||
-define(MIN_TIMEOUT, 200).
|
-define(MIN_TIMEOUT, 200).
|
||||||
|
|
||||||
-record(state, {owner :: pid(),
|
-record(state, {owner :: pid(),
|
||||||
|
@ -61,6 +62,9 @@ clerk_new(Owner, Manifest) ->
|
||||||
clerk_prompt(Pid) ->
|
clerk_prompt(Pid) ->
|
||||||
gen_server:cast(Pid, prompt).
|
gen_server:cast(Pid, prompt).
|
||||||
|
|
||||||
|
clerk_push(Pid, Work) ->
|
||||||
|
gen_server:cast(Pid, {push_work, Work}).
|
||||||
|
|
||||||
clerk_close(Pid) ->
|
clerk_close(Pid) ->
|
||||||
gen_server:call(Pid, close, 20000).
|
gen_server:call(Pid, close, 20000).
|
||||||
|
|
||||||
|
@ -77,18 +81,14 @@ handle_call(close, _From, State) ->
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
handle_cast(prompt, State) ->
|
handle_cast(prompt, State) ->
|
||||||
handle_info(timeout, State).
|
handle_info(timeout, State);
|
||||||
|
handle_cast({push_work, Work}, State) ->
|
||||||
|
handle_work(Work, State),
|
||||||
|
{noreply, State, ?MIN_TIMEOUT}.
|
||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
case requestandhandle_work(State) of
|
request_work(State),
|
||||||
false ->
|
{noreply, State, ?MAX_TIMEOUT}.
|
||||||
{noreply, State, ?MAX_TIMEOUT};
|
|
||||||
true ->
|
|
||||||
% No timeout now as will wait for call to return manifest
|
|
||||||
% change
|
|
||||||
{noreply, State, ?MIN_TIMEOUT}
|
|
||||||
end.
|
|
||||||
|
|
||||||
terminate(Reason, _State) ->
|
terminate(Reason, _State) ->
|
||||||
leveled_log:log("PC005", [self(), Reason]).
|
leveled_log:log("PC005", [self(), Reason]).
|
||||||
|
@ -101,24 +101,19 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
requestandhandle_work(State) ->
|
request_work(State) ->
|
||||||
case leveled_penciller:pcl_workforclerk(State#state.owner) of
|
ok = leveled_penciller:pcl_workforclerk(State#state.owner).
|
||||||
none ->
|
|
||||||
leveled_log:log("PC006", []),
|
|
||||||
false;
|
|
||||||
{SrcLevel, Manifest} ->
|
|
||||||
{UpdManifest, EntriesToDelete} = merge(SrcLevel,
|
|
||||||
Manifest,
|
|
||||||
State#state.root_path),
|
|
||||||
leveled_log:log("PC007", []),
|
|
||||||
ok = leveled_penciller:pcl_manifestchange(State#state.owner,
|
|
||||||
UpdManifest),
|
|
||||||
ok = leveled_manifest:save_manifest(UpdManifest,
|
|
||||||
State#state.root_path),
|
|
||||||
ok = notify_deletions(EntriesToDelete, State#state.owner),
|
|
||||||
true
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
handle_work({SrcLevel, Manifest}, State) ->
|
||||||
|
{UpdManifest, EntriesToDelete} = merge(SrcLevel,
|
||||||
|
Manifest,
|
||||||
|
State#state.root_path),
|
||||||
|
leveled_log:log("PC007", []),
|
||||||
|
ok = leveled_penciller:pcl_manifestchange(State#state.owner,
|
||||||
|
UpdManifest),
|
||||||
|
ok = leveled_manifest:save_manifest(UpdManifest,
|
||||||
|
State#state.root_path),
|
||||||
|
ok = notify_deletions(EntriesToDelete, State#state.owner).
|
||||||
|
|
||||||
merge(SrcLevel, Manifest, RootPath) ->
|
merge(SrcLevel, Manifest, RootPath) ->
|
||||||
Src = leveled_manifest:mergefile_selector(Manifest, SrcLevel),
|
Src = leveled_manifest:mergefile_selector(Manifest, SrcLevel),
|
||||||
|
|
|
@ -289,7 +289,7 @@ pcl_checksequencenumber(Pid, Key, SQN) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
pcl_workforclerk(Pid) ->
|
pcl_workforclerk(Pid) ->
|
||||||
gen_server:call(Pid, work_for_clerk, infinity).
|
gen_server:cast(Pid, work_for_clerk).
|
||||||
|
|
||||||
pcl_manifestchange(Pid, Manifest) ->
|
pcl_manifestchange(Pid, Manifest) ->
|
||||||
gen_server:cast(Pid, {manifest_change, Manifest}).
|
gen_server:cast(Pid, {manifest_change, Manifest}).
|
||||||
|
@ -428,30 +428,6 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
|
||||||
MaxKeys),
|
MaxKeys),
|
||||||
|
|
||||||
{reply, Acc, State#state{levelzero_astree = L0AsList}};
|
{reply, Acc, State#state{levelzero_astree = L0AsList}};
|
||||||
handle_call(work_for_clerk, _From, State) ->
|
|
||||||
case State#state.levelzero_pending of
|
|
||||||
true ->
|
|
||||||
{reply, none, State};
|
|
||||||
false ->
|
|
||||||
{WL, WC} = leveled_manifest:check_for_work(State#state.manifest,
|
|
||||||
?LEVEL_SCALEFACTOR),
|
|
||||||
case WC of
|
|
||||||
0 ->
|
|
||||||
{reply, none, State#state{work_backlog=false}};
|
|
||||||
N when N > ?WORKQUEUE_BACKLOG_TOLERANCE ->
|
|
||||||
leveled_log:log("P0024", [N, true]),
|
|
||||||
[TL|_Tail] = WL,
|
|
||||||
{reply,
|
|
||||||
{TL, State#state.manifest},
|
|
||||||
State#state{work_backlog=true, work_ongoing=true}};
|
|
||||||
N ->
|
|
||||||
leveled_log:log("P0024", [N, false]),
|
|
||||||
[TL|_Tail] = WL,
|
|
||||||
{reply,
|
|
||||||
{TL, State#state.manifest},
|
|
||||||
State#state{work_backlog=false, work_ongoing=true}}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
handle_call(get_startup_sqn, _From, State) ->
|
handle_call(get_startup_sqn, _From, State) ->
|
||||||
{reply, State#state.persisted_sqn, State};
|
{reply, State#state.persisted_sqn, State};
|
||||||
handle_call({register_snapshot, Snapshot}, _From, State) ->
|
handle_call({register_snapshot, Snapshot}, _From, State) ->
|
||||||
|
@ -493,15 +469,24 @@ handle_cast({release_snapshot, Snapshot}, State) ->
|
||||||
{noreply, State#state{manifest=Manifest0}};
|
{noreply, State#state{manifest=Manifest0}};
|
||||||
handle_cast({confirm_delete, Filename}, State=#state{is_snapshot=Snap})
|
handle_cast({confirm_delete, Filename}, State=#state{is_snapshot=Snap})
|
||||||
when Snap == false ->
|
when Snap == false ->
|
||||||
R2D = leveled_manifest:ready_to_delete(State#state.manifest, Filename),
|
case State#state.work_ongoing of
|
||||||
case R2D of
|
false ->
|
||||||
{true, Pid} ->
|
R2D = leveled_manifest:ready_to_delete(State#state.manifest,
|
||||||
leveled_log:log("P0005", [Filename]),
|
Filename),
|
||||||
ok = leveled_sst:sst_deleteconfirmed(Pid),
|
case R2D of
|
||||||
Man0 = leveled_manifest:delete_confirmed(State#state.manifest,
|
{true, Pid} ->
|
||||||
Filename),
|
leveled_log:log("P0005", [Filename]),
|
||||||
{noreply, State#state{manifest=Man0}};
|
ok = leveled_sst:sst_deleteconfirmed(Pid),
|
||||||
{false, _Pid} ->
|
M0 = leveled_manifest:delete_confirmed(State#state.manifest,
|
||||||
|
Filename),
|
||||||
|
{noreply, State#state{manifest=M0}};
|
||||||
|
{false, _Pid} ->
|
||||||
|
{noreply, State}
|
||||||
|
end;
|
||||||
|
true ->
|
||||||
|
% If there is ongoing work, then we can't safely update the pidmap
|
||||||
|
% as any change will be reverted when the manifest is passed back
|
||||||
|
% from the Clerk
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
|
handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
|
||||||
|
@ -524,7 +509,33 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
|
||||||
levelzero_constructor=undefined,
|
levelzero_constructor=undefined,
|
||||||
levelzero_size=0,
|
levelzero_size=0,
|
||||||
manifest=UpdMan,
|
manifest=UpdMan,
|
||||||
persisted_sqn=State#state.ledger_sqn}}.
|
persisted_sqn=State#state.ledger_sqn}};
|
||||||
|
handle_cast(work_for_clerk, State) ->
|
||||||
|
case State#state.levelzero_pending of
|
||||||
|
true ->
|
||||||
|
{noreply, State};
|
||||||
|
false ->
|
||||||
|
{WL, WC} = leveled_manifest:check_for_work(State#state.manifest,
|
||||||
|
?LEVEL_SCALEFACTOR),
|
||||||
|
case WC of
|
||||||
|
0 ->
|
||||||
|
{noreply, State#state{work_backlog=false}};
|
||||||
|
N when N > ?WORKQUEUE_BACKLOG_TOLERANCE ->
|
||||||
|
leveled_log:log("P0024", [N, true]),
|
||||||
|
[TL|_Tail] = WL,
|
||||||
|
ok = leveled_pclerk:clerk_push(State#state.clerk,
|
||||||
|
{TL, State#state.manifest}),
|
||||||
|
{noreply,
|
||||||
|
State#state{work_backlog=true, work_ongoing=true}};
|
||||||
|
N ->
|
||||||
|
leveled_log:log("P0024", [N, false]),
|
||||||
|
[TL|_Tail] = WL,
|
||||||
|
ok = leveled_pclerk:clerk_push(State#state.clerk,
|
||||||
|
{TL, State#state.manifest}),
|
||||||
|
{noreply,
|
||||||
|
State#state{work_backlog=false, work_ongoing=true}}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue