Add timeout for inker snapshots

So that they can also be released if they silently crash without closing neatly.
This commit is contained in:
Martin Sumner 2018-12-14 13:53:36 +00:00
parent 8bf36214e1
commit 2741c46daa
4 changed files with 47 additions and 14 deletions

View file

@ -151,6 +151,7 @@
is_snapshot = false :: boolean(),
compression_method = native :: lz4|native,
compress_on_receipt = false :: boolean(),
snap_timeout :: pos_integer() | undefined, % in seconds
source_inker :: pid() | undefined}).
@ -541,6 +542,7 @@ handle_call({fold,
end;
handle_call({register_snapshot, Requestor}, _From , State) ->
Rs = [{Requestor,
os:timestamp(),
State#state.manifest_sqn}|State#state.registered_snapshots],
leveled_log:log("I0002", [Requestor, State#state.manifest_sqn]),
{reply, {State#state.manifest,
@ -548,13 +550,28 @@ handle_call({register_snapshot, Requestor}, _From , State) ->
State#state.journal_sqn},
State#state{registered_snapshots=Rs}};
handle_call({confirm_delete, ManSQN}, _From, State) ->
% Check there are no snapshots that may be aware of the file process that
% is waiting to delete itself.
CheckSQNFun =
fun({_R, SnapSQN}, Bool) ->
fun({_R, _TS, SnapSQN}, Bool) ->
% If the Snapshot SQN was at the same point the file was set to
% delete (or after), then the snapshot would not have been told
% of the file, and the snapshot should not hold up its deletion
(SnapSQN >= ManSQN) and Bool
end,
CheckSnapshotExpiryFun =
fun({_R, TS, _SnapSQN}) ->
Expiry = leveled_util:integer_time(TS) + State#state.snap_timeout,
% If Expiry has passed this will be false, and the snapshot
% will be removed from the list of registered snapshots and
% so will not longer block deletes
leveled_util:integer_now() < Expiry
end,
RegisteredSnapshots0 =
lists:filter(CheckSnapshotExpiryFun, State#state.registered_snapshots),
{reply,
lists:foldl(CheckSQNFun, true, State#state.registered_snapshots),
State};
lists:foldl(CheckSQNFun, true, RegisteredSnapshots0),
State#state{registered_snapshots = RegisteredSnapshots0}};
handle_call(get_manifest, _From, State) ->
{reply, leveled_imanifest:to_list(State#state.manifest), State};
handle_call({update_manifest,
@ -791,6 +808,8 @@ start_from_file(InkOpts) ->
MRL_CompactPerc = InkOpts#inker_options.maxrunlength_compactionperc,
PressMethod = InkOpts#inker_options.compression_method,
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
SnapTimeout = InkOpts#inker_options.snaptimeout_long,
IClerkOpts =
#iclerk_options{inker = self(),
cdb_options=IClerkCDBOpts,
@ -799,8 +818,7 @@ start_from_file(InkOpts) ->
compression_method = PressMethod,
max_run_length = MRL,
singlefile_compactionperc = SFL_CompactPerc,
maxrunlength_compactionperc = MRL_CompactPerc
},
maxrunlength_compactionperc = MRL_CompactPerc},
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
@ -821,6 +839,7 @@ start_from_file(InkOpts) ->
cdb_options = CDBopts,
compression_method = PressMethod,
compress_on_receipt = PressOnReceipt,
snap_timeout = SnapTimeout,
clerk = Clerk}}.