diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index d040723..e8be244 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -136,6 +136,7 @@ -export_type([tag/0, key/0, + sqn/0, object_spec/0, segment_hash/0, ledger_status/0, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 653c2a8..d41ad7a 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -749,12 +749,14 @@ handle_call({fetch, Key, Hash, UseL0Index}, _From, State) -> {reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}}; handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> {reply, - compare_to_sqn(plain_fetch_mem(Key, - Hash, - State#state.manifest, - State#state.levelzero_cache, - State#state.levelzero_index), - SQN), + compare_to_sqn( + fetch_sqn( + Key, + Hash, + State#state.manifest, + State#state.levelzero_cache, + State#state.levelzero_index), + SQN), State}; handle_call({fetch_keys, StartKey, EndKey, @@ -1456,14 +1458,18 @@ timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) -> {R, UpdTimings}. --spec plain_fetch_mem(tuple(), {integer(), integer()}, - leveled_pmanifest:manifest(), list(), - leveled_pmem:index_array()) -> not_present|tuple(). +-spec fetch_sqn( + leveled_codec:ledger_key(), + leveled_codec:segment_hash(), + leveled_pmanifest:manifest(), + list(), + leveled_pmem:index_array()) -> + not_present|leveled_codec:ledger_kv()|leveled_codec:ledger_sqn(). %% @doc %% Fetch the result from the penciller, starting by looking in the memory, %% and if it is not found looking down level by level through the LSM tree. -plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> - R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun sst_get/4), +fetch_sqn(Key, Hash, Manifest, L0Cache, L0Index) -> + R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun sst_getsqn/4), element(1, R). fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, FetchFun) -> @@ -1516,8 +1522,8 @@ timed_sst_get(PID, Key, Hash, Level) -> T0 = timer:now_diff(os:timestamp(), SW), log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH). -sst_get(PID, Key, Hash, Level) -> - leveled_sst:sst_get(PID, Key, Hash). +sst_getsqn(PID, Key, Hash, _Level) -> + leveled_sst:sst_getsqn(PID, Key, Hash). log_slowfetch(T0, R, PID, Level, FetchTolerance) -> case {T0, R} of @@ -1532,29 +1538,26 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) -> end. --spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check(). +-spec compare_to_sqn( + leveled_codec:ledger_kv()|leveled_codec:sqn()|not_present, + integer()) -> sqn_check(). %% @doc %% Check to see if the SQN in the penciller is after the SQN expected for an %% object (used to allow the journal to check compaction status from a cache %% of the ledger - objects with a more recent sequence number can be compacted). +compare_to_sqn(not_present, _SQN) -> + missing; +compare_to_sqn(ObjSQN, SQN) when is_integer(ObjSQN), ObjSQN > SQN -> + replaced; +compare_to_sqn(ObjSQN, _SQN) when is_integer(ObjSQN) -> + % Normally we would expect the SQN to be equal here, but + % this also allows for the Journal to have a more advanced + % value. We return true here as we wouldn't want to + % compact thta more advanced value, but this may cause + % confusion in snapshots. + current; compare_to_sqn(Obj, SQN) -> - case Obj of - not_present -> - missing; - Obj -> - SQNToCompare = leveled_codec:strip_to_seqonly(Obj), - if - SQNToCompare > SQN -> - replaced; - true -> - % Normally we would expect the SQN to be equal here, but - % this also allows for the Journal to have a more advanced - % value. We return true here as we wouldn't want to - % compact thta more advanced value, but this may cause - % confusion in snapshots. - current - end - end. + compare_to_sqn(leveled_codec:strip_to_seqonly(Obj), SQN). %%%============================================================================ diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 98bc8aa..55c1d17 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -96,6 +96,7 @@ -define(TOMB_COUNT, true). -define(USE_SET_FOR_SPEED, 64). -define(STARTUP_TIMEOUT, 10000). +-define(HIBERNATE_TIMEOUT, 60000). -include_lib("eunit/include/eunit.hrl"). @@ -119,6 +120,7 @@ sst_open/4, sst_get/2, sst_get/3, + sst_getsqn/3, sst_expandpointer/5, sst_getmaxsequencenumber/1, sst_setfordelete/2, @@ -434,6 +436,15 @@ sst_get(Pid, LedgerKey) -> sst_get(Pid, LedgerKey, Hash) -> gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity). +-spec sst_getsqn(pid(), + leveled_codec:ledger_key(), + leveled_codec:segment_hash()) -> leveled_codec:sqn()|not_present. +%% @doc +%% Return a SQN for the key or not_present if the key is not in +%% the store (with the magic hash precalculated). +sst_getsqn(Pid, LedgerKey, Hash) -> + gen_fsm:sync_send_event(Pid, {get_sqn, LedgerKey, Hash}, infinity). + -spec sst_getmaxsequencenumber(pid()) -> integer(). %% @doc %% Get the maximume sequence number for this SST file @@ -695,6 +706,11 @@ starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) -> State#state{new_slots = FetchedSlots}} end. +reader({get_sqn, LedgerKey, Hash}, _From, State) -> + % Get a KV value and potentially take sample timings + {Result, UpdState, _UpdTimings} = + fetch(LedgerKey, Hash, State, no_timing), + {reply, sqn_only(Result), reader, UpdState, ?HIBERNATE_TIMEOUT}; reader({get_kv, LedgerKey, Hash}, _From, State) -> % Get a KV value and potentially take sample timings {Result, UpdState, UpdTimings} = @@ -800,6 +816,11 @@ reader({switch_levels, NewLevel}, State) -> {next_state, reader, State#state{level = NewLevel}, hibernate}. +delete_pending({get_sqn, LedgerKey, Hash}, _From, State) -> + % Get a KV value and potentially take sample timings + {Result, UpdState, _UpdTimings} = + fetch(LedgerKey, Hash, State, no_timing), + {reply, sqn_only(Result), delete_pending, UpdState, ?DELETE_TIMEOUT}; delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> {Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing), {reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT}; @@ -1094,6 +1115,12 @@ member_check(Hash, {sets, HashSet}) -> member_check(_Miss, _Checker) -> false. +-spec sqn_only(leveled_codec:ledger_kv()|not_present) + -> leveled_codec:sqn()|not_present. +sqn_only(not_present) -> + not_present; +sqn_only(KV) -> + leveled_codec:strip_to_seqonly(KV). extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) -> tune_hash(SegHash); @@ -1113,7 +1140,7 @@ fetch_from_cache(CacheHash, Cache) -> -spec add_to_cache( non_neg_integer(), - leveled_codec:ledger_kv() + leveled_codec:ledger_kv(), fetch_cache()) -> fetch_cache(). add_to_cache(_CacheHash, _KV, no_cache) -> no_cache;