parent
eedd09a23d
commit
2e0b20a071
3 changed files with 32 additions and 63 deletions
|
@ -136,7 +136,6 @@
|
||||||
|
|
||||||
-export_type([tag/0,
|
-export_type([tag/0,
|
||||||
key/0,
|
key/0,
|
||||||
sqn/0,
|
|
||||||
object_spec/0,
|
object_spec/0,
|
||||||
segment_hash/0,
|
segment_hash/0,
|
||||||
ledger_status/0,
|
ledger_status/0,
|
||||||
|
|
|
@ -749,14 +749,12 @@ handle_call({fetch, Key, Hash, UseL0Index}, _From, State) ->
|
||||||
{reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}};
|
{reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}};
|
||||||
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
compare_to_sqn(
|
compare_to_sqn(plain_fetch_mem(Key,
|
||||||
fetch_sqn(
|
Hash,
|
||||||
Key,
|
State#state.manifest,
|
||||||
Hash,
|
State#state.levelzero_cache,
|
||||||
State#state.manifest,
|
State#state.levelzero_index),
|
||||||
State#state.levelzero_cache,
|
SQN),
|
||||||
State#state.levelzero_index),
|
|
||||||
SQN),
|
|
||||||
State};
|
State};
|
||||||
handle_call({fetch_keys,
|
handle_call({fetch_keys,
|
||||||
StartKey, EndKey,
|
StartKey, EndKey,
|
||||||
|
@ -1458,18 +1456,14 @@ timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) ->
|
||||||
{R, UpdTimings}.
|
{R, UpdTimings}.
|
||||||
|
|
||||||
|
|
||||||
-spec fetch_sqn(
|
-spec plain_fetch_mem(tuple(), {integer(), integer()},
|
||||||
leveled_codec:ledger_key(),
|
leveled_pmanifest:manifest(), list(),
|
||||||
leveled_codec:segment_hash(),
|
leveled_pmem:index_array()) -> not_present|tuple().
|
||||||
leveled_pmanifest:manifest(),
|
|
||||||
list(),
|
|
||||||
leveled_pmem:index_array()) ->
|
|
||||||
not_present|leveled_codec:ledger_kv()|leveled_codec:ledger_sqn().
|
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Fetch the result from the penciller, starting by looking in the memory,
|
%% Fetch the result from the penciller, starting by looking in the memory,
|
||||||
%% and if it is not found looking down level by level through the LSM tree.
|
%% and if it is not found looking down level by level through the LSM tree.
|
||||||
fetch_sqn(Key, Hash, Manifest, L0Cache, L0Index) ->
|
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun sst_getsqn/4),
|
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun sst_get/4),
|
||||||
element(1, R).
|
element(1, R).
|
||||||
|
|
||||||
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, FetchFun) ->
|
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, FetchFun) ->
|
||||||
|
@ -1522,8 +1516,8 @@ timed_sst_get(PID, Key, Hash, Level) ->
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
T0 = timer:now_diff(os:timestamp(), SW),
|
||||||
log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH).
|
log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH).
|
||||||
|
|
||||||
sst_getsqn(PID, Key, Hash, _Level) ->
|
sst_get(PID, Key, Hash, Level) ->
|
||||||
leveled_sst:sst_getsqn(PID, Key, Hash).
|
leveled_sst:sst_get(PID, Key, Hash).
|
||||||
|
|
||||||
log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
||||||
case {T0, R} of
|
case {T0, R} of
|
||||||
|
@ -1538,26 +1532,29 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec compare_to_sqn(
|
-spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check().
|
||||||
leveled_codec:ledger_kv()|leveled_codec:sqn()|not_present,
|
|
||||||
integer()) -> sqn_check().
|
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Check to see if the SQN in the penciller is after the SQN expected for an
|
%% Check to see if the SQN in the penciller is after the SQN expected for an
|
||||||
%% object (used to allow the journal to check compaction status from a cache
|
%% object (used to allow the journal to check compaction status from a cache
|
||||||
%% of the ledger - objects with a more recent sequence number can be compacted).
|
%% of the ledger - objects with a more recent sequence number can be compacted).
|
||||||
compare_to_sqn(not_present, _SQN) ->
|
|
||||||
missing;
|
|
||||||
compare_to_sqn(ObjSQN, SQN) when is_integer(ObjSQN), ObjSQN > SQN ->
|
|
||||||
replaced;
|
|
||||||
compare_to_sqn(ObjSQN, _SQN) when is_integer(ObjSQN) ->
|
|
||||||
% Normally we would expect the SQN to be equal here, but
|
|
||||||
% this also allows for the Journal to have a more advanced
|
|
||||||
% value. We return true here as we wouldn't want to
|
|
||||||
% compact thta more advanced value, but this may cause
|
|
||||||
% confusion in snapshots.
|
|
||||||
current;
|
|
||||||
compare_to_sqn(Obj, SQN) ->
|
compare_to_sqn(Obj, SQN) ->
|
||||||
compare_to_sqn(leveled_codec:strip_to_seqonly(Obj), SQN).
|
case Obj of
|
||||||
|
not_present ->
|
||||||
|
missing;
|
||||||
|
Obj ->
|
||||||
|
SQNToCompare = leveled_codec:strip_to_seqonly(Obj),
|
||||||
|
if
|
||||||
|
SQNToCompare > SQN ->
|
||||||
|
replaced;
|
||||||
|
true ->
|
||||||
|
% Normally we would expect the SQN to be equal here, but
|
||||||
|
% this also allows for the Journal to have a more advanced
|
||||||
|
% value. We return true here as we wouldn't want to
|
||||||
|
% compact thta more advanced value, but this may cause
|
||||||
|
% confusion in snapshots.
|
||||||
|
current
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
|
@ -96,7 +96,6 @@
|
||||||
-define(TOMB_COUNT, true).
|
-define(TOMB_COUNT, true).
|
||||||
-define(USE_SET_FOR_SPEED, 64).
|
-define(USE_SET_FOR_SPEED, 64).
|
||||||
-define(STARTUP_TIMEOUT, 10000).
|
-define(STARTUP_TIMEOUT, 10000).
|
||||||
-define(HIBERNATE_TIMEOUT, 60000).
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -120,7 +119,6 @@
|
||||||
sst_open/4,
|
sst_open/4,
|
||||||
sst_get/2,
|
sst_get/2,
|
||||||
sst_get/3,
|
sst_get/3,
|
||||||
sst_getsqn/3,
|
|
||||||
sst_expandpointer/5,
|
sst_expandpointer/5,
|
||||||
sst_getmaxsequencenumber/1,
|
sst_getmaxsequencenumber/1,
|
||||||
sst_setfordelete/2,
|
sst_setfordelete/2,
|
||||||
|
@ -436,15 +434,6 @@ sst_get(Pid, LedgerKey) ->
|
||||||
sst_get(Pid, LedgerKey, Hash) ->
|
sst_get(Pid, LedgerKey, Hash) ->
|
||||||
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
|
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
|
||||||
|
|
||||||
-spec sst_getsqn(pid(),
|
|
||||||
leveled_codec:ledger_key(),
|
|
||||||
leveled_codec:segment_hash()) -> leveled_codec:sqn()|not_present.
|
|
||||||
%% @doc
|
|
||||||
%% Return a SQN for the key or not_present if the key is not in
|
|
||||||
%% the store (with the magic hash precalculated).
|
|
||||||
sst_getsqn(Pid, LedgerKey, Hash) ->
|
|
||||||
gen_fsm:sync_send_event(Pid, {get_sqn, LedgerKey, Hash}, infinity).
|
|
||||||
|
|
||||||
-spec sst_getmaxsequencenumber(pid()) -> integer().
|
-spec sst_getmaxsequencenumber(pid()) -> integer().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Get the maximume sequence number for this SST file
|
%% Get the maximume sequence number for this SST file
|
||||||
|
@ -706,11 +695,6 @@ starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
|
||||||
State#state{new_slots = FetchedSlots}}
|
State#state{new_slots = FetchedSlots}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reader({get_sqn, LedgerKey, Hash}, _From, State) ->
|
|
||||||
% Get a KV value and potentially take sample timings
|
|
||||||
{Result, UpdState, _UpdTimings} =
|
|
||||||
fetch(LedgerKey, Hash, State, no_timing),
|
|
||||||
{reply, sqn_only(Result), reader, UpdState, ?HIBERNATE_TIMEOUT};
|
|
||||||
reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
% Get a KV value and potentially take sample timings
|
% Get a KV value and potentially take sample timings
|
||||||
{Result, UpdState, UpdTimings} =
|
{Result, UpdState, UpdTimings} =
|
||||||
|
@ -816,11 +800,6 @@ reader({switch_levels, NewLevel}, State) ->
|
||||||
{next_state, reader, State#state{level = NewLevel}, hibernate}.
|
{next_state, reader, State#state{level = NewLevel}, hibernate}.
|
||||||
|
|
||||||
|
|
||||||
delete_pending({get_sqn, LedgerKey, Hash}, _From, State) ->
|
|
||||||
% Get a KV value and potentially take sample timings
|
|
||||||
{Result, UpdState, _UpdTimings} =
|
|
||||||
fetch(LedgerKey, Hash, State, no_timing),
|
|
||||||
{reply, sqn_only(Result), delete_pending, UpdState, ?DELETE_TIMEOUT};
|
|
||||||
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
|
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
|
||||||
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
||||||
|
@ -1115,12 +1094,6 @@ member_check(Hash, {sets, HashSet}) ->
|
||||||
member_check(_Miss, _Checker) ->
|
member_check(_Miss, _Checker) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
-spec sqn_only(leveled_codec:ledger_kv()|not_present)
|
|
||||||
-> leveled_codec:sqn()|not_present.
|
|
||||||
sqn_only(not_present) ->
|
|
||||||
not_present;
|
|
||||||
sqn_only(KV) ->
|
|
||||||
leveled_codec:strip_to_seqonly(KV).
|
|
||||||
|
|
||||||
extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) ->
|
extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) ->
|
||||||
tune_hash(SegHash);
|
tune_hash(SegHash);
|
||||||
|
@ -1140,7 +1113,7 @@ fetch_from_cache(CacheHash, Cache) ->
|
||||||
|
|
||||||
-spec add_to_cache(
|
-spec add_to_cache(
|
||||||
non_neg_integer(),
|
non_neg_integer(),
|
||||||
leveled_codec:ledger_kv(),
|
leveled_codec:ledger_kv()
|
||||||
fetch_cache()) -> fetch_cache().
|
fetch_cache()) -> fetch_cache().
|
||||||
add_to_cache(_CacheHash, _KV, no_cache) ->
|
add_to_cache(_CacheHash, _KV, no_cache) ->
|
||||||
no_cache;
|
no_cache;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue