Make deletion confirmation an async message (#344)
Currently a sync call is made from cdb to inker wen confirming deletion (checking no snapshots have an outstanding requirement to access the file), whereas an async call is made from sst to penciller to achieve the same thing. Due to the potential of timeouts and crashes - the cdb/inker delete confirmation process is now based on async message passing, making it consistent with the sst/penciller delete confirmation process.
This commit is contained in:
parent
2cb87f3a6e
commit
ec52b16cfd
2 changed files with 50 additions and 41 deletions
|
@ -115,7 +115,8 @@
|
||||||
cdb_isrolling/1,
|
cdb_isrolling/1,
|
||||||
cdb_clerkcomplete/1,
|
cdb_clerkcomplete/1,
|
||||||
cdb_getcachedscore/2,
|
cdb_getcachedscore/2,
|
||||||
cdb_putcachedscore/2]).
|
cdb_putcachedscore/2,
|
||||||
|
cdb_deleteconfirmed/1]).
|
||||||
|
|
||||||
-export([finished_rolling/1,
|
-export([finished_rolling/1,
|
||||||
hashtable_calc/2]).
|
hashtable_calc/2]).
|
||||||
|
@ -323,6 +324,12 @@ cdb_directfetch(Pid, PositionList, Info) ->
|
||||||
cdb_close(Pid) ->
|
cdb_close(Pid) ->
|
||||||
gen_fsm:sync_send_all_state_event(Pid, cdb_close, infinity).
|
gen_fsm:sync_send_all_state_event(Pid, cdb_close, infinity).
|
||||||
|
|
||||||
|
-spec cdb_deleteconfirmed(pid()) -> ok.
|
||||||
|
%% @doc
|
||||||
|
%% Delete has been confirmed, so close (state should be delete_pending)
|
||||||
|
cdb_deleteconfirmed(Pid) ->
|
||||||
|
gen_fsm:send_event(Pid, delete_confirmed).
|
||||||
|
|
||||||
-spec cdb_complete(pid()) -> {ok, string()}.
|
-spec cdb_complete(pid()) -> {ok, string()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Persists the hashtable to the end of the file, to close it for further
|
%% Persists the hashtable to the end of the file, to close it for further
|
||||||
|
@ -762,19 +769,11 @@ delete_pending({key_check, Key}, _From, State) ->
|
||||||
delete_pending(timeout, State=#state{delete_point=ManSQN}) when ManSQN > 0 ->
|
delete_pending(timeout, State=#state{delete_point=ManSQN}) when ManSQN > 0 ->
|
||||||
case is_process_alive(State#state.inker) of
|
case is_process_alive(State#state.inker) of
|
||||||
true ->
|
true ->
|
||||||
case leveled_inker:ink_confirmdelete(State#state.inker, ManSQN) of
|
ok =
|
||||||
true ->
|
leveled_inker:ink_confirmdelete(State#state.inker,
|
||||||
leveled_log:log("CDB04", [State#state.filename, ManSQN]),
|
ManSQN,
|
||||||
close_pendingdelete(State#state.handle,
|
self()),
|
||||||
State#state.filename,
|
{next_state, delete_pending, State, ?DELETE_TIMEOUT};
|
||||||
State#state.waste_path),
|
|
||||||
{stop, normal, State};
|
|
||||||
false ->
|
|
||||||
{next_state,
|
|
||||||
delete_pending,
|
|
||||||
State,
|
|
||||||
?DELETE_TIMEOUT}
|
|
||||||
end;
|
|
||||||
false ->
|
false ->
|
||||||
leveled_log:log("CDB04", [State#state.filename, ManSQN]),
|
leveled_log:log("CDB04", [State#state.filename, ManSQN]),
|
||||||
close_pendingdelete(State#state.handle,
|
close_pendingdelete(State#state.handle,
|
||||||
|
@ -782,6 +781,12 @@ delete_pending(timeout, State=#state{delete_point=ManSQN}) when ManSQN > 0 ->
|
||||||
State#state.waste_path),
|
State#state.waste_path),
|
||||||
{stop, normal, State}
|
{stop, normal, State}
|
||||||
end;
|
end;
|
||||||
|
delete_pending(delete_confirmed, State=#state{delete_point=ManSQN}) ->
|
||||||
|
leveled_log:log("CDB04", [State#state.filename, ManSQN]),
|
||||||
|
close_pendingdelete(State#state.handle,
|
||||||
|
State#state.filename,
|
||||||
|
State#state.waste_path),
|
||||||
|
{stop, normal, State};
|
||||||
delete_pending(destroy, State) ->
|
delete_pending(destroy, State) ->
|
||||||
leveled_log:log("CDB05", [State#state.filename, delete_pending, destroy]),
|
leveled_log:log("CDB05", [State#state.filename, delete_pending, destroy]),
|
||||||
close_pendingdelete(State#state.handle,
|
close_pendingdelete(State#state.handle,
|
||||||
|
|
|
@ -103,7 +103,7 @@
|
||||||
ink_fold/4,
|
ink_fold/4,
|
||||||
ink_loadpcl/5,
|
ink_loadpcl/5,
|
||||||
ink_registersnapshot/2,
|
ink_registersnapshot/2,
|
||||||
ink_confirmdelete/2,
|
ink_confirmdelete/3,
|
||||||
ink_compactjournal/3,
|
ink_compactjournal/3,
|
||||||
ink_clerkcomplete/3,
|
ink_clerkcomplete/3,
|
||||||
ink_compactionpending/1,
|
ink_compactionpending/1,
|
||||||
|
@ -266,12 +266,12 @@ ink_registersnapshot(Pid, Requestor) ->
|
||||||
ink_releasesnapshot(Pid, Snapshot) ->
|
ink_releasesnapshot(Pid, Snapshot) ->
|
||||||
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
||||||
|
|
||||||
-spec ink_confirmdelete(pid(), integer()) -> boolean().
|
-spec ink_confirmdelete(pid(), integer(), pid()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Confirm if a Journal CDB file can be deleted, as it has been set to delete
|
%% Confirm if a Journal CDB file can be deleted, as it has been set to delete
|
||||||
%% and is no longer in use by any snapshots
|
%% and is no longer in use by any snapshots
|
||||||
ink_confirmdelete(Pid, ManSQN) ->
|
ink_confirmdelete(Pid, ManSQN, CDBpid) ->
|
||||||
gen_server:call(Pid, {confirm_delete, ManSQN}).
|
gen_server:cast(Pid, {confirm_delete, ManSQN, CDBpid}).
|
||||||
|
|
||||||
-spec ink_close(pid()) -> ok.
|
-spec ink_close(pid()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -542,29 +542,6 @@ handle_call({register_snapshot, Requestor},
|
||||||
State#state.active_journaldb,
|
State#state.active_journaldb,
|
||||||
State#state.journal_sqn},
|
State#state.journal_sqn},
|
||||||
State#state{registered_snapshots=Rs}};
|
State#state{registered_snapshots=Rs}};
|
||||||
handle_call({confirm_delete, ManSQN}, _From, State) ->
|
|
||||||
% Check there are no snapshots that may be aware of the file process that
|
|
||||||
% is waiting to delete itself.
|
|
||||||
CheckSQNFun =
|
|
||||||
fun({_R, _TS, SnapSQN}, Bool) ->
|
|
||||||
% If the Snapshot SQN was at the same point the file was set to
|
|
||||||
% delete (or after), then the snapshot would not have been told
|
|
||||||
% of the file, and the snapshot should not hold up its deletion
|
|
||||||
(SnapSQN >= ManSQN) and Bool
|
|
||||||
end,
|
|
||||||
CheckSnapshotExpiryFun =
|
|
||||||
fun({_R, TS, _SnapSQN}) ->
|
|
||||||
Expiry = leveled_util:integer_time(TS) + State#state.snap_timeout,
|
|
||||||
% If Expiry has passed this will be false, and the snapshot
|
|
||||||
% will be removed from the list of registered snapshots and
|
|
||||||
% so will not longer block deletes
|
|
||||||
leveled_util:integer_now() < Expiry
|
|
||||||
end,
|
|
||||||
RegisteredSnapshots0 =
|
|
||||||
lists:filter(CheckSnapshotExpiryFun, State#state.registered_snapshots),
|
|
||||||
{reply,
|
|
||||||
lists:foldl(CheckSQNFun, true, RegisteredSnapshots0),
|
|
||||||
State#state{registered_snapshots = RegisteredSnapshots0}};
|
|
||||||
handle_call(get_manifest, _From, State) ->
|
handle_call(get_manifest, _From, State) ->
|
||||||
{reply, leveled_imanifest:to_list(State#state.manifest), State};
|
{reply, leveled_imanifest:to_list(State#state.manifest), State};
|
||||||
handle_call(print_manifest, _From, State) ->
|
handle_call(print_manifest, _From, State) ->
|
||||||
|
@ -734,6 +711,33 @@ handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
|
||||||
manifest_sqn=NewManifestSQN,
|
manifest_sqn=NewManifestSQN,
|
||||||
pending_removals=FilesToDelete,
|
pending_removals=FilesToDelete,
|
||||||
compaction_pending=false}};
|
compaction_pending=false}};
|
||||||
|
handle_cast({confirm_delete, ManSQN, CDB}, State) ->
|
||||||
|
% Check there are no snapshots that may be aware of the file process that
|
||||||
|
% is waiting to delete itself.
|
||||||
|
CheckSQNFun =
|
||||||
|
fun({_R, _TS, SnapSQN}, Bool) ->
|
||||||
|
% If the Snapshot SQN was at the same point the file was set to
|
||||||
|
% delete (or after), then the snapshot would not have been told
|
||||||
|
% of the file, and the snapshot should not hold up its deletion
|
||||||
|
(SnapSQN >= ManSQN) and Bool
|
||||||
|
end,
|
||||||
|
CheckSnapshotExpiryFun =
|
||||||
|
fun({_R, TS, _SnapSQN}) ->
|
||||||
|
Expiry = leveled_util:integer_time(TS) + State#state.snap_timeout,
|
||||||
|
% If Expiry has passed this will be false, and the snapshot
|
||||||
|
% will be removed from the list of registered snapshots and
|
||||||
|
% so will not longer block deletes
|
||||||
|
leveled_util:integer_now() < Expiry
|
||||||
|
end,
|
||||||
|
RegisteredSnapshots0 =
|
||||||
|
lists:filter(CheckSnapshotExpiryFun, State#state.registered_snapshots),
|
||||||
|
case lists:foldl(CheckSQNFun, true, RegisteredSnapshots0) of
|
||||||
|
true ->
|
||||||
|
leveled_cdb:cdb_deleteconfirmed(CDB);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{noreply, State#state{registered_snapshots = RegisteredSnapshots0}};
|
||||||
handle_cast({release_snapshot, Snapshot}, State) ->
|
handle_cast({release_snapshot, Snapshot}, State) ->
|
||||||
leveled_log:log("I0003", [Snapshot]),
|
leveled_log:log("I0003", [Snapshot]),
|
||||||
case lists:keydelete(Snapshot, 1, State#state.registered_snapshots) of
|
case lists:keydelete(Snapshot, 1, State#state.registered_snapshots) of
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue