Compress L0 only

Doing at L1 has a negative impact as tests draw on.  Also improve head time tracking
This commit is contained in:
Martin Sumner 2017-12-04 10:49:42 +00:00
parent 1f5d5033a4
commit f8ceedc9bb
2 changed files with 72 additions and 24 deletions

View file

@ -101,12 +101,14 @@
ledger_cache = #ledger_cache{},
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
puttiming_countdown = 0 :: integer(),
gettiming_countdown = 0 :: integer(),
foldtiming_countdown = 0 :: integer(),
put_countdown = 0 :: integer(),
get_countdown = 0 :: integer(),
fold_countdown = 0 :: integer(),
head_countdown = 0 :: integer(),
get_timings = no_timing :: get_timings(),
put_timings = no_timing :: put_timings(),
fold_timings = no_timing :: fold_timings()}).
fold_timings = no_timing :: fold_timings(),
head_timings = no_timing :: head_timings()}).
-record(get_timings, {sample_count = 0 :: integer(),
@ -114,6 +116,10 @@
body_time = 0 :: integer(),
fetch_count = 0 :: integer()}).
-record(head_timings, {sample_count = 0 :: integer(),
pcl_time = 0 :: integer(),
buildhead_time = 0 :: integer()}).
-record(put_timings, {sample_count = 0 :: integer(),
mem_time = 0 :: integer(),
ink_time = 0 :: integer(),
@ -129,6 +135,8 @@
-type get_timings() :: no_timing|#get_timings{}.
-type put_timings() :: no_timing|#put_timings{}.
-type fold_timings() :: no_timing|#fold_timings{}.
-type head_timings() :: no_timing|#head_timings{}.
-type timing_types() :: head|get|put|fold.
%%%============================================================================
%%% API
@ -508,7 +516,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
{Timings, CountDown} =
update_statetimings(put, Timings2, State#state.puttiming_countdown),
update_statetimings(put, Timings2, State#state.put_countdown),
% If the previous push to memory was returned then punish this PUT with a
% delay. If the back-pressure in the Penciller continues, these delays
% will beocme more frequent
@ -525,12 +533,12 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
{ok, NewCache} ->
{noreply, State#state{ledger_cache = NewCache,
put_timings = Timings,
puttiming_countdown = CountDown,
put_countdown = CountDown,
slow_offer = false}};
{returned, NewCache} ->
{noreply, State#state{ledger_cache = NewCache,
put_timings = Timings,
puttiming_countdown = CountDown,
put_countdown = CountDown,
slow_offer = true}}
end;
handle_call({get, Bucket, Key, Tag}, _From, State) ->
@ -575,14 +583,13 @@ handle_call({get, Bucket, Key, Tag}, _From, State) ->
end
end,
{Timings, CountDown} =
update_statetimings(get, Timings2, State#state.gettiming_countdown),
update_statetimings(get, Timings2, State#state.get_countdown),
{reply, Reply, State#state{get_timings = Timings,
gettiming_countdown = CountDown}};
get_countdown = CountDown}};
handle_call({head, Bucket, Key, Tag}, _From, State) ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
case fetch_head(LedgerKey,
State#state.penciller,
State#state.ledger_cache) of
SWp = os:timestamp(),
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of
not_present ->
{reply, not_found, State};
Head ->
@ -592,10 +599,21 @@ handle_call({head, Bucket, Key, Tag}, _From, State) ->
{_SeqN, {active, TS}, _MH, MD} ->
case TS >= leveled_codec:integer_now() of
true ->
OMD =
leveled_codec:build_metadata_object(LedgerKey,
MD),
{reply, {ok, OMD}, State};
{SWr, UpdTimingsP} =
update_timings(SWp,
{head, pcl},
State#state.head_timings),
OMD = leveled_codec:build_metadata_object(LK, MD),
{_SW, UpdTimingsR} =
update_timings(SWr, {head, rsp}, UpdTimingsP),
{UpdTimings, CountDown} =
update_statetimings(head,
UpdTimingsR,
State#state.head_countdown),
{reply,
{ok, OMD},
State#state{head_timings = UpdTimings,
head_countdown = CountDown}};
false ->
{reply, not_found, State}
end
@ -613,9 +631,9 @@ handle_call({return_runner, QueryType}, _From, State) ->
{_SW, Timings1} =
update_timings(SW, {fold, setup}, State#state.fold_timings),
{Timings, CountDown} =
update_statetimings(fold, Timings1, State#state.foldtiming_countdown),
update_statetimings(fold, Timings1, State#state.fold_countdown),
{reply, Runner, State#state{fold_timings = Timings,
foldtiming_countdown = CountDown}};
fold_countdown = CountDown}};
handle_call({compact_journal, Timeout}, _From, State) ->
ok = leveled_inker:ink_compactjournal(State#state.inker,
self(),
@ -1233,10 +1251,12 @@ delete_path(DirPath) ->
%%% Timing Functions
%%%============================================================================
-spec update_statetimings(put|get|fold,
put_timings()|get_timings()|fold_timings(),
-spec update_statetimings(timing_types(),
put_timings()|get_timings()|fold_timings()|head_timings(),
integer())
-> {put_timings()|get_timings()|fold_timings(), integer()}.
->
{put_timings()|get_timings()|fold_timings()|head_timings(),
integer()}.
%% @doc
%%
%% The timings state is either in countdown to the next set of samples of
@ -1246,12 +1266,22 @@ delete_path(DirPath) ->
%%
%% Outside of sample windows the timings object should be set to the atom
%% no_timing. no_timing is a valid state for each timings type.
update_statetimings(head, no_timing, 0) ->
{#head_timings{}, 0};
update_statetimings(put, no_timing, 0) ->
{#put_timings{}, 0};
update_statetimings(get, no_timing, 0) ->
{#get_timings{}, 0};
update_statetimings(fold, no_timing, 0) ->
{#fold_timings{}, 0};
update_statetimings(head, Timings, 0) ->
case Timings#head_timings.sample_count of
SC when SC >= ?TIMING_SAMPLESIZE ->
log_timings(head, Timings),
{no_timing, leveled_rand:uniform(10 * ?TIMING_SAMPLECOUNTDOWN)};
_SC ->
{Timings, 0}
end;
update_statetimings(put, Timings, 0) ->
case Timings#put_timings.sample_count of
SC when SC >= ?TIMING_SAMPLESIZE ->
@ -1280,6 +1310,11 @@ update_statetimings(fold, Timings, 0) ->
update_statetimings(_, no_timing, N) ->
{no_timing, N - 1}.
log_timings(head, Timings) ->
leveled_log:log("B0018",
[Timings#head_timings.sample_count,
Timings#head_timings.pcl_time,
Timings#head_timings.buildhead_time]);
log_timings(put, Timings) ->
leveled_log:log("B0015", [Timings#put_timings.sample_count,
Timings#put_timings.mem_time,
@ -1297,6 +1332,19 @@ log_timings(fold, Timings) ->
update_timings(_SW, _Stage, no_timing) ->
{no_timing, no_timing};
update_timings(SW, {head, Stage}, Timings) ->
Timer = timer:now_diff(os:timestamp(), SW),
Timings0 =
case Stage of
pcl ->
PCT = Timings#head_timings.pcl_time + Timer,
Timings#head_timings{pcl_time = PCT};
rsp ->
BHT = Timings#head_timings.buildhead_time + Timer,
CNT = Timings#head_timings.sample_count + 1,
Timings#head_timings{buildhead_time = BHT, sample_count = CNT}
end,
{os:timestamp(), Timings0};
update_timings(SW, {put, Stage}, Timings) ->
Timer = timer:now_diff(os:timestamp(), SW),
Timings0 =

View file

@ -81,7 +81,7 @@
-define(CACHE_SIZE, 32).
-define(BLOCK_LENGTHS_LENGTH, 20).
-define(FLIPPER32, 4294967295).
-define(COMPRESS_AT_LEVEL, 2).
-define(COMPRESS_AT_LEVEL, 1).
-include_lib("eunit/include/eunit.hrl").