Mas i370 d31 sstmemory (#373)
* Don't use fetch_cache below the page_cache level * Don't time fetches due to SQN checks SQN checks are all background processes * Hibernate on SQN check SQN check in the penciller is used for journal (all object) folds, but mainly for journal compaction. Use this to trigger hibernation where SST files stay quiet after the compaction check. * Add catch for hibernate timeout * Scale cache_size with level Based on volume testing. Relatively speaking, far higher value to be gained from caches at higher levels (lower numbered levels). The cache at lower levels are proportionally much less efficient. so cache more at higher levels, where there is value, and less at lower levels where there is more cost relative to value. * OTP 24 fix to cherry-pick * Make minimal change to previous setup Making significant change appears to not have had the expected positive improvement - so a more minimal change is proposed. The assumption is that the cache only really gets used for double reads in the write path (e.g. where the application reads before a write) - and so a large cache make minimal difference, but no cache still has a downside. * Introduce new types * Mas i370 d30 sstmemory (#374) * Don't time fetches due to SQN checks SQN checks are all background processes * Hibernate on SQN check SQN check in the penciller is used for journal (all object) folds, but mainly for journal compaction. Use this to trigger hibernation where SST files stay quiet after the compaction check. * Add catch for hibernate timeout * Scale cache_size with level Based on volume testing. Relatively speaking, far higher value to be gained from caches at higher levels (lower numbered levels). The cache at lower levels are proportionally much less efficient. so cache more at higher levels, where there is value, and less at lower levels where there is more cost relative to value. * Make minimal change to previous setup Making significant change appears to not have had the expected positive improvement - so a more minimal change is proposed. The assumption is that the cache only really gets used for double reads in the write path (e.g. where the application reads before a write) - and so a large cache make minimal difference, but no cache still has a downside. * Introduce new types * More memory management Clear blockindex_cache on timeout, and manually GC on pclerk after work. * Add further garbage collection prompt After fetching level zero, significant change in references in the penciller memory, so prompt a garbage_collect() at this point.
This commit is contained in:
parent
75edb7293d
commit
f8485210ed
4 changed files with 293 additions and 74 deletions
|
@ -136,6 +136,7 @@
|
||||||
|
|
||||||
-export_type([tag/0,
|
-export_type([tag/0,
|
||||||
key/0,
|
key/0,
|
||||||
|
sqn/0,
|
||||||
object_spec/0,
|
object_spec/0,
|
||||||
segment_hash/0,
|
segment_hash/0,
|
||||||
ledger_status/0,
|
ledger_status/0,
|
||||||
|
|
|
@ -146,6 +146,11 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
|
||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
request_work(State),
|
request_work(State),
|
||||||
|
% When handling work, the clerk can collect a large number of binary
|
||||||
|
% references, so proactively GC this process before receiving any future
|
||||||
|
% work. In under pressure clusters, clerks with large binary memory
|
||||||
|
% footprints can occur.
|
||||||
|
garbage_collect(),
|
||||||
{noreply, State, ?MAX_TIMEOUT}.
|
{noreply, State, ?MAX_TIMEOUT}.
|
||||||
|
|
||||||
terminate(Reason, _State) ->
|
terminate(Reason, _State) ->
|
||||||
|
|
|
@ -396,7 +396,7 @@ pcl_fetchlevelzero(Pid, Slot, ReturnFun) ->
|
||||||
%% The Key needs to be hashable (i.e. have a tag which indicates that the key
|
%% The Key needs to be hashable (i.e. have a tag which indicates that the key
|
||||||
%% can be looked up) - index entries are not hashable for example.
|
%% can be looked up) - index entries are not hashable for example.
|
||||||
%%
|
%%
|
||||||
%% If the hash is already knonw, call pcl_fetch/3 as segment_hash is a
|
%% If the hash is already known, call pcl_fetch/3 as segment_hash is a
|
||||||
%% relatively expensive hash function
|
%% relatively expensive hash function
|
||||||
pcl_fetch(Pid, Key) ->
|
pcl_fetch(Pid, Key) ->
|
||||||
Hash = leveled_codec:segment_hash(Key),
|
Hash = leveled_codec:segment_hash(Key),
|
||||||
|
@ -749,12 +749,14 @@ handle_call({fetch, Key, Hash, UseL0Index}, _From, State) ->
|
||||||
{reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}};
|
{reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}};
|
||||||
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
compare_to_sqn(plain_fetch_mem(Key,
|
compare_to_sqn(
|
||||||
Hash,
|
fetch_sqn(
|
||||||
State#state.manifest,
|
Key,
|
||||||
State#state.levelzero_cache,
|
Hash,
|
||||||
State#state.levelzero_index),
|
State#state.manifest,
|
||||||
SQN),
|
State#state.levelzero_cache,
|
||||||
|
State#state.levelzero_index),
|
||||||
|
SQN),
|
||||||
State};
|
State};
|
||||||
handle_call({fetch_keys,
|
handle_call({fetch_keys,
|
||||||
StartKey, EndKey,
|
StartKey, EndKey,
|
||||||
|
@ -1066,8 +1068,19 @@ handle_cast(work_for_clerk, State) ->
|
||||||
Backlog = N > ?WORKQUEUE_BACKLOG_TOLERANCE,
|
Backlog = N > ?WORKQUEUE_BACKLOG_TOLERANCE,
|
||||||
leveled_log:log("P0024", [N, Backlog]),
|
leveled_log:log("P0024", [N, Backlog]),
|
||||||
[TL|_Tail] = WL,
|
[TL|_Tail] = WL,
|
||||||
ok = leveled_pclerk:clerk_push(State#state.clerk,
|
ok =
|
||||||
{TL, State#state.manifest}),
|
leveled_pclerk:clerk_push(
|
||||||
|
State#state.clerk, {TL, State#state.manifest}),
|
||||||
|
case TL of
|
||||||
|
0 ->
|
||||||
|
% Just written a L0 so as LoopState now rewritten,
|
||||||
|
% garbage collect to free as much as possible as
|
||||||
|
% soon as possible
|
||||||
|
garbage_collect();
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
|
||||||
{noreply,
|
{noreply,
|
||||||
State#state{work_backlog=Backlog, work_ongoing=true}}
|
State#state{work_backlog=Backlog, work_ongoing=true}}
|
||||||
end;
|
end;
|
||||||
|
@ -1450,22 +1463,27 @@ roll_memory(State, true) ->
|
||||||
%% the result tuple includes the level at which the result was found.
|
%% the result tuple includes the level at which the result was found.
|
||||||
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) ->
|
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
{R, Level} =
|
||||||
|
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun timed_sst_get/4),
|
||||||
UpdTimings = update_timings(SW, Timings, R, Level),
|
UpdTimings = update_timings(SW, Timings, R, Level),
|
||||||
{R, UpdTimings}.
|
{R, UpdTimings}.
|
||||||
|
|
||||||
|
|
||||||
-spec plain_fetch_mem(tuple(), {integer(), integer()},
|
-spec fetch_sqn(
|
||||||
leveled_pmanifest:manifest(), list(),
|
leveled_codec:ledger_key(),
|
||||||
leveled_pmem:index_array()) -> not_present|tuple().
|
leveled_codec:segment_hash(),
|
||||||
|
leveled_pmanifest:manifest(),
|
||||||
|
list(),
|
||||||
|
leveled_pmem:index_array()) ->
|
||||||
|
not_present|leveled_codec:ledger_kv()|leveled_codec:ledger_sqn().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Fetch the result from the penciller, starting by looking in the memory,
|
%% Fetch the result from the penciller, starting by looking in the memory,
|
||||||
%% and if it is not found looking down level by level through the LSM tree.
|
%% and if it is not found looking down level by level through the LSM tree.
|
||||||
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
fetch_sqn(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, fun sst_getsqn/4),
|
||||||
element(1, R).
|
element(1, R).
|
||||||
|
|
||||||
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, FetchFun) ->
|
||||||
PosList =
|
PosList =
|
||||||
case L0Index of
|
case L0Index of
|
||||||
none ->
|
none ->
|
||||||
|
@ -1476,7 +1494,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
|
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
|
||||||
case L0Check of
|
case L0Check of
|
||||||
{false, not_found} ->
|
{false, not_found} ->
|
||||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4);
|
fetch(Key, Hash, Manifest, 0, FetchFun);
|
||||||
{true, KV} ->
|
{true, KV} ->
|
||||||
{KV, memory}
|
{KV, memory}
|
||||||
end.
|
end.
|
||||||
|
@ -1515,6 +1533,9 @@ timed_sst_get(PID, Key, Hash, Level) ->
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
T0 = timer:now_diff(os:timestamp(), SW),
|
||||||
log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH).
|
log_slowfetch(T0, R, PID, Level, ?SLOW_FETCH).
|
||||||
|
|
||||||
|
sst_getsqn(PID, Key, Hash, _Level) ->
|
||||||
|
leveled_sst:sst_getsqn(PID, Key, Hash).
|
||||||
|
|
||||||
log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
||||||
case {T0, R} of
|
case {T0, R} of
|
||||||
{T, R} when T < FetchTolerance ->
|
{T, R} when T < FetchTolerance ->
|
||||||
|
@ -1528,29 +1549,26 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check().
|
-spec compare_to_sqn(
|
||||||
|
leveled_codec:ledger_kv()|leveled_codec:sqn()|not_present,
|
||||||
|
integer()) -> sqn_check().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Check to see if the SQN in the penciller is after the SQN expected for an
|
%% Check to see if the SQN in the penciller is after the SQN expected for an
|
||||||
%% object (used to allow the journal to check compaction status from a cache
|
%% object (used to allow the journal to check compaction status from a cache
|
||||||
%% of the ledger - objects with a more recent sequence number can be compacted).
|
%% of the ledger - objects with a more recent sequence number can be compacted).
|
||||||
|
compare_to_sqn(not_present, _SQN) ->
|
||||||
|
missing;
|
||||||
|
compare_to_sqn(ObjSQN, SQN) when is_integer(ObjSQN), ObjSQN > SQN ->
|
||||||
|
replaced;
|
||||||
|
compare_to_sqn(ObjSQN, _SQN) when is_integer(ObjSQN) ->
|
||||||
|
% Normally we would expect the SQN to be equal here, but
|
||||||
|
% this also allows for the Journal to have a more advanced
|
||||||
|
% value. We return true here as we wouldn't want to
|
||||||
|
% compact thta more advanced value, but this may cause
|
||||||
|
% confusion in snapshots.
|
||||||
|
current;
|
||||||
compare_to_sqn(Obj, SQN) ->
|
compare_to_sqn(Obj, SQN) ->
|
||||||
case Obj of
|
compare_to_sqn(leveled_codec:strip_to_seqonly(Obj), SQN).
|
||||||
not_present ->
|
|
||||||
missing;
|
|
||||||
Obj ->
|
|
||||||
SQNToCompare = leveled_codec:strip_to_seqonly(Obj),
|
|
||||||
if
|
|
||||||
SQNToCompare > SQN ->
|
|
||||||
replaced;
|
|
||||||
true ->
|
|
||||||
% Normally we would expect the SQN to be equal here, but
|
|
||||||
% this also allows for the Journal to have a more advanced
|
|
||||||
% value. We return true here as we wouldn't want to
|
|
||||||
% compact thta more advanced value, but this may cause
|
|
||||||
% confusion in snapshots.
|
|
||||||
current
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
|
@ -86,7 +86,6 @@
|
||||||
-define(TREE_SIZE, 4).
|
-define(TREE_SIZE, 4).
|
||||||
-define(TIMING_SAMPLECOUNTDOWN, 20000).
|
-define(TIMING_SAMPLECOUNTDOWN, 20000).
|
||||||
-define(TIMING_SAMPLESIZE, 100).
|
-define(TIMING_SAMPLESIZE, 100).
|
||||||
-define(CACHE_SIZE, 32).
|
|
||||||
-define(BLOCK_LENGTHS_LENGTH, 20).
|
-define(BLOCK_LENGTHS_LENGTH, 20).
|
||||||
-define(LMD_LENGTH, 4).
|
-define(LMD_LENGTH, 4).
|
||||||
-define(FLIPPER32, 4294967295).
|
-define(FLIPPER32, 4294967295).
|
||||||
|
@ -97,6 +96,12 @@
|
||||||
-define(USE_SET_FOR_SPEED, 64).
|
-define(USE_SET_FOR_SPEED, 64).
|
||||||
-define(STARTUP_TIMEOUT, 10000).
|
-define(STARTUP_TIMEOUT, 10000).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-define(HIBERNATE_TIMEOUT, 5000).
|
||||||
|
-else.
|
||||||
|
-define(HIBERNATE_TIMEOUT, 60000).
|
||||||
|
-endif.
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-export([init/1,
|
-export([init/1,
|
||||||
|
@ -119,6 +124,7 @@
|
||||||
sst_open/4,
|
sst_open/4,
|
||||||
sst_get/2,
|
sst_get/2,
|
||||||
sst_get/3,
|
sst_get/3,
|
||||||
|
sst_getsqn/3,
|
||||||
sst_expandpointer/5,
|
sst_expandpointer/5,
|
||||||
sst_getmaxsequencenumber/1,
|
sst_getmaxsequencenumber/1,
|
||||||
sst_setfordelete/2,
|
sst_setfordelete/2,
|
||||||
|
@ -184,6 +190,14 @@
|
||||||
-type sst_summary()
|
-type sst_summary()
|
||||||
:: #summary{}.
|
:: #summary{}.
|
||||||
-type blockindex_cache() :: array:array().
|
-type blockindex_cache() :: array:array().
|
||||||
|
-type fetch_cache()
|
||||||
|
:: array:array()|no_cache.
|
||||||
|
-type cache_size()
|
||||||
|
:: no_cache|4|32|64.
|
||||||
|
-type cache_hash()
|
||||||
|
:: no_cache|non_neg_integer().
|
||||||
|
-type level()
|
||||||
|
:: non_neg_integer().
|
||||||
|
|
||||||
%% yield_blockquery is used to determine if the work necessary to process a
|
%% yield_blockquery is used to determine if the work necessary to process a
|
||||||
%% range query beyond the fetching the slot should be managed from within
|
%% range query beyond the fetching the slot should be managed from within
|
||||||
|
@ -206,34 +220,33 @@
|
||||||
timings = no_timing :: sst_timings(),
|
timings = no_timing :: sst_timings(),
|
||||||
timings_countdown = 0 :: integer(),
|
timings_countdown = 0 :: integer(),
|
||||||
starting_pid :: pid()|undefined,
|
starting_pid :: pid()|undefined,
|
||||||
fetch_cache = array:new([{size, ?CACHE_SIZE}]) ::
|
fetch_cache = no_cache :: fetch_cache() | redacted,
|
||||||
array:array() | redacted,
|
|
||||||
new_slots :: list()|undefined,
|
new_slots :: list()|undefined,
|
||||||
deferred_startup_tuple :: tuple()|undefined,
|
deferred_startup_tuple :: tuple()|undefined,
|
||||||
level :: non_neg_integer()|undefined,
|
level :: level()|undefined,
|
||||||
tomb_count = not_counted
|
tomb_count = not_counted
|
||||||
:: non_neg_integer()|not_counted,
|
:: non_neg_integer()|not_counted,
|
||||||
high_modified_date :: non_neg_integer()|undefined}).
|
high_modified_date :: non_neg_integer()|undefined}).
|
||||||
|
|
||||||
-record(sst_timings,
|
-record(sst_timings,
|
||||||
{sample_count = 0 :: integer(),
|
{sample_count = 0 :: integer(),
|
||||||
index_query_time = 0 :: integer(),
|
index_query_time = 0 :: integer(),
|
||||||
lookup_cache_time = 0 :: integer(),
|
lookup_cache_time = 0 :: integer(),
|
||||||
slot_index_time = 0 :: integer(),
|
slot_index_time = 0 :: integer(),
|
||||||
fetch_cache_time = 0 :: integer(),
|
fetch_cache_time = 0 :: integer(),
|
||||||
slot_fetch_time = 0 :: integer(),
|
slot_fetch_time = 0 :: integer(),
|
||||||
noncached_block_time = 0 :: integer(),
|
noncached_block_time = 0 :: integer(),
|
||||||
lookup_cache_count = 0 :: integer(),
|
lookup_cache_count = 0 :: integer(),
|
||||||
slot_index_count = 0 :: integer(),
|
slot_index_count = 0 :: integer(),
|
||||||
fetch_cache_count = 0 :: integer(),
|
fetch_cache_count = 0 :: integer(),
|
||||||
slot_fetch_count = 0 :: integer(),
|
slot_fetch_count = 0 :: integer(),
|
||||||
noncached_block_count = 0 :: integer()}).
|
noncached_block_count = 0 :: integer()}).
|
||||||
|
|
||||||
-record(build_timings,
|
-record(build_timings,
|
||||||
{slot_hashlist = 0 :: integer(),
|
{slot_hashlist = 0 :: integer(),
|
||||||
slot_serialise = 0 :: integer(),
|
slot_serialise = 0 :: integer(),
|
||||||
slot_finish = 0 :: integer(),
|
slot_finish = 0 :: integer(),
|
||||||
fold_toslot = 0 :: integer()}).
|
fold_toslot = 0 :: integer()}).
|
||||||
|
|
||||||
-type sst_state() :: #state{}.
|
-type sst_state() :: #state{}.
|
||||||
-type sst_timings() :: no_timing|#sst_timings{}.
|
-type sst_timings() :: no_timing|#sst_timings{}.
|
||||||
|
@ -245,7 +258,7 @@
|
||||||
%%% API
|
%%% API
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
-spec sst_open(string(), string(), sst_options(), non_neg_integer())
|
-spec sst_open(string(), string(), sst_options(), level())
|
||||||
-> {ok, pid(),
|
-> {ok, pid(),
|
||||||
{leveled_codec:ledger_key(), leveled_codec:ledger_key()},
|
{leveled_codec:ledger_key(), leveled_codec:ledger_key()},
|
||||||
binary()}.
|
binary()}.
|
||||||
|
@ -267,7 +280,7 @@ sst_open(RootPath, Filename, OptsSST, Level) ->
|
||||||
{ok, Pid, {SK, EK}, Bloom}
|
{ok, Pid, {SK, EK}, Bloom}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec sst_new(string(), string(), integer(),
|
-spec sst_new(string(), string(), level(),
|
||||||
list(leveled_codec:ledger_kv()),
|
list(leveled_codec:ledger_kv()),
|
||||||
integer(), sst_options())
|
integer(), sst_options())
|
||||||
-> {ok, pid(),
|
-> {ok, pid(),
|
||||||
|
@ -310,7 +323,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
|
||||||
-spec sst_newmerge(string(), string(),
|
-spec sst_newmerge(string(), string(),
|
||||||
list(leveled_codec:ledger_kv()|sst_pointer()),
|
list(leveled_codec:ledger_kv()|sst_pointer()),
|
||||||
list(leveled_codec:ledger_kv()|sst_pointer()),
|
list(leveled_codec:ledger_kv()|sst_pointer()),
|
||||||
boolean(), integer(),
|
boolean(), level(),
|
||||||
integer(), sst_options())
|
integer(), sst_options())
|
||||||
-> empty|{ok, pid(),
|
-> empty|{ok, pid(),
|
||||||
{{list(leveled_codec:ledger_kv()),
|
{{list(leveled_codec:ledger_kv()),
|
||||||
|
@ -331,7 +344,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
|
||||||
%% file is not added to the manifest.
|
%% file is not added to the manifest.
|
||||||
sst_newmerge(RootPath, Filename,
|
sst_newmerge(RootPath, Filename,
|
||||||
KVL1, KVL2, IsBasement, Level,
|
KVL1, KVL2, IsBasement, Level,
|
||||||
MaxSQN, OptsSST) ->
|
MaxSQN, OptsSST) when Level > 0 ->
|
||||||
sst_newmerge(RootPath, Filename,
|
sst_newmerge(RootPath, Filename,
|
||||||
KVL1, KVL2, IsBasement, Level,
|
KVL1, KVL2, IsBasement, Level,
|
||||||
MaxSQN, OptsSST, ?INDEX_MODDATE, ?TOMB_COUNT).
|
MaxSQN, OptsSST, ?INDEX_MODDATE, ?TOMB_COUNT).
|
||||||
|
@ -433,6 +446,15 @@ sst_get(Pid, LedgerKey) ->
|
||||||
sst_get(Pid, LedgerKey, Hash) ->
|
sst_get(Pid, LedgerKey, Hash) ->
|
||||||
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
|
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
|
||||||
|
|
||||||
|
-spec sst_getsqn(pid(),
|
||||||
|
leveled_codec:ledger_key(),
|
||||||
|
leveled_codec:segment_hash()) -> leveled_codec:sqn()|not_present.
|
||||||
|
%% @doc
|
||||||
|
%% Return a SQN for the key or not_present if the key is not in
|
||||||
|
%% the store (with the magic hash precalculated).
|
||||||
|
sst_getsqn(Pid, LedgerKey, Hash) ->
|
||||||
|
gen_fsm:sync_send_event(Pid, {get_sqn, LedgerKey, Hash}, infinity).
|
||||||
|
|
||||||
-spec sst_getmaxsequencenumber(pid()) -> integer().
|
-spec sst_getmaxsequencenumber(pid()) -> integer().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Get the maximume sequence number for this SST file
|
%% Get the maximume sequence number for this SST file
|
||||||
|
@ -540,7 +562,7 @@ starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
||||||
reader,
|
reader,
|
||||||
UpdState#state{level = Level}};
|
UpdState#state{level = Level, fetch_cache = new_cache(Level)}};
|
||||||
starting({sst_new,
|
starting({sst_new,
|
||||||
RootPath, Filename, Level,
|
RootPath, Filename, Level,
|
||||||
{SlotList, FirstKey}, MaxSQN,
|
{SlotList, FirstKey}, MaxSQN,
|
||||||
|
@ -580,7 +602,8 @@ starting({sst_new,
|
||||||
UpdState#state{blockindex_cache = BlockIndex,
|
UpdState#state{blockindex_cache = BlockIndex,
|
||||||
high_modified_date = HighModDate,
|
high_modified_date = HighModDate,
|
||||||
starting_pid = StartingPID,
|
starting_pid = StartingPID,
|
||||||
level = Level}};
|
level = Level,
|
||||||
|
fetch_cache = new_cache(Level)}};
|
||||||
starting({sst_newlevelzero, RootPath, Filename,
|
starting({sst_newlevelzero, RootPath, Filename,
|
||||||
Penciller, MaxSQN,
|
Penciller, MaxSQN,
|
||||||
OptsSST, IdxModDate}, _From, State) ->
|
OptsSST, IdxModDate}, _From, State) ->
|
||||||
|
@ -588,7 +611,10 @@ starting({sst_newlevelzero, RootPath, Filename,
|
||||||
{RootPath, Filename, Penciller, MaxSQN, OptsSST,
|
{RootPath, Filename, Penciller, MaxSQN, OptsSST,
|
||||||
IdxModDate},
|
IdxModDate},
|
||||||
{reply, ok, starting,
|
{reply, ok, starting,
|
||||||
State#state{deferred_startup_tuple = DeferredStartupTuple, level = 0}};
|
State#state{
|
||||||
|
deferred_startup_tuple = DeferredStartupTuple,
|
||||||
|
level = 0,
|
||||||
|
fetch_cache = new_cache(0)}};
|
||||||
starting(close, _From, State) ->
|
starting(close, _From, State) ->
|
||||||
% No file should have been created, so nothing to close.
|
% No file should have been created, so nothing to close.
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
@ -694,6 +720,11 @@ starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
|
||||||
State#state{new_slots = FetchedSlots}}
|
State#state{new_slots = FetchedSlots}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
reader({get_sqn, LedgerKey, Hash}, _From, State) ->
|
||||||
|
% Get a KV value and potentially take sample timings
|
||||||
|
{Result, UpdState, _UpdTimings} =
|
||||||
|
fetch(LedgerKey, Hash, State, no_timing),
|
||||||
|
{reply, sqn_only(Result), reader, UpdState, ?HIBERNATE_TIMEOUT};
|
||||||
reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
% Get a KV value and potentially take sample timings
|
% Get a KV value and potentially take sample timings
|
||||||
{Result, UpdState, UpdTimings} =
|
{Result, UpdState, UpdTimings} =
|
||||||
|
@ -795,10 +826,31 @@ reader(close, _From, State) ->
|
||||||
ok = file:close(State#state.handle),
|
ok = file:close(State#state.handle),
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
|
reader(timeout, State) ->
|
||||||
|
FreshFetchCache = new_cache(State#state.level),
|
||||||
|
Summary = State#state.summary,
|
||||||
|
FreshBlockIndexCache = new_blockindex_cache(Summary#summary.size),
|
||||||
|
{next_state,
|
||||||
|
reader,
|
||||||
|
State#state{
|
||||||
|
fetch_cache = FreshFetchCache,
|
||||||
|
blockindex_cache = FreshBlockIndexCache},
|
||||||
|
hibernate};
|
||||||
reader({switch_levels, NewLevel}, State) ->
|
reader({switch_levels, NewLevel}, State) ->
|
||||||
{next_state, reader, State#state{level = NewLevel}, hibernate}.
|
FreshCache = new_cache(NewLevel),
|
||||||
|
{next_state,
|
||||||
|
reader,
|
||||||
|
State#state{
|
||||||
|
level = NewLevel,
|
||||||
|
fetch_cache = FreshCache},
|
||||||
|
hibernate}.
|
||||||
|
|
||||||
|
|
||||||
|
delete_pending({get_sqn, LedgerKey, Hash}, _From, State) ->
|
||||||
|
% Get a KV value and potentially take sample timings
|
||||||
|
{Result, UpdState, _UpdTimings} =
|
||||||
|
fetch(LedgerKey, Hash, State, no_timing),
|
||||||
|
{reply, sqn_only(Result), delete_pending, UpdState, ?DELETE_TIMEOUT};
|
||||||
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
|
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
|
||||||
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
||||||
|
@ -1093,14 +1145,72 @@ member_check(Hash, {sets, HashSet}) ->
|
||||||
member_check(_Miss, _Checker) ->
|
member_check(_Miss, _Checker) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
-spec sqn_only(leveled_codec:ledger_kv()|not_present)
|
||||||
|
-> leveled_codec:sqn()|not_present.
|
||||||
|
sqn_only(not_present) ->
|
||||||
|
not_present;
|
||||||
|
sqn_only(KV) ->
|
||||||
|
leveled_codec:strip_to_seqonly(KV).
|
||||||
|
|
||||||
extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) ->
|
extract_hash({SegHash, _ExtraHash}) when is_integer(SegHash) ->
|
||||||
tune_hash(SegHash);
|
tune_hash(SegHash);
|
||||||
extract_hash(NotHash) ->
|
extract_hash(NotHash) ->
|
||||||
NotHash.
|
NotHash.
|
||||||
|
|
||||||
cache_hash({_SegHash, ExtraHash}) when is_integer(ExtraHash) ->
|
|
||||||
ExtraHash band (?CACHE_SIZE - 1).
|
-spec new_cache(level()) -> fetch_cache().
|
||||||
|
new_cache(Level) ->
|
||||||
|
case cache_size(Level) of
|
||||||
|
no_cache ->
|
||||||
|
no_cache;
|
||||||
|
CacheSize ->
|
||||||
|
array:new([{size, CacheSize}])
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec cache_hash(leveled_codec:segment_hash(), non_neg_integer()) ->
|
||||||
|
cache_hash().
|
||||||
|
cache_hash({_SegHash, ExtraHash}, Level) when is_integer(ExtraHash) ->
|
||||||
|
case cache_size(Level) of
|
||||||
|
no_cache -> no_cache;
|
||||||
|
CH -> ExtraHash band (CH - 1)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc
|
||||||
|
%% The lower the level, the bigger the memory cost of supporting the cache,
|
||||||
|
%% as each level has more files than the previous level. Load tests with
|
||||||
|
%% any sort of pareto distribution show far better cost/benefit ratios for
|
||||||
|
%% cache at higher levels.
|
||||||
|
-spec cache_size(level()) -> cache_size().
|
||||||
|
cache_size(N) when N < 3 ->
|
||||||
|
64;
|
||||||
|
cache_size(3) ->
|
||||||
|
32;
|
||||||
|
cache_size(4) ->
|
||||||
|
32;
|
||||||
|
cache_size(5) ->
|
||||||
|
4;
|
||||||
|
cache_size(6) ->
|
||||||
|
4;
|
||||||
|
cache_size(_LowerLevel) ->
|
||||||
|
no_cache.
|
||||||
|
|
||||||
|
-spec fetch_from_cache(
|
||||||
|
cache_hash(),
|
||||||
|
fetch_cache()) -> undefined|leveled_codec:ledger_kv().
|
||||||
|
fetch_from_cache(_CacheHash, no_cache) ->
|
||||||
|
undefined;
|
||||||
|
fetch_from_cache(CacheHash, Cache) ->
|
||||||
|
array:get(CacheHash, Cache).
|
||||||
|
|
||||||
|
-spec add_to_cache(
|
||||||
|
non_neg_integer(),
|
||||||
|
leveled_codec:ledger_kv(),
|
||||||
|
fetch_cache()) -> fetch_cache().
|
||||||
|
add_to_cache(_CacheHash, _KV, no_cache) ->
|
||||||
|
no_cache;
|
||||||
|
add_to_cache(CacheHash, KV, FetchCache) ->
|
||||||
|
array:set(CacheHash, KV, FetchCache).
|
||||||
|
|
||||||
|
|
||||||
-spec tune_hash(non_neg_integer()) -> non_neg_integer().
|
-spec tune_hash(non_neg_integer()) -> non_neg_integer().
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -1237,8 +1347,8 @@ fetch(LedgerKey, Hash, State, Timings0) ->
|
||||||
{SW3, Timings3} =
|
{SW3, Timings3} =
|
||||||
update_timings(SW2, Timings2, slot_index, true),
|
update_timings(SW2, Timings2, slot_index, true),
|
||||||
FetchCache = State#state.fetch_cache,
|
FetchCache = State#state.fetch_cache,
|
||||||
CacheHash = cache_hash(Hash),
|
CacheHash = cache_hash(Hash, State#state.level),
|
||||||
case array:get(CacheHash, FetchCache) of
|
case fetch_from_cache(CacheHash, FetchCache) of
|
||||||
{LedgerKey, V} ->
|
{LedgerKey, V} ->
|
||||||
{_SW4, Timings4} =
|
{_SW4, Timings4} =
|
||||||
update_timings(SW3,
|
update_timings(SW3,
|
||||||
|
@ -1258,7 +1368,7 @@ fetch(LedgerKey, Hash, State, Timings0) ->
|
||||||
IdxModDate,
|
IdxModDate,
|
||||||
not_present),
|
not_present),
|
||||||
FetchCache0 =
|
FetchCache0 =
|
||||||
array:set(CacheHash, Result, FetchCache),
|
add_to_cache(CacheHash, Result, FetchCache),
|
||||||
{_SW4, Timings4} =
|
{_SW4, Timings4} =
|
||||||
update_timings(SW3,
|
update_timings(SW3,
|
||||||
Timings3,
|
Timings3,
|
||||||
|
@ -1350,7 +1460,7 @@ compress_level(Level, _PressMethod) when Level < ?COMPRESS_AT_LEVEL ->
|
||||||
compress_level(_Level, PressMethod) ->
|
compress_level(_Level, PressMethod) ->
|
||||||
PressMethod.
|
PressMethod.
|
||||||
|
|
||||||
-spec maxslots_level(non_neg_integer(), pos_integer()) -> pos_integer().
|
-spec maxslots_level(level(), pos_integer()) -> pos_integer().
|
||||||
maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL ->
|
maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL ->
|
||||||
MaxSlotCount;
|
MaxSlotCount;
|
||||||
maxslots_level(_Level, MaxSlotCount) ->
|
maxslots_level(_Level, MaxSlotCount) ->
|
||||||
|
@ -1384,8 +1494,9 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin,
|
||||||
|
|
||||||
read_file(Filename, State, LoadPageCache) ->
|
read_file(Filename, State, LoadPageCache) ->
|
||||||
{Handle, FileVersion, SummaryBin} =
|
{Handle, FileVersion, SummaryBin} =
|
||||||
open_reader(filename:join(State#state.root_path, Filename),
|
open_reader(
|
||||||
LoadPageCache),
|
filename:join(State#state.root_path, Filename),
|
||||||
|
LoadPageCache),
|
||||||
UpdState0 = imp_fileversion(FileVersion, State),
|
UpdState0 = imp_fileversion(FileVersion, State),
|
||||||
{Summary, Bloom, SlotList, TombCount} =
|
{Summary, Bloom, SlotList, TombCount} =
|
||||||
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
|
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
|
||||||
|
@ -3619,11 +3730,77 @@ additional_range_test() ->
|
||||||
% R8 = sst_getkvrange(P1, element(1, PastEKV), element(1, PastEKV), 2),
|
% R8 = sst_getkvrange(P1, element(1, PastEKV), element(1, PastEKV), 2),
|
||||||
% ?assertMatch([], R8).
|
% ?assertMatch([], R8).
|
||||||
|
|
||||||
|
simple_switchcache_test() ->
|
||||||
|
{RP, Filename} = {?TEST_AREA, "simple_switchcache_test"},
|
||||||
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 2, 1, 20),
|
||||||
|
KVList1 = lists:sublist(lists:ukeysort(1, KVList0), ?LOOK_SLOTSIZE),
|
||||||
|
[{FirstKey, _FV}|_Rest] = KVList1,
|
||||||
|
{LastKey, _LV} = lists:last(KVList1),
|
||||||
|
{ok, OpenP4, {FirstKey, LastKey}, Bloom} =
|
||||||
|
testsst_new(RP, Filename, 4, KVList1, length(KVList1), native),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP4, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
ok = sst_switchlevels(OpenP4, 5),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP4, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP4, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
gen_fsm:send_event(OpenP4, timeout),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP4, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
ok = sst_close(OpenP4),
|
||||||
|
OptsSST = #sst_options{press_method=native,
|
||||||
|
log_options=leveled_log:get_opts()},
|
||||||
|
{ok, OpenP5, {FirstKey, LastKey}, Bloom} =
|
||||||
|
sst_open(RP, Filename ++ ".sst", OptsSST, 5),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP5, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP5, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
gen_fsm:send_event(OpenP5, timeout),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP5, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
ok = sst_switchlevels(OpenP5, 6),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP5, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
gen_fsm:send_event(OpenP5, timeout),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP5, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
ok = sst_switchlevels(OpenP5, 7),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP5, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
gen_fsm:send_event(OpenP5, timeout),
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP5, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
ok = sst_close(OpenP5),
|
||||||
|
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
||||||
|
|
||||||
|
|
||||||
simple_persisted_slotsize_test() ->
|
simple_persisted_slotsize_test() ->
|
||||||
simple_persisted_slotsize_tester(fun testsst_new/6).
|
simple_persisted_slotsize_tester(fun testsst_new/6).
|
||||||
|
|
||||||
|
|
||||||
simple_persisted_slotsize_tester(SSTNewFun) ->
|
simple_persisted_slotsize_tester(SSTNewFun) ->
|
||||||
{RP, Filename} = {?TEST_AREA, "simple_slotsize_test"},
|
{RP, Filename} = {?TEST_AREA, "simple_slotsize_test"},
|
||||||
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 2, 1, 20),
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 2, 1, 20),
|
||||||
|
@ -3640,6 +3817,24 @@ simple_persisted_slotsize_tester(SSTNewFun) ->
|
||||||
ok = sst_close(Pid),
|
ok = sst_close(Pid),
|
||||||
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
||||||
|
|
||||||
|
reader_hibernate_test_() ->
|
||||||
|
{timeout, 90, fun reader_hibernate_tester/0}.
|
||||||
|
|
||||||
|
reader_hibernate_tester() ->
|
||||||
|
{RP, Filename} = {?TEST_AREA, "readerhibernate_test"},
|
||||||
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20),
|
||||||
|
KVList1 = lists:ukeysort(1, KVList0),
|
||||||
|
[{FirstKey, FV}|_Rest] = KVList1,
|
||||||
|
{LastKey, _LV} = lists:last(KVList1),
|
||||||
|
{ok, Pid, {FirstKey, LastKey}, _Bloom} =
|
||||||
|
testsst_new(RP, Filename, 1, KVList1, length(KVList1), native),
|
||||||
|
?assertMatch({FirstKey, FV}, sst_get(Pid, FirstKey)),
|
||||||
|
SQN = leveled_codec:strip_to_seqonly({FirstKey, FV}),
|
||||||
|
?assertMatch(
|
||||||
|
SQN,
|
||||||
|
sst_getsqn(Pid, FirstKey, leveled_codec:segment_hash(FirstKey))),
|
||||||
|
timer:sleep(?HIBERNATE_TIMEOUT + 1000),
|
||||||
|
?assertMatch({FirstKey, FV}, sst_get(Pid, FirstKey)).
|
||||||
|
|
||||||
delete_pending_test_() ->
|
delete_pending_test_() ->
|
||||||
{timeout, 30, fun delete_pending_tester/0}.
|
{timeout, 30, fun delete_pending_tester/0}.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue