Timeout long-running snapshots
Add logic to timeout long-running snapshots.
This commit is contained in:
parent
400f65f557
commit
4e9fa2a206
6 changed files with 86 additions and 20 deletions
|
@ -57,6 +57,7 @@
|
||||||
snapshot_query,
|
snapshot_query,
|
||||||
bookies_mem :: tuple(),
|
bookies_mem :: tuple(),
|
||||||
source_penciller :: pid(),
|
source_penciller :: pid(),
|
||||||
|
snapshot_longrunning = true :: boolean(),
|
||||||
levelzero_cointoss = false :: boolean()}).
|
levelzero_cointoss = false :: boolean()}).
|
||||||
|
|
||||||
-record(iclerk_options,
|
-record(iclerk_options,
|
||||||
|
|
|
@ -631,9 +631,21 @@ snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) ->
|
||||||
LedgerCache#ledger_cache.index,
|
LedgerCache#ledger_cache.index,
|
||||||
LedgerCache#ledger_cache.min_sqn,
|
LedgerCache#ledger_cache.min_sqn,
|
||||||
LedgerCache#ledger_cache.max_sqn},
|
LedgerCache#ledger_cache.max_sqn},
|
||||||
|
LongRunning =
|
||||||
|
case Query of
|
||||||
|
undefined ->
|
||||||
|
true;
|
||||||
|
no_lookup ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
% If a specific query has been defined, then not expected
|
||||||
|
% to be long running
|
||||||
|
false
|
||||||
|
end,
|
||||||
PCLopts = #penciller_options{start_snapshot = true,
|
PCLopts = #penciller_options{start_snapshot = true,
|
||||||
source_penciller = Penciller,
|
source_penciller = Penciller,
|
||||||
snapshot_query = Query,
|
snapshot_query = Query,
|
||||||
|
snapshot_longrunning = LongRunning,
|
||||||
bookies_mem = BookiesMem},
|
bookies_mem = BookiesMem},
|
||||||
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
||||||
case SnapType of
|
case SnapType of
|
||||||
|
|
|
@ -135,6 +135,12 @@
|
||||||
{info, "Garbage collection on manifest removes key for filename ~s"}},
|
{info, "Garbage collection on manifest removes key for filename ~s"}},
|
||||||
{"P0037",
|
{"P0037",
|
||||||
{debug, "Merging of penciller L0 tree from size ~w complete"}},
|
{debug, "Merging of penciller L0 tree from size ~w complete"}},
|
||||||
|
{"P0038",
|
||||||
|
{info, "Timeout of snapshot with pid=~w at SQN=~w at TS ~w "
|
||||||
|
++ "set to timeout=~w"}},
|
||||||
|
{"P0039",
|
||||||
|
{info, "Failed to relase pid=~w "
|
||||||
|
++ "leaving SnapshotCount=~w and MinSQN=~w"}},
|
||||||
|
|
||||||
{"PC001",
|
{"PC001",
|
||||||
{info, "Penciller's clerk ~w started with owner ~w"}},
|
{info, "Penciller's clerk ~w started with owner ~w"}},
|
||||||
|
|
|
@ -185,7 +185,7 @@
|
||||||
pcl_close/1,
|
pcl_close/1,
|
||||||
pcl_doom/1,
|
pcl_doom/1,
|
||||||
pcl_releasesnapshot/2,
|
pcl_releasesnapshot/2,
|
||||||
pcl_registersnapshot/4,
|
pcl_registersnapshot/5,
|
||||||
pcl_getstartupsequencenumber/1]).
|
pcl_getstartupsequencenumber/1]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -214,7 +214,8 @@
|
||||||
-define(COIN_SIDECOUNT, 5).
|
-define(COIN_SIDECOUNT, 5).
|
||||||
-define(SLOW_FETCH, 20000).
|
-define(SLOW_FETCH, 20000).
|
||||||
-define(ITERATOR_SCANWIDTH, 4).
|
-define(ITERATOR_SCANWIDTH, 4).
|
||||||
-define(SNAPSHOT_TIMEOUT, 3600).
|
-define(SNAPSHOT_TIMEOUT_LONG, 3600).
|
||||||
|
-define(SNAPSHOT_TIMEOUT_SHORT, 600).
|
||||||
|
|
||||||
-record(state, {manifest, % a manifest record from the leveled_manifest module
|
-record(state, {manifest, % a manifest record from the leveled_manifest module
|
||||||
persisted_sqn = 0 :: integer(), % The highest SQN persisted
|
persisted_sqn = 0 :: integer(), % The highest SQN persisted
|
||||||
|
@ -305,9 +306,9 @@ pcl_confirmdelete(Pid, FileName, FilePid) ->
|
||||||
pcl_getstartupsequencenumber(Pid) ->
|
pcl_getstartupsequencenumber(Pid) ->
|
||||||
gen_server:call(Pid, get_startup_sqn, infinity).
|
gen_server:call(Pid, get_startup_sqn, infinity).
|
||||||
|
|
||||||
pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem) ->
|
pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) ->
|
||||||
gen_server:call(Pid,
|
gen_server:call(Pid,
|
||||||
{register_snapshot, Snapshot, Query, BookiesMem},
|
{register_snapshot, Snapshot, Query, BookiesMem, LR},
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
pcl_releasesnapshot(Pid, Snapshot) ->
|
pcl_releasesnapshot(Pid, Snapshot) ->
|
||||||
|
@ -332,7 +333,12 @@ init([PCLopts]) ->
|
||||||
PCLopts#penciller_options.bookies_mem} of
|
PCLopts#penciller_options.bookies_mem} of
|
||||||
{undefined, true, Query, BookiesMem} ->
|
{undefined, true, Query, BookiesMem} ->
|
||||||
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
||||||
{ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, BookiesMem),
|
LongRunning = PCLopts#penciller_options.snapshot_longrunning,
|
||||||
|
{ok, State} = pcl_registersnapshot(SrcPenciller,
|
||||||
|
self(),
|
||||||
|
Query,
|
||||||
|
BookiesMem,
|
||||||
|
LongRunning),
|
||||||
leveled_log:log("P0001", [self()]),
|
leveled_log:log("P0001", [self()]),
|
||||||
{ok, State#state{is_snapshot=true,
|
{ok, State#state{is_snapshot=true,
|
||||||
source_penciller=SrcPenciller}};
|
source_penciller=SrcPenciller}};
|
||||||
|
@ -444,16 +450,24 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
|
||||||
{reply, Acc, State#state{levelzero_astree = L0AsList}};
|
{reply, Acc, State#state{levelzero_astree = L0AsList}};
|
||||||
handle_call(get_startup_sqn, _From, State) ->
|
handle_call(get_startup_sqn, _From, State) ->
|
||||||
{reply, State#state.persisted_sqn, State};
|
{reply, State#state.persisted_sqn, State};
|
||||||
handle_call({register_snapshot, Snapshot, Query, BookiesMem}, _From, State) ->
|
handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) ->
|
||||||
% Register and load a snapshot
|
% Register and load a snapshot
|
||||||
%
|
%
|
||||||
% For setup of the snapshot to be efficient should pass a query
|
% For setup of the snapshot to be efficient should pass a query
|
||||||
% of (StartKey, EndKey) - this will avoid a fully copy of the penciller's
|
% of (StartKey, EndKey) - this will avoid a fully copy of the penciller's
|
||||||
% memory being required to be trasnferred to the clone. However, this
|
% memory being required to be trasnferred to the clone. However, this
|
||||||
% will not be a valid clone for fetch
|
% will not be a valid clone for fetch
|
||||||
|
Timeout =
|
||||||
|
case LR of
|
||||||
|
true ->
|
||||||
|
?SNAPSHOT_TIMEOUT_LONG;
|
||||||
|
false ->
|
||||||
|
?SNAPSHOT_TIMEOUT_SHORT
|
||||||
|
end,
|
||||||
|
|
||||||
Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest,
|
Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest,
|
||||||
Snapshot,
|
Snapshot,
|
||||||
?SNAPSHOT_TIMEOUT),
|
Timeout),
|
||||||
|
|
||||||
{BookieIncrTree, BookieIdx, MinSQN, MaxSQN} = BookiesMem,
|
{BookieIncrTree, BookieIdx, MinSQN, MaxSQN} = BookiesMem,
|
||||||
LM1Cache =
|
LM1Cache =
|
||||||
|
|
|
@ -54,6 +54,7 @@
|
||||||
-define(MAX_LEVELS, 8).
|
-define(MAX_LEVELS, 8).
|
||||||
-define(TREE_TYPE, idxt).
|
-define(TREE_TYPE, idxt).
|
||||||
-define(TREE_WIDTH, 8).
|
-define(TREE_WIDTH, 8).
|
||||||
|
-define(PHANTOM_PID, r2d_fail).
|
||||||
|
|
||||||
-record(manifest, {levels,
|
-record(manifest, {levels,
|
||||||
% an array of lists or trees representing the manifest
|
% an array of lists or trees representing the manifest
|
||||||
|
@ -266,9 +267,7 @@ mergefile_selector(Manifest, LevelIdx) ->
|
||||||
ME.
|
ME.
|
||||||
|
|
||||||
add_snapshot(Manifest, Pid, Timeout) ->
|
add_snapshot(Manifest, Pid, Timeout) ->
|
||||||
{MegaNow, SecNow, _} = os:timestamp(),
|
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout},
|
||||||
TimeToTimeout = MegaNow * 1000000 + SecNow + Timeout,
|
|
||||||
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, TimeToTimeout},
|
|
||||||
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
|
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
|
||||||
ManSQN = Manifest#manifest.manifest_sqn,
|
ManSQN = Manifest#manifest.manifest_sqn,
|
||||||
case Manifest#manifest.min_snapshot_sqn of
|
case Manifest#manifest.min_snapshot_sqn of
|
||||||
|
@ -283,22 +282,34 @@ add_snapshot(Manifest, Pid, Timeout) ->
|
||||||
|
|
||||||
release_snapshot(Manifest, Pid) ->
|
release_snapshot(Manifest, Pid) ->
|
||||||
FilterFun =
|
FilterFun =
|
||||||
fun({P, SQN, TS}, {Acc, MinSQN}) ->
|
fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) ->
|
||||||
case P of
|
case P of
|
||||||
Pid ->
|
Pid ->
|
||||||
{Acc, MinSQN};
|
{Acc, MinSQN, true};
|
||||||
_ ->
|
_ ->
|
||||||
{[{P, SQN, TS}|Acc], min(SQN, MinSQN)}
|
case seconds_now() > (ST + TO) of
|
||||||
|
true ->
|
||||||
|
leveled_log:log("P0038", [P, SQN, ST, TO]),
|
||||||
|
{Acc, MinSQN, Found};
|
||||||
|
false ->
|
||||||
|
{[{P, SQN, ST, TO}|Acc], min(SQN, MinSQN), Found}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{SnapList0, MinSnapSQN} = lists:foldl(FilterFun,
|
{SnapList0, MinSnapSQN, Hit} = lists:foldl(FilterFun,
|
||||||
{[], infinity},
|
{[], infinity, false},
|
||||||
Manifest#manifest.snapshots),
|
Manifest#manifest.snapshots),
|
||||||
|
case Hit of
|
||||||
|
false ->
|
||||||
|
leveled_log:log("P0039", [Pid, length(SnapList0), MinSnapSQN]);
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
case SnapList0 of
|
case SnapList0 of
|
||||||
[] ->
|
[] ->
|
||||||
Manifest#manifest{snapshots = SnapList0,
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
min_snapshot_sqn = 0};
|
min_snapshot_sqn = 0};
|
||||||
_ ->
|
_ ->
|
||||||
leveled_log:log("P0004", [SnapList0]),
|
leveled_log:log("P0004", [SnapList0]),
|
||||||
Manifest#manifest{snapshots = SnapList0,
|
Manifest#manifest{snapshots = SnapList0,
|
||||||
min_snapshot_sqn = MinSnapSQN}
|
min_snapshot_sqn = MinSnapSQN}
|
||||||
|
@ -317,7 +328,10 @@ ready_to_delete(Manifest, Filename) ->
|
||||||
PDs = dict:erase(Filename, Manifest#manifest.pending_deletes),
|
PDs = dict:erase(Filename, Manifest#manifest.pending_deletes),
|
||||||
{true, Manifest#manifest{pending_deletes = PDs}};
|
{true, Manifest#manifest{pending_deletes = PDs}};
|
||||||
_N ->
|
_N ->
|
||||||
{false, Manifest}
|
% If failed to delete then we should release a phantom pid
|
||||||
|
% in case this is necessary to timeout any snapshots
|
||||||
|
% This wll also trigger a log
|
||||||
|
{false, release_snapshot(Manifest, ?PHANTOM_PID)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_for_work(Manifest, Thresholds) ->
|
check_for_work(Manifest, Thresholds) ->
|
||||||
|
@ -627,6 +641,10 @@ open_manifestfile(RootPath, [TopManSQN|Rest]) ->
|
||||||
open_manifestfile(RootPath, Rest)
|
open_manifestfile(RootPath, Rest)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
seconds_now() ->
|
||||||
|
{MegaNow, SecNow, _} = os:timestamp(),
|
||||||
|
MegaNow * 1000000 + SecNow.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
|
@ -974,5 +992,18 @@ snapshot_release_test() ->
|
||||||
?assertMatch(true, Bool19).
|
?assertMatch(true, Bool19).
|
||||||
|
|
||||||
|
|
||||||
|
snapshot_timeout_test() ->
|
||||||
|
Man6 = element(7, initial_setup()),
|
||||||
|
Man7 = add_snapshot(Man6, "pid_a1", 3600),
|
||||||
|
?assertMatch(1, length(Man7#manifest.snapshots)),
|
||||||
|
Man8 = release_snapshot(Man7, "pid_a1"),
|
||||||
|
?assertMatch(0, length(Man8#manifest.snapshots)),
|
||||||
|
Man9 = add_snapshot(Man8, "pid_a1", 0),
|
||||||
|
timer:sleep(2001),
|
||||||
|
?assertMatch(1, length(Man9#manifest.snapshots)),
|
||||||
|
Man10 = release_snapshot(Man9, ?PHANTOM_PID),
|
||||||
|
?assertMatch(0, length(Man10#manifest.snapshots)).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
|
@ -410,7 +410,9 @@ delete_pending(timeout, State) ->
|
||||||
ok = leveled_penciller:pcl_confirmdelete(State#state.penciller,
|
ok = leveled_penciller:pcl_confirmdelete(State#state.penciller,
|
||||||
State#state.filename,
|
State#state.filename,
|
||||||
self()),
|
self()),
|
||||||
{next_state, delete_pending, State, ?DELETE_TIMEOUT};
|
% If the next thing is another timeout - may be long-running snapshot, so
|
||||||
|
% back-off
|
||||||
|
{next_state, delete_pending, State, random:uniform(10) * ?DELETE_TIMEOUT};
|
||||||
delete_pending(close, State) ->
|
delete_pending(close, State) ->
|
||||||
leveled_log:log("SST07", [State#state.filename]),
|
leveled_log:log("SST07", [State#state.filename]),
|
||||||
ok = file:close(State#state.handle),
|
ok = file:close(State#state.handle),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue