Merge pull request #253 from martinsumner/mas-i249-sstcloseraces
Mas i249 sstcloseraces
This commit is contained in:
commit
5563510da7
3 changed files with 46 additions and 18 deletions
|
@ -923,13 +923,13 @@ handle_call(close, _From, State) ->
|
||||||
leveled_log:log("P0010", [StatusTuple])
|
leveled_log:log("P0010", [StatusTuple])
|
||||||
end,
|
end,
|
||||||
|
|
||||||
shutdown_manifest(State#state.manifest),
|
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_call(doom, _From, State) ->
|
handle_call(doom, _From, State) ->
|
||||||
leveled_log:log("P0030", []),
|
leveled_log:log("P0030", []),
|
||||||
ok = leveled_pclerk:clerk_close(State#state.clerk),
|
ok = leveled_pclerk:clerk_close(State#state.clerk),
|
||||||
|
|
||||||
shutdown_manifest(State#state.manifest),
|
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
|
||||||
|
|
||||||
ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
|
ManifestFP = State#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
|
||||||
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
|
FilesFP = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/",
|
||||||
|
@ -1183,21 +1183,34 @@ start_from_file(PCLopts) ->
|
||||||
{ok, State0}.
|
{ok, State0}.
|
||||||
|
|
||||||
|
|
||||||
-spec shutdown_manifest(leveled_pmanifest:manifest()) -> ok.
|
-spec shutdown_manifest(leveled_pmanifest:manifest(), pid()|undefined) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Shutdown all the SST files within the manifest
|
%% Shutdown all the SST files within the manifest
|
||||||
shutdown_manifest(Manifest)->
|
shutdown_manifest(Manifest, L0Constructor) ->
|
||||||
EntryCloseFun =
|
EntryCloseFun =
|
||||||
fun(ME) ->
|
fun(ME) ->
|
||||||
|
Owner =
|
||||||
case is_record(ME, manifest_entry) of
|
case is_record(ME, manifest_entry) of
|
||||||
true ->
|
true ->
|
||||||
ok = leveled_sst:sst_close(ME#manifest_entry.owner);
|
ME#manifest_entry.owner;
|
||||||
false ->
|
false ->
|
||||||
{_SK, ME0} = ME,
|
case ME of
|
||||||
ok = leveled_sst:sst_close(ME0#manifest_entry.owner)
|
{_SK, ME0} ->
|
||||||
|
ME0#manifest_entry.owner;
|
||||||
|
ME ->
|
||||||
|
ME
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
leveled_pmanifest:close_manifest(Manifest, EntryCloseFun).
|
ok =
|
||||||
|
case is_pid(Owner) of
|
||||||
|
true ->
|
||||||
|
leveled_sst:sst_close(Owner);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
leveled_pmanifest:close_manifest(Manifest, EntryCloseFun),
|
||||||
|
EntryCloseFun(L0Constructor).
|
||||||
|
|
||||||
|
|
||||||
-spec archive_files(list(), list()) -> ok.
|
-spec archive_files(list(), list()) -> ok.
|
||||||
|
|
|
@ -92,6 +92,7 @@
|
||||||
-define(COMPRESS_AT_LEVEL, 1).
|
-define(COMPRESS_AT_LEVEL, 1).
|
||||||
-define(INDEX_MODDATE, true).
|
-define(INDEX_MODDATE, true).
|
||||||
-define(USE_SET_FOR_SPEED, 64).
|
-define(USE_SET_FOR_SPEED, 64).
|
||||||
|
-define(STARTUP_TIMEOUT, 10000).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -103,6 +104,7 @@
|
||||||
code_change/4,
|
code_change/4,
|
||||||
starting/2,
|
starting/2,
|
||||||
starting/3,
|
starting/3,
|
||||||
|
reader/2,
|
||||||
reader/3,
|
reader/3,
|
||||||
delete_pending/2,
|
delete_pending/2,
|
||||||
delete_pending/3]).
|
delete_pending/3]).
|
||||||
|
@ -185,6 +187,7 @@
|
||||||
index_moddate = ?INDEX_MODDATE :: boolean(),
|
index_moddate = ?INDEX_MODDATE :: boolean(),
|
||||||
timings = no_timing :: sst_timings(),
|
timings = no_timing :: sst_timings(),
|
||||||
timings_countdown = 0 :: integer(),
|
timings_countdown = 0 :: integer(),
|
||||||
|
starting_pid :: pid()|undefined,
|
||||||
fetch_cache = array:new([{size, ?CACHE_SIZE}])}).
|
fetch_cache = array:new([{size, ?CACHE_SIZE}])}).
|
||||||
|
|
||||||
-record(sst_timings,
|
-record(sst_timings,
|
||||||
|
@ -266,7 +269,8 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
|
||||||
{SlotList, FK},
|
{SlotList, FK},
|
||||||
MaxSQN,
|
MaxSQN,
|
||||||
OptsSST0,
|
OptsSST0,
|
||||||
IndexModDate},
|
IndexModDate,
|
||||||
|
self()},
|
||||||
infinity) of
|
infinity) of
|
||||||
{ok, {SK, EK}, Bloom} ->
|
{ok, {SK, EK}, Bloom} ->
|
||||||
{ok, Pid, {SK, EK}, Bloom}
|
{ok, Pid, {SK, EK}, Bloom}
|
||||||
|
@ -322,7 +326,8 @@ sst_new(RootPath, Filename,
|
||||||
{SlotList, FK},
|
{SlotList, FK},
|
||||||
MaxSQN,
|
MaxSQN,
|
||||||
OptsSST0,
|
OptsSST0,
|
||||||
IndexModDate},
|
IndexModDate,
|
||||||
|
self()},
|
||||||
infinity) of
|
infinity) of
|
||||||
{ok, {SK, EK}, Bloom} ->
|
{ok, {SK, EK}, Bloom} ->
|
||||||
{ok, Pid, {{Rem1, Rem2}, SK, EK}, Bloom}
|
{ok, Pid, {{Rem1, Rem2}, SK, EK}, Bloom}
|
||||||
|
@ -463,7 +468,7 @@ starting({sst_open, RootPath, Filename, OptsSST}, _From, State) ->
|
||||||
starting({sst_new,
|
starting({sst_new,
|
||||||
RootPath, Filename, Level,
|
RootPath, Filename, Level,
|
||||||
{SlotList, FirstKey}, MaxSQN,
|
{SlotList, FirstKey}, MaxSQN,
|
||||||
OptsSST, IdxModDate}, _From, State) ->
|
OptsSST, IdxModDate, StartingPID}, _From, State) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
leveled_log:save(OptsSST#sst_options.log_options),
|
leveled_log:save(OptsSST#sst_options.log_options),
|
||||||
PressMethod = OptsSST#sst_options.press_method,
|
PressMethod = OptsSST#sst_options.press_method,
|
||||||
|
@ -485,7 +490,9 @@ starting({sst_new,
|
||||||
{reply,
|
{reply,
|
||||||
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
||||||
reader,
|
reader,
|
||||||
UpdState#state{blockindex_cache = BlockIndex}}.
|
UpdState#state{blockindex_cache = BlockIndex,
|
||||||
|
starting_pid = StartingPID},
|
||||||
|
?STARTUP_TIMEOUT}.
|
||||||
|
|
||||||
starting({sst_newlevelzero, RootPath, Filename,
|
starting({sst_newlevelzero, RootPath, Filename,
|
||||||
Slots, FetchFun, Penciller, MaxSQN,
|
Slots, FetchFun, Penciller, MaxSQN,
|
||||||
|
@ -630,6 +637,14 @@ reader(close, _From, State) ->
|
||||||
ok = file:close(State#state.handle),
|
ok = file:close(State#state.handle),
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
|
reader(timeout, State) ->
|
||||||
|
case is_process_alive(State#state.starting_pid) of
|
||||||
|
true ->
|
||||||
|
{next_state, reader, State};
|
||||||
|
false ->
|
||||||
|
{stop, normal, State}
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
|
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
|
||||||
|
|
|
@ -170,7 +170,7 @@ bigsst_littlesst(_Config) ->
|
||||||
RootPath = testutil:reset_filestructure(),
|
RootPath = testutil:reset_filestructure(),
|
||||||
StartOpts1 = [{root_path, RootPath},
|
StartOpts1 = [{root_path, RootPath},
|
||||||
{max_journalsize, 50000000},
|
{max_journalsize, 50000000},
|
||||||
{cache_size, 1000},
|
{cache_size, 500},
|
||||||
{max_pencillercachesize, 16000},
|
{max_pencillercachesize, 16000},
|
||||||
{max_sstslots, 256},
|
{max_sstslots, 256},
|
||||||
{sync_strategy, testutil:sync_strategy()},
|
{sync_strategy, testutil:sync_strategy()},
|
||||||
|
@ -195,7 +195,7 @@ bigsst_littlesst(_Config) ->
|
||||||
ok = leveled_bookie:book_destroy(Bookie2),
|
ok = leveled_bookie:book_destroy(Bookie2),
|
||||||
io:format("Big SST ~w files Little SST ~w files~n",
|
io:format("Big SST ~w files Little SST ~w files~n",
|
||||||
[length(FNS1), length(FNS2)]),
|
[length(FNS1), length(FNS2)]),
|
||||||
true = length(FNS2) > (2 * length(FNS1)).
|
true = length(FNS2) >= (2 * length(FNS1)).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue