From ef9ac672e5630421f293914c8ec2b90aba01b843 Mon Sep 17 00:00:00 2001 From: Russell Brown Date: Thu, 16 Aug 2018 10:37:30 +0100 Subject: [PATCH] Stop snapshots when the bookie stops During EQC testing it was found that snapshots are still usable even if the bookie process crashes. This change has snapshots monitor the bookie and close when the bookie process dies. --- include/leveled.hrl | 6 +++++ src/leveled_bookie.erl | 4 ++- src/leveled_inker.erl | 51 +++++++++++++++++++++++++++++++++++++++ src/leveled_penciller.erl | 13 +++++++++- 4 files changed, 72 insertions(+), 2 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 64f0dfe..1c3b17e 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -53,6 +53,9 @@ root_path :: string() | undefined, cdb_options :: #cdb_options{} | undefined, start_snapshot = false :: boolean(), + %% so a snapshot can monitor the bookie and + %% terminate when it does + bookies_pid :: pid() | undefined, source_inker :: pid() | undefined, reload_strategy = [] :: list(), waste_retention_period :: integer() | undefined, @@ -67,6 +70,9 @@ max_inmemory_tablesize :: integer() | undefined, start_snapshot = false :: boolean(), snapshot_query, + %% so a snapshot can monitor the bookie and + %% terminate when it does + bookies_pid :: pid() | undefined, bookies_mem :: tuple() | undefined, source_penciller :: pid() | undefined, snapshot_longrunning = true :: boolean(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 901f6b8..971e3df 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -931,12 +931,14 @@ snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) -> source_penciller = Penciller, snapshot_query = Query, snapshot_longrunning = LongRunning, + bookies_pid = self(), bookies_mem = BookiesMem}, {ok, LedgerSnapshot} = leveled_penciller:pcl_snapstart(PCLopts), case SnapType of store -> InkerOpts = #inker_options{start_snapshot=true, - source_inker=Inker}, + bookies_pid = self(), + source_inker=Inker}, {ok, JournalSnapshot} = leveled_inker:ink_snapstart(InkerOpts), {ok, LedgerSnapshot, JournalSnapshot}; ledger -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index a03494d..50544ae 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -140,6 +140,7 @@ cdb_options :: #cdb_options{} | undefined, clerk :: pid() | undefined, compaction_pending = false :: boolean(), + bookie_monref :: reference() | undefined, is_snapshot = false :: boolean(), compression_method = native :: lz4|native, compress_on_receipt = false :: boolean(), @@ -426,12 +427,17 @@ init([InkerOpts]) -> case {InkerOpts#inker_options.root_path, InkerOpts#inker_options.start_snapshot} of {undefined, true} -> + %% monitor the bookie, and close the snapshot when bookie + %% exits + BookieMonitor = erlang:monitor(process, InkerOpts#inker_options.bookies_pid), + SrcInker = InkerOpts#inker_options.source_inker, {Manifest, ActiveJournalDB} = ink_registersnapshot(SrcInker, self()), {ok, #state{manifest=Manifest, active_journaldb=ActiveJournalDB, source_inker=SrcInker, + bookie_monref=BookieMonitor, is_snapshot=true}}; %% Need to do something about timeout {_RootPath, false} -> @@ -570,6 +576,12 @@ handle_cast({release_snapshot, Snapshot}, State) -> leveled_log:log("I0004", [length(Rs)]), {noreply, State#state{registered_snapshots=Rs}}. +%% handle the bookie stopping and stop this snapshot +handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, + State=#state{bookie_monref = BookieMonRef}) -> + %% Monitor only registered on snapshots + ok = ink_releasesnapshot(State#state.source_inker, self()), + {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. @@ -1299,4 +1311,43 @@ coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null). +handle_down_test() -> + RootPath = "../test/journal", + build_dummy_journal(), + CDBopts = #cdb_options{max_size=300000, binary_mode=true}, + {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, + cdb_options=CDBopts, + compression_method=native, + compress_on_receipt=true}), + + FakeBookie = spawn(fun loop/0), + + Mon = erlang:monitor(process, FakeBookie), + + SnapOpts = #inker_options{start_snapshot=true, + bookies_pid = FakeBookie, + source_inker=Ink1}, + + {ok, Snap1} = ink_snapstart(SnapOpts), + + FakeBookie ! stop, + + receive + {'DOWN', Mon, process, FakeBookie, normal} -> + %% Now we know that inker should have received this too! + %% (better than timer:sleep/1) + ok + end, + + ?assertEqual(undefined, erlang:process_info(Snap1)), + + ink_close(Ink1), + clean_testdir(RootPath). + +loop() -> + receive + stop -> + ok + end. + -endif. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index ca0a5b6..aa63e69 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -257,6 +257,7 @@ is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid() | undefined, + bookie_monref :: reference() | undefined, levelzero_astree :: list() | undefined, work_ongoing = false :: boolean(), % i.e. compaction work @@ -583,6 +584,10 @@ init([PCLopts]) -> {undefined, _Snapshot=true, Query, BookiesMem} -> SrcPenciller = PCLopts#penciller_options.source_penciller, LongRunning = PCLopts#penciller_options.snapshot_longrunning, + %% monitor the bookie, and close the snapshot when bookie + %% exits + BookieMonitor = erlang:monitor(process, PCLopts#penciller_options.bookies_pid), + {ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, @@ -590,7 +595,8 @@ init([PCLopts]) -> LongRunning), leveled_log:log("P0001", [self()]), {ok, State#state{is_snapshot=true, - source_penciller=SrcPenciller}}; + bookie_monref = BookieMonitor, + source_penciller=SrcPenciller}}; {_RootPath, _Snapshot=false, _Q, _BM} -> start_from_file(PCLopts) end. @@ -943,6 +949,11 @@ handle_cast(work_for_clerk, State) -> end. +%% handle the bookie stopping and stop this snapshot +handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, + State=#state{bookie_monref = BookieMonRef}) -> + ok = pcl_releasesnapshot(State#state.source_penciller, self()), + {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}.