diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index eb2fced..3720ba3 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -101,12 +101,14 @@ ledger_cache = #ledger_cache{}, is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), - puttiming_countdown = 0 :: integer(), - gettiming_countdown = 0 :: integer(), - foldtiming_countdown = 0 :: integer(), + put_countdown = 0 :: integer(), + get_countdown = 0 :: integer(), + fold_countdown = 0 :: integer(), + head_countdown = 0 :: integer(), get_timings = no_timing :: get_timings(), put_timings = no_timing :: put_timings(), - fold_timings = no_timing :: fold_timings()}). + fold_timings = no_timing :: fold_timings(), + head_timings = no_timing :: head_timings()}). -record(get_timings, {sample_count = 0 :: integer(), @@ -114,6 +116,10 @@ body_time = 0 :: integer(), fetch_count = 0 :: integer()}). +-record(head_timings, {sample_count = 0 :: integer(), + pcl_time = 0 :: integer(), + buildhead_time = 0 :: integer()}). + -record(put_timings, {sample_count = 0 :: integer(), mem_time = 0 :: integer(), ink_time = 0 :: integer(), @@ -129,6 +135,8 @@ -type get_timings() :: no_timing|#get_timings{}. -type put_timings() :: no_timing|#put_timings{}. -type fold_timings() :: no_timing|#fold_timings{}. +-type head_timings() :: no_timing|#head_timings{}. +-type timing_types() :: head|get|put|fold. %%%============================================================================ %%% API @@ -508,7 +516,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), {Timings, CountDown} = - update_statetimings(put, Timings2, State#state.puttiming_countdown), + update_statetimings(put, Timings2, State#state.put_countdown), % If the previous push to memory was returned then punish this PUT with a % delay. If the back-pressure in the Penciller continues, these delays % will beocme more frequent @@ -525,12 +533,12 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> {ok, NewCache} -> {noreply, State#state{ledger_cache = NewCache, put_timings = Timings, - puttiming_countdown = CountDown, + put_countdown = CountDown, slow_offer = false}}; {returned, NewCache} -> {noreply, State#state{ledger_cache = NewCache, put_timings = Timings, - puttiming_countdown = CountDown, + put_countdown = CountDown, slow_offer = true}} end; handle_call({get, Bucket, Key, Tag}, _From, State) -> @@ -575,14 +583,13 @@ handle_call({get, Bucket, Key, Tag}, _From, State) -> end end, {Timings, CountDown} = - update_statetimings(get, Timings2, State#state.gettiming_countdown), + update_statetimings(get, Timings2, State#state.get_countdown), {reply, Reply, State#state{get_timings = Timings, - gettiming_countdown = CountDown}}; + get_countdown = CountDown}}; handle_call({head, Bucket, Key, Tag}, _From, State) -> - LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), - case fetch_head(LedgerKey, - State#state.penciller, - State#state.ledger_cache) of + SWp = os:timestamp(), + LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of not_present -> {reply, not_found, State}; Head -> @@ -592,10 +599,21 @@ handle_call({head, Bucket, Key, Tag}, _From, State) -> {_SeqN, {active, TS}, _MH, MD} -> case TS >= leveled_codec:integer_now() of true -> - OMD = - leveled_codec:build_metadata_object(LedgerKey, - MD), - {reply, {ok, OMD}, State}; + {SWr, UpdTimingsP} = + update_timings(SWp, + {head, pcl}, + State#state.head_timings), + OMD = leveled_codec:build_metadata_object(LK, MD), + {_SW, UpdTimingsR} = + update_timings(SWr, {head, rsp}, UpdTimingsP), + {UpdTimings, CountDown} = + update_statetimings(head, + UpdTimingsR, + State#state.head_countdown), + {reply, + {ok, OMD}, + State#state{head_timings = UpdTimings, + head_countdown = CountDown}}; false -> {reply, not_found, State} end @@ -613,9 +631,9 @@ handle_call({return_runner, QueryType}, _From, State) -> {_SW, Timings1} = update_timings(SW, {fold, setup}, State#state.fold_timings), {Timings, CountDown} = - update_statetimings(fold, Timings1, State#state.foldtiming_countdown), + update_statetimings(fold, Timings1, State#state.fold_countdown), {reply, Runner, State#state{fold_timings = Timings, - foldtiming_countdown = CountDown}}; + fold_countdown = CountDown}}; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, self(), @@ -1233,10 +1251,12 @@ delete_path(DirPath) -> %%% Timing Functions %%%============================================================================ --spec update_statetimings(put|get|fold, - put_timings()|get_timings()|fold_timings(), - integer()) - -> {put_timings()|get_timings()|fold_timings(), integer()}. +-spec update_statetimings(timing_types(), + put_timings()|get_timings()|fold_timings()|head_timings(), + integer()) + -> + {put_timings()|get_timings()|fold_timings()|head_timings(), + integer()}. %% @doc %% %% The timings state is either in countdown to the next set of samples of @@ -1246,12 +1266,22 @@ delete_path(DirPath) -> %% %% Outside of sample windows the timings object should be set to the atom %% no_timing. no_timing is a valid state for each timings type. +update_statetimings(head, no_timing, 0) -> + {#head_timings{}, 0}; update_statetimings(put, no_timing, 0) -> {#put_timings{}, 0}; update_statetimings(get, no_timing, 0) -> {#get_timings{}, 0}; update_statetimings(fold, no_timing, 0) -> {#fold_timings{}, 0}; +update_statetimings(head, Timings, 0) -> + case Timings#head_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(head, Timings), + {no_timing, leveled_rand:uniform(10 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; update_statetimings(put, Timings, 0) -> case Timings#put_timings.sample_count of SC when SC >= ?TIMING_SAMPLESIZE -> @@ -1280,6 +1310,11 @@ update_statetimings(fold, Timings, 0) -> update_statetimings(_, no_timing, N) -> {no_timing, N - 1}. +log_timings(head, Timings) -> + leveled_log:log("B0018", + [Timings#head_timings.sample_count, + Timings#head_timings.pcl_time, + Timings#head_timings.buildhead_time]); log_timings(put, Timings) -> leveled_log:log("B0015", [Timings#put_timings.sample_count, Timings#put_timings.mem_time, @@ -1297,6 +1332,19 @@ log_timings(fold, Timings) -> update_timings(_SW, _Stage, no_timing) -> {no_timing, no_timing}; +update_timings(SW, {head, Stage}, Timings) -> + Timer = timer:now_diff(os:timestamp(), SW), + Timings0 = + case Stage of + pcl -> + PCT = Timings#head_timings.pcl_time + Timer, + Timings#head_timings{pcl_time = PCT}; + rsp -> + BHT = Timings#head_timings.buildhead_time + Timer, + CNT = Timings#head_timings.sample_count + 1, + Timings#head_timings{buildhead_time = BHT, sample_count = CNT} + end, + {os:timestamp(), Timings0}; update_timings(SW, {put, Stage}, Timings) -> Timer = timer:now_diff(os:timestamp(), SW), Timings0 = diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 1cd5267..b13d07f 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -81,7 +81,7 @@ -define(CACHE_SIZE, 32). -define(BLOCK_LENGTHS_LENGTH, 20). -define(FLIPPER32, 4294967295). --define(COMPRESS_AT_LEVEL, 2). +-define(COMPRESS_AT_LEVEL, 1). -include_lib("eunit/include/eunit.hrl").