Close in stages - waiting for releases (#411)
* Close in stages - waiting for releases Have a consistent approach to closing the inker and the penciller - so that the close can be interrupted by releasing of snapshots. Then any unreleased snapshots are closed before shutdown - with a 10s pause to give queries a short opportunity to finish. This should address some issues, primarily seen (but very rarely) in test whereby post-rebuild destruction of parallel AAE keystores cause the crashing of aae_folds. The primary benefit is to stop an attempt to release a snapshot that has in fact already finished does not cause a crash of the database on normal stop. this was primarily an issue when shutdown is delayed by an ongoing journal compaction job. * Boost default test budget for EQC * Update test to use correct type * Update following review Avoid filtering out exited PIDs when closing snapshots by catching the exit exception when the Pid is down
This commit is contained in:
parent
bc87273c76
commit
7a5cf251b3
6 changed files with 256 additions and 88 deletions
|
@ -133,6 +133,11 @@
|
||||||
-define(JOURNAL_FILEX, "cdb").
|
-define(JOURNAL_FILEX, "cdb").
|
||||||
-define(PENDING_FILEX, "pnd").
|
-define(PENDING_FILEX, "pnd").
|
||||||
-define(TEST_KC, {[], infinity}).
|
-define(TEST_KC, {[], infinity}).
|
||||||
|
-define(SHUTDOWN_PAUSE, 10000).
|
||||||
|
% How long to wait for snapshots to be released on shutdown
|
||||||
|
% before forcing closure of snapshots
|
||||||
|
% 10s may not be long enough for all snapshots, but avoids crashes of
|
||||||
|
% short-lived queries racing with the shutdown
|
||||||
|
|
||||||
-record(state, {manifest = [] :: list(),
|
-record(state, {manifest = [] :: list(),
|
||||||
manifest_sqn = 0 :: integer(),
|
manifest_sqn = 0 :: integer(),
|
||||||
|
@ -281,6 +286,18 @@ ink_confirmdelete(Pid, ManSQN, CDBpid) ->
|
||||||
ink_close(Pid) ->
|
ink_close(Pid) ->
|
||||||
gen_server:call(Pid, close, infinity).
|
gen_server:call(Pid, close, infinity).
|
||||||
|
|
||||||
|
-spec ink_snapclose(pid()) -> ok.
|
||||||
|
%% @doc
|
||||||
|
%% Specifically to be used when closing snpashots on shutdown, will handle a
|
||||||
|
%% scenario where a snapshot has already exited
|
||||||
|
ink_snapclose(Pid) ->
|
||||||
|
try
|
||||||
|
ink_close(Pid)
|
||||||
|
catch
|
||||||
|
exit:{noproc, _CallDetails} ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
-spec ink_doom(pid()) -> {ok, [{string(), string(), string(), string()}]}.
|
-spec ink_doom(pid()) -> {ok, [{string(), string(), string(), string()}]}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Test function used to close a file, and return all file paths (potentially
|
%% Test function used to close a file, and return all file paths (potentially
|
||||||
|
@ -654,33 +671,23 @@ handle_call({check_sqn, LedgerSQN}, _From, State) ->
|
||||||
end;
|
end;
|
||||||
handle_call(get_journalsqn, _From, State) ->
|
handle_call(get_journalsqn, _From, State) ->
|
||||||
{reply, {ok, State#state.journal_sqn}, State};
|
{reply, {ok, State#state.journal_sqn}, State};
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
|
||||||
case State#state.is_snapshot of
|
ok = ink_releasesnapshot(State#state.source_inker, self()),
|
||||||
true ->
|
|
||||||
ok = ink_releasesnapshot(State#state.source_inker, self());
|
|
||||||
false ->
|
|
||||||
leveled_log:log(i0005, [close]),
|
|
||||||
leveled_log:log(
|
|
||||||
i0006, [State#state.journal_sqn, State#state.manifest_sqn]),
|
|
||||||
ok = leveled_iclerk:clerk_stop(State#state.clerk),
|
|
||||||
shutdown_snapshots(State#state.registered_snapshots),
|
|
||||||
shutdown_manifest(State#state.manifest)
|
|
||||||
end,
|
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_call(doom, _From, State) ->
|
handle_call(ShutdownType, From, State)
|
||||||
FPs = [filepath(State#state.root_path, journal_dir),
|
when ShutdownType == close; ShutdownType == doom ->
|
||||||
filepath(State#state.root_path, manifest_dir),
|
case ShutdownType of
|
||||||
filepath(State#state.root_path, journal_compact_dir),
|
doom ->
|
||||||
filepath(State#state.root_path, journal_waste_dir)],
|
leveled_log:log(i0018, []);
|
||||||
leveled_log:log(i0018, []),
|
_ ->
|
||||||
|
ok
|
||||||
leveled_log:log(i0005, [doom]),
|
end,
|
||||||
|
leveled_log:log(i0005, [ShutdownType]),
|
||||||
leveled_log:log(
|
leveled_log:log(
|
||||||
i0006, [State#state.journal_sqn, State#state.manifest_sqn]),
|
i0006, [State#state.journal_sqn, State#state.manifest_sqn]),
|
||||||
ok = leveled_iclerk:clerk_stop(State#state.clerk),
|
ok = leveled_iclerk:clerk_stop(State#state.clerk),
|
||||||
shutdown_snapshots(State#state.registered_snapshots),
|
gen_server:cast(self(), {maybe_defer_shutdown, ShutdownType, From}),
|
||||||
shutdown_manifest(State#state.manifest),
|
{noreply, State}.
|
||||||
{stop, normal, {ok, FPs}, State}.
|
|
||||||
|
|
||||||
|
|
||||||
handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
|
handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
|
||||||
|
@ -766,8 +773,39 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
|
||||||
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
||||||
CDBopts = State#state.cdb_options,
|
CDBopts = State#state.cdb_options,
|
||||||
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
|
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
|
||||||
{noreply, State#state{cdb_options = CDBopts0}}.
|
{noreply, State#state{cdb_options = CDBopts0}};
|
||||||
|
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
|
||||||
|
case length(State#state.registered_snapshots) of
|
||||||
|
0 ->
|
||||||
|
ok;
|
||||||
|
N ->
|
||||||
|
% Whilst this process sleeps, then any remaining snapshots may
|
||||||
|
% release and have their release messages queued before the
|
||||||
|
% complete_shutdown cast is sent
|
||||||
|
leveled_log:log(i0026, [N]),
|
||||||
|
timer:sleep(?SHUTDOWN_PAUSE)
|
||||||
|
end,
|
||||||
|
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
|
||||||
|
{noreply, State};
|
||||||
|
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(SnapPid) -> ok = ink_snapclose(SnapPid) end,
|
||||||
|
lists:map(
|
||||||
|
fun(Snapshot) -> element(1, Snapshot) end,
|
||||||
|
State#state.registered_snapshots)),
|
||||||
|
shutdown_manifest(State#state.manifest),
|
||||||
|
case ShutdownType of
|
||||||
|
doom ->
|
||||||
|
FPs =
|
||||||
|
[filepath(State#state.root_path, journal_dir),
|
||||||
|
filepath(State#state.root_path, manifest_dir),
|
||||||
|
filepath(State#state.root_path, journal_compact_dir),
|
||||||
|
filepath(State#state.root_path, journal_waste_dir)],
|
||||||
|
gen_server:reply(From, {ok, FPs});
|
||||||
|
close ->
|
||||||
|
gen_server:reply(From, ok)
|
||||||
|
end,
|
||||||
|
{stop, normal, State}.
|
||||||
|
|
||||||
%% handle the bookie stopping and stop this snapshot
|
%% handle the bookie stopping and stop this snapshot
|
||||||
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
|
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
|
||||||
|
@ -789,6 +827,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
|
||||||
-spec start_from_file(inker_options()) -> {ok, ink_state()}.
|
-spec start_from_file(inker_options()) -> {ok, ink_state()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Start an Inker from the state on disk (i.e. not a snapshot).
|
%% Start an Inker from the state on disk (i.e. not a snapshot).
|
||||||
|
@ -854,13 +893,6 @@ start_from_file(InkOpts) ->
|
||||||
clerk = Clerk}}.
|
clerk = Clerk}}.
|
||||||
|
|
||||||
|
|
||||||
-spec shutdown_snapshots(list(registered_snapshot())) -> ok.
|
|
||||||
%% @doc
|
|
||||||
%% Shutdown any snapshots before closing the store
|
|
||||||
shutdown_snapshots(Snapshots) ->
|
|
||||||
lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end,
|
|
||||||
Snapshots).
|
|
||||||
|
|
||||||
-spec shutdown_manifest(leveled_imanifest:manifest()) -> ok.
|
-spec shutdown_manifest(leveled_imanifest:manifest()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Shutdown all files in the manifest
|
%% Shutdown all files in the manifest
|
||||||
|
@ -1603,4 +1635,28 @@ loop() ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
close_no_crash_test_() ->
|
||||||
|
{timeout, 60, fun close_no_crash_tester/0}.
|
||||||
|
|
||||||
|
close_no_crash_tester() ->
|
||||||
|
RootPath = "test/test_area/journal",
|
||||||
|
build_dummy_journal(),
|
||||||
|
CDBopts = #cdb_options{max_size=300000, binary_mode=true},
|
||||||
|
{ok, Inker} =
|
||||||
|
ink_start(
|
||||||
|
#inker_options{
|
||||||
|
root_path=RootPath,
|
||||||
|
cdb_options=CDBopts,
|
||||||
|
compression_method=native,
|
||||||
|
compress_on_receipt=true}),
|
||||||
|
|
||||||
|
SnapOpts =
|
||||||
|
#inker_options{
|
||||||
|
start_snapshot=true, bookies_pid = self(), source_inker=Inker},
|
||||||
|
{ok, InkSnap} = ink_snapstart(SnapOpts),
|
||||||
|
|
||||||
|
exit(InkSnap, kill),
|
||||||
|
ok = ink_close(Inker),
|
||||||
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -130,6 +130,8 @@
|
||||||
{info, <<"Archiving filename ~s as unused at startup">>},
|
{info, <<"Archiving filename ~s as unused at startup">>},
|
||||||
p0041 =>
|
p0041 =>
|
||||||
{info, <<"Penciller manifest switched from SQN ~w to ~w">>},
|
{info, <<"Penciller manifest switched from SQN ~w to ~w">>},
|
||||||
|
p0042 =>
|
||||||
|
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
|
||||||
pc001 =>
|
pc001 =>
|
||||||
{info, <<"Penciller's clerk ~w started with owner ~w">>},
|
{info, <<"Penciller's clerk ~w started with owner ~w">>},
|
||||||
pc005 =>
|
pc005 =>
|
||||||
|
@ -242,6 +244,8 @@
|
||||||
{info, <<"Prompted roll at NewSQN=~w">>},
|
{info, <<"Prompted roll at NewSQN=~w">>},
|
||||||
i0025 =>
|
i0025 =>
|
||||||
{warn, <<"Journal SQN of ~w is below Ledger SQN of ~w anti-entropy will be required">>},
|
{warn, <<"Journal SQN of ~w is below Ledger SQN of ~w anti-entropy will be required">>},
|
||||||
|
i0026 =>
|
||||||
|
{info, <<"Deferring shutdown due to snapshot_count=~w">>},
|
||||||
ic001 =>
|
ic001 =>
|
||||||
{info, <<"Closed for reason ~w so maybe leaving garbage">>},
|
{info, <<"Closed for reason ~w so maybe leaving garbage">>},
|
||||||
ic002 =>
|
ic002 =>
|
||||||
|
|
|
@ -222,6 +222,11 @@
|
||||||
-define(TIMING_SAMPLECOUNTDOWN, 10000).
|
-define(TIMING_SAMPLECOUNTDOWN, 10000).
|
||||||
-define(TIMING_SAMPLESIZE, 100).
|
-define(TIMING_SAMPLESIZE, 100).
|
||||||
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
|
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
|
||||||
|
-define(SHUTDOWN_PAUSE, 10000).
|
||||||
|
% How long to wait for snapshots to be released on shutdown
|
||||||
|
% before forcing closure of snapshots
|
||||||
|
% 10s may not be long enough for all snapshots, but avoids crashes of
|
||||||
|
% short-lived queries racing with the shutdown
|
||||||
|
|
||||||
-record(state, {manifest ::
|
-record(state, {manifest ::
|
||||||
leveled_pmanifest:manifest() | undefined | redacted,
|
leveled_pmanifest:manifest() | undefined | redacted,
|
||||||
|
@ -548,7 +553,19 @@ pcl_persistedsqn(Pid) ->
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Close the penciller neatly, trying to persist to disk anything in the memory
|
%% Close the penciller neatly, trying to persist to disk anything in the memory
|
||||||
pcl_close(Pid) ->
|
pcl_close(Pid) ->
|
||||||
gen_server:call(Pid, close, 60000).
|
gen_server:call(Pid, close, infinity).
|
||||||
|
|
||||||
|
-spec pcl_snapclose(pid()) -> ok.
|
||||||
|
%% @doc
|
||||||
|
%% Specifically to be used when closing snpashots on shutdown, will handle a
|
||||||
|
%% scenario where a snapshot has already exited
|
||||||
|
pcl_snapclose(Pid) ->
|
||||||
|
try
|
||||||
|
pcl_close(Pid)
|
||||||
|
catch
|
||||||
|
exit:{noproc, _CallDetails} ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
-spec pcl_doom(pid()) -> {ok, list()}.
|
-spec pcl_doom(pid()) -> {ok, list()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -556,7 +573,7 @@ pcl_close(Pid) ->
|
||||||
%% Return a list of filepaths from where files exist for this penciller (should
|
%% Return a list of filepaths from where files exist for this penciller (should
|
||||||
%% the calling process which to erase the store).
|
%% the calling process which to erase the store).
|
||||||
pcl_doom(Pid) ->
|
pcl_doom(Pid) ->
|
||||||
gen_server:call(Pid, doom, 60000).
|
gen_server:call(Pid, doom, infinity).
|
||||||
|
|
||||||
-spec pcl_checkbloomtest(pid(), tuple()) -> boolean().
|
-spec pcl_checkbloomtest(pid(), tuple()) -> boolean().
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -888,7 +905,7 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
|
||||||
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
|
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
|
||||||
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
|
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, From, State) ->
|
||||||
% Level 0 files lie outside of the manifest, and so if there is no L0
|
% Level 0 files lie outside of the manifest, and so if there is no L0
|
||||||
% file present it is safe to write the current contents of memory. If
|
% file present it is safe to write the current contents of memory. If
|
||||||
% there is a L0 file present - then the memory can be dropped (it is
|
% there is a L0 file present - then the memory can be dropped (it is
|
||||||
|
@ -917,17 +934,13 @@ handle_call(close, _From, State) ->
|
||||||
false ->
|
false ->
|
||||||
leveled_log:log(p0010, [State#state.levelzero_size])
|
leveled_log:log(p0010, [State#state.levelzero_size])
|
||||||
end,
|
end,
|
||||||
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
|
gen_server:cast(self(), {maybe_defer_shutdown, close, From}),
|
||||||
{stop, normal, ok, State};
|
{noreply, State};
|
||||||
handle_call(doom, _From, State) ->
|
handle_call(doom, From, State) ->
|
||||||
leveled_log:log(p0030, []),
|
leveled_log:log(p0030, []),
|
||||||
ok = leveled_pclerk:clerk_close(State#state.clerk),
|
ok = leveled_pclerk:clerk_close(State#state.clerk),
|
||||||
|
gen_server:cast(self(), {maybe_defer_shutdown, doom, From}),
|
||||||
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
|
{noreply, State};
|
||||||
|
|
||||||
ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
|
|
||||||
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
|
|
||||||
{stop, normal, {ok, [ManifestFP, FilesFP]}, State};
|
|
||||||
handle_call({checkbloom_fortest, Key, Hash}, _From, State) ->
|
handle_call({checkbloom_fortest, Key, Hash}, _From, State) ->
|
||||||
Manifest = State#state.manifest,
|
Manifest = State#state.manifest,
|
||||||
FoldFun =
|
FoldFun =
|
||||||
|
@ -977,8 +990,8 @@ handle_cast({manifest_change, Manifest}, State) ->
|
||||||
work_ongoing=false}}
|
work_ongoing=false}}
|
||||||
end;
|
end;
|
||||||
handle_cast({release_snapshot, Snapshot}, State) ->
|
handle_cast({release_snapshot, Snapshot}, State) ->
|
||||||
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
|
Manifest0 =
|
||||||
Snapshot),
|
leveled_pmanifest:release_snapshot(State#state.manifest, Snapshot),
|
||||||
leveled_log:log(p0003, [Snapshot]),
|
leveled_log:log(p0003, [Snapshot]),
|
||||||
{noreply, State#state{manifest=Manifest0}};
|
{noreply, State#state{manifest=Manifest0}};
|
||||||
handle_cast({confirm_delete, PDFN, FilePid}, State=#state{is_snapshot=Snap})
|
handle_cast({confirm_delete, PDFN, FilePid}, State=#state{is_snapshot=Snap})
|
||||||
|
@ -1138,7 +1151,34 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
|
||||||
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
ok = leveled_log:remove_forcedlogs(ForcedLogs),
|
||||||
SSTopts = State#state.sst_options,
|
SSTopts = State#state.sst_options,
|
||||||
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
|
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
|
||||||
{noreply, State#state{sst_options = SSTopts0}}.
|
{noreply, State#state{sst_options = SSTopts0}};
|
||||||
|
handle_cast({maybe_defer_shutdown, ShutdownType, From}, State) ->
|
||||||
|
case length(leveled_pmanifest:snapshot_pids(State#state.manifest)) of
|
||||||
|
0 ->
|
||||||
|
ok;
|
||||||
|
N ->
|
||||||
|
% Whilst this process sleeps, then any remaining snapshots may
|
||||||
|
% release and have their release messages queued before the
|
||||||
|
% complete_shutdown cast is sent
|
||||||
|
leveled_log:log(p0042, [N]),
|
||||||
|
timer:sleep(?SHUTDOWN_PAUSE)
|
||||||
|
end,
|
||||||
|
gen_server:cast(self(), {complete_shutdown, ShutdownType, From}),
|
||||||
|
{noreply, State};
|
||||||
|
handle_cast({complete_shutdown, ShutdownType, From}, State) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(Snap) -> ok = pcl_snapclose(Snap) end,
|
||||||
|
leveled_pmanifest:snapshot_pids(State#state.manifest)),
|
||||||
|
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
|
||||||
|
case ShutdownType of
|
||||||
|
doom ->
|
||||||
|
ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
|
||||||
|
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
|
||||||
|
gen_server:reply(From, {ok, [ManifestFP, FilesFP]});
|
||||||
|
close ->
|
||||||
|
gen_server:reply(From, ok)
|
||||||
|
end,
|
||||||
|
{stop, normal, State}.
|
||||||
|
|
||||||
|
|
||||||
%% handle the bookie stopping and stop this snapshot
|
%% handle the bookie stopping and stop this snapshot
|
||||||
|
@ -1177,8 +1217,8 @@ sst_rootpath(RootPath) ->
|
||||||
FP.
|
FP.
|
||||||
|
|
||||||
sst_filename(ManSQN, Level, Count) ->
|
sst_filename(ManSQN, Level, Count) ->
|
||||||
lists:flatten(io_lib:format("./~w_~w_~w" ++ ?SST_FILEX,
|
lists:flatten(
|
||||||
[ManSQN, Level, Count])).
|
io_lib:format("./~w_~w_~w" ++ ?SST_FILEX, [ManSQN, Level, Count])).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -2010,6 +2050,34 @@ format_status_test() ->
|
||||||
?assertMatch(redacted, ST#state.levelzero_astree),
|
?assertMatch(redacted, ST#state.levelzero_astree),
|
||||||
clean_testdir(RootPath).
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
close_no_crash_test_() ->
|
||||||
|
{timeout, 60, fun close_no_crash_tester/0}.
|
||||||
|
|
||||||
|
close_no_crash_tester() ->
|
||||||
|
RootPath = "test/test_area/ledger_close",
|
||||||
|
clean_testdir(RootPath),
|
||||||
|
{ok, PCL} =
|
||||||
|
pcl_start(
|
||||||
|
#penciller_options{
|
||||||
|
root_path=RootPath,
|
||||||
|
max_inmemory_tablesize=1000,
|
||||||
|
sst_options=#sst_options{}}),
|
||||||
|
{ok, PclSnap} =
|
||||||
|
pcl_snapstart(
|
||||||
|
#penciller_options{
|
||||||
|
start_snapshot = true,
|
||||||
|
snapshot_query = undefined,
|
||||||
|
bookies_mem = {empty_cache, empty_index, 1, 1},
|
||||||
|
source_penciller = PCL,
|
||||||
|
snapshot_longrunning = true,
|
||||||
|
bookies_pid = self()
|
||||||
|
}
|
||||||
|
),
|
||||||
|
exit(PclSnap, kill),
|
||||||
|
ok = pcl_close(PCL),
|
||||||
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
|
||||||
simple_server_test() ->
|
simple_server_test() ->
|
||||||
RootPath = "test/test_area/ledger",
|
RootPath = "test/test_area/ledger",
|
||||||
clean_testdir(RootPath),
|
clean_testdir(RootPath),
|
||||||
|
|
|
@ -45,7 +45,8 @@
|
||||||
is_basement/2,
|
is_basement/2,
|
||||||
levelzero_present/1,
|
levelzero_present/1,
|
||||||
check_bloom/3,
|
check_bloom/3,
|
||||||
report_manifest_level/2
|
report_manifest_level/2,
|
||||||
|
snapshot_pids/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -86,22 +87,22 @@
|
||||||
|
|
||||||
-record(manifest, {levels,
|
-record(manifest, {levels,
|
||||||
% an array of lists or trees representing the manifest
|
% an array of lists or trees representing the manifest
|
||||||
manifest_sqn = 0 :: integer(),
|
manifest_sqn = 0 :: non_neg_integer(),
|
||||||
% The current manifest SQN
|
% The current manifest SQN
|
||||||
snapshots :: list() | undefined,
|
snapshots = []
|
||||||
|
:: list(snapshot()),
|
||||||
% A list of snaphots (i.e. clones)
|
% A list of snaphots (i.e. clones)
|
||||||
min_snapshot_sqn = 0 :: integer(),
|
min_snapshot_sqn = 0 :: integer(),
|
||||||
% The smallest snapshot manifest SQN in the snapshot
|
% The smallest snapshot manifest SQN in the snapshot
|
||||||
% list
|
% list
|
||||||
pending_deletes, % OTP16 does not like defining type
|
pending_deletes = dict:new() :: dict:dict(),
|
||||||
% a dictionary mapping keys (filenames) to SQN when the
|
basement :: non_neg_integer(),
|
||||||
% deletion was made, and the original Manifest Entry
|
|
||||||
basement :: integer(),
|
|
||||||
% Currently the lowest level (the largest number)
|
% Currently the lowest level (the largest number)
|
||||||
blooms :: any() % actually a dict but OTP 16 compatability
|
blooms :: dict:dict()
|
||||||
% A dictionary mapping PIDs to bloom filters
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-type snapshot() ::
|
||||||
|
{pid(), non_neg_integer(), pos_integer(), pos_integer()}.
|
||||||
-type manifest() :: #manifest{}.
|
-type manifest() :: #manifest{}.
|
||||||
-type manifest_entry() :: #manifest_entry{}.
|
-type manifest_entry() :: #manifest_entry{}.
|
||||||
-type manifest_owner() :: pid()|list().
|
-type manifest_owner() :: pid()|list().
|
||||||
|
@ -169,8 +170,8 @@ open_manifest(RootPath) ->
|
||||||
%% by a snapshot
|
%% by a snapshot
|
||||||
copy_manifest(Manifest) ->
|
copy_manifest(Manifest) ->
|
||||||
% Copy the manifest ensuring anything only the master process should care
|
% Copy the manifest ensuring anything only the master process should care
|
||||||
% about is switched to undefined
|
% about is switched to be empty
|
||||||
Manifest#manifest{snapshots = undefined, pending_deletes = undefined}.
|
Manifest#manifest{snapshots = [], pending_deletes = dict:new()}.
|
||||||
|
|
||||||
-spec load_manifest(
|
-spec load_manifest(
|
||||||
manifest(),
|
manifest(),
|
||||||
|
@ -533,13 +534,12 @@ mergefile_selector(Manifest, LevelIdx, {grooming, ScoringFun}) ->
|
||||||
%% @doc
|
%% @doc
|
||||||
%% When the clerk returns an updated manifest to the penciller, the penciller
|
%% When the clerk returns an updated manifest to the penciller, the penciller
|
||||||
%% should restore its view of the snapshots to that manifest. Snapshots can
|
%% should restore its view of the snapshots to that manifest. Snapshots can
|
||||||
%% be received in parallel to the manifest ebing updated, so the updated
|
%% be received in parallel to the manifest being updated, so the updated
|
||||||
%% manifest must not trample over any accrued state in the manifest.
|
%% manifest must not trample over any accrued state in the manifest.
|
||||||
merge_snapshot(PencillerManifest, ClerkManifest) ->
|
merge_snapshot(PencillerManifest, ClerkManifest) ->
|
||||||
ClerkManifest#manifest{snapshots =
|
ClerkManifest#manifest{
|
||||||
PencillerManifest#manifest.snapshots,
|
snapshots = PencillerManifest#manifest.snapshots,
|
||||||
min_snapshot_sqn =
|
min_snapshot_sqn = PencillerManifest#manifest.min_snapshot_sqn}.
|
||||||
PencillerManifest#manifest.min_snapshot_sqn}.
|
|
||||||
|
|
||||||
-spec add_snapshot(manifest(), pid()|atom(), integer()) -> manifest().
|
-spec add_snapshot(manifest(), pid()|atom(), integer()) -> manifest().
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -693,6 +693,11 @@ check_bloom(Manifest, FP, Hash) ->
|
||||||
true
|
true
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec snapshot_pids(manifest()) -> list(pid()).
|
||||||
|
%% @doc
|
||||||
|
%% Return a list of snapshot_pids - to be shutdown on shutdown
|
||||||
|
snapshot_pids(Manifest) ->
|
||||||
|
lists:map(fun(S) -> element(1, S) end, Manifest#manifest.snapshots).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal Functions
|
%%% Internal Functions
|
||||||
|
@ -1383,6 +1388,10 @@ ready_to_delete_combined(Manifest, Filename) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
snapshot_release_test() ->
|
snapshot_release_test() ->
|
||||||
|
PidA1 = spawn(fun() -> ok end),
|
||||||
|
PidA2 = spawn(fun() -> ok end),
|
||||||
|
PidA3 = spawn(fun() -> ok end),
|
||||||
|
PidA4 = spawn(fun() -> ok end),
|
||||||
Man6 = element(7, initial_setup()),
|
Man6 = element(7, initial_setup()),
|
||||||
E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
|
E1 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
|
||||||
end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"},
|
end_key={i, "Bucket1", {"Idx1", "Fld9"}, "K93"},
|
||||||
|
@ -1400,35 +1409,35 @@ snapshot_release_test() ->
|
||||||
owner="pid_z3",
|
owner="pid_z3",
|
||||||
bloom=none},
|
bloom=none},
|
||||||
|
|
||||||
Man7 = add_snapshot(Man6, pid_a1, 3600),
|
Man7 = add_snapshot(Man6, PidA1, 3600),
|
||||||
Man8 = remove_manifest_entry(Man7, 2, 1, E1),
|
Man8 = remove_manifest_entry(Man7, 2, 1, E1),
|
||||||
Man9 = add_snapshot(Man8, pid_a2, 3600),
|
Man9 = add_snapshot(Man8, PidA2, 3600),
|
||||||
Man10 = remove_manifest_entry(Man9, 3, 1, E2),
|
Man10 = remove_manifest_entry(Man9, 3, 1, E2),
|
||||||
Man11 = add_snapshot(Man10, pid_a3, 3600),
|
Man11 = add_snapshot(Man10, PidA3, 3600),
|
||||||
Man12 = remove_manifest_entry(Man11, 4, 1, E3),
|
Man12 = remove_manifest_entry(Man11, 4, 1, E3),
|
||||||
Man13 = add_snapshot(Man12, pid_a4, 3600),
|
Man13 = add_snapshot(Man12, PidA4, 3600),
|
||||||
|
|
||||||
?assertMatch(false, element(1, ready_to_delete_combined(Man8, "Z1"))),
|
?assertMatch(false, element(1, ready_to_delete_combined(Man8, "Z1"))),
|
||||||
?assertMatch(false, element(1, ready_to_delete_combined(Man10, "Z2"))),
|
?assertMatch(false, element(1, ready_to_delete_combined(Man10, "Z2"))),
|
||||||
?assertMatch(false, element(1, ready_to_delete_combined(Man12, "Z3"))),
|
?assertMatch(false, element(1, ready_to_delete_combined(Man12, "Z3"))),
|
||||||
|
|
||||||
Man14 = release_snapshot(Man13, pid_a1),
|
Man14 = release_snapshot(Man13, PidA1),
|
||||||
?assertMatch(false, element(1, ready_to_delete_combined(Man14, "Z2"))),
|
?assertMatch(false, element(1, ready_to_delete_combined(Man14, "Z2"))),
|
||||||
?assertMatch(false, element(1, ready_to_delete_combined(Man14, "Z3"))),
|
?assertMatch(false, element(1, ready_to_delete_combined(Man14, "Z3"))),
|
||||||
{Bool14, Man15} = ready_to_delete_combined(Man14, "Z1"),
|
{Bool14, Man15} = ready_to_delete_combined(Man14, "Z1"),
|
||||||
?assertMatch(true, Bool14),
|
?assertMatch(true, Bool14),
|
||||||
|
|
||||||
%This doesn't change anything - released snaphsot not the min
|
%This doesn't change anything - released snaphsot not the min
|
||||||
Man16 = release_snapshot(Man15, pid_a4),
|
Man16 = release_snapshot(Man15, PidA4),
|
||||||
?assertMatch(false, element(1, ready_to_delete_combined(Man16, "Z2"))),
|
?assertMatch(false, element(1, ready_to_delete_combined(Man16, "Z2"))),
|
||||||
?assertMatch(false, element(1, ready_to_delete_combined(Man16, "Z3"))),
|
?assertMatch(false, element(1, ready_to_delete_combined(Man16, "Z3"))),
|
||||||
|
|
||||||
Man17 = release_snapshot(Man16, pid_a2),
|
Man17 = release_snapshot(Man16, PidA2),
|
||||||
?assertMatch(false, element(1, ready_to_delete_combined(Man17, "Z3"))),
|
?assertMatch(false, element(1, ready_to_delete_combined(Man17, "Z3"))),
|
||||||
{Bool17, Man18} = ready_to_delete_combined(Man17, "Z2"),
|
{Bool17, Man18} = ready_to_delete_combined(Man17, "Z2"),
|
||||||
?assertMatch(true, Bool17),
|
?assertMatch(true, Bool17),
|
||||||
|
|
||||||
Man19 = release_snapshot(Man18, pid_a3),
|
Man19 = release_snapshot(Man18, PidA3),
|
||||||
|
|
||||||
io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]),
|
io:format("MinSnapSQN ~w~n", [Man19#manifest.min_snapshot_sqn]),
|
||||||
|
|
||||||
|
@ -1437,12 +1446,13 @@ snapshot_release_test() ->
|
||||||
|
|
||||||
|
|
||||||
snapshot_timeout_test() ->
|
snapshot_timeout_test() ->
|
||||||
|
PidA1 = spawn(fun() -> ok end),
|
||||||
Man6 = element(7, initial_setup()),
|
Man6 = element(7, initial_setup()),
|
||||||
Man7 = add_snapshot(Man6, pid_a1, 3600),
|
Man7 = add_snapshot(Man6, PidA1, 3600),
|
||||||
?assertMatch(1, length(Man7#manifest.snapshots)),
|
?assertMatch(1, length(Man7#manifest.snapshots)),
|
||||||
Man8 = release_snapshot(Man7, pid_a1),
|
Man8 = release_snapshot(Man7, PidA1),
|
||||||
?assertMatch(0, length(Man8#manifest.snapshots)),
|
?assertMatch(0, length(Man8#manifest.snapshots)),
|
||||||
Man9 = add_snapshot(Man8, pid_a1, 0),
|
Man9 = add_snapshot(Man8, PidA1, 1),
|
||||||
timer:sleep(2001),
|
timer:sleep(2001),
|
||||||
?assertMatch(1, length(Man9#manifest.snapshots)),
|
?assertMatch(1, length(Man9#manifest.snapshots)),
|
||||||
Man10 = release_snapshot(Man9, ?PHANTOM_PID),
|
Man10 = release_snapshot(Man9, ?PHANTOM_PID),
|
||||||
|
@ -1474,12 +1484,7 @@ potential_issue_test() ->
|
||||||
{idxt,0,{{},{0,nil}}},
|
{idxt,0,{{},{0,nil}}},
|
||||||
{idxt,0,{{},{0,nil}}},
|
{idxt,0,{{},{0,nil}}},
|
||||||
[]}},
|
[]}},
|
||||||
19,[],0,
|
19, [], 0, dict:new(), 2, dict:new()},
|
||||||
{dict,0,16,16,8,80,48,
|
|
||||||
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
|
|
||||||
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
|
|
||||||
2,
|
|
||||||
dict:new()},
|
|
||||||
Range1 = range_lookup(Manifest,
|
Range1 = range_lookup(Manifest,
|
||||||
1,
|
1,
|
||||||
{o_rkv, "Bucket", null, null},
|
{o_rkv, "Bucket", null, null},
|
||||||
|
|
|
@ -452,7 +452,7 @@ fetchput_snapshot(_Config) ->
|
||||||
|
|
||||||
% Now loads lots of new objects
|
% Now loads lots of new objects
|
||||||
|
|
||||||
GenList = [20002, 40002, 60002, 80002, 100002],
|
GenList = [20002, 40002, 60002, 80002, 100002, 120002, 140002, 160002],
|
||||||
CLs2 = testutil:load_objects(20000, GenList, Bookie2, TestObject,
|
CLs2 = testutil:load_objects(20000, GenList, Bookie2, TestObject,
|
||||||
fun testutil:generate_smallobjects/2),
|
fun testutil:generate_smallobjects/2),
|
||||||
io:format("Loaded significant numbers of new objects~n"),
|
io:format("Loaded significant numbers of new objects~n"),
|
||||||
|
@ -508,7 +508,6 @@ fetchput_snapshot(_Config) ->
|
||||||
testutil:check_forlist(Bookie2, lists:nth(length(CLs3), CLs3)),
|
testutil:check_forlist(Bookie2, lists:nth(length(CLs3), CLs3)),
|
||||||
testutil:check_forlist(Bookie2, lists:nth(1, CLs3)),
|
testutil:check_forlist(Bookie2, lists:nth(1, CLs3)),
|
||||||
|
|
||||||
|
|
||||||
{ok, FNsC} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
{ok, FNsC} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||||
io:format("FNsA ~w FNsB ~w FNsC ~w~n",
|
io:format("FNsA ~w FNsB ~w FNsC ~w~n",
|
||||||
[length(FNsA), length(FNsB), length(FNsC)]),
|
[length(FNsA), length(FNsB), length(FNsC)]),
|
||||||
|
@ -523,9 +522,35 @@ fetchput_snapshot(_Config) ->
|
||||||
{B1Size, B1Count} = testutil:check_bucket_stats(Bookie2, "Bucket1"),
|
{B1Size, B1Count} = testutil:check_bucket_stats(Bookie2, "Bucket1"),
|
||||||
{BSize, BCount} = testutil:check_bucket_stats(Bookie2, "Bucket"),
|
{BSize, BCount} = testutil:check_bucket_stats(Bookie2, "Bucket"),
|
||||||
true = BSize > 0,
|
true = BSize > 0,
|
||||||
true = BCount == 120000,
|
true = BCount == 180000,
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie2),
|
io:format("Shutdown with overhanging snapshot~n"),
|
||||||
|
|
||||||
|
{ok, SnpPCL1, SnpJrnl1} =
|
||||||
|
leveled_bookie:book_snapshot(Bookie2, store, undefined, true),
|
||||||
|
{ok, SnpPCL2, SnpJrnl2} =
|
||||||
|
leveled_bookie:book_snapshot(Bookie2, store, undefined, true),
|
||||||
|
|
||||||
|
TestPid = self(),
|
||||||
|
spawn(
|
||||||
|
fun() ->
|
||||||
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
|
TestPid ! ok
|
||||||
|
end),
|
||||||
|
|
||||||
|
timer:sleep(5000),
|
||||||
|
ok = leveled_penciller:pcl_close(SnpPCL1),
|
||||||
|
ok = leveled_inker:ink_close(SnpJrnl1),
|
||||||
|
true = is_process_alive(SnpPCL2),
|
||||||
|
true = is_process_alive(SnpJrnl2),
|
||||||
|
|
||||||
|
io:format("Time for close to complete is 2 * 10s~n"),
|
||||||
|
io:format("Both Inker and Penciller will have snapshot delay~n"),
|
||||||
|
|
||||||
|
receive ok -> ok end,
|
||||||
|
|
||||||
|
false = is_process_alive(SnpPCL2),
|
||||||
|
false = is_process_alive(SnpJrnl2),
|
||||||
testutil:reset_filestructure().
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
|
@ -628,7 +653,9 @@ load_and_count(JournalSize, BookiesMemSize, PencillerMemSize) ->
|
||||||
ok = leveled_bookie:book_close(Bookie1),
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||||
{_, 300000} = testutil:check_bucket_stats(Bookie2, "Bucket"),
|
{_, 300000} = testutil:check_bucket_stats(Bookie2, "Bucket"),
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie2),
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
|
|
||||||
ManifestFP =
|
ManifestFP =
|
||||||
leveled_pmanifest:filepath(filename:join(RootPath, ?LEDGER_FP),
|
leveled_pmanifest:filepath(filename:join(RootPath, ?LEDGER_FP),
|
||||||
manifest),
|
manifest),
|
||||||
|
@ -691,11 +718,13 @@ load_and_count_withdelete(_Config) ->
|
||||||
lists:seq(1, 20)),
|
lists:seq(1, 20)),
|
||||||
not_found = testutil:book_riakget(Bookie1, BucketD, KeyD),
|
not_found = testutil:book_riakget(Bookie1, BucketD, KeyD),
|
||||||
ok = leveled_bookie:book_close(Bookie1),
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
|
||||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||||
testutil:check_formissingobject(Bookie2, BucketD, KeyD),
|
testutil:check_formissingobject(Bookie2, BucketD, KeyD),
|
||||||
testutil:check_formissingobject(Bookie2, "Bookie1", "MissingKey0123"),
|
testutil:check_formissingobject(Bookie2, "Bookie1", "MissingKey0123"),
|
||||||
{_BSize, 0} = testutil:check_bucket_stats(Bookie2, BucketD),
|
{_BSize, 0} = testutil:check_bucket_stats(Bookie2, BucketD),
|
||||||
ok = leveled_bookie:book_close(Bookie2),
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
|
|
||||||
testutil:reset_filestructure().
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,8 @@
|
||||||
start_opts = []
|
start_opts = []
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(NUMTESTS, 1000).
|
-define(NUMTESTS, 10000).
|
||||||
|
-define(TIME_BUDGET, 300).
|
||||||
-define(QC_OUT(P),
|
-define(QC_OUT(P),
|
||||||
eqc:on_output(fun(Str, Args) ->
|
eqc:on_output(fun(Str, Args) ->
|
||||||
io:format(user, Str, Args) end, P)).
|
io:format(user, Str, Args) end, P)).
|
||||||
|
@ -49,7 +50,12 @@
|
||||||
-type state() :: #state{}.
|
-type state() :: #state{}.
|
||||||
|
|
||||||
eqc_test_() ->
|
eqc_test_() ->
|
||||||
{timeout, 60, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(50, ?QC_OUT(prop_db()))))}.
|
{timeout,
|
||||||
|
?TIME_BUDGET + 10,
|
||||||
|
?_assertEqual(
|
||||||
|
true,
|
||||||
|
eqc:quickcheck(
|
||||||
|
eqc:testing_time(?TIME_BUDGET, ?QC_OUT(prop_db()))))}.
|
||||||
|
|
||||||
run() ->
|
run() ->
|
||||||
run(?NUMTESTS).
|
run(?NUMTESTS).
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue