Hibernate on SQN check

SQN check in the penciller is used for journal (all object) folds, but mainly for journal compaction.  Use this to trigger hibernation where SST files stay quiet after the compaction check.
This commit is contained in:
Martin Sumner 2022-03-10 11:23:53 +00:00
parent fb490b9af7
commit eedd09a23d
3 changed files with 63 additions and 32 deletions

View file

@ -136,6 +136,7 @@
-export_type([tag/0, -export_type([tag/0,
key/0, key/0,
sqn/0,
object_spec/0, object_spec/0,
segment_hash/0, segment_hash/0,
ledger_status/0, ledger_status/0,

View file

@ -749,12 +749,14 @@ handle_call({fetch, Key, Hash, UseL0Index}, _From, State) ->
{reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}}; {reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}};
handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
{reply, {reply,
compare_to_sqn(plain_fetch_mem(Key, compare_to_sqn(
Hash, fetch_sqn(
State#state.manifest, Key,
State#state.levelzero_cache, Hash,
State#state.levelzero_index), State#state.manifest,
SQN), State#state.levelzero_cache,
State#state.levelzero_index),
SQN),
State}; State};
handle_call({fetch_keys, handle_call({fetch_keys,
StartKey, EndKey, StartKey, EndKey,
@ -1456,14 +1458,18 @@ timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) ->
{R, UpdTimings}. {R, UpdTimings}.
-spec plain_fetch_mem(tuple(), {integer(), integer()}, -spec fetch_sqn(
leveled_pmanifest:manifest(), list(), leveled_codec:ledger_key(),
leveled_pmem:index_array()) -> not_present|tuple(). leveled_codec:segment_hash(),
leveled_pmanifest:manifest(),
list(),
leveled_pmem:index_array()) ->
not_present|leveled_codec:ledger_kv()|leveled_codec:ledger_sqn().
%% @doc %% @doc
%% Fetch the result from the penciller, starting by looking in the memory, %% Fetch the result from the penciller, starting by looking in the memory,
%% and if it is not found looking down level by level through the LSM tree. %% and if it is not found looking down level by level through the LSM tree.
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> fetch_sqn(Key, Hash, Manifest, L0Cache, L0Index) ->
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun sst_get/4), R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun sst_getsqn/4),
element(1, R). element(1, R).
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, FetchFun) -> fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, FetchFun) ->
@ -1516,8 +1522,8 @@ timed_sst_get(PID, Key, Hash, Level) ->
T0 = timer:now_diff(os:timestamp(), SW), T0 = timer:now_diff(os:timestamp(), SW),
log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH). log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH).
sst_get(PID, Key, Hash, Level) -> sst_getsqn(PID, Key, Hash, _Level) ->
leveled_sst:sst_get(PID, Key, Hash). leveled_sst:sst_getsqn(PID, Key, Hash).
log_slowfetch(T0, R, PID, Level, FetchTolerance) -> log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
case {T0, R} of case {T0, R} of
@ -1532,29 +1538,26 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
end. end.
-spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check(). -spec compare_to_sqn(
leveled_codec:ledger_kv()|leveled_codec:sqn()|not_present,
integer()) -> sqn_check().
%% @doc %% @doc
%% Check to see if the SQN in the penciller is after the SQN expected for an %% Check to see if the SQN in the penciller is after the SQN expected for an
%% object (used to allow the journal to check compaction status from a cache %% object (used to allow the journal to check compaction status from a cache
%% of the ledger - objects with a more recent sequence number can be compacted). %% of the ledger - objects with a more recent sequence number can be compacted).
compare_to_sqn(not_present, _SQN) ->
missing;
compare_to_sqn(ObjSQN, SQN) when is_integer(ObjSQN), ObjSQN > SQN ->
replaced;
compare_to_sqn(ObjSQN, _SQN) when is_integer(ObjSQN) ->
% Normally we would expect the SQN to be equal here, but
% this also allows for the Journal to have a more advanced
% value. We return true here as we wouldn't want to
% compact thta more advanced value, but this may cause
% confusion in snapshots.
current;
compare_to_sqn(Obj, SQN) -> compare_to_sqn(Obj, SQN) ->
case Obj of compare_to_sqn(leveled_codec:strip_to_seqonly(Obj), SQN).
not_present ->
missing;
Obj ->
SQNToCompare = leveled_codec:strip_to_seqonly(Obj),
if
SQNToCompare > SQN ->
replaced;
true ->
% Normally we would expect the SQN to be equal here, but
% this also allows for the Journal to have a more advanced
% value. We return true here as we wouldn't want to
% compact thta more advanced value, but this may cause
% confusion in snapshots.
current
end
end.
%%%============================================================================ %%%============================================================================

View file

@ -96,6 +96,7 @@
-define(TOMB_COUNT, true). -define(TOMB_COUNT, true).
-define(USE_SET_FOR_SPEED, 64). -define(USE_SET_FOR_SPEED, 64).
-define(STARTUP_TIMEOUT, 10000). -define(STARTUP_TIMEOUT, 10000).
-define(HIBERNATE_TIMEOUT, 60000).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -119,6 +120,7 @@
sst_open/4, sst_open/4,
sst_get/2, sst_get/2,
sst_get/3, sst_get/3,
sst_getsqn/3,
sst_expandpointer/5, sst_expandpointer/5,
sst_getmaxsequencenumber/1, sst_getmaxsequencenumber/1,
sst_setfordelete/2, sst_setfordelete/2,
@ -434,6 +436,15 @@ sst_get(Pid, LedgerKey) ->
sst_get(Pid, LedgerKey, Hash) -> sst_get(Pid, LedgerKey, Hash) ->
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity). gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
-spec sst_getsqn(pid(),
leveled_codec:ledger_key(),
leveled_codec:segment_hash()) -> leveled_codec:sqn()|not_present.
%% @doc
%% Return a SQN for the key or not_present if the key is not in
%% the store (with the magic hash precalculated).
sst_getsqn(Pid, LedgerKey, Hash) ->
gen_fsm:sync_send_event(Pid, {get_sqn, LedgerKey, Hash}, infinity).
-spec sst_getmaxsequencenumber(pid()) -> integer(). -spec sst_getmaxsequencenumber(pid()) -> integer().
%% @doc %% @doc
%% Get the maximume sequence number for this SST file %% Get the maximume sequence number for this SST file
@ -695,6 +706,11 @@ starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
State#state{new_slots = FetchedSlots}} State#state{new_slots = FetchedSlots}}
end. end.
reader({get_sqn, LedgerKey, Hash}, _From, State) ->
% Get a KV value and potentially take sample timings
{Result, UpdState, _UpdTimings} =
fetch(LedgerKey, Hash, State, no_timing),
{reply, sqn_only(Result), reader, UpdState, ?HIBERNATE_TIMEOUT};
reader({get_kv, LedgerKey, Hash}, _From, State) -> reader({get_kv, LedgerKey, Hash}, _From, State) ->
% Get a KV value and potentially take sample timings % Get a KV value and potentially take sample timings
{Result, UpdState, UpdTimings} = {Result, UpdState, UpdTimings} =
@ -800,6 +816,11 @@ reader({switch_levels, NewLevel}, State) ->
{next_state, reader, State#state{level = NewLevel}, hibernate}. {next_state, reader, State#state{level = NewLevel}, hibernate}.
delete_pending({get_sqn, LedgerKey, Hash}, _From, State) ->
% Get a KV value and potentially take sample timings
{Result, UpdState, _UpdTimings} =
fetch(LedgerKey, Hash, State, no_timing),
{reply, sqn_only(Result), delete_pending, UpdState, ?DELETE_TIMEOUT};
delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing), {Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT}; {reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
@ -1094,6 +1115,12 @@ member_check(Hash, {sets, HashSet}) ->
member_check(_Miss, _Checker) -> member_check(_Miss, _Checker) ->
false. false.
-spec sqn_only(leveled_codec:ledger_kv()|not_present)
-> leveled_codec:sqn()|not_present.
sqn_only(not_present) ->
not_present;
sqn_only(KV) ->
leveled_codec:strip_to_seqonly(KV).
extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) -> extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) ->
tune_hash(SegHash); tune_hash(SegHash);
@ -1113,7 +1140,7 @@ fetch_from_cache(CacheHash, Cache) ->
-spec add_to_cache( -spec add_to_cache(
non_neg_integer(), non_neg_integer(),
leveled_codec:ledger_kv() leveled_codec:ledger_kv(),
fetch_cache()) -> fetch_cache(). fetch_cache()) -> fetch_cache().
add_to_cache(_CacheHash, _KV, no_cache) -> add_to_cache(_CacheHash, _KV, no_cache) ->
no_cache; no_cache;