diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 3e7b156..8448bd5 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -92,6 +92,7 @@ -define(COMPRESS_AT_LEVEL, 1). -define(INDEX_MODDATE, true). -define(USE_SET_FOR_SPEED, 64). +-define(STARTUP_TIMEOUT, 10000). -include_lib("eunit/include/eunit.hrl"). @@ -103,6 +104,7 @@ code_change/4, starting/2, starting/3, + reader/2, reader/3, delete_pending/2, delete_pending/3]). @@ -185,6 +187,7 @@ index_moddate = ?INDEX_MODDATE :: boolean(), timings = no_timing :: sst_timings(), timings_countdown = 0 :: integer(), + starting_pid :: pid()|undefined, fetch_cache = array:new([{size, ?CACHE_SIZE}])}). -record(sst_timings, @@ -266,7 +269,8 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> {SlotList, FK}, MaxSQN, OptsSST0, - IndexModDate}, + IndexModDate, + self()}, infinity) of {ok, {SK, EK}, Bloom} -> {ok, Pid, {SK, EK}, Bloom} @@ -322,7 +326,8 @@ sst_new(RootPath, Filename, {SlotList, FK}, MaxSQN, OptsSST0, - IndexModDate}, + IndexModDate, + self()}, infinity) of {ok, {SK, EK}, Bloom} -> {ok, Pid, {{Rem1, Rem2}, SK, EK}, Bloom} @@ -463,7 +468,7 @@ starting({sst_open, RootPath, Filename, OptsSST}, _From, State) -> starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN, - OptsSST, IdxModDate}, _From, State) -> + OptsSST, IdxModDate, StartingPID}, _From, State) -> SW = os:timestamp(), leveled_log:save(OptsSST#sst_options.log_options), PressMethod = OptsSST#sst_options.press_method, @@ -485,7 +490,9 @@ starting({sst_new, {reply, {ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom}, reader, - UpdState#state{blockindex_cache = BlockIndex}}. + UpdState#state{blockindex_cache = BlockIndex, + starting_pid = StartingPID}, + ?STARTUP_TIMEOUT}. starting({sst_newlevelzero, RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN, @@ -630,6 +637,10 @@ reader(close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State}. +reader(timeout, State) -> + true = is_process_alive(State#state.starting_pid), + {next_state, reader, State}. + delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> {Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),