diff --git a/include/leveled.hrl b/include/leveled.hrl index 64f0dfe..1c3b17e 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -53,6 +53,9 @@ root_path :: string() | undefined, cdb_options :: #cdb_options{} | undefined, start_snapshot = false :: boolean(), + %% so a snapshot can monitor the bookie and + %% terminate when it does + bookies_pid :: pid() | undefined, source_inker :: pid() | undefined, reload_strategy = [] :: list(), waste_retention_period :: integer() | undefined, @@ -67,6 +70,9 @@ max_inmemory_tablesize :: integer() | undefined, start_snapshot = false :: boolean(), snapshot_query, + %% so a snapshot can monitor the bookie and + %% terminate when it does + bookies_pid :: pid() | undefined, bookies_mem :: tuple() | undefined, source_penciller :: pid() | undefined, snapshot_longrunning = true :: boolean(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 901f6b8..971e3df 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -931,12 +931,14 @@ snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) -> source_penciller = Penciller, snapshot_query = Query, snapshot_longrunning = LongRunning, + bookies_pid = self(), bookies_mem = BookiesMem}, {ok, LedgerSnapshot} = leveled_penciller:pcl_snapstart(PCLopts), case SnapType of store -> InkerOpts = #inker_options{start_snapshot=true, - source_inker=Inker}, + bookies_pid = self(), + source_inker=Inker}, {ok, JournalSnapshot} = leveled_inker:ink_snapstart(InkerOpts), {ok, LedgerSnapshot, JournalSnapshot}; ledger -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index a03494d..50544ae 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -140,6 +140,7 @@ cdb_options :: #cdb_options{} | undefined, clerk :: pid() | undefined, compaction_pending = false :: boolean(), + bookie_monref :: reference() | undefined, is_snapshot = false :: boolean(), compression_method = native :: lz4|native, compress_on_receipt = false :: boolean(), @@ -426,12 +427,17 @@ init([InkerOpts]) -> case {InkerOpts#inker_options.root_path, InkerOpts#inker_options.start_snapshot} of {undefined, true} -> + %% monitor the bookie, and close the snapshot when bookie + %% exits + BookieMonitor = erlang:monitor(process, InkerOpts#inker_options.bookies_pid), + SrcInker = InkerOpts#inker_options.source_inker, {Manifest, ActiveJournalDB} = ink_registersnapshot(SrcInker, self()), {ok, #state{manifest=Manifest, active_journaldb=ActiveJournalDB, source_inker=SrcInker, + bookie_monref=BookieMonitor, is_snapshot=true}}; %% Need to do something about timeout {_RootPath, false} -> @@ -570,6 +576,12 @@ handle_cast({release_snapshot, Snapshot}, State) -> leveled_log:log("I0004", [length(Rs)]), {noreply, State#state{registered_snapshots=Rs}}. +%% handle the bookie stopping and stop this snapshot +handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, + State=#state{bookie_monref = BookieMonRef}) -> + %% Monitor only registered on snapshots + ok = ink_releasesnapshot(State#state.source_inker, self()), + {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. @@ -1299,4 +1311,43 @@ coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null). +handle_down_test() -> + RootPath = "../test/journal", + build_dummy_journal(), + CDBopts = #cdb_options{max_size=300000, binary_mode=true}, + {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, + cdb_options=CDBopts, + compression_method=native, + compress_on_receipt=true}), + + FakeBookie = spawn(fun loop/0), + + Mon = erlang:monitor(process, FakeBookie), + + SnapOpts = #inker_options{start_snapshot=true, + bookies_pid = FakeBookie, + source_inker=Ink1}, + + {ok, Snap1} = ink_snapstart(SnapOpts), + + FakeBookie ! stop, + + receive + {'DOWN', Mon, process, FakeBookie, normal} -> + %% Now we know that inker should have received this too! + %% (better than timer:sleep/1) + ok + end, + + ?assertEqual(undefined, erlang:process_info(Snap1)), + + ink_close(Ink1), + clean_testdir(RootPath). + +loop() -> + receive + stop -> + ok + end. + -endif. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index ca0a5b6..aa63e69 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -257,6 +257,7 @@ is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid() | undefined, + bookie_monref :: reference() | undefined, levelzero_astree :: list() | undefined, work_ongoing = false :: boolean(), % i.e. compaction work @@ -583,6 +584,10 @@ init([PCLopts]) -> {undefined, _Snapshot=true, Query, BookiesMem} -> SrcPenciller = PCLopts#penciller_options.source_penciller, LongRunning = PCLopts#penciller_options.snapshot_longrunning, + %% monitor the bookie, and close the snapshot when bookie + %% exits + BookieMonitor = erlang:monitor(process, PCLopts#penciller_options.bookies_pid), + {ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, @@ -590,7 +595,8 @@ init([PCLopts]) -> LongRunning), leveled_log:log("P0001", [self()]), {ok, State#state{is_snapshot=true, - source_penciller=SrcPenciller}}; + bookie_monref = BookieMonitor, + source_penciller=SrcPenciller}}; {_RootPath, _Snapshot=false, _Q, _BM} -> start_from_file(PCLopts) end. @@ -943,6 +949,11 @@ handle_cast(work_for_clerk, State) -> end. +%% handle the bookie stopping and stop this snapshot +handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, + State=#state{bookie_monref = BookieMonRef}) -> + ok = pcl_releasesnapshot(State#state.source_penciller, self()), + {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}.