diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index edd339d..eb2fced 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -83,6 +83,8 @@ -define(RECENT_AAE, false). -define(COMPRESSION_METHOD, lz4). -define(COMPRESSION_POINT, on_receipt). +-define(TIMING_SAMPLESIZE, 100). +-define(TIMING_SAMPLECOUNTDOWN, 10000). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -99,13 +101,34 @@ ledger_cache = #ledger_cache{}, is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), - put_timing :: tuple() | undefined, - get_timing :: tuple() | undefined}). + puttiming_countdown = 0 :: integer(), + gettiming_countdown = 0 :: integer(), + foldtiming_countdown = 0 :: integer(), + get_timings = no_timing :: get_timings(), + put_timings = no_timing :: put_timings(), + fold_timings = no_timing :: fold_timings()}). + + +-record(get_timings, {sample_count = 0 :: integer(), + head_time = 0 :: integer(), + body_time = 0 :: integer(), + fetch_count = 0 :: integer()}). + +-record(put_timings, {sample_count = 0 :: integer(), + mem_time = 0 :: integer(), + ink_time = 0 :: integer(), + total_size = 0 :: integer()}). + +-record(fold_timings, {sample_count = 0 :: integer(), + setup_time = 0 :: integer()}). -type book_state() :: #state{}. -type sync_mode() :: sync|none|riak_sync. -type ledger_cache() :: #ledger_cache{}. +-type get_timings() :: no_timing|#get_timings{}. +-type put_timings() :: no_timing|#put_timings{}. +-type fold_timings() :: no_timing|#fold_timings{}. %%%============================================================================ %%% API @@ -467,12 +490,13 @@ init([Opts]) -> handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), - SW = os:timestamp(), + SW0 = os:timestamp(), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, LedgerKey, Object, {IndexSpecs, TTL}), - T0 = timer:now_diff(os:timestamp(), SW), + {SW1, Timings1} = + update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings), Changes = preparefor_ledgercache(no_type_assigned, LedgerKey, SQN, @@ -481,8 +505,10 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> {IndexSpecs, TTL}, State#state.recent_aae), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), - T1 = timer:now_diff(os:timestamp(), SW) - T0, - PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1), + {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), + + {Timings, CountDown} = + update_statetimings(put, Timings2, State#state.puttiming_countdown), % If the previous push to memory was returned then punish this PUT with a % delay. If the back-pressure in the Penciller continues, these delays % will beocme more frequent @@ -492,53 +518,66 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> false -> gen_server:reply(From, ok) end, - maybe_longrunning(SW, overall_put), + maybe_longrunning(SW0, overall_put), case maybepush_ledgercache(State#state.cache_size, Cache0, State#state.penciller) of {ok, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, - put_timing=PutTimes, - slow_offer=false}}; + {noreply, State#state{ledger_cache = NewCache, + put_timings = Timings, + puttiming_countdown = CountDown, + slow_offer = false}}; {returned, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, - put_timing=PutTimes, - slow_offer=true}} + {noreply, State#state{ledger_cache = NewCache, + put_timings = Timings, + puttiming_countdown = CountDown, + slow_offer = true}} end; handle_call({get, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SWh = os:timestamp(), - case fetch_head(LedgerKey, - State#state.penciller, - State#state.ledger_cache) of - not_present -> - GT0 = leveled_log:get_timing(State#state.get_timing, - SWh, - head_not_present), - {reply, not_found, State#state{get_timing=GT0}}; - Head -> - GT0 = leveled_log:get_timing(State#state.get_timing, - SWh, - head_found), - SWg = os:timestamp(), - {Seqn, Status, _MH, _MD} = leveled_codec:striphead_to_details(Head), - case Status of - tomb -> - {reply, not_found, State}; - {active, TS} -> - Active = TS >= leveled_codec:integer_now(), - Object = fetch_value(State#state.inker, {LedgerKey, Seqn}), - GT1 = leveled_log:get_timing(GT0, SWg, fetch), - case {Active, Object} of - {_, not_present} -> - {reply, not_found, State#state{get_timing=GT1}}; - {true, Object} -> - {reply, {ok, Object}, State#state{get_timing=GT1}}; - _ -> - {reply, not_found, State#state{get_timing=GT1}} - end - end - end; + HeadResult = + case fetch_head(LedgerKey, + State#state.penciller, + State#state.ledger_cache) of + not_present -> + not_found; + Head -> + {Seqn, Status, _MH, _MD} = + leveled_codec:striphead_to_details(Head), + case Status of + tomb -> + not_found; + {active, TS} -> + case TS >= leveled_codec:integer_now() of + false -> + not_found; + true -> + {LedgerKey, Seqn} + end + end + end, + {SWb, Timings1} = + update_timings(SWh, {get, head}, State#state.get_timings), + {Reply, Timings2} = + case HeadResult of + not_found -> + {not_found, Timings1}; + {LK, SQN} -> + Object = fetch_value(State#state.inker, {LK, SQN}), + {_SW, UpdTimingsB} = + update_timings(SWb, {get, body}, Timings1), + case Object of + not_present -> + {not_found, UpdTimingsB}; + _ -> + {{ok, Object}, UpdTimingsB} + end + end, + {Timings, CountDown} = + update_statetimings(get, Timings2, State#state.gettiming_countdown), + {reply, Reply, State#state{get_timings = Timings, + gettiming_countdown = CountDown}}; handle_call({head, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), case fetch_head(LedgerKey, @@ -569,7 +608,14 @@ handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> Reply = snapshot_store(State, SnapType, Query, LongRunning), {reply, Reply, State}; handle_call({return_runner, QueryType}, _From, State) -> - {reply, get_runner(State, QueryType), State}; + SW = os:timestamp(), + Runner = get_runner(State, QueryType), + {_SW, Timings1} = + update_timings(SW, {fold, setup}, State#state.fold_timings), + {Timings, CountDown} = + update_statetimings(fold, Timings1, State#state.foldtiming_countdown), + {reply, Runner, State#state{fold_timings = Timings, + foldtiming_countdown = CountDown}}; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, self(), @@ -1181,6 +1227,109 @@ delete_path(DirPath) -> [file:delete(filename:join([DirPath, File])) || File <- Files], file:del_dir(DirPath). + + +%%%============================================================================ +%%% Timing Functions +%%%============================================================================ + +-spec update_statetimings(put|get|fold, + put_timings()|get_timings()|fold_timings(), + integer()) + -> {put_timings()|get_timings()|fold_timings(), integer()}. +%% @doc +%% +%% The timings state is either in countdown to the next set of samples of +%% we are actively collecting a sample. Active collection take place +%% when the countdown is 0. Once the sample has reached the expected count +%% then there is a log of that sample, and the countdown is restarted. +%% +%% Outside of sample windows the timings object should be set to the atom +%% no_timing. no_timing is a valid state for each timings type. +update_statetimings(put, no_timing, 0) -> + {#put_timings{}, 0}; +update_statetimings(get, no_timing, 0) -> + {#get_timings{}, 0}; +update_statetimings(fold, no_timing, 0) -> + {#fold_timings{}, 0}; +update_statetimings(put, Timings, 0) -> + case Timings#put_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(put, Timings), + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; +update_statetimings(get, Timings, 0) -> + case Timings#get_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(get, Timings), + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; +update_statetimings(fold, Timings, 0) -> + case Timings#fold_timings.sample_count of + SC when SC >= (?TIMING_SAMPLESIZE div 10) -> + log_timings(fold, Timings), + {no_timing, + leveled_rand:uniform(2 * (?TIMING_SAMPLECOUNTDOWN div 10))}; + _SC -> + {Timings, 0} + end; +update_statetimings(_, no_timing, N) -> + {no_timing, N - 1}. + +log_timings(put, Timings) -> + leveled_log:log("B0015", [Timings#put_timings.sample_count, + Timings#put_timings.mem_time, + Timings#put_timings.ink_time, + Timings#put_timings.total_size]); +log_timings(get, Timings) -> + leveled_log:log("B0016", [Timings#get_timings.sample_count, + Timings#get_timings.head_time, + Timings#get_timings.body_time, + Timings#get_timings.fetch_count]); +log_timings(fold, Timings) -> + leveled_log:log("B0017", [Timings#fold_timings.sample_count, + Timings#fold_timings.setup_time]). + + +update_timings(_SW, _Stage, no_timing) -> + {no_timing, no_timing}; +update_timings(SW, {put, Stage}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + Timings0 = + case Stage of + {inker, ObjectSize} -> + INT = Timings#put_timings.ink_time + Timer, + TSZ = Timings#put_timings.total_size + ObjectSize, + Timings#put_timings{ink_time = INT, total_size = TSZ}; + mem -> + PCT = Timings#put_timings.mem_time + Timer, + CNT = Timings#put_timings.sample_count + 1, + Timings#put_timings{mem_time = PCT, sample_count = CNT} + end, + {os:timestamp(), Timings0}; +update_timings(SW, {get, head}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + GHT = Timings#get_timings.head_time + Timer, + CNT = Timings#get_timings.sample_count + 1, + Timings0 = Timings#get_timings{head_time = GHT, sample_count = CNT}, + {os:timestamp(), Timings0}; +update_timings(SW, {get, body}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + GBT = Timings#get_timings.body_time + Timer, + FCNT = Timings#get_timings.fetch_count + 1, + Timings0 = Timings#get_timings{body_time = GBT, fetch_count = FCNT}, + {no_timing, Timings0}; +update_timings(SW, {fold, setup}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + FST = Timings#fold_timings.setup_time + Timer, + CNT = Timings#fold_timings.sample_count + 1, + Timings0 = Timings#fold_timings{setup_time = FST, sample_count = CNT}, + {no_timing, Timings0}. + %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 6c71071..fef9ce1 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -97,7 +97,7 @@ -define(WRITE_OPS, [binary, raw, read, write]). -define(PENDING_ROLL_WAIT, 30). -define(DELETE_TIMEOUT, 10000). --define(TIMING_SAMPLECOUNTDOWN, 2000). +-define(TIMING_SAMPLECOUNTDOWN, 1000). -define(TIMING_SAMPLESIZE, 100). -record(state, {hashtree, @@ -1437,7 +1437,7 @@ update_statetimings(Timings, 0) -> Timings#cdb_timings.sample_cyclecount, Timings#cdb_timings.sample_fetchtime, Timings#cdb_timings.sample_indextime]), - {no_timing, leveled_rand:uniform(?TIMING_SAMPLECOUNTDOWN)}; + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; _SC -> {Timings, 0} end; diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 01bfd7f..63eb237 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -9,18 +9,9 @@ -export([log/2, log_timer/3, - log_randomtimer/4, - put_timing/4, - head_timing/4, - get_timing/3, - sst_timing/3]). + log_randomtimer/4]). --define(PUT_LOGPOINT, 10000). --define(HEAD_LOGPOINT, 20000). --define(GET_LOGPOINT, 20000). --define(SST_LOGPOINT, 20000). -define(LOG_LEVEL, [info, warn, error, critical]). --define(SAMPLE_RATE, 16). -define(LOGBASE, [ @@ -53,13 +44,20 @@ {info, "Bucket list finds non-binary Bucket ~w"}}, {"B0011", {warn, "Call to destroy the store and so all files to be removed"}}, - {"B0012", - {info, "After ~w PUTs total inker time is ~w total ledger time is ~w " - ++ "and max inker time is ~w and max ledger time is ~w"}}, {"B0013", {warn, "Long running task took ~w microseconds with task of type ~w"}}, - {"B0014", - {info, "Get timing for result ~w is sample ~w total ~w and max ~w"}}, + {"B0015", + {info, "Put timing with sample_count=~w and mem_time=~w ink_time=~w" + ++ " with total_object_size=~w"}}, + {"B0016", + {info, "Get timing with sample_count=~w and head_time=~w body_time=~w" + ++ " with fetch_count=~w"}}, + {"B0017", + {info, "Fold timing with sample_count=~w and setup_time=~w"}}, + + + {"R0001", + {debug, "Object fold to process batch of ~w objects"}}, {"P0001", {debug, "Ledger snapshot ~w registered"}}, @@ -123,7 +121,12 @@ {info, "Completion of update to levelzero" ++ " with cache size status ~w ~w"}}, {"P0032", - {info, "Head timing for result ~w is sample ~w total ~w and max ~w"}}, + {info, "Fetch head timing with sample_count=~w and level timings of" + ++ " foundmem_time=~w found0_time=~w found1_time=~w" + ++ " found2_time=~w foundlower_time=~w missed_time=~w" + ++ " with counts of" + ++ " foundmem_count=~w found0_count=~w found1_count=~w" + ++ " found2_count=~w foundlower_count=~w missed_count=~w"}}, {"P0033", {error, "Corrupted manifest file at path ~s to be ignored " ++ "due to error ~w"}}, @@ -191,7 +194,44 @@ {info, "Prompting deletions at ManifestSQN=~w"}}, {"PC022", {info, "Storing reference to deletions at ManifestSQN=~w"}}, - + {"PM002", + {info, "Completed dump of L0 cache to list of size ~w"}}, + + {"SST01", + {info, "SST timing for result ~w is sample ~w total ~w and max ~w"}}, + {"SST02", + {error, "False result returned from SST with filename ~s as " + ++ "slot ~w has failed crc check"}}, + {"SST03", + {info, "Opening SST file with filename ~s slots ~w and" + ++ " max sqn ~w"}}, + {"SST04", + {info, "Exit called for reason ~w on filename ~s"}}, + {"SST05", + {warn, "Rename rogue filename ~s to ~s"}}, + {"SST06", + {debug, "File ~s has been set for delete"}}, + {"SST07", + {info, "Exit called and now clearing ~s"}}, + {"SST08", + {info, "Completed creation of ~s at level ~w with max sqn ~w"}}, + {"SST09", + {warn, "Read request exposes slot with bad CRC"}}, + {"SST10", + {debug, "Expansion sought to support pointer to pid ~w status ~w"}}, + {"SST11", + {info, "Level zero creation timings in microseconds " + ++ "pmem_fetch=~w merge_lists=~w build_slots=~w " + ++ "build_summary=~w read_switch=~w"}}, + {"SST12", + {info, "SST Timings for sample_count=~w" + ++ " at timing points index_query_time=~w" + ++ " tiny_bloom_time=~w slot_index_time=~w slot_fetch_time=~w" + ++ " noncached_block_fetch_time=~w" + ++ " exiting at points tiny_bloom=~w slot_index=~w" + ++ " slot_fetch=~w noncached_block_fetch=~w"}}, + + {"I0001", {info, "Unexpected failure to fetch value for Key=~w SQN=~w " ++ "with reason ~w"}}, @@ -261,33 +301,7 @@ {"IC013", {warn, "File with name ~s to be ignored in manifest as scanning for " ++ "first key returned empty - maybe corrupted"}}, - - {"PM002", - {info, "Completed dump of L0 cache to list of size ~w"}}, - - {"SST01", - {info, "SST timing for result ~w is sample ~w total ~w and max ~w"}}, - {"SST02", - {error, "False result returned from SST with filename ~s as " - ++ "slot ~w has failed crc check"}}, - {"SST03", - {info, "Opening SST file with filename ~s slots ~w and" - ++ " max sqn ~w"}}, - {"SST04", - {info, "Exit called for reason ~w on filename ~s"}}, - {"SST05", - {warn, "Rename rogue filename ~s to ~s"}}, - {"SST06", - {debug, "File ~s has been set for delete"}}, - {"SST07", - {info, "Exit called and now clearing ~s"}}, - {"SST08", - {info, "Completed creation of ~s at level ~w with max sqn ~w"}}, - {"SST09", - {warn, "Read request exposes slot with bad CRC"}}, - {"SST10", - {debug, "Expansion sought to support pointer to pid ~w status ~w"}}, - + {"CDB01", {info, "Opening file for writing with filename ~s"}}, {"CDB02", @@ -330,10 +344,7 @@ {"CDB19", {info, "Sample timings in microseconds for sample_count=~w " ++ "with totals of cycle_count=~w " - ++ "fetch_time=~w index_time=~w"}}, - - {"R0001", - {debug, "Object fold to process batch of ~w objects"}} + ++ "fetch_time=~w index_time=~w"}} ]). @@ -393,179 +404,6 @@ log_randomtimer(LogReference, Subs, StartTime, RandomProb) -> ok end. -%% Make a log of put timings split out by actor - one log for every -%% PUT_LOGPOINT puts - -put_timing(_Actor, undefined, T0, T1) -> - {1, {T0, T1}, {T0, T1}}; -put_timing(Actor, {?PUT_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> - RN = leveled_rand:uniform(?PUT_LOGPOINT), - case RN > ?PUT_LOGPOINT div 2 of - true -> - % log at the timing point less than half the time - LogRef = - case Actor of - bookie -> "B0012" %; - % inker -> "I0019"; - % journal -> "CDB17" - end, - log(LogRef, [?PUT_LOGPOINT, Total0, Total1, Max0, Max1]), - put_timing(Actor, undefined, T0, T1); - false -> - % Log some other random time - put_timing(Actor, {RN, {Total0, Total1}, {Max0, Max1}}, T0, T1) - end; -put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> - {N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}. - -%% Make a log of penciller head timings split out by level and result - one -%% log for every HEAD_LOGPOINT puts -%% Returns a tuple of {Count, TimingDict} to be stored on the process state -head_timing(undefined, SW, Level, R) -> - T0 = timer:now_diff(os:timestamp(), SW), - head_timing_int(undefined, T0, Level, R); -head_timing({N, HeadTimingD}, SW, Level, R) -> - case N band (?SAMPLE_RATE - 1) of - 0 -> - T0 = timer:now_diff(os:timestamp(), SW), - head_timing_int({N, HeadTimingD}, T0, Level, R); - _ -> - % Not to be sampled this time - {N + 1, HeadTimingD} - end. - -head_timing_int(undefined, T0, Level, R) -> - Key = head_key(R, Level), - NewDFun = fun(K, Acc) -> - case K of - Key -> - dict:store(K, [1, T0, T0], Acc); - _ -> - dict:store(K, [0, 0, 0], Acc) - end end, - {1, lists:foldl(NewDFun, dict:new(), head_keylist())}; -head_timing_int({?HEAD_LOGPOINT, HeadTimingD}, T0, Level, R) -> - RN = leveled_rand:uniform(?HEAD_LOGPOINT), - case RN > ?HEAD_LOGPOINT div 2 of - true -> - % log at the timing point less than half the time - LogFun = fun(K) -> log("P0032", [K|dict:fetch(K, HeadTimingD)]) end, - lists:foreach(LogFun, head_keylist()), - head_timing_int(undefined, T0, Level, R); - false -> - % Log some other time - reset to RN not 0 to stagger logs out over - % time between the vnodes - head_timing_int({RN, HeadTimingD}, T0, Level, R) - end; -head_timing_int({N, HeadTimingD}, T0, Level, R) -> - Key = head_key(R, Level), - [Count0, Total0, Max0] = dict:fetch(Key, HeadTimingD), - {N + 1, - dict:store(Key, [Count0 + 1, Total0 + T0, max(Max0, T0)], - HeadTimingD)}. - -head_key(not_present, _Level) -> - not_present; -head_key(found, 0) -> - found_0; -head_key(found, 1) -> - found_1; -head_key(found, 2) -> - found_2; -head_key(found, Level) when Level > 2 -> - found_lower. - -head_keylist() -> - [not_present, found_lower, found_0, found_1, found_2]. - - -sst_timing(undefined, SW, TimerType) -> - T0 = timer:now_diff(os:timestamp(), SW), - gen_timing_int(undefined, - T0, - TimerType, - fun sst_keylist/0, - ?SST_LOGPOINT, - "SST01"); -sst_timing({N, SSTTimerD}, SW, TimerType) -> - case N band (?SAMPLE_RATE - 1) of - 0 -> - T0 = timer:now_diff(os:timestamp(), SW), - gen_timing_int({N, SSTTimerD}, - T0, - TimerType, - fun sst_keylist/0, - ?SST_LOGPOINT, - "SST01"); - _ -> - % Not to be sampled this time - {N + 1, SSTTimerD} - end. - -sst_keylist() -> - [tiny_bloom, slot_bloom, slot_fetch]. - - -get_timing(undefined, SW, TimerType) -> - T0 = timer:now_diff(os:timestamp(), SW), - gen_timing_int(undefined, - T0, - TimerType, - fun get_keylist/0, - ?GET_LOGPOINT, - "B0014"); -get_timing({N, GetTimerD}, SW, TimerType) -> - case N band (?SAMPLE_RATE - 1) of - 0 -> - T0 = timer:now_diff(os:timestamp(), SW), - gen_timing_int({N, GetTimerD}, - T0, - TimerType, - fun get_keylist/0, - ?GET_LOGPOINT, - "B0014"); - _ -> - % Not to be sampled this time - {N + 1, GetTimerD} - end. - -get_keylist() -> - [head_not_present, head_found, fetch]. - - -gen_timing_int(undefined, T0, TimerType, KeyListFun, _LogPoint, _LogRef) -> - NewDFun = fun(K, Acc) -> - case K of - TimerType -> - dict:store(K, [1, T0, T0], Acc); - _ -> - dict:store(K, [0, 0, 0], Acc) - end end, - {1, lists:foldl(NewDFun, dict:new(), KeyListFun())}; -gen_timing_int({LogPoint, TimerD}, T0, TimerType, KeyListFun, LogPoint, - LogRef) -> - RN = leveled_rand:uniform(LogPoint), - case RN > LogPoint div 2 of - true -> - % log at the timing point less than half the time - LogFun = fun(K) -> log(LogRef, [K|dict:fetch(K, TimerD)]) end, - lists:foreach(LogFun, KeyListFun()), - gen_timing_int(undefined, T0, TimerType, - KeyListFun, LogPoint, LogRef); - false -> - % Log some other time - reset to RN not 0 to stagger logs out over - % time between the vnodes - gen_timing_int({RN, TimerD}, T0, TimerType, - KeyListFun, LogPoint, LogRef) - end; -gen_timing_int({N, TimerD}, T0, TimerType, _KeyListFun, _LogPoint, _LogRef) -> - [Count0, Total0, Max0] = dict:fetch(TimerType, TimerD), - {N + 1, - dict:store(TimerType, - [Count0 + 1, Total0 + T0, max(Max0, T0)], - TimerD)}. - - format_time() -> format_time(localtime_ms()). @@ -591,17 +429,6 @@ log_test() -> log("D0001", []), log_timer("D0001", [], os:timestamp()). -head_timing_test() -> - SW = os:timestamp(), - HeadTimer0 = lists:foldl(fun(_X, Acc) -> head_timing(Acc, SW, 2, found) end, - undefined, - lists:seq(0, 47)), - HeadTimer1 = head_timing(HeadTimer0, SW, 3, found), - {N, D} = HeadTimer1, - ?assertMatch(49, N), - ?assertMatch(3, lists:nth(1, dict:fetch(found_2, D))), - ?assertMatch(1, lists:nth(1, dict:fetch(found_lower, D))). - log_warn_test() -> ok = log("G0001", [], [warn, error]), ok = log("G8888", [], [info, warn, error]), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a2523bc..7266e30 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -219,6 +219,8 @@ -define(ITERATOR_SCANWIDTH, 4). -define(SNAPSHOT_TIMEOUT_LONG, 3600). -define(SNAPSHOT_TIMEOUT_SHORT, 600). +-define(TIMING_SAMPLECOUNTDOWN, 10000). +-define(TIMING_SAMPLESIZE, 100). -record(state, {manifest, % a manifest record from the leveled_manifest module persisted_sqn = 0 :: integer(), % The highest SQN persisted @@ -244,10 +246,26 @@ work_ongoing = false :: boolean(), % i.e. compaction work work_backlog = false :: boolean(), % i.e. compaction work - head_timing :: tuple() | undefined, - + timings = no_timing :: pcl_timings(), + timings_countdown = 0 :: integer(), + compression_method = native :: lz4|native}). +-record(pcl_timings, + {sample_count = 0 :: integer(), + foundmem_time = 0 :: integer(), + found0_time = 0 :: integer(), + found1_time = 0 :: integer(), + found2_time = 0 :: integer(), + foundlower_time = 0 :: integer(), + missed_time = 0 :: integer(), + foundmem_count = 0 :: integer(), + found0_count = 0 :: integer(), + found1_count = 0 :: integer(), + found2_count = 0 :: integer(), + foundlower_count = 0 :: integer(), + missed_count = 0 :: integer()}). + -type penciller_options() :: #penciller_options{}. -type bookies_memory() :: {tuple()|empty_cache, % array:array()|empty_array, @@ -255,6 +273,7 @@ integer()|infinity, integer()}. -type pcl_state() :: #state{}. +-type pcl_timings() :: no_timing|#pcl_timings{}. %%%============================================================================ %%% API @@ -303,11 +322,11 @@ pcl_pushmem(Pid, LedgerCache) -> %% The return value will be a leveled_skiplist that forms that part of the %% cache pcl_fetchlevelzero(Pid, Slot) -> - %% Timeout to cause crash of L0 file when it can't get the close signal - %% as it is deadlocked making this call. - %% - %% If the timeout gets hit outside of close scenario the Penciller will - %% be stuck in L0 pending + % Timeout to cause crash of L0 file when it can't get the close signal + % as it is deadlocked making this call. + % + % If the timeout gets hit outside of close scenario the Penciller will + % be stuck in L0 pending gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). -spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present. @@ -555,13 +574,15 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}}, State)} end; handle_call({fetch, Key, Hash}, _From, State) -> - {R, HeadTimer} = timed_fetch_mem(Key, + {R, UpdTimings} = timed_fetch_mem(Key, Hash, State#state.manifest, State#state.levelzero_cache, State#state.levelzero_index, - State#state.head_timing), - {reply, R, State#state{head_timing=HeadTimer}}; + State#state.timings), + {UpdTimings0, CountDown} = + update_statetimings(UpdTimings, State#state.timings_countdown), + {reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}}; handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> {reply, compare_to_sqn(plain_fetch_mem(Key, @@ -1059,17 +1080,11 @@ roll_memory(State, true) -> {ok, Constructor, _} = R, Constructor. -timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) -> +timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) -> SW = os:timestamp(), {R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), - UpdHeadTimer = - case R of - not_present -> - leveled_log:head_timing(HeadTimer, SW, Level, not_present); - _ -> - leveled_log:head_timing(HeadTimer, SW, Level, found) - end, - {R, UpdHeadTimer}. + UpdTimings = update_timings(SW, Timings, R, Level), + {R, UpdTimings}. plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), @@ -1082,7 +1097,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> {false, not_found} -> fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4); {true, KV} -> - {KV, 0} + {KV, memory} end. fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> @@ -1404,6 +1419,89 @@ find_nextkey(QueryArray, LCnt, +%%%============================================================================ +%%% Timing Functions +%%%============================================================================ + +-spec update_statetimings(pcl_timings(), integer()) + -> {pcl_timings(), integer()}. +%% @doc +%% +%% The timings state is either in countdown to the next set of samples of +%% we are actively collecting a sample. Active collection take place +%% when the countdown is 0. Once the sample has reached the expected count +%% then there is a log of that sample, and the countdown is restarted. +%% +%% Outside of sample windows the timings object should be set to the atom +%% no_timing. no_timing is a valid state for the pcl_timings type. +update_statetimings(no_timing, 0) -> + {#pcl_timings{}, 0}; +update_statetimings(Timings, 0) -> + case Timings#pcl_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(Timings), + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; +update_statetimings(no_timing, N) -> + {no_timing, N - 1}. + +log_timings(Timings) -> + leveled_log:log("P0032", [Timings#pcl_timings.sample_count, + Timings#pcl_timings.foundmem_time, + Timings#pcl_timings.found0_time, + Timings#pcl_timings.found1_time, + Timings#pcl_timings.found2_time, + Timings#pcl_timings.foundlower_time, + Timings#pcl_timings.missed_time, + Timings#pcl_timings.foundmem_count, + Timings#pcl_timings.found0_count, + Timings#pcl_timings.found1_count, + Timings#pcl_timings.found2_count, + Timings#pcl_timings.foundlower_count, + Timings#pcl_timings.missed_count]). + +-spec update_timings(erlang:timestamp(), pcl_timings(), + not_found|tuple(), integer()|basement) + -> pcl_timings(). +%% @doc +%% +%% update the timings record unless the current record object is the atom +%% no_timing. +update_timings(_SW, no_timing, _Result, _Stage) -> + no_timing; +update_timings(SW, Timings, Result, Stage) -> + Timer = timer:now_diff(os:timestamp(), SW), + SC = Timings#pcl_timings.sample_count + 1, + Timings0 = Timings#pcl_timings{sample_count = SC}, + case {Result, Stage} of + {not_present, _} -> + NFT = Timings#pcl_timings.missed_time + Timer, + NFC = Timings#pcl_timings.missed_count + 1, + Timings0#pcl_timings{missed_time = NFT, missed_count = NFC}; + {_, memory} -> + PMT = Timings#pcl_timings.foundmem_time + Timer, + PMC = Timings#pcl_timings.foundmem_count + 1, + Timings0#pcl_timings{foundmem_time = PMT, foundmem_count = PMC}; + {_, 0} -> + L0T = Timings#pcl_timings.found0_time + Timer, + L0C = Timings#pcl_timings.found0_count + 1, + Timings0#pcl_timings{found0_time = L0T, found0_count = L0C}; + {_, 1} -> + L1T = Timings#pcl_timings.found1_time + Timer, + L1C = Timings#pcl_timings.found1_count + 1, + Timings0#pcl_timings{found1_time = L1T, found1_count = L1C}; + {_, 2} -> + L2T = Timings#pcl_timings.found2_time + Timer, + L2C = Timings#pcl_timings.found2_count + 1, + Timings0#pcl_timings{found2_time = L2T, found2_count = L2C}; + _ -> + LLT = Timings#pcl_timings.foundlower_time + Timer, + LLC = Timings#pcl_timings.foundlower_count + 1, + Timings0#pcl_timings{foundlower_time = LLT, foundlower_count = LLC} + end. + %%%============================================================================ %%% Test @@ -1831,6 +1929,19 @@ checkready(Pid) -> timeout end. +timings_test() -> + SW = os:timestamp(), + timer:sleep(1), + T0 = update_timings(SW, #pcl_timings{}, {"K", "V"}, 2), + timer:sleep(1), + T1 = update_timings(SW, T0, {"K", "V"}, 3), + T2 = update_timings(SW, T1, {"K", "V"}, basement), + ?assertMatch(3, T2#pcl_timings.sample_count), + ?assertMatch(true, T2#pcl_timings.foundlower_time > T2#pcl_timings.found2_time), + ?assertMatch(1, T2#pcl_timings.found2_count), + ?assertMatch(2, T2#pcl_timings.foundlower_count). + + coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null). diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index e1cfc80..164e251 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -76,6 +76,8 @@ -define(DELETE_TIMEOUT, 10000). -define(TREE_TYPE, idxt). -define(TREE_SIZE, 4). +-define(TIMING_SAMPLECOUNTDOWN, 10000). +-define(TIMING_SAMPLESIZE, 100). -include_lib("eunit/include/eunit.hrl"). @@ -134,15 +136,29 @@ -record(state, {summary, handle :: file:fd() | undefined, - sst_timings :: tuple() | undefined, penciller :: pid() | undefined, root_path, filename, yield_blockquery = false :: boolean(), blockindex_cache, - compression_method = native :: press_methods()}). + compression_method = native :: press_methods(), + timings = no_timing :: sst_timings(), + timings_countdown = 0 :: integer()}). + +-record(sst_timings, + {sample_count = 0 :: integer(), + index_query_time = 0 :: integer(), + tiny_bloom_time = 0 :: integer(), + slot_index_time = 0 :: integer(), + slot_fetch_time = 0 :: integer(), + noncached_block_time = 0 :: integer(), + tiny_bloom_count = 0 :: integer(), + slot_index_count = 0 :: integer(), + slot_fetch_count = 0 :: integer(), + noncached_block_count = 0 :: integer()}). -type sst_state() :: #state{}. +-type sst_timings() :: no_timing|#sst_timings{}. %%%============================================================================ %%% API @@ -421,27 +437,43 @@ starting({sst_new, starting({sst_newlevelzero, RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN, PressMethod}, State) -> - SW = os:timestamp(), + SW0 = os:timestamp(), KVList = leveled_pmem:to_list(Slots, FetchFun), + Time0 = timer:now_diff(os:timestamp(), SW0), + + SW1 = os:timestamp(), {[], [], SlotList, FirstKey} = merge_lists(KVList, PressMethod), + Time1 = timer:now_diff(os:timestamp(), SW1), + + SW2 = os:timestamp(), {SlotCount, SlotIndex, BlockIndex, SlotsBin} = build_all_slots(SlotList, PressMethod), + Time2 = timer:now_diff(os:timestamp(), SW2), + + SW3 = os:timestamp(), SummaryBin = build_table_summary(SlotIndex, 0, FirstKey, SlotCount, MaxSQN), + Time3 = timer:now_diff(os:timestamp(), SW3), + + SW4 = os:timestamp(), ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod), UpdState = read_file(ActualFilename, State#state{root_path = RootPath, yield_blockquery = true}), Summary = UpdState#state.summary, + Time4 = timer:now_diff(os:timestamp(), SW4), + leveled_log:log_timer("SST08", [ActualFilename, 0, Summary#summary.max_sqn], - SW), + SW0), + leveled_log:log("SST11", [Time0, Time1, Time2, Time3, Time4]), + case Penciller of undefined -> {next_state, @@ -459,10 +491,15 @@ starting({sst_newlevelzero, RootPath, Filename, reader({get_kv, LedgerKey, Hash}, _From, State) -> - SW = os:timestamp(), - {Result, Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State), - UpdTimings = leveled_log:sst_timing(State#state.sst_timings, SW, Stage), - {reply, Result, reader, UpdState#state{sst_timings = UpdTimings}}; + % Get a KV value and potentially take sample timings + {Result, UpdState, UpdTimings} = + fetch(LedgerKey, Hash, State, State#state.timings), + + {UpdTimings0, CountDown} = + update_statetimings(UpdTimings, State#state.timings_countdown), + + {reply, Result, reader, UpdState#state{timings = UpdTimings0, + timings_countdown = CountDown}}; reader({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) -> {SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey, EndKey, @@ -497,8 +534,8 @@ reader(get_maxsequencenumber, _From, State) -> Summary = State#state.summary, {reply, Summary#summary.max_sqn, reader, State}; reader(print_timings, _From, State) -> - io:format(user, "~nTimings of ~w~n", [State#state.sst_timings]), - {reply, ok, reader, State#state{sst_timings = undefined}}; + log_timings(State#state.timings), + {reply, ok, reader, State}; reader({set_for_delete, Penciller}, _From, State) -> leveled_log:log("SST06", [State#state.filename]), {reply, @@ -521,7 +558,7 @@ reader(close, _From, State) -> delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> - {Result, _Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State), + {Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing), {reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT}; delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) -> @@ -591,16 +628,34 @@ code_change(_OldVsn, StateName, State, _Extra) -> %%% Internal Functions %%%============================================================================ -fetch(LedgerKey, Hash, State) -> +-spec fetch(tuple(), + {integer(), integer()}|integer(), + sst_state(), sst_timings()) + -> {not_present|tuple(), sst_state(), sst_timings()}. +%% @doc +%% +%% Fetch a key from the store, potentially taking timings. Result should be +%% not_present if the key is not in the store. +fetch(LedgerKey, Hash, State, Timings0) -> + SW0 = os:timestamp(), + Summary = State#state.summary, PressMethod = State#state.compression_method, Slot = lookup_slot(LedgerKey, Summary#summary.index), + + {SW1, Timings1} = update_timings(SW0, Timings0, index_query, true), + SlotID = Slot#slot_index_value.slot_id, Bloom = Slot#slot_index_value.bloom, case leveled_tinybloom:check_hash(Hash, Bloom) of false -> - {not_present, tiny_bloom, SlotID, State}; + {_SW2, Timings2} = + update_timings(SW1, Timings1, tiny_bloom, false), + {not_present, State, Timings2}; true -> + {SW2, Timings2} = + update_timings(SW1, Timings1, tiny_bloom, true), + CachedBlockIdx = array:get(SlotID - 1, State#state.blockindex_cache), case CachedBlockIdx of @@ -612,10 +667,11 @@ fetch(LedgerKey, Hash, State) -> <>, State#state.blockindex_cache), + {_SW3, Timings3} = + update_timings(SW2, Timings2, noncached_block, false), {Result, - slot_fetch, - Slot#slot_index_value.slot_id, - State#state{blockindex_cache = BlockIndexCache}}; + State#state{blockindex_cache = BlockIndexCache}, + Timings3}; <> -> PosList = find_pos(BlockIdx, extra_hash(Hash), @@ -623,8 +679,18 @@ fetch(LedgerKey, Hash, State) -> 0), case PosList of [] -> - {not_present, slot_bloom, SlotID, State}; + {_SW3, Timings3} = + update_timings(SW2, + Timings2, + slot_index, + false), + {not_present, State, Timings3}; _ -> + {SW3, Timings3} = + update_timings(SW2, + Timings2, + slot_index, + true), StartPos = Slot#slot_index_value.start_position, Result = check_blocks(PosList, @@ -634,7 +700,12 @@ fetch(LedgerKey, Hash, State) -> LedgerKey, PressMethod, not_present), - {Result, slot_fetch, SlotID, State} + {_SW4, Timings4} = + update_timings(SW3, + Timings3, + slot_fetch, + false), + {Result, State, Timings4} end end end. @@ -1722,6 +1793,97 @@ expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Width, SegList), ExpPointer ++ Tail. + + +%%%============================================================================ +%%% Timing Functions +%%%============================================================================ + +-spec update_statetimings(sst_timings(), integer()) + -> {sst_timings(), integer()}. +%% @doc +%% +%% The timings state is either in countdown to the next set of samples of +%% we are actively collecting a sample. Active collection take place +%% when the countdown is 0. Once the sample has reached the expected count +%% then there is a log of that sample, and the countdown is restarted. +%% +%% Outside of sample windows the timings object should be set to the atom +%% no_timing. no_timing is a valid state for the cdb_timings type. +update_statetimings(no_timing, 0) -> + {#sst_timings{}, 0}; +update_statetimings(Timings, 0) -> + case Timings#sst_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(Timings), + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; +update_statetimings(no_timing, N) -> + {no_timing, N - 1}. + +log_timings(no_timing) -> + ok; +log_timings(Timings) -> + leveled_log:log("SST12", [Timings#sst_timings.sample_count, + Timings#sst_timings.index_query_time, + Timings#sst_timings.tiny_bloom_time, + Timings#sst_timings.slot_index_time, + Timings#sst_timings.slot_fetch_time, + Timings#sst_timings.noncached_block_time, + Timings#sst_timings.tiny_bloom_count, + Timings#sst_timings.slot_index_count, + Timings#sst_timings.slot_fetch_count, + Timings#sst_timings.noncached_block_count]). + + +update_timings(_SW, no_timing, _Stage, _Continue) -> + {no_timing, no_timing}; +update_timings(SW, Timings, Stage, Continue) -> + Timer = timer:now_diff(os:timestamp(), SW), + Timings0 = + case Stage of + index_query -> + IQT = Timings#sst_timings.index_query_time, + Timings#sst_timings{index_query_time = IQT + Timer}; + tiny_bloom -> + TBT = Timings#sst_timings.tiny_bloom_time, + Timings#sst_timings{tiny_bloom_time = TBT + Timer}; + slot_index -> + SIT = Timings#sst_timings.slot_index_time, + Timings#sst_timings{slot_index_time = SIT + Timer}; + slot_fetch -> + SFT = Timings#sst_timings.slot_fetch_time, + Timings#sst_timings{slot_fetch_time = SFT + Timer}; + noncached_block -> + NCT = Timings#sst_timings.noncached_block_time, + Timings#sst_timings{noncached_block_time = NCT + Timer} + end, + case Continue of + true -> + {os:timestamp(), Timings0}; + false -> + Timings1 = + case Stage of + tiny_bloom -> + TBC = Timings#sst_timings.tiny_bloom_count, + Timings0#sst_timings{tiny_bloom_count = TBC + 1}; + slot_index -> + SIC = Timings#sst_timings.slot_index_count, + Timings0#sst_timings{slot_index_count = SIC + 1}; + slot_fetch -> + SFC = Timings#sst_timings.slot_fetch_count, + Timings0#sst_timings{slot_fetch_count = SFC + 1}; + noncached_block -> + NCC = Timings#sst_timings.noncached_block_count, + Timings0#sst_timings{noncached_block_count = NCC + 1} + end, + SC = Timings1#sst_timings.sample_count, + {no_timing, Timings1#sst_timings{sample_count = SC + 1}} + end. + + %%%============================================================================ %%% Test %%%============================================================================ @@ -1962,7 +2124,7 @@ indexed_list_mixedkeys_bitflip_test() -> flip_byte(Binary) -> L = byte_size(Binary), - Byte1 = leveled_rand:uniform(L), + Byte1 = leveled_rand:uniform(L) - 1, <> = Binary, case A of 0 -> @@ -2369,7 +2531,19 @@ check_segment_match(PosBinIndex1, KVL, TreeSize) -> end, lists:foreach(CheckFun, KVL). - +timings_test() -> + SW = os:timestamp(), + timer:sleep(1), + {no_timing, T0} = update_timings(SW, #sst_timings{}, tiny_bloom, false), + {no_timing, T1} = update_timings(SW, T0, slot_index, false), + {no_timing, T2} = update_timings(SW, T1, slot_fetch, false), + {no_timing, T3} = update_timings(SW, T2, noncached_block, false), + timer:sleep(1), + {_, T4} = update_timings(SW, T3, tiny_bloom, true), + ?assertMatch(4, T4#sst_timings.sample_count), + ?assertMatch(1, T4#sst_timings.tiny_bloom_count), + ?assertMatch(true, T4#sst_timings.tiny_bloom_time > + T3#sst_timings.tiny_bloom_time). -endif. diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 60c2609..1539ccf 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -281,7 +281,7 @@ handoff(_Config) -> % Start the first database, load a test object, close it, start it again StartOpts1 = [{root_path, RootPathA}, {max_pencillercachesize, 16000}, - {sync_strategy, riak_sync}], + {sync_strategy, sync}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), % Add some noe Riak objects in - which should be ignored in folds.