Merge pull request #165 from russelldb/rdb/stop-snapshot-cp

Stop snapshots when the bookie stops
This commit is contained in:
Martin Sumner 2018-09-06 12:01:52 +01:00 committed by GitHub
commit 01c16edf1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 2 deletions

View file

@ -53,6 +53,9 @@
root_path :: string() | undefined,
cdb_options :: #cdb_options{} | undefined,
start_snapshot = false :: boolean(),
%% so a snapshot can monitor the bookie and
%% terminate when it does
bookies_pid :: pid() | undefined,
source_inker :: pid() | undefined,
reload_strategy = [] :: list(),
waste_retention_period :: integer() | undefined,
@ -67,6 +70,9 @@
max_inmemory_tablesize :: integer() | undefined,
start_snapshot = false :: boolean(),
snapshot_query,
%% so a snapshot can monitor the bookie and
%% terminate when it does
bookies_pid :: pid() | undefined,
bookies_mem :: tuple() | undefined,
source_penciller :: pid() | undefined,
snapshot_longrunning = true :: boolean(),

View file

@ -931,11 +931,13 @@ snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) ->
source_penciller = Penciller,
snapshot_query = Query,
snapshot_longrunning = LongRunning,
bookies_pid = self(),
bookies_mem = BookiesMem},
{ok, LedgerSnapshot} = leveled_penciller:pcl_snapstart(PCLopts),
case SnapType of
store ->
InkerOpts = #inker_options{start_snapshot=true,
bookies_pid = self(),
source_inker=Inker},
{ok, JournalSnapshot} = leveled_inker:ink_snapstart(InkerOpts),
{ok, LedgerSnapshot, JournalSnapshot};

View file

@ -140,6 +140,7 @@
cdb_options :: #cdb_options{} | undefined,
clerk :: pid() | undefined,
compaction_pending = false :: boolean(),
bookie_monref :: reference() | undefined,
is_snapshot = false :: boolean(),
compression_method = native :: lz4|native,
compress_on_receipt = false :: boolean(),
@ -426,12 +427,17 @@ init([InkerOpts]) ->
case {InkerOpts#inker_options.root_path,
InkerOpts#inker_options.start_snapshot} of
{undefined, true} ->
%% monitor the bookie, and close the snapshot when bookie
%% exits
BookieMonitor = erlang:monitor(process, InkerOpts#inker_options.bookies_pid),
SrcInker = InkerOpts#inker_options.source_inker,
{Manifest,
ActiveJournalDB} = ink_registersnapshot(SrcInker, self()),
{ok, #state{manifest=Manifest,
active_journaldb=ActiveJournalDB,
source_inker=SrcInker,
bookie_monref=BookieMonitor,
is_snapshot=true}};
%% Need to do something about timeout
{_RootPath, false} ->
@ -570,6 +576,12 @@ handle_cast({release_snapshot, Snapshot}, State) ->
leveled_log:log("I0004", [length(Rs)]),
{noreply, State#state{registered_snapshots=Rs}}.
%% handle the bookie stopping and stop this snapshot
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
State=#state{bookie_monref = BookieMonRef}) ->
%% Monitor only registered on snapshots
ok = ink_releasesnapshot(State#state.source_inker, self()),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
@ -1299,4 +1311,43 @@ coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}),
{ok, _State1} = code_change(null, #state{}, null).
handle_down_test() ->
RootPath = "../test/journal",
build_dummy_journal(),
CDBopts = #cdb_options{max_size=300000, binary_mode=true},
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
cdb_options=CDBopts,
compression_method=native,
compress_on_receipt=true}),
FakeBookie = spawn(fun loop/0),
Mon = erlang:monitor(process, FakeBookie),
SnapOpts = #inker_options{start_snapshot=true,
bookies_pid = FakeBookie,
source_inker=Ink1},
{ok, Snap1} = ink_snapstart(SnapOpts),
FakeBookie ! stop,
receive
{'DOWN', Mon, process, FakeBookie, normal} ->
%% Now we know that inker should have received this too!
%% (better than timer:sleep/1)
ok
end,
?assertEqual(undefined, erlang:process_info(Snap1)),
ink_close(Ink1),
clean_testdir(RootPath).
loop() ->
receive
stop ->
ok
end.
-endif.

View file

@ -257,6 +257,7 @@
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
source_penciller :: pid() | undefined,
bookie_monref :: reference() | undefined,
levelzero_astree :: list() | undefined,
work_ongoing = false :: boolean(), % i.e. compaction work
@ -583,6 +584,10 @@ init([PCLopts]) ->
{undefined, _Snapshot=true, Query, BookiesMem} ->
SrcPenciller = PCLopts#penciller_options.source_penciller,
LongRunning = PCLopts#penciller_options.snapshot_longrunning,
%% monitor the bookie, and close the snapshot when bookie
%% exits
BookieMonitor = erlang:monitor(process, PCLopts#penciller_options.bookies_pid),
{ok, State} = pcl_registersnapshot(SrcPenciller,
self(),
Query,
@ -590,6 +595,7 @@ init([PCLopts]) ->
LongRunning),
leveled_log:log("P0001", [self()]),
{ok, State#state{is_snapshot=true,
bookie_monref = BookieMonitor,
source_penciller=SrcPenciller}};
{_RootPath, _Snapshot=false, _Q, _BM} ->
start_from_file(PCLopts)
@ -943,6 +949,11 @@ handle_cast(work_for_clerk, State) ->
end.
%% handle the bookie stopping and stop this snapshot
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
State=#state{bookie_monref = BookieMonRef}) ->
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.