From f868be80862b5333a88b35bd8f0cb47c05b105ee Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sun, 15 Jan 2017 00:52:43 +0000 Subject: [PATCH] Change Penciller to Clerk handoff correct the previous change to make sure that deletes are not confirmed when work is outstanding, but also make sure that the clerk only ever casts the Penciller so no deadlocks can ensue. --- src/leveled_pclerk.erl | 51 +++++++++++------------- src/leveled_penciller.erl | 81 ++++++++++++++++++++++----------------- 2 files changed, 69 insertions(+), 63 deletions(-) diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 1233a1b..c255943 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -37,12 +37,13 @@ -export([ clerk_new/2, clerk_prompt/1, + clerk_push/2, clerk_close/1 ]). -include_lib("eunit/include/eunit.hrl"). --define(MAX_TIMEOUT, 1000). +-define(MAX_TIMEOUT, 2000). -define(MIN_TIMEOUT, 200). -record(state, {owner :: pid(), @@ -61,6 +62,9 @@ clerk_new(Owner, Manifest) -> clerk_prompt(Pid) -> gen_server:cast(Pid, prompt). +clerk_push(Pid, Work) -> + gen_server:cast(Pid, {push_work, Work}). + clerk_close(Pid) -> gen_server:call(Pid, close, 20000). @@ -77,18 +81,14 @@ handle_call(close, _From, State) -> {stop, normal, ok, State}. handle_cast(prompt, State) -> - handle_info(timeout, State). - + handle_info(timeout, State); +handle_cast({push_work, Work}, State) -> + handle_work(Work, State), + {noreply, State, ?MIN_TIMEOUT}. handle_info(timeout, State) -> - case requestandhandle_work(State) of - false -> - {noreply, State, ?MAX_TIMEOUT}; - true -> - % No timeout now as will wait for call to return manifest - % change - {noreply, State, ?MIN_TIMEOUT} - end. + request_work(State), + {noreply, State, ?MAX_TIMEOUT}. terminate(Reason, _State) -> leveled_log:log("PC005", [self(), Reason]). @@ -101,24 +101,19 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -requestandhandle_work(State) -> - case leveled_penciller:pcl_workforclerk(State#state.owner) of - none -> - leveled_log:log("PC006", []), - false; - {SrcLevel, Manifest} -> - {UpdManifest, EntriesToDelete} = merge(SrcLevel, - Manifest, - State#state.root_path), - leveled_log:log("PC007", []), - ok = leveled_penciller:pcl_manifestchange(State#state.owner, - UpdManifest), - ok = leveled_manifest:save_manifest(UpdManifest, - State#state.root_path), - ok = notify_deletions(EntriesToDelete, State#state.owner), - true - end. +request_work(State) -> + ok = leveled_penciller:pcl_workforclerk(State#state.owner). +handle_work({SrcLevel, Manifest}, State) -> + {UpdManifest, EntriesToDelete} = merge(SrcLevel, + Manifest, + State#state.root_path), + leveled_log:log("PC007", []), + ok = leveled_penciller:pcl_manifestchange(State#state.owner, + UpdManifest), + ok = leveled_manifest:save_manifest(UpdManifest, + State#state.root_path), + ok = notify_deletions(EntriesToDelete, State#state.owner). merge(SrcLevel, Manifest, RootPath) -> Src = leveled_manifest:mergefile_selector(Manifest, SrcLevel), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index d506264..b340f7d 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -289,7 +289,7 @@ pcl_checksequencenumber(Pid, Key, SQN) -> end. pcl_workforclerk(Pid) -> - gen_server:call(Pid, work_for_clerk, infinity). + gen_server:cast(Pid, work_for_clerk). pcl_manifestchange(Pid, Manifest) -> gen_server:cast(Pid, {manifest_change, Manifest}). @@ -428,30 +428,6 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, MaxKeys), {reply, Acc, State#state{levelzero_astree = L0AsList}}; -handle_call(work_for_clerk, _From, State) -> - case State#state.levelzero_pending of - true -> - {reply, none, State}; - false -> - {WL, WC} = leveled_manifest:check_for_work(State#state.manifest, - ?LEVEL_SCALEFACTOR), - case WC of - 0 -> - {reply, none, State#state{work_backlog=false}}; - N when N > ?WORKQUEUE_BACKLOG_TOLERANCE -> - leveled_log:log("P0024", [N, true]), - [TL|_Tail] = WL, - {reply, - {TL, State#state.manifest}, - State#state{work_backlog=true, work_ongoing=true}}; - N -> - leveled_log:log("P0024", [N, false]), - [TL|_Tail] = WL, - {reply, - {TL, State#state.manifest}, - State#state{work_backlog=false, work_ongoing=true}} - end - end; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> @@ -493,15 +469,24 @@ handle_cast({release_snapshot, Snapshot}, State) -> {noreply, State#state{manifest=Manifest0}}; handle_cast({confirm_delete, Filename}, State=#state{is_snapshot=Snap}) when Snap == false -> - R2D = leveled_manifest:ready_to_delete(State#state.manifest, Filename), - case R2D of - {true, Pid} -> - leveled_log:log("P0005", [Filename]), - ok = leveled_sst:sst_deleteconfirmed(Pid), - Man0 = leveled_manifest:delete_confirmed(State#state.manifest, - Filename), - {noreply, State#state{manifest=Man0}}; - {false, _Pid} -> + case State#state.work_ongoing of + false -> + R2D = leveled_manifest:ready_to_delete(State#state.manifest, + Filename), + case R2D of + {true, Pid} -> + leveled_log:log("P0005", [Filename]), + ok = leveled_sst:sst_deleteconfirmed(Pid), + M0 = leveled_manifest:delete_confirmed(State#state.manifest, + Filename), + {noreply, State#state{manifest=M0}}; + {false, _Pid} -> + {noreply, State} + end; + true -> + % If there is ongoing work, then we can't safely update the pidmap + % as any change will be reverted when the manifest is passed back + % from the Clerk {noreply, State} end; handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> @@ -524,7 +509,33 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> levelzero_constructor=undefined, levelzero_size=0, manifest=UpdMan, - persisted_sqn=State#state.ledger_sqn}}. + persisted_sqn=State#state.ledger_sqn}}; +handle_cast(work_for_clerk, State) -> + case State#state.levelzero_pending of + true -> + {noreply, State}; + false -> + {WL, WC} = leveled_manifest:check_for_work(State#state.manifest, + ?LEVEL_SCALEFACTOR), + case WC of + 0 -> + {noreply, State#state{work_backlog=false}}; + N when N > ?WORKQUEUE_BACKLOG_TOLERANCE -> + leveled_log:log("P0024", [N, true]), + [TL|_Tail] = WL, + ok = leveled_pclerk:clerk_push(State#state.clerk, + {TL, State#state.manifest}), + {noreply, + State#state{work_backlog=true, work_ongoing=true}}; + N -> + leveled_log:log("P0024", [N, false]), + [TL|_Tail] = WL, + ok = leveled_pclerk:clerk_push(State#state.clerk, + {TL, State#state.manifest}), + {noreply, + State#state{work_backlog=false, work_ongoing=true}} + end + end. handle_info(_Info, State) ->