From f436cfd03ed1e6acca5e07eb7f4f40d1ae26d72d Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 21 Nov 2017 23:13:24 +0000 Subject: [PATCH] Add consistent timing points Now all timing points should be made in a consistent fashion --- src/leveled_bookie.erl | 239 +++++++++++++++++++++++++++++++------- src/leveled_log.erl | 110 ++---------------- src/leveled_penciller.erl | 13 +++ src/leveled_sst.erl | 14 ++- 4 files changed, 231 insertions(+), 145 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index edd339d..eb2fced 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -83,6 +83,8 @@ -define(RECENT_AAE, false). -define(COMPRESSION_METHOD, lz4). -define(COMPRESSION_POINT, on_receipt). +-define(TIMING_SAMPLESIZE, 100). +-define(TIMING_SAMPLECOUNTDOWN, 10000). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -99,13 +101,34 @@ ledger_cache = #ledger_cache{}, is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), - put_timing :: tuple() | undefined, - get_timing :: tuple() | undefined}). + puttiming_countdown = 0 :: integer(), + gettiming_countdown = 0 :: integer(), + foldtiming_countdown = 0 :: integer(), + get_timings = no_timing :: get_timings(), + put_timings = no_timing :: put_timings(), + fold_timings = no_timing :: fold_timings()}). + + +-record(get_timings, {sample_count = 0 :: integer(), + head_time = 0 :: integer(), + body_time = 0 :: integer(), + fetch_count = 0 :: integer()}). + +-record(put_timings, {sample_count = 0 :: integer(), + mem_time = 0 :: integer(), + ink_time = 0 :: integer(), + total_size = 0 :: integer()}). + +-record(fold_timings, {sample_count = 0 :: integer(), + setup_time = 0 :: integer()}). -type book_state() :: #state{}. -type sync_mode() :: sync|none|riak_sync. -type ledger_cache() :: #ledger_cache{}. +-type get_timings() :: no_timing|#get_timings{}. +-type put_timings() :: no_timing|#put_timings{}. +-type fold_timings() :: no_timing|#fold_timings{}. %%%============================================================================ %%% API @@ -467,12 +490,13 @@ init([Opts]) -> handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), - SW = os:timestamp(), + SW0 = os:timestamp(), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, LedgerKey, Object, {IndexSpecs, TTL}), - T0 = timer:now_diff(os:timestamp(), SW), + {SW1, Timings1} = + update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings), Changes = preparefor_ledgercache(no_type_assigned, LedgerKey, SQN, @@ -481,8 +505,10 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> {IndexSpecs, TTL}, State#state.recent_aae), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), - T1 = timer:now_diff(os:timestamp(), SW) - T0, - PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1), + {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), + + {Timings, CountDown} = + update_statetimings(put, Timings2, State#state.puttiming_countdown), % If the previous push to memory was returned then punish this PUT with a % delay. If the back-pressure in the Penciller continues, these delays % will beocme more frequent @@ -492,53 +518,66 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> false -> gen_server:reply(From, ok) end, - maybe_longrunning(SW, overall_put), + maybe_longrunning(SW0, overall_put), case maybepush_ledgercache(State#state.cache_size, Cache0, State#state.penciller) of {ok, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, - put_timing=PutTimes, - slow_offer=false}}; + {noreply, State#state{ledger_cache = NewCache, + put_timings = Timings, + puttiming_countdown = CountDown, + slow_offer = false}}; {returned, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, - put_timing=PutTimes, - slow_offer=true}} + {noreply, State#state{ledger_cache = NewCache, + put_timings = Timings, + puttiming_countdown = CountDown, + slow_offer = true}} end; handle_call({get, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SWh = os:timestamp(), - case fetch_head(LedgerKey, - State#state.penciller, - State#state.ledger_cache) of - not_present -> - GT0 = leveled_log:get_timing(State#state.get_timing, - SWh, - head_not_present), - {reply, not_found, State#state{get_timing=GT0}}; - Head -> - GT0 = leveled_log:get_timing(State#state.get_timing, - SWh, - head_found), - SWg = os:timestamp(), - {Seqn, Status, _MH, _MD} = leveled_codec:striphead_to_details(Head), - case Status of - tomb -> - {reply, not_found, State}; - {active, TS} -> - Active = TS >= leveled_codec:integer_now(), - Object = fetch_value(State#state.inker, {LedgerKey, Seqn}), - GT1 = leveled_log:get_timing(GT0, SWg, fetch), - case {Active, Object} of - {_, not_present} -> - {reply, not_found, State#state{get_timing=GT1}}; - {true, Object} -> - {reply, {ok, Object}, State#state{get_timing=GT1}}; - _ -> - {reply, not_found, State#state{get_timing=GT1}} - end - end - end; + HeadResult = + case fetch_head(LedgerKey, + State#state.penciller, + State#state.ledger_cache) of + not_present -> + not_found; + Head -> + {Seqn, Status, _MH, _MD} = + leveled_codec:striphead_to_details(Head), + case Status of + tomb -> + not_found; + {active, TS} -> + case TS >= leveled_codec:integer_now() of + false -> + not_found; + true -> + {LedgerKey, Seqn} + end + end + end, + {SWb, Timings1} = + update_timings(SWh, {get, head}, State#state.get_timings), + {Reply, Timings2} = + case HeadResult of + not_found -> + {not_found, Timings1}; + {LK, SQN} -> + Object = fetch_value(State#state.inker, {LK, SQN}), + {_SW, UpdTimingsB} = + update_timings(SWb, {get, body}, Timings1), + case Object of + not_present -> + {not_found, UpdTimingsB}; + _ -> + {{ok, Object}, UpdTimingsB} + end + end, + {Timings, CountDown} = + update_statetimings(get, Timings2, State#state.gettiming_countdown), + {reply, Reply, State#state{get_timings = Timings, + gettiming_countdown = CountDown}}; handle_call({head, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), case fetch_head(LedgerKey, @@ -569,7 +608,14 @@ handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> Reply = snapshot_store(State, SnapType, Query, LongRunning), {reply, Reply, State}; handle_call({return_runner, QueryType}, _From, State) -> - {reply, get_runner(State, QueryType), State}; + SW = os:timestamp(), + Runner = get_runner(State, QueryType), + {_SW, Timings1} = + update_timings(SW, {fold, setup}, State#state.fold_timings), + {Timings, CountDown} = + update_statetimings(fold, Timings1, State#state.foldtiming_countdown), + {reply, Runner, State#state{fold_timings = Timings, + foldtiming_countdown = CountDown}}; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, self(), @@ -1181,6 +1227,109 @@ delete_path(DirPath) -> [file:delete(filename:join([DirPath, File])) || File <- Files], file:del_dir(DirPath). + + +%%%============================================================================ +%%% Timing Functions +%%%============================================================================ + +-spec update_statetimings(put|get|fold, + put_timings()|get_timings()|fold_timings(), + integer()) + -> {put_timings()|get_timings()|fold_timings(), integer()}. +%% @doc +%% +%% The timings state is either in countdown to the next set of samples of +%% we are actively collecting a sample. Active collection take place +%% when the countdown is 0. Once the sample has reached the expected count +%% then there is a log of that sample, and the countdown is restarted. +%% +%% Outside of sample windows the timings object should be set to the atom +%% no_timing. no_timing is a valid state for each timings type. +update_statetimings(put, no_timing, 0) -> + {#put_timings{}, 0}; +update_statetimings(get, no_timing, 0) -> + {#get_timings{}, 0}; +update_statetimings(fold, no_timing, 0) -> + {#fold_timings{}, 0}; +update_statetimings(put, Timings, 0) -> + case Timings#put_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(put, Timings), + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; +update_statetimings(get, Timings, 0) -> + case Timings#get_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(get, Timings), + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; +update_statetimings(fold, Timings, 0) -> + case Timings#fold_timings.sample_count of + SC when SC >= (?TIMING_SAMPLESIZE div 10) -> + log_timings(fold, Timings), + {no_timing, + leveled_rand:uniform(2 * (?TIMING_SAMPLECOUNTDOWN div 10))}; + _SC -> + {Timings, 0} + end; +update_statetimings(_, no_timing, N) -> + {no_timing, N - 1}. + +log_timings(put, Timings) -> + leveled_log:log("B0015", [Timings#put_timings.sample_count, + Timings#put_timings.mem_time, + Timings#put_timings.ink_time, + Timings#put_timings.total_size]); +log_timings(get, Timings) -> + leveled_log:log("B0016", [Timings#get_timings.sample_count, + Timings#get_timings.head_time, + Timings#get_timings.body_time, + Timings#get_timings.fetch_count]); +log_timings(fold, Timings) -> + leveled_log:log("B0017", [Timings#fold_timings.sample_count, + Timings#fold_timings.setup_time]). + + +update_timings(_SW, _Stage, no_timing) -> + {no_timing, no_timing}; +update_timings(SW, {put, Stage}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + Timings0 = + case Stage of + {inker, ObjectSize} -> + INT = Timings#put_timings.ink_time + Timer, + TSZ = Timings#put_timings.total_size + ObjectSize, + Timings#put_timings{ink_time = INT, total_size = TSZ}; + mem -> + PCT = Timings#put_timings.mem_time + Timer, + CNT = Timings#put_timings.sample_count + 1, + Timings#put_timings{mem_time = PCT, sample_count = CNT} + end, + {os:timestamp(), Timings0}; +update_timings(SW, {get, head}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + GHT = Timings#get_timings.head_time + Timer, + CNT = Timings#get_timings.sample_count + 1, + Timings0 = Timings#get_timings{head_time = GHT, sample_count = CNT}, + {os:timestamp(), Timings0}; +update_timings(SW, {get, body}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + GBT = Timings#get_timings.body_time + Timer, + FCNT = Timings#get_timings.fetch_count + 1, + Timings0 = Timings#get_timings{body_time = GBT, fetch_count = FCNT}, + {no_timing, Timings0}; +update_timings(SW, {fold, setup}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + FST = Timings#fold_timings.setup_time + Timer, + CNT = Timings#fold_timings.sample_count + 1, + Timings0 = Timings#fold_timings{setup_time = FST, sample_count = CNT}, + {no_timing, Timings0}. + %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_log.erl b/src/leveled_log.erl index cdcbcce..63eb237 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -9,15 +9,9 @@ -export([log/2, log_timer/3, - log_randomtimer/4, - put_timing/4, - get_timing/3]). + log_randomtimer/4]). --define(PUT_LOGPOINT, 10000). --define(HEAD_LOGPOINT, 20000). --define(GET_LOGPOINT, 20000). -define(LOG_LEVEL, [info, warn, error, critical]). --define(SAMPLE_RATE, 16). -define(LOGBASE, [ @@ -50,14 +44,18 @@ {info, "Bucket list finds non-binary Bucket ~w"}}, {"B0011", {warn, "Call to destroy the store and so all files to be removed"}}, - {"B0012", - {info, "After ~w PUTs total inker time is ~w total ledger time is ~w " - ++ "and max inker time is ~w and max ledger time is ~w"}}, {"B0013", {warn, "Long running task took ~w microseconds with task of type ~w"}}, - {"B0014", - {info, "Get timing for result ~w is sample ~w total ~w and max ~w"}}, - + {"B0015", + {info, "Put timing with sample_count=~w and mem_time=~w ink_time=~w" + ++ " with total_object_size=~w"}}, + {"B0016", + {info, "Get timing with sample_count=~w and head_time=~w body_time=~w" + ++ " with fetch_count=~w"}}, + {"B0017", + {info, "Fold timing with sample_count=~w and setup_time=~w"}}, + + {"R0001", {debug, "Object fold to process batch of ~w objects"}}, @@ -406,92 +404,6 @@ log_randomtimer(LogReference, Subs, StartTime, RandomProb) -> ok end. -%% Make a log of put timings split out by actor - one log for every -%% PUT_LOGPOINT puts - -put_timing(_Actor, undefined, T0, T1) -> - {1, {T0, T1}, {T0, T1}}; -put_timing(Actor, {?PUT_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> - RN = leveled_rand:uniform(?PUT_LOGPOINT), - case RN > ?PUT_LOGPOINT div 2 of - true -> - % log at the timing point less than half the time - LogRef = - case Actor of - bookie -> "B0012" %; - % inker -> "I0019"; - % journal -> "CDB17" - end, - log(LogRef, [?PUT_LOGPOINT, Total0, Total1, Max0, Max1]), - put_timing(Actor, undefined, T0, T1); - false -> - % Log some other random time - put_timing(Actor, {RN, {Total0, Total1}, {Max0, Max1}}, T0, T1) - end; -put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> - {N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}. - - -get_timing(undefined, SW, TimerType) -> - T0 = timer:now_diff(os:timestamp(), SW), - gen_timing_int(undefined, - T0, - TimerType, - fun get_keylist/0, - ?GET_LOGPOINT, - "B0014"); -get_timing({N, GetTimerD}, SW, TimerType) -> - case N band (?SAMPLE_RATE - 1) of - 0 -> - T0 = timer:now_diff(os:timestamp(), SW), - gen_timing_int({N, GetTimerD}, - T0, - TimerType, - fun get_keylist/0, - ?GET_LOGPOINT, - "B0014"); - _ -> - % Not to be sampled this time - {N + 1, GetTimerD} - end. - -get_keylist() -> - [head_not_present, head_found, fetch]. - - -gen_timing_int(undefined, T0, TimerType, KeyListFun, _LogPoint, _LogRef) -> - NewDFun = fun(K, Acc) -> - case K of - TimerType -> - dict:store(K, [1, T0, T0], Acc); - _ -> - dict:store(K, [0, 0, 0], Acc) - end end, - {1, lists:foldl(NewDFun, dict:new(), KeyListFun())}; -gen_timing_int({LogPoint, TimerD}, T0, TimerType, KeyListFun, LogPoint, - LogRef) -> - RN = leveled_rand:uniform(LogPoint), - case RN > LogPoint div 2 of - true -> - % log at the timing point less than half the time - LogFun = fun(K) -> log(LogRef, [K|dict:fetch(K, TimerD)]) end, - lists:foreach(LogFun, KeyListFun()), - gen_timing_int(undefined, T0, TimerType, - KeyListFun, LogPoint, LogRef); - false -> - % Log some other time - reset to RN not 0 to stagger logs out over - % time between the vnodes - gen_timing_int({RN, TimerD}, T0, TimerType, - KeyListFun, LogPoint, LogRef) - end; -gen_timing_int({N, TimerD}, T0, TimerType, _KeyListFun, _LogPoint, _LogRef) -> - [Count0, Total0, Max0] = dict:fetch(TimerType, TimerD), - {N + 1, - dict:store(TimerType, - [Count0 + 1, Total0 + T0, max(Max0, T0)], - TimerD)}. - - format_time() -> format_time(localtime_ms()). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 4a9365d..7266e30 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -1929,6 +1929,19 @@ checkready(Pid) -> timeout end. +timings_test() -> + SW = os:timestamp(), + timer:sleep(1), + T0 = update_timings(SW, #pcl_timings{}, {"K", "V"}, 2), + timer:sleep(1), + T1 = update_timings(SW, T0, {"K", "V"}, 3), + T2 = update_timings(SW, T1, {"K", "V"}, basement), + ?assertMatch(3, T2#pcl_timings.sample_count), + ?assertMatch(true, T2#pcl_timings.foundlower_time > T2#pcl_timings.found2_time), + ?assertMatch(1, T2#pcl_timings.found2_count), + ?assertMatch(2, T2#pcl_timings.foundlower_count). + + coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null). diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 554edce..164e251 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -2531,7 +2531,19 @@ check_segment_match(PosBinIndex1, KVL, TreeSize) -> end, lists:foreach(CheckFun, KVL). - +timings_test() -> + SW = os:timestamp(), + timer:sleep(1), + {no_timing, T0} = update_timings(SW, #sst_timings{}, tiny_bloom, false), + {no_timing, T1} = update_timings(SW, T0, slot_index, false), + {no_timing, T2} = update_timings(SW, T1, slot_fetch, false), + {no_timing, T3} = update_timings(SW, T2, noncached_block, false), + timer:sleep(1), + {_, T4} = update_timings(SW, T3, tiny_bloom, true), + ?assertMatch(4, T4#sst_timings.sample_count), + ?assertMatch(1, T4#sst_timings.tiny_bloom_count), + ?assertMatch(true, T4#sst_timings.tiny_bloom_time > + T3#sst_timings.tiny_bloom_time). -endif.