diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 1233a1b..c255943 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -37,12 +37,13 @@ -export([ clerk_new/2, clerk_prompt/1, + clerk_push/2, clerk_close/1 ]). -include_lib("eunit/include/eunit.hrl"). --define(MAX_TIMEOUT, 1000). +-define(MAX_TIMEOUT, 2000). -define(MIN_TIMEOUT, 200). -record(state, {owner :: pid(), @@ -61,6 +62,9 @@ clerk_new(Owner, Manifest) -> clerk_prompt(Pid) -> gen_server:cast(Pid, prompt). +clerk_push(Pid, Work) -> + gen_server:cast(Pid, {push_work, Work}). + clerk_close(Pid) -> gen_server:call(Pid, close, 20000). @@ -77,18 +81,14 @@ handle_call(close, _From, State) -> {stop, normal, ok, State}. handle_cast(prompt, State) -> - handle_info(timeout, State). - + handle_info(timeout, State); +handle_cast({push_work, Work}, State) -> + handle_work(Work, State), + {noreply, State, ?MIN_TIMEOUT}. handle_info(timeout, State) -> - case requestandhandle_work(State) of - false -> - {noreply, State, ?MAX_TIMEOUT}; - true -> - % No timeout now as will wait for call to return manifest - % change - {noreply, State, ?MIN_TIMEOUT} - end. + request_work(State), + {noreply, State, ?MAX_TIMEOUT}. terminate(Reason, _State) -> leveled_log:log("PC005", [self(), Reason]). @@ -101,24 +101,19 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -requestandhandle_work(State) -> - case leveled_penciller:pcl_workforclerk(State#state.owner) of - none -> - leveled_log:log("PC006", []), - false; - {SrcLevel, Manifest} -> - {UpdManifest, EntriesToDelete} = merge(SrcLevel, - Manifest, - State#state.root_path), - leveled_log:log("PC007", []), - ok = leveled_penciller:pcl_manifestchange(State#state.owner, - UpdManifest), - ok = leveled_manifest:save_manifest(UpdManifest, - State#state.root_path), - ok = notify_deletions(EntriesToDelete, State#state.owner), - true - end. +request_work(State) -> + ok = leveled_penciller:pcl_workforclerk(State#state.owner). +handle_work({SrcLevel, Manifest}, State) -> + {UpdManifest, EntriesToDelete} = merge(SrcLevel, + Manifest, + State#state.root_path), + leveled_log:log("PC007", []), + ok = leveled_penciller:pcl_manifestchange(State#state.owner, + UpdManifest), + ok = leveled_manifest:save_manifest(UpdManifest, + State#state.root_path), + ok = notify_deletions(EntriesToDelete, State#state.owner). merge(SrcLevel, Manifest, RootPath) -> Src = leveled_manifest:mergefile_selector(Manifest, SrcLevel), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index d506264..b340f7d 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -289,7 +289,7 @@ pcl_checksequencenumber(Pid, Key, SQN) -> end. pcl_workforclerk(Pid) -> - gen_server:call(Pid, work_for_clerk, infinity). + gen_server:cast(Pid, work_for_clerk). pcl_manifestchange(Pid, Manifest) -> gen_server:cast(Pid, {manifest_change, Manifest}). @@ -428,30 +428,6 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, MaxKeys), {reply, Acc, State#state{levelzero_astree = L0AsList}}; -handle_call(work_for_clerk, _From, State) -> - case State#state.levelzero_pending of - true -> - {reply, none, State}; - false -> - {WL, WC} = leveled_manifest:check_for_work(State#state.manifest, - ?LEVEL_SCALEFACTOR), - case WC of - 0 -> - {reply, none, State#state{work_backlog=false}}; - N when N > ?WORKQUEUE_BACKLOG_TOLERANCE -> - leveled_log:log("P0024", [N, true]), - [TL|_Tail] = WL, - {reply, - {TL, State#state.manifest}, - State#state{work_backlog=true, work_ongoing=true}}; - N -> - leveled_log:log("P0024", [N, false]), - [TL|_Tail] = WL, - {reply, - {TL, State#state.manifest}, - State#state{work_backlog=false, work_ongoing=true}} - end - end; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> @@ -493,15 +469,24 @@ handle_cast({release_snapshot, Snapshot}, State) -> {noreply, State#state{manifest=Manifest0}}; handle_cast({confirm_delete, Filename}, State=#state{is_snapshot=Snap}) when Snap == false -> - R2D = leveled_manifest:ready_to_delete(State#state.manifest, Filename), - case R2D of - {true, Pid} -> - leveled_log:log("P0005", [Filename]), - ok = leveled_sst:sst_deleteconfirmed(Pid), - Man0 = leveled_manifest:delete_confirmed(State#state.manifest, - Filename), - {noreply, State#state{manifest=Man0}}; - {false, _Pid} -> + case State#state.work_ongoing of + false -> + R2D = leveled_manifest:ready_to_delete(State#state.manifest, + Filename), + case R2D of + {true, Pid} -> + leveled_log:log("P0005", [Filename]), + ok = leveled_sst:sst_deleteconfirmed(Pid), + M0 = leveled_manifest:delete_confirmed(State#state.manifest, + Filename), + {noreply, State#state{manifest=M0}}; + {false, _Pid} -> + {noreply, State} + end; + true -> + % If there is ongoing work, then we can't safely update the pidmap + % as any change will be reverted when the manifest is passed back + % from the Clerk {noreply, State} end; handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> @@ -524,7 +509,33 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> levelzero_constructor=undefined, levelzero_size=0, manifest=UpdMan, - persisted_sqn=State#state.ledger_sqn}}. + persisted_sqn=State#state.ledger_sqn}}; +handle_cast(work_for_clerk, State) -> + case State#state.levelzero_pending of + true -> + {noreply, State}; + false -> + {WL, WC} = leveled_manifest:check_for_work(State#state.manifest, + ?LEVEL_SCALEFACTOR), + case WC of + 0 -> + {noreply, State#state{work_backlog=false}}; + N when N > ?WORKQUEUE_BACKLOG_TOLERANCE -> + leveled_log:log("P0024", [N, true]), + [TL|_Tail] = WL, + ok = leveled_pclerk:clerk_push(State#state.clerk, + {TL, State#state.manifest}), + {noreply, + State#state{work_backlog=true, work_ongoing=true}}; + N -> + leveled_log:log("P0024", [N, false]), + [TL|_Tail] = WL, + ok = leveled_pclerk:clerk_push(State#state.clerk, + {TL, State#state.manifest}), + {noreply, + State#state{work_backlog=false, work_ongoing=true}} + end + end. handle_info(_Info, State) ->