diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index d040723..e8be244 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -136,6 +136,7 @@ -export_type([tag/0, key/0, + sqn/0, object_spec/0, segment_hash/0, ledger_status/0, diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 53f8cae..c8f494d 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -146,6 +146,11 @@ handle_cast({remove_logs, ForcedLogs}, State) -> handle_info(timeout, State) -> request_work(State), + % When handling work, the clerk can collect a large number of binary + % references, so proactively GC this process before receiving any future + % work. In under pressure clusters, clerks with large binary memory + % footprints can occur. + garbage_collect(), {noreply, State, ?MAX_TIMEOUT}. terminate(Reason, _State) -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 96187f5..743cae1 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -396,7 +396,7 @@ pcl_fetchlevelzero(Pid, Slot, ReturnFun) -> %% The Key needs to be hashable (i.e. have a tag which indicates that the key %% can be looked up) - index entries are not hashable for example. %% -%% If the hash is already knonw, call pcl_fetch/3 as segment_hash is a +%% If the hash is already known, call pcl_fetch/3 as segment_hash is a %% relatively expensive hash function pcl_fetch(Pid, Key) -> Hash = leveled_codec:segment_hash(Key), @@ -749,12 +749,14 @@ handle_call({fetch, Key, Hash, UseL0Index}, _From, State) -> {reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}}; handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> {reply, - compare_to_sqn(plain_fetch_mem(Key, - Hash, - State#state.manifest, - State#state.levelzero_cache, - State#state.levelzero_index), - SQN), + compare_to_sqn( + fetch_sqn( + Key, + Hash, + State#state.manifest, + State#state.levelzero_cache, + State#state.levelzero_index), + SQN), State}; handle_call({fetch_keys, StartKey, EndKey, @@ -1066,8 +1068,19 @@ handle_cast(work_for_clerk, State) -> Backlog = N > ?WORKQUEUE_BACKLOG_TOLERANCE, leveled_log:log("P0024", [N, Backlog]), [TL|_Tail] = WL, - ok = leveled_pclerk:clerk_push(State#state.clerk, - {TL, State#state.manifest}), + ok = + leveled_pclerk:clerk_push( + State#state.clerk, {TL, State#state.manifest}), + case TL of + 0 -> + % Just written a L0 so as LoopState now rewritten, + % garbage collect to free as much as possible as + % soon as possible + garbage_collect(); + _ -> + ok + end, + {noreply, State#state{work_backlog=Backlog, work_ongoing=true}} end; @@ -1450,22 +1463,27 @@ roll_memory(State, true) -> %% the result tuple includes the level at which the result was found. timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) -> SW = os:timestamp(), - {R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), + {R, Level} = + fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun timed_sst_get/4), UpdTimings = update_timings(SW, Timings, R, Level), {R, UpdTimings}. --spec plain_fetch_mem(tuple(), {integer(), integer()}, - leveled_pmanifest:manifest(), list(), - leveled_pmem:index_array()) -> not_present|tuple(). +-spec fetch_sqn( + leveled_codec:ledger_key(), + leveled_codec:segment_hash(), + leveled_pmanifest:manifest(), + list(), + leveled_pmem:index_array()) -> + not_present|leveled_codec:ledger_kv()|leveled_codec:ledger_sqn(). %% @doc %% Fetch the result from the penciller, starting by looking in the memory, %% and if it is not found looking down level by level through the LSM tree. -plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> - R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), +fetch_sqn(Key, Hash, Manifest, L0Cache, L0Index) -> + R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun sst_getsqn/4), element(1, R). -fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> +fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, FetchFun) -> PosList = case L0Index of none -> @@ -1476,7 +1494,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache), case L0Check of {false, not_found} -> - fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4); + fetch(Key, Hash, Manifest, 0, FetchFun); {true, KV} -> {KV, memory} end. @@ -1515,6 +1533,9 @@ timed_sst_get(PID, Key, Hash, Level) -> T0 = timer:now_diff(os:timestamp(), SW), log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH). +sst_getsqn(PID, Key, Hash, _Level) -> + leveled_sst:sst_getsqn(PID, Key, Hash). + log_slowfetch(T0, R, PID, Level, FetchTolerance) -> case {T0, R} of {T, R} when T < FetchTolerance -> @@ -1528,29 +1549,26 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) -> end. --spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check(). +-spec compare_to_sqn( + leveled_codec:ledger_kv()|leveled_codec:sqn()|not_present, + integer()) -> sqn_check(). %% @doc %% Check to see if the SQN in the penciller is after the SQN expected for an %% object (used to allow the journal to check compaction status from a cache %% of the ledger - objects with a more recent sequence number can be compacted). +compare_to_sqn(not_present, _SQN) -> + missing; +compare_to_sqn(ObjSQN, SQN) when is_integer(ObjSQN), ObjSQN > SQN -> + replaced; +compare_to_sqn(ObjSQN, _SQN) when is_integer(ObjSQN) -> + % Normally we would expect the SQN to be equal here, but + % this also allows for the Journal to have a more advanced + % value. We return true here as we wouldn't want to + % compact thta more advanced value, but this may cause + % confusion in snapshots. + current; compare_to_sqn(Obj, SQN) -> - case Obj of - not_present -> - missing; - Obj -> - SQNToCompare = leveled_codec:strip_to_seqonly(Obj), - if - SQNToCompare > SQN -> - replaced; - true -> - % Normally we would expect the SQN to be equal here, but - % this also allows for the Journal to have a more advanced - % value. We return true here as we wouldn't want to - % compact thta more advanced value, but this may cause - % confusion in snapshots. - current - end - end. + compare_to_sqn(leveled_codec:strip_to_seqonly(Obj), SQN). %%%============================================================================ diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 3e06783..c5577c9 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -86,7 +86,6 @@ -define(TREE_SIZE, 4). -define(TIMING_SAMPLECOUNTDOWN, 20000). -define(TIMING_SAMPLESIZE, 100). --define(CACHE_SIZE, 32). -define(BLOCK_LENGTHS_LENGTH, 20). -define(LMD_LENGTH, 4). -define(FLIPPER32, 4294967295). @@ -97,6 +96,12 @@ -define(USE_SET_FOR_SPEED, 64). -define(STARTUP_TIMEOUT, 10000). +-ifdef(TEST). +-define(HIBERNATE_TIMEOUT, 5000). +-else. +-define(HIBERNATE_TIMEOUT, 60000). +-endif. + -include_lib("eunit/include/eunit.hrl"). -export([init/1, @@ -119,6 +124,7 @@ sst_open/4, sst_get/2, sst_get/3, + sst_getsqn/3, sst_expandpointer/5, sst_getmaxsequencenumber/1, sst_setfordelete/2, @@ -184,6 +190,14 @@ -type sst_summary() :: #summary{}. -type blockindex_cache() :: array:array(). +-type fetch_cache() + :: array:array()|no_cache. +-type cache_size() + :: no_cache|4|32|64. +-type cache_hash() + :: no_cache|non_neg_integer(). +-type level() + :: non_neg_integer(). %% yield_blockquery is used to determine if the work necessary to process a %% range query beyond the fetching the slot should be managed from within @@ -206,34 +220,33 @@ timings = no_timing :: sst_timings(), timings_countdown = 0 :: integer(), starting_pid :: pid()|undefined, - fetch_cache = array:new([{size, ?CACHE_SIZE}]) :: - array:array() | redacted, + fetch_cache = no_cache :: fetch_cache() | redacted, new_slots :: list()|undefined, deferred_startup_tuple :: tuple()|undefined, - level :: non_neg_integer()|undefined, + level :: level()|undefined, tomb_count = not_counted :: non_neg_integer()|not_counted, high_modified_date :: non_neg_integer()|undefined}). -record(sst_timings, - {sample_count = 0 :: integer(), - index_query_time = 0 :: integer(), - lookup_cache_time = 0 :: integer(), - slot_index_time = 0 :: integer(), - fetch_cache_time = 0 :: integer(), - slot_fetch_time = 0 :: integer(), - noncached_block_time = 0 :: integer(), - lookup_cache_count = 0 :: integer(), - slot_index_count = 0 :: integer(), - fetch_cache_count = 0 :: integer(), - slot_fetch_count = 0 :: integer(), - noncached_block_count = 0 :: integer()}). + {sample_count = 0 :: integer(), + index_query_time = 0 :: integer(), + lookup_cache_time = 0 :: integer(), + slot_index_time = 0 :: integer(), + fetch_cache_time = 0 :: integer(), + slot_fetch_time = 0 :: integer(), + noncached_block_time = 0 :: integer(), + lookup_cache_count = 0 :: integer(), + slot_index_count = 0 :: integer(), + fetch_cache_count = 0 :: integer(), + slot_fetch_count = 0 :: integer(), + noncached_block_count = 0 :: integer()}). -record(build_timings, - {slot_hashlist = 0 :: integer(), - slot_serialise = 0 :: integer(), - slot_finish = 0 :: integer(), - fold_toslot = 0 :: integer()}). + {slot_hashlist = 0 :: integer(), + slot_serialise = 0 :: integer(), + slot_finish = 0 :: integer(), + fold_toslot = 0 :: integer()}). -type sst_state() :: #state{}. -type sst_timings() :: no_timing|#sst_timings{}. @@ -245,7 +258,7 @@ %%% API %%%============================================================================ --spec sst_open(string(), string(), sst_options(), non_neg_integer()) +-spec sst_open(string(), string(), sst_options(), level()) -> {ok, pid(), {leveled_codec:ledger_key(), leveled_codec:ledger_key()}, binary()}. @@ -267,7 +280,7 @@ sst_open(RootPath, Filename, OptsSST, Level) -> {ok, Pid, {SK, EK}, Bloom} end. --spec sst_new(string(), string(), integer(), +-spec sst_new(string(), string(), level(), list(leveled_codec:ledger_kv()), integer(), sst_options()) -> {ok, pid(), @@ -310,7 +323,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> -spec sst_newmerge(string(), string(), list(leveled_codec:ledger_kv()|sst_pointer()), list(leveled_codec:ledger_kv()|sst_pointer()), - boolean(), integer(), + boolean(), level(), integer(), sst_options()) -> empty|{ok, pid(), {{list(leveled_codec:ledger_kv()), @@ -331,7 +344,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> %% file is not added to the manifest. sst_newmerge(RootPath, Filename, KVL1, KVL2, IsBasement, Level, - MaxSQN, OptsSST) -> + MaxSQN, OptsSST) when Level > 0 -> sst_newmerge(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN, OptsSST, ?INDEX_MODDATE, ?TOMB_COUNT). @@ -433,6 +446,15 @@ sst_get(Pid, LedgerKey) -> sst_get(Pid, LedgerKey, Hash) -> gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity). +-spec sst_getsqn(pid(), + leveled_codec:ledger_key(), + leveled_codec:segment_hash()) -> leveled_codec:sqn()|not_present. +%% @doc +%% Return a SQN for the key or not_present if the key is not in +%% the store (with the magic hash precalculated). +sst_getsqn(Pid, LedgerKey, Hash) -> + gen_fsm:sync_send_event(Pid, {get_sqn, LedgerKey, Hash}, infinity). + -spec sst_getmaxsequencenumber(pid()) -> integer(). %% @doc %% Get the maximume sequence number for this SST file @@ -540,7 +562,7 @@ starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) -> {reply, {ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom}, reader, - UpdState#state{level = Level}}; + UpdState#state{level = Level, fetch_cache = new_cache(Level)}}; starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN, @@ -580,7 +602,8 @@ starting({sst_new, UpdState#state{blockindex_cache = BlockIndex, high_modified_date = HighModDate, starting_pid = StartingPID, - level = Level}}; + level = Level, + fetch_cache = new_cache(Level)}}; starting({sst_newlevelzero, RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate}, _From, State) -> @@ -588,7 +611,10 @@ starting({sst_newlevelzero, RootPath, Filename, {RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate}, {reply, ok, starting, - State#state{deferred_startup_tuple = DeferredStartupTuple, level = 0}}; + State#state{ + deferred_startup_tuple = DeferredStartupTuple, + level = 0, + fetch_cache = new_cache(0)}}; starting(close, _From, State) -> % No file should have been created, so nothing to close. {stop, normal, ok, State}. @@ -694,6 +720,11 @@ starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) -> State#state{new_slots = FetchedSlots}} end. +reader({get_sqn, LedgerKey, Hash}, _From, State) -> + % Get a KV value and potentially take sample timings + {Result, UpdState, _UpdTimings} = + fetch(LedgerKey, Hash, State, no_timing), + {reply, sqn_only(Result), reader, UpdState, ?HIBERNATE_TIMEOUT}; reader({get_kv, LedgerKey, Hash}, _From, State) -> % Get a KV value and potentially take sample timings {Result, UpdState, UpdTimings} = @@ -795,10 +826,31 @@ reader(close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State}. +reader(timeout, State) -> + FreshFetchCache = new_cache(State#state.level), + Summary = State#state.summary, + FreshBlockIndexCache = new_blockindex_cache(Summary#summary.size), + {next_state, + reader, + State#state{ + fetch_cache = FreshFetchCache, + blockindex_cache = FreshBlockIndexCache}, + hibernate}; reader({switch_levels, NewLevel}, State) -> - {next_state, reader, State#state{level = NewLevel}, hibernate}. + FreshCache = new_cache(NewLevel), + {next_state, + reader, + State#state{ + level = NewLevel, + fetch_cache = FreshCache}, + hibernate}. +delete_pending({get_sqn, LedgerKey, Hash}, _From, State) -> + % Get a KV value and potentially take sample timings + {Result, UpdState, _UpdTimings} = + fetch(LedgerKey, Hash, State, no_timing), + {reply, sqn_only(Result), delete_pending, UpdState, ?DELETE_TIMEOUT}; delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> {Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing), {reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT}; @@ -1093,14 +1145,72 @@ member_check(Hash, {sets, HashSet}) -> member_check(_Miss, _Checker) -> false. +-spec sqn_only(leveled_codec:ledger_kv()|not_present) + -> leveled_codec:sqn()|not_present. +sqn_only(not_present) -> + not_present; +sqn_only(KV) -> + leveled_codec:strip_to_seqonly(KV). extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) -> tune_hash(SegHash); extract_hash(NotHash) -> NotHash. -cache_hash({_SegHash, ExtraHash}) when is_integer(ExtraHash) -> - ExtraHash band (?CACHE_SIZE - 1). + +-spec new_cache(level()) -> fetch_cache(). +new_cache(Level) -> + case cache_size(Level) of + no_cache -> + no_cache; + CacheSize -> + array:new([{size, CacheSize}]) + end. + +-spec cache_hash(leveled_codec:segment_hash(), non_neg_integer()) -> + cache_hash(). +cache_hash({_SegHash, ExtraHash}, Level) when is_integer(ExtraHash) -> + case cache_size(Level) of + no_cache -> no_cache; + CH -> ExtraHash band (CH - 1) + end. + +%% @doc +%% The lower the level, the bigger the memory cost of supporting the cache, +%% as each level has more files than the previous level. Load tests with +%% any sort of pareto distribution show far better cost/benefit ratios for +%% cache at higher levels. +-spec cache_size(level()) -> cache_size(). +cache_size(N) when N < 3 -> + 64; +cache_size(3) -> + 32; +cache_size(4) -> + 32; +cache_size(5) -> + 4; +cache_size(6) -> + 4; +cache_size(_LowerLevel) -> + no_cache. + +-spec fetch_from_cache( + cache_hash(), + fetch_cache()) -> undefined|leveled_codec:ledger_kv(). +fetch_from_cache(_CacheHash, no_cache) -> + undefined; +fetch_from_cache(CacheHash, Cache) -> + array:get(CacheHash, Cache). + +-spec add_to_cache( + non_neg_integer(), + leveled_codec:ledger_kv(), + fetch_cache()) -> fetch_cache(). +add_to_cache(_CacheHash, _KV, no_cache) -> + no_cache; +add_to_cache(CacheHash, KV, FetchCache) -> + array:set(CacheHash, KV, FetchCache). + -spec tune_hash(non_neg_integer()) -> non_neg_integer(). %% @doc @@ -1237,8 +1347,8 @@ fetch(LedgerKey, Hash, State, Timings0) -> {SW3, Timings3} = update_timings(SW2, Timings2, slot_index, true), FetchCache = State#state.fetch_cache, - CacheHash = cache_hash(Hash), - case array:get(CacheHash, FetchCache) of + CacheHash = cache_hash(Hash, State#state.level), + case fetch_from_cache(CacheHash, FetchCache) of {LedgerKey, V} -> {_SW4, Timings4} = update_timings(SW3, @@ -1257,8 +1367,8 @@ fetch(LedgerKey, Hash, State, Timings0) -> PressMethod, IdxModDate, not_present), - FetchCache0 = - array:set(CacheHash, Result, FetchCache), + FetchCache0 = + add_to_cache(CacheHash, Result, FetchCache), {_SW4, Timings4} = update_timings(SW3, Timings3, @@ -1350,7 +1460,7 @@ compress_level(Level, _PressMethod) when Level < ?COMPRESS_AT_LEVEL -> compress_level(_Level, PressMethod) -> PressMethod. --spec maxslots_level(non_neg_integer(), pos_integer()) -> pos_integer(). +-spec maxslots_level(level(), pos_integer()) -> pos_integer(). maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL -> MaxSlotCount; maxslots_level(_Level, MaxSlotCount) -> @@ -1384,8 +1494,9 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin, read_file(Filename, State, LoadPageCache) -> {Handle, FileVersion, SummaryBin} = - open_reader(filename:join(State#state.root_path, Filename), - LoadPageCache), + open_reader( + filename:join(State#state.root_path, Filename), + LoadPageCache), UpdState0 = imp_fileversion(FileVersion, State), {Summary, Bloom, SlotList, TombCount} = read_table_summary(SummaryBin, UpdState0#state.tomb_count), @@ -3618,12 +3729,78 @@ additional_range_test() -> % Test blows up anyway % R8 = sst_getkvrange(P1, element(1, PastEKV), element(1, PastEKV), 2), % ?assertMatch([], R8). - + +simple_switchcache_test() -> + {RP, Filename} = {?TEST_AREA, "simple_switchcache_test"}, + KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 2, 1, 20), + KVList1 = lists:sublist(lists:ukeysort(1, KVList0), ?LOOK_SLOTSIZE), + [{FirstKey, _FV}|_Rest] = KVList1, + {LastKey, _LV} = lists:last(KVList1), + {ok, OpenP4, {FirstKey, LastKey}, Bloom} = + testsst_new(RP, Filename, 4, KVList1, length(KVList1), native), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP4, K)) + end, + KVList1), + ok = sst_switchlevels(OpenP4, 5), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP4, K)) + end, + KVList1), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP4, K)) + end, + KVList1), + gen_fsm:send_event(OpenP4, timeout), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP4, K)) + end, + KVList1), + ok = sst_close(OpenP4), + OptsSST = #sst_options{press_method=native, + log_options=leveled_log:get_opts()}, + {ok, OpenP5, {FirstKey, LastKey}, Bloom} = + sst_open(RP, Filename ++ ".sst", OptsSST, 5), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP5, K)) + end, + KVList1), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP5, K)) + end, + KVList1), + gen_fsm:send_event(OpenP5, timeout), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP5, K)) + end, + KVList1), + ok = sst_switchlevels(OpenP5, 6), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP5, K)) + end, + KVList1), + gen_fsm:send_event(OpenP5, timeout), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP5, K)) + end, + KVList1), + ok = sst_switchlevels(OpenP5, 7), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP5, K)) + end, + KVList1), + gen_fsm:send_event(OpenP5, timeout), + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP5, K)) + end, + KVList1), + ok = sst_close(OpenP5), + ok = file:delete(filename:join(RP, Filename ++ ".sst")). + simple_persisted_slotsize_test() -> simple_persisted_slotsize_tester(fun testsst_new/6). - simple_persisted_slotsize_tester(SSTNewFun) -> {RP, Filename} = {?TEST_AREA, "simple_slotsize_test"}, KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 2, 1, 20), @@ -3640,6 +3817,24 @@ simple_persisted_slotsize_tester(SSTNewFun) -> ok = sst_close(Pid), ok = file:delete(filename:join(RP, Filename ++ ".sst")). +reader_hibernate_test_() -> + {timeout, 90, fun reader_hibernate_tester/0}. + +reader_hibernate_tester() -> + {RP, Filename} = {?TEST_AREA, "readerhibernate_test"}, + KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20), + KVList1 = lists:ukeysort(1, KVList0), + [{FirstKey, FV}|_Rest] = KVList1, + {LastKey, _LV} = lists:last(KVList1), + {ok, Pid, {FirstKey, LastKey}, _Bloom} = + testsst_new(RP, Filename, 1, KVList1, length(KVList1), native), + ?assertMatch({FirstKey, FV}, sst_get(Pid, FirstKey)), + SQN = leveled_codec:strip_to_seqonly({FirstKey, FV}), + ?assertMatch( + SQN, + sst_getsqn(Pid, FirstKey, leveled_codec:segment_hash(FirstKey))), + timer:sleep(?HIBERNATE_TIMEOUT + 1000), + ?assertMatch({FirstKey, FV}, sst_get(Pid, FirstKey)). delete_pending_test_() -> {timeout, 30, fun delete_pending_tester/0}.