Stop snapshots when the bookie stops
During EQC testing it was found that snapshots are still usable even if the bookie process crashes. This change has snapshots monitor the bookie and close when the bookie process dies.
This commit is contained in:
parent
a1269e5274
commit
ef9ac672e5
4 changed files with 72 additions and 2 deletions
|
@ -53,6 +53,9 @@
|
||||||
root_path :: string() | undefined,
|
root_path :: string() | undefined,
|
||||||
cdb_options :: #cdb_options{} | undefined,
|
cdb_options :: #cdb_options{} | undefined,
|
||||||
start_snapshot = false :: boolean(),
|
start_snapshot = false :: boolean(),
|
||||||
|
%% so a snapshot can monitor the bookie and
|
||||||
|
%% terminate when it does
|
||||||
|
bookies_pid :: pid() | undefined,
|
||||||
source_inker :: pid() | undefined,
|
source_inker :: pid() | undefined,
|
||||||
reload_strategy = [] :: list(),
|
reload_strategy = [] :: list(),
|
||||||
waste_retention_period :: integer() | undefined,
|
waste_retention_period :: integer() | undefined,
|
||||||
|
@ -67,6 +70,9 @@
|
||||||
max_inmemory_tablesize :: integer() | undefined,
|
max_inmemory_tablesize :: integer() | undefined,
|
||||||
start_snapshot = false :: boolean(),
|
start_snapshot = false :: boolean(),
|
||||||
snapshot_query,
|
snapshot_query,
|
||||||
|
%% so a snapshot can monitor the bookie and
|
||||||
|
%% terminate when it does
|
||||||
|
bookies_pid :: pid() | undefined,
|
||||||
bookies_mem :: tuple() | undefined,
|
bookies_mem :: tuple() | undefined,
|
||||||
source_penciller :: pid() | undefined,
|
source_penciller :: pid() | undefined,
|
||||||
snapshot_longrunning = true :: boolean(),
|
snapshot_longrunning = true :: boolean(),
|
||||||
|
|
|
@ -931,11 +931,13 @@ snapshot_store(LedgerCache, Penciller, Inker, SnapType, Query, LongRunning) ->
|
||||||
source_penciller = Penciller,
|
source_penciller = Penciller,
|
||||||
snapshot_query = Query,
|
snapshot_query = Query,
|
||||||
snapshot_longrunning = LongRunning,
|
snapshot_longrunning = LongRunning,
|
||||||
|
bookies_pid = self(),
|
||||||
bookies_mem = BookiesMem},
|
bookies_mem = BookiesMem},
|
||||||
{ok, LedgerSnapshot} = leveled_penciller:pcl_snapstart(PCLopts),
|
{ok, LedgerSnapshot} = leveled_penciller:pcl_snapstart(PCLopts),
|
||||||
case SnapType of
|
case SnapType of
|
||||||
store ->
|
store ->
|
||||||
InkerOpts = #inker_options{start_snapshot=true,
|
InkerOpts = #inker_options{start_snapshot=true,
|
||||||
|
bookies_pid = self(),
|
||||||
source_inker=Inker},
|
source_inker=Inker},
|
||||||
{ok, JournalSnapshot} = leveled_inker:ink_snapstart(InkerOpts),
|
{ok, JournalSnapshot} = leveled_inker:ink_snapstart(InkerOpts),
|
||||||
{ok, LedgerSnapshot, JournalSnapshot};
|
{ok, LedgerSnapshot, JournalSnapshot};
|
||||||
|
|
|
@ -140,6 +140,7 @@
|
||||||
cdb_options :: #cdb_options{} | undefined,
|
cdb_options :: #cdb_options{} | undefined,
|
||||||
clerk :: pid() | undefined,
|
clerk :: pid() | undefined,
|
||||||
compaction_pending = false :: boolean(),
|
compaction_pending = false :: boolean(),
|
||||||
|
bookie_monref :: reference() | undefined,
|
||||||
is_snapshot = false :: boolean(),
|
is_snapshot = false :: boolean(),
|
||||||
compression_method = native :: lz4|native,
|
compression_method = native :: lz4|native,
|
||||||
compress_on_receipt = false :: boolean(),
|
compress_on_receipt = false :: boolean(),
|
||||||
|
@ -426,12 +427,17 @@ init([InkerOpts]) ->
|
||||||
case {InkerOpts#inker_options.root_path,
|
case {InkerOpts#inker_options.root_path,
|
||||||
InkerOpts#inker_options.start_snapshot} of
|
InkerOpts#inker_options.start_snapshot} of
|
||||||
{undefined, true} ->
|
{undefined, true} ->
|
||||||
|
%% monitor the bookie, and close the snapshot when bookie
|
||||||
|
%% exits
|
||||||
|
BookieMonitor = erlang:monitor(process, InkerOpts#inker_options.bookies_pid),
|
||||||
|
|
||||||
SrcInker = InkerOpts#inker_options.source_inker,
|
SrcInker = InkerOpts#inker_options.source_inker,
|
||||||
{Manifest,
|
{Manifest,
|
||||||
ActiveJournalDB} = ink_registersnapshot(SrcInker, self()),
|
ActiveJournalDB} = ink_registersnapshot(SrcInker, self()),
|
||||||
{ok, #state{manifest=Manifest,
|
{ok, #state{manifest=Manifest,
|
||||||
active_journaldb=ActiveJournalDB,
|
active_journaldb=ActiveJournalDB,
|
||||||
source_inker=SrcInker,
|
source_inker=SrcInker,
|
||||||
|
bookie_monref=BookieMonitor,
|
||||||
is_snapshot=true}};
|
is_snapshot=true}};
|
||||||
%% Need to do something about timeout
|
%% Need to do something about timeout
|
||||||
{_RootPath, false} ->
|
{_RootPath, false} ->
|
||||||
|
@ -570,6 +576,12 @@ handle_cast({release_snapshot, Snapshot}, State) ->
|
||||||
leveled_log:log("I0004", [length(Rs)]),
|
leveled_log:log("I0004", [length(Rs)]),
|
||||||
{noreply, State#state{registered_snapshots=Rs}}.
|
{noreply, State#state{registered_snapshots=Rs}}.
|
||||||
|
|
||||||
|
%% handle the bookie stopping and stop this snapshot
|
||||||
|
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
|
||||||
|
State=#state{bookie_monref = BookieMonRef}) ->
|
||||||
|
%% Monitor only registered on snapshots
|
||||||
|
ok = ink_releasesnapshot(State#state.source_inker, self()),
|
||||||
|
{stop, normal, State};
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
@ -1299,4 +1311,43 @@ coverage_cheat_test() ->
|
||||||
{noreply, _State0} = handle_info(timeout, #state{}),
|
{noreply, _State0} = handle_info(timeout, #state{}),
|
||||||
{ok, _State1} = code_change(null, #state{}, null).
|
{ok, _State1} = code_change(null, #state{}, null).
|
||||||
|
|
||||||
|
handle_down_test() ->
|
||||||
|
RootPath = "../test/journal",
|
||||||
|
build_dummy_journal(),
|
||||||
|
CDBopts = #cdb_options{max_size=300000, binary_mode=true},
|
||||||
|
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||||
|
cdb_options=CDBopts,
|
||||||
|
compression_method=native,
|
||||||
|
compress_on_receipt=true}),
|
||||||
|
|
||||||
|
FakeBookie = spawn(fun loop/0),
|
||||||
|
|
||||||
|
Mon = erlang:monitor(process, FakeBookie),
|
||||||
|
|
||||||
|
SnapOpts = #inker_options{start_snapshot=true,
|
||||||
|
bookies_pid = FakeBookie,
|
||||||
|
source_inker=Ink1},
|
||||||
|
|
||||||
|
{ok, Snap1} = ink_snapstart(SnapOpts),
|
||||||
|
|
||||||
|
FakeBookie ! stop,
|
||||||
|
|
||||||
|
receive
|
||||||
|
{'DOWN', Mon, process, FakeBookie, normal} ->
|
||||||
|
%% Now we know that inker should have received this too!
|
||||||
|
%% (better than timer:sleep/1)
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
?assertEqual(undefined, erlang:process_info(Snap1)),
|
||||||
|
|
||||||
|
ink_close(Ink1),
|
||||||
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
loop() ->
|
||||||
|
receive
|
||||||
|
stop ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -257,6 +257,7 @@
|
||||||
is_snapshot = false :: boolean(),
|
is_snapshot = false :: boolean(),
|
||||||
snapshot_fully_loaded = false :: boolean(),
|
snapshot_fully_loaded = false :: boolean(),
|
||||||
source_penciller :: pid() | undefined,
|
source_penciller :: pid() | undefined,
|
||||||
|
bookie_monref :: reference() | undefined,
|
||||||
levelzero_astree :: list() | undefined,
|
levelzero_astree :: list() | undefined,
|
||||||
|
|
||||||
work_ongoing = false :: boolean(), % i.e. compaction work
|
work_ongoing = false :: boolean(), % i.e. compaction work
|
||||||
|
@ -583,6 +584,10 @@ init([PCLopts]) ->
|
||||||
{undefined, _Snapshot=true, Query, BookiesMem} ->
|
{undefined, _Snapshot=true, Query, BookiesMem} ->
|
||||||
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
||||||
LongRunning = PCLopts#penciller_options.snapshot_longrunning,
|
LongRunning = PCLopts#penciller_options.snapshot_longrunning,
|
||||||
|
%% monitor the bookie, and close the snapshot when bookie
|
||||||
|
%% exits
|
||||||
|
BookieMonitor = erlang:monitor(process, PCLopts#penciller_options.bookies_pid),
|
||||||
|
|
||||||
{ok, State} = pcl_registersnapshot(SrcPenciller,
|
{ok, State} = pcl_registersnapshot(SrcPenciller,
|
||||||
self(),
|
self(),
|
||||||
Query,
|
Query,
|
||||||
|
@ -590,6 +595,7 @@ init([PCLopts]) ->
|
||||||
LongRunning),
|
LongRunning),
|
||||||
leveled_log:log("P0001", [self()]),
|
leveled_log:log("P0001", [self()]),
|
||||||
{ok, State#state{is_snapshot=true,
|
{ok, State#state{is_snapshot=true,
|
||||||
|
bookie_monref = BookieMonitor,
|
||||||
source_penciller=SrcPenciller}};
|
source_penciller=SrcPenciller}};
|
||||||
{_RootPath, _Snapshot=false, _Q, _BM} ->
|
{_RootPath, _Snapshot=false, _Q, _BM} ->
|
||||||
start_from_file(PCLopts)
|
start_from_file(PCLopts)
|
||||||
|
@ -943,6 +949,11 @@ handle_cast(work_for_clerk, State) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
%% handle the bookie stopping and stop this snapshot
|
||||||
|
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
|
||||||
|
State=#state{bookie_monref = BookieMonRef}) ->
|
||||||
|
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
|
||||||
|
{stop, normal, State};
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue