Refactor timing point management
For Penciller and timing head requests.
This commit is contained in:
parent
58946a7f98
commit
3ef550d9f8
2 changed files with 124 additions and 92 deletions
|
@ -11,7 +11,6 @@
|
||||||
log_timer/3,
|
log_timer/3,
|
||||||
log_randomtimer/4,
|
log_randomtimer/4,
|
||||||
put_timing/4,
|
put_timing/4,
|
||||||
head_timing/4,
|
|
||||||
get_timing/3]).
|
get_timing/3]).
|
||||||
|
|
||||||
-define(PUT_LOGPOINT, 10000).
|
-define(PUT_LOGPOINT, 10000).
|
||||||
|
@ -124,7 +123,12 @@
|
||||||
{info, "Completion of update to levelzero"
|
{info, "Completion of update to levelzero"
|
||||||
++ " with cache size status ~w ~w"}},
|
++ " with cache size status ~w ~w"}},
|
||||||
{"P0032",
|
{"P0032",
|
||||||
{info, "Head timing for result ~w is sample ~w total ~w and max ~w"}},
|
{info, "Fetch head timing with sample_count=~w and level timings of"
|
||||||
|
++ " foundmem_time=~w found0_time=~w found1_time=~w"
|
||||||
|
++ " found2_time=~w foundlower_time=~w missed_time=~w"
|
||||||
|
++ " with counts of"
|
||||||
|
++ " foundmem_count=~w found0_count=~w found1_count=~w"
|
||||||
|
++ " found2_count=~w foundlower_count=~w missed_count=~w"}},
|
||||||
{"P0033",
|
{"P0033",
|
||||||
{error, "Corrupted manifest file at path ~s to be ignored "
|
{error, "Corrupted manifest file at path ~s to be ignored "
|
||||||
++ "due to error ~w"}},
|
++ "due to error ~w"}},
|
||||||
|
@ -427,65 +431,6 @@ put_timing(Actor, {?PUT_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
|
||||||
put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
|
put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
|
||||||
{N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}.
|
{N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}.
|
||||||
|
|
||||||
%% Make a log of penciller head timings split out by level and result - one
|
|
||||||
%% log for every HEAD_LOGPOINT puts
|
|
||||||
%% Returns a tuple of {Count, TimingDict} to be stored on the process state
|
|
||||||
head_timing(undefined, SW, Level, R) ->
|
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
|
||||||
head_timing_int(undefined, T0, Level, R);
|
|
||||||
head_timing({N, HeadTimingD}, SW, Level, R) ->
|
|
||||||
case N band (?SAMPLE_RATE - 1) of
|
|
||||||
0 ->
|
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
|
||||||
head_timing_int({N, HeadTimingD}, T0, Level, R);
|
|
||||||
_ ->
|
|
||||||
% Not to be sampled this time
|
|
||||||
{N + 1, HeadTimingD}
|
|
||||||
end.
|
|
||||||
|
|
||||||
head_timing_int(undefined, T0, Level, R) ->
|
|
||||||
Key = head_key(R, Level),
|
|
||||||
NewDFun = fun(K, Acc) ->
|
|
||||||
case K of
|
|
||||||
Key ->
|
|
||||||
dict:store(K, [1, T0, T0], Acc);
|
|
||||||
_ ->
|
|
||||||
dict:store(K, [0, 0, 0], Acc)
|
|
||||||
end end,
|
|
||||||
{1, lists:foldl(NewDFun, dict:new(), head_keylist())};
|
|
||||||
head_timing_int({?HEAD_LOGPOINT, HeadTimingD}, T0, Level, R) ->
|
|
||||||
RN = leveled_rand:uniform(?HEAD_LOGPOINT),
|
|
||||||
case RN > ?HEAD_LOGPOINT div 2 of
|
|
||||||
true ->
|
|
||||||
% log at the timing point less than half the time
|
|
||||||
LogFun = fun(K) -> log("P0032", [K|dict:fetch(K, HeadTimingD)]) end,
|
|
||||||
lists:foreach(LogFun, head_keylist()),
|
|
||||||
head_timing_int(undefined, T0, Level, R);
|
|
||||||
false ->
|
|
||||||
% Log some other time - reset to RN not 0 to stagger logs out over
|
|
||||||
% time between the vnodes
|
|
||||||
head_timing_int({RN, HeadTimingD}, T0, Level, R)
|
|
||||||
end;
|
|
||||||
head_timing_int({N, HeadTimingD}, T0, Level, R) ->
|
|
||||||
Key = head_key(R, Level),
|
|
||||||
[Count0, Total0, Max0] = dict:fetch(Key, HeadTimingD),
|
|
||||||
{N + 1,
|
|
||||||
dict:store(Key, [Count0 + 1, Total0 + T0, max(Max0, T0)],
|
|
||||||
HeadTimingD)}.
|
|
||||||
|
|
||||||
head_key(not_present, _Level) ->
|
|
||||||
not_present;
|
|
||||||
head_key(found, 0) ->
|
|
||||||
found_0;
|
|
||||||
head_key(found, 1) ->
|
|
||||||
found_1;
|
|
||||||
head_key(found, 2) ->
|
|
||||||
found_2;
|
|
||||||
head_key(found, Level) when Level > 2 ->
|
|
||||||
found_lower.
|
|
||||||
|
|
||||||
head_keylist() ->
|
|
||||||
[not_present, found_lower, found_0, found_1, found_2].
|
|
||||||
|
|
||||||
get_timing(undefined, SW, TimerType) ->
|
get_timing(undefined, SW, TimerType) ->
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
T0 = timer:now_diff(os:timestamp(), SW),
|
||||||
|
@ -572,17 +517,6 @@ log_test() ->
|
||||||
log("D0001", []),
|
log("D0001", []),
|
||||||
log_timer("D0001", [], os:timestamp()).
|
log_timer("D0001", [], os:timestamp()).
|
||||||
|
|
||||||
head_timing_test() ->
|
|
||||||
SW = os:timestamp(),
|
|
||||||
HeadTimer0 = lists:foldl(fun(_X, Acc) -> head_timing(Acc, SW, 2, found) end,
|
|
||||||
undefined,
|
|
||||||
lists:seq(0, 47)),
|
|
||||||
HeadTimer1 = head_timing(HeadTimer0, SW, 3, found),
|
|
||||||
{N, D} = HeadTimer1,
|
|
||||||
?assertMatch(49, N),
|
|
||||||
?assertMatch(3, lists:nth(1, dict:fetch(found_2, D))),
|
|
||||||
?assertMatch(1, lists:nth(1, dict:fetch(found_lower, D))).
|
|
||||||
|
|
||||||
log_warn_test() ->
|
log_warn_test() ->
|
||||||
ok = log("G0001", [], [warn, error]),
|
ok = log("G0001", [], [warn, error]),
|
||||||
ok = log("G8888", [], [info, warn, error]),
|
ok = log("G8888", [], [info, warn, error]),
|
||||||
|
|
|
@ -219,6 +219,8 @@
|
||||||
-define(ITERATOR_SCANWIDTH, 4).
|
-define(ITERATOR_SCANWIDTH, 4).
|
||||||
-define(SNAPSHOT_TIMEOUT_LONG, 3600).
|
-define(SNAPSHOT_TIMEOUT_LONG, 3600).
|
||||||
-define(SNAPSHOT_TIMEOUT_SHORT, 600).
|
-define(SNAPSHOT_TIMEOUT_SHORT, 600).
|
||||||
|
-define(TIMING_SAMPLECOUNTDOWN, 10000).
|
||||||
|
-define(TIMING_SAMPLESIZE, 100).
|
||||||
|
|
||||||
-record(state, {manifest, % a manifest record from the leveled_manifest module
|
-record(state, {manifest, % a manifest record from the leveled_manifest module
|
||||||
persisted_sqn = 0 :: integer(), % The highest SQN persisted
|
persisted_sqn = 0 :: integer(), % The highest SQN persisted
|
||||||
|
@ -244,10 +246,26 @@
|
||||||
work_ongoing = false :: boolean(), % i.e. compaction work
|
work_ongoing = false :: boolean(), % i.e. compaction work
|
||||||
work_backlog = false :: boolean(), % i.e. compaction work
|
work_backlog = false :: boolean(), % i.e. compaction work
|
||||||
|
|
||||||
head_timing :: tuple() | undefined,
|
timings = no_timing :: pcl_timings(),
|
||||||
|
timings_countdown = 0 :: integer(),
|
||||||
|
|
||||||
compression_method = native :: lz4|native}).
|
compression_method = native :: lz4|native}).
|
||||||
|
|
||||||
|
-record(pcl_timings,
|
||||||
|
{sample_count = 0 :: integer(),
|
||||||
|
foundmem_time = 0 :: integer(),
|
||||||
|
found0_time = 0 :: integer(),
|
||||||
|
found1_time = 0 :: integer(),
|
||||||
|
found2_time = 0 :: integer(),
|
||||||
|
foundlower_time = 0 :: integer(),
|
||||||
|
missed_time = 0 :: integer(),
|
||||||
|
foundmem_count = 0 :: integer(),
|
||||||
|
found0_count = 0 :: integer(),
|
||||||
|
found1_count = 0 :: integer(),
|
||||||
|
found2_count = 0 :: integer(),
|
||||||
|
foundlower_count = 0 :: integer(),
|
||||||
|
missed_count = 0 :: integer()}).
|
||||||
|
|
||||||
-type penciller_options() :: #penciller_options{}.
|
-type penciller_options() :: #penciller_options{}.
|
||||||
-type bookies_memory() :: {tuple()|empty_cache,
|
-type bookies_memory() :: {tuple()|empty_cache,
|
||||||
% array:array()|empty_array,
|
% array:array()|empty_array,
|
||||||
|
@ -255,6 +273,7 @@
|
||||||
integer()|infinity,
|
integer()|infinity,
|
||||||
integer()}.
|
integer()}.
|
||||||
-type pcl_state() :: #state{}.
|
-type pcl_state() :: #state{}.
|
||||||
|
-type pcl_timings() :: no_timing|#pcl_timings{}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -303,11 +322,11 @@ pcl_pushmem(Pid, LedgerCache) ->
|
||||||
%% The return value will be a leveled_skiplist that forms that part of the
|
%% The return value will be a leveled_skiplist that forms that part of the
|
||||||
%% cache
|
%% cache
|
||||||
pcl_fetchlevelzero(Pid, Slot) ->
|
pcl_fetchlevelzero(Pid, Slot) ->
|
||||||
%% Timeout to cause crash of L0 file when it can't get the close signal
|
% Timeout to cause crash of L0 file when it can't get the close signal
|
||||||
%% as it is deadlocked making this call.
|
% as it is deadlocked making this call.
|
||||||
%%
|
%
|
||||||
%% If the timeout gets hit outside of close scenario the Penciller will
|
% If the timeout gets hit outside of close scenario the Penciller will
|
||||||
%% be stuck in L0 pending
|
% be stuck in L0 pending
|
||||||
gen_server:call(Pid, {fetch_levelzero, Slot}, 60000).
|
gen_server:call(Pid, {fetch_levelzero, Slot}, 60000).
|
||||||
|
|
||||||
-spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present.
|
-spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present.
|
||||||
|
@ -555,13 +574,15 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
|
||||||
State)}
|
State)}
|
||||||
end;
|
end;
|
||||||
handle_call({fetch, Key, Hash}, _From, State) ->
|
handle_call({fetch, Key, Hash}, _From, State) ->
|
||||||
{R, HeadTimer} = timed_fetch_mem(Key,
|
{R, UpdTimings} = timed_fetch_mem(Key,
|
||||||
Hash,
|
Hash,
|
||||||
State#state.manifest,
|
State#state.manifest,
|
||||||
State#state.levelzero_cache,
|
State#state.levelzero_cache,
|
||||||
State#state.levelzero_index,
|
State#state.levelzero_index,
|
||||||
State#state.head_timing),
|
State#state.timings),
|
||||||
{reply, R, State#state{head_timing=HeadTimer}};
|
{UpdTimings0, CountDown} =
|
||||||
|
update_statetimings(UpdTimings, State#state.timings_countdown),
|
||||||
|
{reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}};
|
||||||
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
compare_to_sqn(plain_fetch_mem(Key,
|
compare_to_sqn(plain_fetch_mem(Key,
|
||||||
|
@ -1059,17 +1080,11 @@ roll_memory(State, true) ->
|
||||||
{ok, Constructor, _} = R,
|
{ok, Constructor, _} = R,
|
||||||
Constructor.
|
Constructor.
|
||||||
|
|
||||||
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) ->
|
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
||||||
UpdHeadTimer =
|
UpdTimings = update_timings(SW, Timings, R, Level),
|
||||||
case R of
|
{R, UpdTimings}.
|
||||||
not_present ->
|
|
||||||
leveled_log:head_timing(HeadTimer, SW, Level, not_present);
|
|
||||||
_ ->
|
|
||||||
leveled_log:head_timing(HeadTimer, SW, Level, found)
|
|
||||||
end,
|
|
||||||
{R, UpdHeadTimer}.
|
|
||||||
|
|
||||||
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
||||||
|
@ -1082,7 +1097,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
{false, not_found} ->
|
{false, not_found} ->
|
||||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4);
|
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4);
|
||||||
{true, KV} ->
|
{true, KV} ->
|
||||||
{KV, 0}
|
{KV, memory}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
||||||
|
@ -1404,6 +1419,89 @@ find_nextkey(QueryArray, LCnt,
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Timing Functions
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec update_statetimings(pcl_timings(), integer())
|
||||||
|
-> {pcl_timings(), integer()}.
|
||||||
|
%% @doc
|
||||||
|
%%
|
||||||
|
%% The timings state is either in countdown to the next set of samples of
|
||||||
|
%% we are actively collecting a sample. Active collection take place
|
||||||
|
%% when the countdown is 0. Once the sample has reached the expected count
|
||||||
|
%% then there is a log of that sample, and the countdown is restarted.
|
||||||
|
%%
|
||||||
|
%% Outside of sample windows the timings object should be set to the atom
|
||||||
|
%% no_timing. no_timing is a valid state for the pcl_timings type.
|
||||||
|
update_statetimings(no_timing, 0) ->
|
||||||
|
{#pcl_timings{}, 0};
|
||||||
|
update_statetimings(Timings, 0) ->
|
||||||
|
case Timings#pcl_timings.sample_count of
|
||||||
|
SC when SC >= ?TIMING_SAMPLESIZE ->
|
||||||
|
log_timings(Timings),
|
||||||
|
{no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)};
|
||||||
|
_SC ->
|
||||||
|
{Timings, 0}
|
||||||
|
end;
|
||||||
|
update_statetimings(no_timing, N) ->
|
||||||
|
{no_timing, N - 1}.
|
||||||
|
|
||||||
|
log_timings(Timings) ->
|
||||||
|
leveled_log:log("P0032", [Timings#pcl_timings.sample_count,
|
||||||
|
Timings#pcl_timings.foundmem_time,
|
||||||
|
Timings#pcl_timings.found0_time,
|
||||||
|
Timings#pcl_timings.found1_time,
|
||||||
|
Timings#pcl_timings.found2_time,
|
||||||
|
Timings#pcl_timings.foundlower_time,
|
||||||
|
Timings#pcl_timings.missed_time,
|
||||||
|
Timings#pcl_timings.foundmem_count,
|
||||||
|
Timings#pcl_timings.found0_count,
|
||||||
|
Timings#pcl_timings.found1_count,
|
||||||
|
Timings#pcl_timings.found2_count,
|
||||||
|
Timings#pcl_timings.foundlower_count,
|
||||||
|
Timings#pcl_timings.missed_count]).
|
||||||
|
|
||||||
|
-spec update_timings(erlang:timestamp(), pcl_timings(),
|
||||||
|
not_found|tuple(), integer()|basement)
|
||||||
|
-> pcl_timings().
|
||||||
|
%% @doc
|
||||||
|
%%
|
||||||
|
%% update the timings record unless the current record object is the atom
|
||||||
|
%% no_timing.
|
||||||
|
update_timings(_SW, no_timing, _Result, _Stage) ->
|
||||||
|
no_timing;
|
||||||
|
update_timings(SW, Timings, Result, Stage) ->
|
||||||
|
Timer = timer:now_diff(os:timestamp(), SW),
|
||||||
|
SC = Timings#pcl_timings.sample_count + 1,
|
||||||
|
Timings0 = Timings#pcl_timings{sample_count = SC},
|
||||||
|
case {Result, Stage} of
|
||||||
|
{not_present, _} ->
|
||||||
|
NFT = Timings#pcl_timings.missed_time + Timer,
|
||||||
|
NFC = Timings#pcl_timings.missed_count + 1,
|
||||||
|
Timings0#pcl_timings{missed_time = NFT, missed_count = NFC};
|
||||||
|
{_, memory} ->
|
||||||
|
PMT = Timings#pcl_timings.foundmem_time + Timer,
|
||||||
|
PMC = Timings#pcl_timings.foundmem_count + 1,
|
||||||
|
Timings0#pcl_timings{foundmem_time = PMT, foundmem_count = PMC};
|
||||||
|
{_, 0} ->
|
||||||
|
L0T = Timings#pcl_timings.found0_time + Timer,
|
||||||
|
L0C = Timings#pcl_timings.found0_count + 1,
|
||||||
|
Timings0#pcl_timings{found0_time = L0T, found0_count = L0C};
|
||||||
|
{_, 1} ->
|
||||||
|
L1T = Timings#pcl_timings.found1_time + Timer,
|
||||||
|
L1C = Timings#pcl_timings.found1_count + 1,
|
||||||
|
Timings0#pcl_timings{found1_time = L1T, found1_count = L1C};
|
||||||
|
{_, 2} ->
|
||||||
|
L2T = Timings#pcl_timings.found2_time + Timer,
|
||||||
|
L2C = Timings#pcl_timings.found2_count + 1,
|
||||||
|
Timings0#pcl_timings{found2_time = L2T, found2_count = L2C};
|
||||||
|
_ ->
|
||||||
|
LLT = Timings#pcl_timings.foundlower_time + Timer,
|
||||||
|
LLC = Timings#pcl_timings.foundlower_count + 1,
|
||||||
|
Timings0#pcl_timings{foundlower_time = LLT, foundlower_count = LLC}
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue