From 4e9fa2a206df50b06625e853ade8ee1c59c88e0a Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 5 Apr 2017 09:16:01 +0100 Subject: [PATCH] Timeout long-running snapshots Add logic to timeout long-running snapshots. --- include/leveled.hrl | 1 + src/leveled_bookie.erl | 12 +++++++++ src/leveled_log.erl | 6 +++++ src/leveled_penciller.erl | 28 +++++++++++++++----- src/leveled_pmanifest.erl | 55 ++++++++++++++++++++++++++++++--------- src/leveled_sst.erl | 4 ++- 6 files changed, 86 insertions(+), 20 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 5db22d3..f6b0294 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -57,6 +57,7 @@ snapshot_query, bookies_mem :: tuple(), source_penciller :: pid(), + snapshot_longrunning = true :: boolean(), levelzero_cointoss = false :: boolean()}). -record(iclerk_options, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 81a5a36..14a2b55 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -631,9 +631,21 @@ snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) -> LedgerCache#ledger_cache.index, LedgerCache#ledger_cache.min_sqn, LedgerCache#ledger_cache.max_sqn}, + LongRunning = + case Query of + undefined -> + true; + no_lookup -> + true; + _ -> + % If a specific query has been defined, then not expected + % to be long running + false + end, PCLopts = #penciller_options{start_snapshot = true, source_penciller = Penciller, snapshot_query = Query, + snapshot_longrunning = LongRunning, bookies_mem = BookiesMem}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), case SnapType of diff --git a/src/leveled_log.erl b/src/leveled_log.erl index a65b587..d435c4f 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -135,6 +135,12 @@ {info, "Garbage collection on manifest removes key for filename ~s"}}, {"P0037", {debug, "Merging of penciller L0 tree from size ~w complete"}}, + {"P0038", + {info, "Timeout of snapshot with pid=~w at SQN=~w at TS ~w " + ++ "set to timeout=~w"}}, + {"P0039", + {info, "Failed to relase pid=~w " + ++ "leaving SnapshotCount=~w and MinSQN=~w"}}, {"PC001", {info, "Penciller's clerk ~w started with owner ~w"}}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index fd242f1..9994991 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -185,7 +185,7 @@ pcl_close/1, pcl_doom/1, pcl_releasesnapshot/2, - pcl_registersnapshot/4, + pcl_registersnapshot/5, pcl_getstartupsequencenumber/1]). -export([ @@ -214,7 +214,8 @@ -define(COIN_SIDECOUNT, 5). -define(SLOW_FETCH, 20000). -define(ITERATOR_SCANWIDTH, 4). --define(SNAPSHOT_TIMEOUT, 3600). +-define(SNAPSHOT_TIMEOUT_LONG, 3600). +-define(SNAPSHOT_TIMEOUT_SHORT, 600). -record(state, {manifest, % a manifest record from the leveled_manifest module persisted_sqn = 0 :: integer(), % The highest SQN persisted @@ -305,9 +306,9 @@ pcl_confirmdelete(Pid, FileName, FilePid) -> pcl_getstartupsequencenumber(Pid) -> gen_server:call(Pid, get_startup_sqn, infinity). -pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem) -> +pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) -> gen_server:call(Pid, - {register_snapshot, Snapshot, Query, BookiesMem}, + {register_snapshot, Snapshot, Query, BookiesMem, LR}, infinity). pcl_releasesnapshot(Pid, Snapshot) -> @@ -332,7 +333,12 @@ init([PCLopts]) -> PCLopts#penciller_options.bookies_mem} of {undefined, true, Query, BookiesMem} -> SrcPenciller = PCLopts#penciller_options.source_penciller, - {ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, BookiesMem), + LongRunning = PCLopts#penciller_options.snapshot_longrunning, + {ok, State} = pcl_registersnapshot(SrcPenciller, + self(), + Query, + BookiesMem, + LongRunning), leveled_log:log("P0001", [self()]), {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; @@ -444,16 +450,24 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, {reply, Acc, State#state{levelzero_astree = L0AsList}}; handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; -handle_call({register_snapshot, Snapshot, Query, BookiesMem}, _From, State) -> +handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) -> % Register and load a snapshot % % For setup of the snapshot to be efficient should pass a query % of (StartKey, EndKey) - this will avoid a fully copy of the penciller's % memory being required to be trasnferred to the clone. However, this % will not be a valid clone for fetch + Timeout = + case LR of + true -> + ?SNAPSHOT_TIMEOUT_LONG; + false -> + ?SNAPSHOT_TIMEOUT_SHORT + end, + Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest, Snapshot, - ?SNAPSHOT_TIMEOUT), + Timeout), {BookieIncrTree, BookieIdx, MinSQN, MaxSQN} = BookiesMem, LM1Cache = diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index 5cd3ef0..61e227e 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -54,6 +54,7 @@ -define(MAX_LEVELS, 8). -define(TREE_TYPE, idxt). -define(TREE_WIDTH, 8). +-define(PHANTOM_PID, r2d_fail). -record(manifest, {levels, % an array of lists or trees representing the manifest @@ -266,9 +267,7 @@ mergefile_selector(Manifest, LevelIdx) -> ME. add_snapshot(Manifest, Pid, Timeout) -> - {MegaNow, SecNow, _} = os:timestamp(), - TimeToTimeout = MegaNow * 1000000 + SecNow + Timeout, - SnapEntry = {Pid, Manifest#manifest.manifest_sqn, TimeToTimeout}, + SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout}, SnapList0 = [SnapEntry|Manifest#manifest.snapshots], ManSQN = Manifest#manifest.manifest_sqn, case Manifest#manifest.min_snapshot_sqn of @@ -283,22 +282,34 @@ add_snapshot(Manifest, Pid, Timeout) -> release_snapshot(Manifest, Pid) -> FilterFun = - fun({P, SQN, TS}, {Acc, MinSQN}) -> + fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) -> case P of Pid -> - {Acc, MinSQN}; + {Acc, MinSQN, true}; _ -> - {[{P, SQN, TS}|Acc], min(SQN, MinSQN)} + case seconds_now() > (ST + TO) of + true -> + leveled_log:log("P0038", [P, SQN, ST, TO]), + {Acc, MinSQN, Found}; + false -> + {[{P, SQN, ST, TO}|Acc], min(SQN, MinSQN), Found} + end end end, - {SnapList0, MinSnapSQN} = lists:foldl(FilterFun, - {[], infinity}, - Manifest#manifest.snapshots), + {SnapList0, MinSnapSQN, Hit} = lists:foldl(FilterFun, + {[], infinity, false}, + Manifest#manifest.snapshots), + case Hit of + false -> + leveled_log:log("P0039", [Pid, length(SnapList0), MinSnapSQN]); + true -> + ok + end, case SnapList0 of [] -> Manifest#manifest{snapshots = SnapList0, min_snapshot_sqn = 0}; - _ -> + _ -> leveled_log:log("P0004", [SnapList0]), Manifest#manifest{snapshots = SnapList0, min_snapshot_sqn = MinSnapSQN} @@ -317,7 +328,10 @@ ready_to_delete(Manifest, Filename) -> PDs = dict:erase(Filename, Manifest#manifest.pending_deletes), {true, Manifest#manifest{pending_deletes = PDs}}; _N -> - {false, Manifest} + % If failed to delete then we should release a phantom pid + % in case this is necessary to timeout any snapshots + % This wll also trigger a log + {false, release_snapshot(Manifest, ?PHANTOM_PID)} end. check_for_work(Manifest, Thresholds) -> @@ -627,6 +641,10 @@ open_manifestfile(RootPath, [TopManSQN|Rest]) -> open_manifestfile(RootPath, Rest) end. +seconds_now() -> + {MegaNow, SecNow, _} = os:timestamp(), + MegaNow * 1000000 + SecNow. + %%%============================================================================ %%% Test @@ -973,6 +991,19 @@ snapshot_release_test() -> {Bool19, _Man20} = ready_to_delete(Man19, "Z3"), ?assertMatch(true, Bool19). - + +snapshot_timeout_test() -> + Man6 = element(7, initial_setup()), + Man7 = add_snapshot(Man6, "pid_a1", 3600), + ?assertMatch(1, length(Man7#manifest.snapshots)), + Man8 = release_snapshot(Man7, "pid_a1"), + ?assertMatch(0, length(Man8#manifest.snapshots)), + Man9 = add_snapshot(Man8, "pid_a1", 0), + timer:sleep(2001), + ?assertMatch(1, length(Man9#manifest.snapshots)), + Man10 = release_snapshot(Man9, ?PHANTOM_PID), + ?assertMatch(0, length(Man10#manifest.snapshots)). + + -endif. \ No newline at end of file diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 5fd85ca..73896ff 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -410,7 +410,9 @@ delete_pending(timeout, State) -> ok = leveled_penciller:pcl_confirmdelete(State#state.penciller, State#state.filename, self()), - {next_state, delete_pending, State, ?DELETE_TIMEOUT}; + % If the next thing is another timeout - may be long-running snapshot, so + % back-off + {next_state, delete_pending, State, random:uniform(10) * ?DELETE_TIMEOUT}; delete_pending(close, State) -> leveled_log:log("SST07", [State#state.filename]), ok = file:close(State#state.handle),