diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 8dc9511..cdcbcce 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -11,7 +11,6 @@ log_timer/3, log_randomtimer/4, put_timing/4, - head_timing/4, get_timing/3]). -define(PUT_LOGPOINT, 10000). @@ -124,7 +123,12 @@ {info, "Completion of update to levelzero" ++ " with cache size status ~w ~w"}}, {"P0032", - {info, "Head timing for result ~w is sample ~w total ~w and max ~w"}}, + {info, "Fetch head timing with sample_count=~w and level timings of" + ++ " foundmem_time=~w found0_time=~w found1_time=~w" + ++ " found2_time=~w foundlower_time=~w missed_time=~w" + ++ " with counts of" + ++ " foundmem_count=~w found0_count=~w found1_count=~w" + ++ " found2_count=~w foundlower_count=~w missed_count=~w"}}, {"P0033", {error, "Corrupted manifest file at path ~s to be ignored " ++ "due to error ~w"}}, @@ -427,65 +431,6 @@ put_timing(Actor, {?PUT_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> {N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}. -%% Make a log of penciller head timings split out by level and result - one -%% log for every HEAD_LOGPOINT puts -%% Returns a tuple of {Count, TimingDict} to be stored on the process state -head_timing(undefined, SW, Level, R) -> - T0 = timer:now_diff(os:timestamp(), SW), - head_timing_int(undefined, T0, Level, R); -head_timing({N, HeadTimingD}, SW, Level, R) -> - case N band (?SAMPLE_RATE - 1) of - 0 -> - T0 = timer:now_diff(os:timestamp(), SW), - head_timing_int({N, HeadTimingD}, T0, Level, R); - _ -> - % Not to be sampled this time - {N + 1, HeadTimingD} - end. - -head_timing_int(undefined, T0, Level, R) -> - Key = head_key(R, Level), - NewDFun = fun(K, Acc) -> - case K of - Key -> - dict:store(K, [1, T0, T0], Acc); - _ -> - dict:store(K, [0, 0, 0], Acc) - end end, - {1, lists:foldl(NewDFun, dict:new(), head_keylist())}; -head_timing_int({?HEAD_LOGPOINT, HeadTimingD}, T0, Level, R) -> - RN = leveled_rand:uniform(?HEAD_LOGPOINT), - case RN > ?HEAD_LOGPOINT div 2 of - true -> - % log at the timing point less than half the time - LogFun = fun(K) -> log("P0032", [K|dict:fetch(K, HeadTimingD)]) end, - lists:foreach(LogFun, head_keylist()), - head_timing_int(undefined, T0, Level, R); - false -> - % Log some other time - reset to RN not 0 to stagger logs out over - % time between the vnodes - head_timing_int({RN, HeadTimingD}, T0, Level, R) - end; -head_timing_int({N, HeadTimingD}, T0, Level, R) -> - Key = head_key(R, Level), - [Count0, Total0, Max0] = dict:fetch(Key, HeadTimingD), - {N + 1, - dict:store(Key, [Count0 + 1, Total0 + T0, max(Max0, T0)], - HeadTimingD)}. - -head_key(not_present, _Level) -> - not_present; -head_key(found, 0) -> - found_0; -head_key(found, 1) -> - found_1; -head_key(found, 2) -> - found_2; -head_key(found, Level) when Level > 2 -> - found_lower. - -head_keylist() -> - [not_present, found_lower, found_0, found_1, found_2]. get_timing(undefined, SW, TimerType) -> T0 = timer:now_diff(os:timestamp(), SW), @@ -572,17 +517,6 @@ log_test() -> log("D0001", []), log_timer("D0001", [], os:timestamp()). -head_timing_test() -> - SW = os:timestamp(), - HeadTimer0 = lists:foldl(fun(_X, Acc) -> head_timing(Acc, SW, 2, found) end, - undefined, - lists:seq(0, 47)), - HeadTimer1 = head_timing(HeadTimer0, SW, 3, found), - {N, D} = HeadTimer1, - ?assertMatch(49, N), - ?assertMatch(3, lists:nth(1, dict:fetch(found_2, D))), - ?assertMatch(1, lists:nth(1, dict:fetch(found_lower, D))). - log_warn_test() -> ok = log("G0001", [], [warn, error]), ok = log("G8888", [], [info, warn, error]), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a2523bc..4a9365d 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -219,6 +219,8 @@ -define(ITERATOR_SCANWIDTH, 4). -define(SNAPSHOT_TIMEOUT_LONG, 3600). -define(SNAPSHOT_TIMEOUT_SHORT, 600). +-define(TIMING_SAMPLECOUNTDOWN, 10000). +-define(TIMING_SAMPLESIZE, 100). -record(state, {manifest, % a manifest record from the leveled_manifest module persisted_sqn = 0 :: integer(), % The highest SQN persisted @@ -244,10 +246,26 @@ work_ongoing = false :: boolean(), % i.e. compaction work work_backlog = false :: boolean(), % i.e. compaction work - head_timing :: tuple() | undefined, - + timings = no_timing :: pcl_timings(), + timings_countdown = 0 :: integer(), + compression_method = native :: lz4|native}). +-record(pcl_timings, + {sample_count = 0 :: integer(), + foundmem_time = 0 :: integer(), + found0_time = 0 :: integer(), + found1_time = 0 :: integer(), + found2_time = 0 :: integer(), + foundlower_time = 0 :: integer(), + missed_time = 0 :: integer(), + foundmem_count = 0 :: integer(), + found0_count = 0 :: integer(), + found1_count = 0 :: integer(), + found2_count = 0 :: integer(), + foundlower_count = 0 :: integer(), + missed_count = 0 :: integer()}). + -type penciller_options() :: #penciller_options{}. -type bookies_memory() :: {tuple()|empty_cache, % array:array()|empty_array, @@ -255,6 +273,7 @@ integer()|infinity, integer()}. -type pcl_state() :: #state{}. +-type pcl_timings() :: no_timing|#pcl_timings{}. %%%============================================================================ %%% API @@ -303,11 +322,11 @@ pcl_pushmem(Pid, LedgerCache) -> %% The return value will be a leveled_skiplist that forms that part of the %% cache pcl_fetchlevelzero(Pid, Slot) -> - %% Timeout to cause crash of L0 file when it can't get the close signal - %% as it is deadlocked making this call. - %% - %% If the timeout gets hit outside of close scenario the Penciller will - %% be stuck in L0 pending + % Timeout to cause crash of L0 file when it can't get the close signal + % as it is deadlocked making this call. + % + % If the timeout gets hit outside of close scenario the Penciller will + % be stuck in L0 pending gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). -spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present. @@ -555,13 +574,15 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}}, State)} end; handle_call({fetch, Key, Hash}, _From, State) -> - {R, HeadTimer} = timed_fetch_mem(Key, + {R, UpdTimings} = timed_fetch_mem(Key, Hash, State#state.manifest, State#state.levelzero_cache, State#state.levelzero_index, - State#state.head_timing), - {reply, R, State#state{head_timing=HeadTimer}}; + State#state.timings), + {UpdTimings0, CountDown} = + update_statetimings(UpdTimings, State#state.timings_countdown), + {reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}}; handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> {reply, compare_to_sqn(plain_fetch_mem(Key, @@ -1059,17 +1080,11 @@ roll_memory(State, true) -> {ok, Constructor, _} = R, Constructor. -timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) -> +timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) -> SW = os:timestamp(), {R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), - UpdHeadTimer = - case R of - not_present -> - leveled_log:head_timing(HeadTimer, SW, Level, not_present); - _ -> - leveled_log:head_timing(HeadTimer, SW, Level, found) - end, - {R, UpdHeadTimer}. + UpdTimings = update_timings(SW, Timings, R, Level), + {R, UpdTimings}. plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), @@ -1082,7 +1097,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> {false, not_found} -> fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4); {true, KV} -> - {KV, 0} + {KV, memory} end. fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> @@ -1404,6 +1419,89 @@ find_nextkey(QueryArray, LCnt, +%%%============================================================================ +%%% Timing Functions +%%%============================================================================ + +-spec update_statetimings(pcl_timings(), integer()) + -> {pcl_timings(), integer()}. +%% @doc +%% +%% The timings state is either in countdown to the next set of samples of +%% we are actively collecting a sample. Active collection take place +%% when the countdown is 0. Once the sample has reached the expected count +%% then there is a log of that sample, and the countdown is restarted. +%% +%% Outside of sample windows the timings object should be set to the atom +%% no_timing. no_timing is a valid state for the pcl_timings type. +update_statetimings(no_timing, 0) -> + {#pcl_timings{}, 0}; +update_statetimings(Timings, 0) -> + case Timings#pcl_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(Timings), + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; +update_statetimings(no_timing, N) -> + {no_timing, N - 1}. + +log_timings(Timings) -> + leveled_log:log("P0032", [Timings#pcl_timings.sample_count, + Timings#pcl_timings.foundmem_time, + Timings#pcl_timings.found0_time, + Timings#pcl_timings.found1_time, + Timings#pcl_timings.found2_time, + Timings#pcl_timings.foundlower_time, + Timings#pcl_timings.missed_time, + Timings#pcl_timings.foundmem_count, + Timings#pcl_timings.found0_count, + Timings#pcl_timings.found1_count, + Timings#pcl_timings.found2_count, + Timings#pcl_timings.foundlower_count, + Timings#pcl_timings.missed_count]). + +-spec update_timings(erlang:timestamp(), pcl_timings(), + not_found|tuple(), integer()|basement) + -> pcl_timings(). +%% @doc +%% +%% update the timings record unless the current record object is the atom +%% no_timing. +update_timings(_SW, no_timing, _Result, _Stage) -> + no_timing; +update_timings(SW, Timings, Result, Stage) -> + Timer = timer:now_diff(os:timestamp(), SW), + SC = Timings#pcl_timings.sample_count + 1, + Timings0 = Timings#pcl_timings{sample_count = SC}, + case {Result, Stage} of + {not_present, _} -> + NFT = Timings#pcl_timings.missed_time + Timer, + NFC = Timings#pcl_timings.missed_count + 1, + Timings0#pcl_timings{missed_time = NFT, missed_count = NFC}; + {_, memory} -> + PMT = Timings#pcl_timings.foundmem_time + Timer, + PMC = Timings#pcl_timings.foundmem_count + 1, + Timings0#pcl_timings{foundmem_time = PMT, foundmem_count = PMC}; + {_, 0} -> + L0T = Timings#pcl_timings.found0_time + Timer, + L0C = Timings#pcl_timings.found0_count + 1, + Timings0#pcl_timings{found0_time = L0T, found0_count = L0C}; + {_, 1} -> + L1T = Timings#pcl_timings.found1_time + Timer, + L1C = Timings#pcl_timings.found1_count + 1, + Timings0#pcl_timings{found1_time = L1T, found1_count = L1C}; + {_, 2} -> + L2T = Timings#pcl_timings.found2_time + Timer, + L2C = Timings#pcl_timings.found2_count + 1, + Timings0#pcl_timings{found2_time = L2T, found2_count = L2C}; + _ -> + LLT = Timings#pcl_timings.foundlower_time + Timer, + LLC = Timings#pcl_timings.foundlower_count + 1, + Timings0#pcl_timings{foundlower_time = LLT, foundlower_count = LLC} + end. + %%%============================================================================ %%% Test