Merge pull request #113 from martinsumner/mas-i111-ssttimings
Mas i111 ssttimings
This commit is contained in:
commit
0e51627539
6 changed files with 581 additions and 320 deletions
|
@ -83,6 +83,8 @@
|
||||||
-define(RECENT_AAE, false).
|
-define(RECENT_AAE, false).
|
||||||
-define(COMPRESSION_METHOD, lz4).
|
-define(COMPRESSION_METHOD, lz4).
|
||||||
-define(COMPRESSION_POINT, on_receipt).
|
-define(COMPRESSION_POINT, on_receipt).
|
||||||
|
-define(TIMING_SAMPLESIZE, 100).
|
||||||
|
-define(TIMING_SAMPLECOUNTDOWN, 10000).
|
||||||
|
|
||||||
-record(ledger_cache, {mem :: ets:tab(),
|
-record(ledger_cache, {mem :: ets:tab(),
|
||||||
loader = leveled_tree:empty(?CACHE_TYPE)
|
loader = leveled_tree:empty(?CACHE_TYPE)
|
||||||
|
@ -99,13 +101,34 @@
|
||||||
ledger_cache = #ledger_cache{},
|
ledger_cache = #ledger_cache{},
|
||||||
is_snapshot :: boolean() | undefined,
|
is_snapshot :: boolean() | undefined,
|
||||||
slow_offer = false :: boolean(),
|
slow_offer = false :: boolean(),
|
||||||
put_timing :: tuple() | undefined,
|
puttiming_countdown = 0 :: integer(),
|
||||||
get_timing :: tuple() | undefined}).
|
gettiming_countdown = 0 :: integer(),
|
||||||
|
foldtiming_countdown = 0 :: integer(),
|
||||||
|
get_timings = no_timing :: get_timings(),
|
||||||
|
put_timings = no_timing :: put_timings(),
|
||||||
|
fold_timings = no_timing :: fold_timings()}).
|
||||||
|
|
||||||
|
|
||||||
|
-record(get_timings, {sample_count = 0 :: integer(),
|
||||||
|
head_time = 0 :: integer(),
|
||||||
|
body_time = 0 :: integer(),
|
||||||
|
fetch_count = 0 :: integer()}).
|
||||||
|
|
||||||
|
-record(put_timings, {sample_count = 0 :: integer(),
|
||||||
|
mem_time = 0 :: integer(),
|
||||||
|
ink_time = 0 :: integer(),
|
||||||
|
total_size = 0 :: integer()}).
|
||||||
|
|
||||||
|
-record(fold_timings, {sample_count = 0 :: integer(),
|
||||||
|
setup_time = 0 :: integer()}).
|
||||||
|
|
||||||
|
|
||||||
-type book_state() :: #state{}.
|
-type book_state() :: #state{}.
|
||||||
-type sync_mode() :: sync|none|riak_sync.
|
-type sync_mode() :: sync|none|riak_sync.
|
||||||
-type ledger_cache() :: #ledger_cache{}.
|
-type ledger_cache() :: #ledger_cache{}.
|
||||||
|
-type get_timings() :: no_timing|#get_timings{}.
|
||||||
|
-type put_timings() :: no_timing|#put_timings{}.
|
||||||
|
-type fold_timings() :: no_timing|#fold_timings{}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -467,12 +490,13 @@ init([Opts]) ->
|
||||||
|
|
||||||
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
||||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||||
SW = os:timestamp(),
|
SW0 = os:timestamp(),
|
||||||
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
|
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
|
||||||
LedgerKey,
|
LedgerKey,
|
||||||
Object,
|
Object,
|
||||||
{IndexSpecs, TTL}),
|
{IndexSpecs, TTL}),
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
{SW1, Timings1} =
|
||||||
|
update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings),
|
||||||
Changes = preparefor_ledgercache(no_type_assigned,
|
Changes = preparefor_ledgercache(no_type_assigned,
|
||||||
LedgerKey,
|
LedgerKey,
|
||||||
SQN,
|
SQN,
|
||||||
|
@ -481,8 +505,10 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
||||||
{IndexSpecs, TTL},
|
{IndexSpecs, TTL},
|
||||||
State#state.recent_aae),
|
State#state.recent_aae),
|
||||||
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
||||||
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
|
||||||
PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1),
|
|
||||||
|
{Timings, CountDown} =
|
||||||
|
update_statetimings(put, Timings2, State#state.puttiming_countdown),
|
||||||
% If the previous push to memory was returned then punish this PUT with a
|
% If the previous push to memory was returned then punish this PUT with a
|
||||||
% delay. If the back-pressure in the Penciller continues, these delays
|
% delay. If the back-pressure in the Penciller continues, these delays
|
||||||
% will beocme more frequent
|
% will beocme more frequent
|
||||||
|
@ -492,53 +518,66 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
||||||
false ->
|
false ->
|
||||||
gen_server:reply(From, ok)
|
gen_server:reply(From, ok)
|
||||||
end,
|
end,
|
||||||
maybe_longrunning(SW, overall_put),
|
maybe_longrunning(SW0, overall_put),
|
||||||
case maybepush_ledgercache(State#state.cache_size,
|
case maybepush_ledgercache(State#state.cache_size,
|
||||||
Cache0,
|
Cache0,
|
||||||
State#state.penciller) of
|
State#state.penciller) of
|
||||||
{ok, NewCache} ->
|
{ok, NewCache} ->
|
||||||
{noreply, State#state{ledger_cache=NewCache,
|
{noreply, State#state{ledger_cache = NewCache,
|
||||||
put_timing=PutTimes,
|
put_timings = Timings,
|
||||||
slow_offer=false}};
|
puttiming_countdown = CountDown,
|
||||||
|
slow_offer = false}};
|
||||||
{returned, NewCache} ->
|
{returned, NewCache} ->
|
||||||
{noreply, State#state{ledger_cache=NewCache,
|
{noreply, State#state{ledger_cache = NewCache,
|
||||||
put_timing=PutTimes,
|
put_timings = Timings,
|
||||||
slow_offer=true}}
|
puttiming_countdown = CountDown,
|
||||||
|
slow_offer = true}}
|
||||||
end;
|
end;
|
||||||
handle_call({get, Bucket, Key, Tag}, _From, State) ->
|
handle_call({get, Bucket, Key, Tag}, _From, State) ->
|
||||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||||
SWh = os:timestamp(),
|
SWh = os:timestamp(),
|
||||||
case fetch_head(LedgerKey,
|
HeadResult =
|
||||||
State#state.penciller,
|
case fetch_head(LedgerKey,
|
||||||
State#state.ledger_cache) of
|
State#state.penciller,
|
||||||
not_present ->
|
State#state.ledger_cache) of
|
||||||
GT0 = leveled_log:get_timing(State#state.get_timing,
|
not_present ->
|
||||||
SWh,
|
not_found;
|
||||||
head_not_present),
|
Head ->
|
||||||
{reply, not_found, State#state{get_timing=GT0}};
|
{Seqn, Status, _MH, _MD} =
|
||||||
Head ->
|
leveled_codec:striphead_to_details(Head),
|
||||||
GT0 = leveled_log:get_timing(State#state.get_timing,
|
case Status of
|
||||||
SWh,
|
tomb ->
|
||||||
head_found),
|
not_found;
|
||||||
SWg = os:timestamp(),
|
{active, TS} ->
|
||||||
{Seqn, Status, _MH, _MD} = leveled_codec:striphead_to_details(Head),
|
case TS >= leveled_codec:integer_now() of
|
||||||
case Status of
|
false ->
|
||||||
tomb ->
|
not_found;
|
||||||
{reply, not_found, State};
|
true ->
|
||||||
{active, TS} ->
|
{LedgerKey, Seqn}
|
||||||
Active = TS >= leveled_codec:integer_now(),
|
end
|
||||||
Object = fetch_value(State#state.inker, {LedgerKey, Seqn}),
|
end
|
||||||
GT1 = leveled_log:get_timing(GT0, SWg, fetch),
|
end,
|
||||||
case {Active, Object} of
|
{SWb, Timings1} =
|
||||||
{_, not_present} ->
|
update_timings(SWh, {get, head}, State#state.get_timings),
|
||||||
{reply, not_found, State#state{get_timing=GT1}};
|
{Reply, Timings2} =
|
||||||
{true, Object} ->
|
case HeadResult of
|
||||||
{reply, {ok, Object}, State#state{get_timing=GT1}};
|
not_found ->
|
||||||
_ ->
|
{not_found, Timings1};
|
||||||
{reply, not_found, State#state{get_timing=GT1}}
|
{LK, SQN} ->
|
||||||
end
|
Object = fetch_value(State#state.inker, {LK, SQN}),
|
||||||
end
|
{_SW, UpdTimingsB} =
|
||||||
end;
|
update_timings(SWb, {get, body}, Timings1),
|
||||||
|
case Object of
|
||||||
|
not_present ->
|
||||||
|
{not_found, UpdTimingsB};
|
||||||
|
_ ->
|
||||||
|
{{ok, Object}, UpdTimingsB}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{Timings, CountDown} =
|
||||||
|
update_statetimings(get, Timings2, State#state.gettiming_countdown),
|
||||||
|
{reply, Reply, State#state{get_timings = Timings,
|
||||||
|
gettiming_countdown = CountDown}};
|
||||||
handle_call({head, Bucket, Key, Tag}, _From, State) ->
|
handle_call({head, Bucket, Key, Tag}, _From, State) ->
|
||||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||||
case fetch_head(LedgerKey,
|
case fetch_head(LedgerKey,
|
||||||
|
@ -569,7 +608,14 @@ handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) ->
|
||||||
Reply = snapshot_store(State, SnapType, Query, LongRunning),
|
Reply = snapshot_store(State, SnapType, Query, LongRunning),
|
||||||
{reply, Reply, State};
|
{reply, Reply, State};
|
||||||
handle_call({return_runner, QueryType}, _From, State) ->
|
handle_call({return_runner, QueryType}, _From, State) ->
|
||||||
{reply, get_runner(State, QueryType), State};
|
SW = os:timestamp(),
|
||||||
|
Runner = get_runner(State, QueryType),
|
||||||
|
{_SW, Timings1} =
|
||||||
|
update_timings(SW, {fold, setup}, State#state.fold_timings),
|
||||||
|
{Timings, CountDown} =
|
||||||
|
update_statetimings(fold, Timings1, State#state.foldtiming_countdown),
|
||||||
|
{reply, Runner, State#state{fold_timings = Timings,
|
||||||
|
foldtiming_countdown = CountDown}};
|
||||||
handle_call({compact_journal, Timeout}, _From, State) ->
|
handle_call({compact_journal, Timeout}, _From, State) ->
|
||||||
ok = leveled_inker:ink_compactjournal(State#state.inker,
|
ok = leveled_inker:ink_compactjournal(State#state.inker,
|
||||||
self(),
|
self(),
|
||||||
|
@ -1181,6 +1227,109 @@ delete_path(DirPath) ->
|
||||||
[file:delete(filename:join([DirPath, File])) || File <- Files],
|
[file:delete(filename:join([DirPath, File])) || File <- Files],
|
||||||
file:del_dir(DirPath).
|
file:del_dir(DirPath).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Timing Functions
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec update_statetimings(put|get|fold,
|
||||||
|
put_timings()|get_timings()|fold_timings(),
|
||||||
|
integer())
|
||||||
|
-> {put_timings()|get_timings()|fold_timings(), integer()}.
|
||||||
|
%% @doc
|
||||||
|
%%
|
||||||
|
%% The timings state is either in countdown to the next set of samples of
|
||||||
|
%% we are actively collecting a sample. Active collection take place
|
||||||
|
%% when the countdown is 0. Once the sample has reached the expected count
|
||||||
|
%% then there is a log of that sample, and the countdown is restarted.
|
||||||
|
%%
|
||||||
|
%% Outside of sample windows the timings object should be set to the atom
|
||||||
|
%% no_timing. no_timing is a valid state for each timings type.
|
||||||
|
update_statetimings(put, no_timing, 0) ->
|
||||||
|
{#put_timings{}, 0};
|
||||||
|
update_statetimings(get, no_timing, 0) ->
|
||||||
|
{#get_timings{}, 0};
|
||||||
|
update_statetimings(fold, no_timing, 0) ->
|
||||||
|
{#fold_timings{}, 0};
|
||||||
|
update_statetimings(put, Timings, 0) ->
|
||||||
|
case Timings#put_timings.sample_count of
|
||||||
|
SC when SC >= ?TIMING_SAMPLESIZE ->
|
||||||
|
log_timings(put, Timings),
|
||||||
|
{no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)};
|
||||||
|
_SC ->
|
||||||
|
{Timings, 0}
|
||||||
|
end;
|
||||||
|
update_statetimings(get, Timings, 0) ->
|
||||||
|
case Timings#get_timings.sample_count of
|
||||||
|
SC when SC >= ?TIMING_SAMPLESIZE ->
|
||||||
|
log_timings(get, Timings),
|
||||||
|
{no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)};
|
||||||
|
_SC ->
|
||||||
|
{Timings, 0}
|
||||||
|
end;
|
||||||
|
update_statetimings(fold, Timings, 0) ->
|
||||||
|
case Timings#fold_timings.sample_count of
|
||||||
|
SC when SC >= (?TIMING_SAMPLESIZE div 10) ->
|
||||||
|
log_timings(fold, Timings),
|
||||||
|
{no_timing,
|
||||||
|
leveled_rand:uniform(2 * (?TIMING_SAMPLECOUNTDOWN div 10))};
|
||||||
|
_SC ->
|
||||||
|
{Timings, 0}
|
||||||
|
end;
|
||||||
|
update_statetimings(_, no_timing, N) ->
|
||||||
|
{no_timing, N - 1}.
|
||||||
|
|
||||||
|
log_timings(put, Timings) ->
|
||||||
|
leveled_log:log("B0015", [Timings#put_timings.sample_count,
|
||||||
|
Timings#put_timings.mem_time,
|
||||||
|
Timings#put_timings.ink_time,
|
||||||
|
Timings#put_timings.total_size]);
|
||||||
|
log_timings(get, Timings) ->
|
||||||
|
leveled_log:log("B0016", [Timings#get_timings.sample_count,
|
||||||
|
Timings#get_timings.head_time,
|
||||||
|
Timings#get_timings.body_time,
|
||||||
|
Timings#get_timings.fetch_count]);
|
||||||
|
log_timings(fold, Timings) ->
|
||||||
|
leveled_log:log("B0017", [Timings#fold_timings.sample_count,
|
||||||
|
Timings#fold_timings.setup_time]).
|
||||||
|
|
||||||
|
|
||||||
|
update_timings(_SW, _Stage, no_timing) ->
|
||||||
|
{no_timing, no_timing};
|
||||||
|
update_timings(SW, {put, Stage}, Timings) ->
|
||||||
|
Timer = timer:now_diff(os:timestamp(), SW),
|
||||||
|
Timings0 =
|
||||||
|
case Stage of
|
||||||
|
{inker, ObjectSize} ->
|
||||||
|
INT = Timings#put_timings.ink_time + Timer,
|
||||||
|
TSZ = Timings#put_timings.total_size + ObjectSize,
|
||||||
|
Timings#put_timings{ink_time = INT, total_size = TSZ};
|
||||||
|
mem ->
|
||||||
|
PCT = Timings#put_timings.mem_time + Timer,
|
||||||
|
CNT = Timings#put_timings.sample_count + 1,
|
||||||
|
Timings#put_timings{mem_time = PCT, sample_count = CNT}
|
||||||
|
end,
|
||||||
|
{os:timestamp(), Timings0};
|
||||||
|
update_timings(SW, {get, head}, Timings) ->
|
||||||
|
Timer = timer:now_diff(os:timestamp(), SW),
|
||||||
|
GHT = Timings#get_timings.head_time + Timer,
|
||||||
|
CNT = Timings#get_timings.sample_count + 1,
|
||||||
|
Timings0 = Timings#get_timings{head_time = GHT, sample_count = CNT},
|
||||||
|
{os:timestamp(), Timings0};
|
||||||
|
update_timings(SW, {get, body}, Timings) ->
|
||||||
|
Timer = timer:now_diff(os:timestamp(), SW),
|
||||||
|
GBT = Timings#get_timings.body_time + Timer,
|
||||||
|
FCNT = Timings#get_timings.fetch_count + 1,
|
||||||
|
Timings0 = Timings#get_timings{body_time = GBT, fetch_count = FCNT},
|
||||||
|
{no_timing, Timings0};
|
||||||
|
update_timings(SW, {fold, setup}, Timings) ->
|
||||||
|
Timer = timer:now_diff(os:timestamp(), SW),
|
||||||
|
FST = Timings#fold_timings.setup_time + Timer,
|
||||||
|
CNT = Timings#fold_timings.sample_count + 1,
|
||||||
|
Timings0 = Timings#fold_timings{setup_time = FST, sample_count = CNT},
|
||||||
|
{no_timing, Timings0}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
|
@ -97,7 +97,7 @@
|
||||||
-define(WRITE_OPS, [binary, raw, read, write]).
|
-define(WRITE_OPS, [binary, raw, read, write]).
|
||||||
-define(PENDING_ROLL_WAIT, 30).
|
-define(PENDING_ROLL_WAIT, 30).
|
||||||
-define(DELETE_TIMEOUT, 10000).
|
-define(DELETE_TIMEOUT, 10000).
|
||||||
-define(TIMING_SAMPLECOUNTDOWN, 2000).
|
-define(TIMING_SAMPLECOUNTDOWN, 1000).
|
||||||
-define(TIMING_SAMPLESIZE, 100).
|
-define(TIMING_SAMPLESIZE, 100).
|
||||||
|
|
||||||
-record(state, {hashtree,
|
-record(state, {hashtree,
|
||||||
|
@ -1437,7 +1437,7 @@ update_statetimings(Timings, 0) ->
|
||||||
Timings#cdb_timings.sample_cyclecount,
|
Timings#cdb_timings.sample_cyclecount,
|
||||||
Timings#cdb_timings.sample_fetchtime,
|
Timings#cdb_timings.sample_fetchtime,
|
||||||
Timings#cdb_timings.sample_indextime]),
|
Timings#cdb_timings.sample_indextime]),
|
||||||
{no_timing, leveled_rand:uniform(?TIMING_SAMPLECOUNTDOWN)};
|
{no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)};
|
||||||
_SC ->
|
_SC ->
|
||||||
{Timings, 0}
|
{Timings, 0}
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -9,18 +9,9 @@
|
||||||
|
|
||||||
-export([log/2,
|
-export([log/2,
|
||||||
log_timer/3,
|
log_timer/3,
|
||||||
log_randomtimer/4,
|
log_randomtimer/4]).
|
||||||
put_timing/4,
|
|
||||||
head_timing/4,
|
|
||||||
get_timing/3,
|
|
||||||
sst_timing/3]).
|
|
||||||
|
|
||||||
-define(PUT_LOGPOINT, 10000).
|
|
||||||
-define(HEAD_LOGPOINT, 20000).
|
|
||||||
-define(GET_LOGPOINT, 20000).
|
|
||||||
-define(SST_LOGPOINT, 20000).
|
|
||||||
-define(LOG_LEVEL, [info, warn, error, critical]).
|
-define(LOG_LEVEL, [info, warn, error, critical]).
|
||||||
-define(SAMPLE_RATE, 16).
|
|
||||||
|
|
||||||
-define(LOGBASE, [
|
-define(LOGBASE, [
|
||||||
|
|
||||||
|
@ -53,13 +44,20 @@
|
||||||
{info, "Bucket list finds non-binary Bucket ~w"}},
|
{info, "Bucket list finds non-binary Bucket ~w"}},
|
||||||
{"B0011",
|
{"B0011",
|
||||||
{warn, "Call to destroy the store and so all files to be removed"}},
|
{warn, "Call to destroy the store and so all files to be removed"}},
|
||||||
{"B0012",
|
|
||||||
{info, "After ~w PUTs total inker time is ~w total ledger time is ~w "
|
|
||||||
++ "and max inker time is ~w and max ledger time is ~w"}},
|
|
||||||
{"B0013",
|
{"B0013",
|
||||||
{warn, "Long running task took ~w microseconds with task of type ~w"}},
|
{warn, "Long running task took ~w microseconds with task of type ~w"}},
|
||||||
{"B0014",
|
{"B0015",
|
||||||
{info, "Get timing for result ~w is sample ~w total ~w and max ~w"}},
|
{info, "Put timing with sample_count=~w and mem_time=~w ink_time=~w"
|
||||||
|
++ " with total_object_size=~w"}},
|
||||||
|
{"B0016",
|
||||||
|
{info, "Get timing with sample_count=~w and head_time=~w body_time=~w"
|
||||||
|
++ " with fetch_count=~w"}},
|
||||||
|
{"B0017",
|
||||||
|
{info, "Fold timing with sample_count=~w and setup_time=~w"}},
|
||||||
|
|
||||||
|
|
||||||
|
{"R0001",
|
||||||
|
{debug, "Object fold to process batch of ~w objects"}},
|
||||||
|
|
||||||
{"P0001",
|
{"P0001",
|
||||||
{debug, "Ledger snapshot ~w registered"}},
|
{debug, "Ledger snapshot ~w registered"}},
|
||||||
|
@ -123,7 +121,12 @@
|
||||||
{info, "Completion of update to levelzero"
|
{info, "Completion of update to levelzero"
|
||||||
++ " with cache size status ~w ~w"}},
|
++ " with cache size status ~w ~w"}},
|
||||||
{"P0032",
|
{"P0032",
|
||||||
{info, "Head timing for result ~w is sample ~w total ~w and max ~w"}},
|
{info, "Fetch head timing with sample_count=~w and level timings of"
|
||||||
|
++ " foundmem_time=~w found0_time=~w found1_time=~w"
|
||||||
|
++ " found2_time=~w foundlower_time=~w missed_time=~w"
|
||||||
|
++ " with counts of"
|
||||||
|
++ " foundmem_count=~w found0_count=~w found1_count=~w"
|
||||||
|
++ " found2_count=~w foundlower_count=~w missed_count=~w"}},
|
||||||
{"P0033",
|
{"P0033",
|
||||||
{error, "Corrupted manifest file at path ~s to be ignored "
|
{error, "Corrupted manifest file at path ~s to be ignored "
|
||||||
++ "due to error ~w"}},
|
++ "due to error ~w"}},
|
||||||
|
@ -191,6 +194,43 @@
|
||||||
{info, "Prompting deletions at ManifestSQN=~w"}},
|
{info, "Prompting deletions at ManifestSQN=~w"}},
|
||||||
{"PC022",
|
{"PC022",
|
||||||
{info, "Storing reference to deletions at ManifestSQN=~w"}},
|
{info, "Storing reference to deletions at ManifestSQN=~w"}},
|
||||||
|
{"PM002",
|
||||||
|
{info, "Completed dump of L0 cache to list of size ~w"}},
|
||||||
|
|
||||||
|
{"SST01",
|
||||||
|
{info, "SST timing for result ~w is sample ~w total ~w and max ~w"}},
|
||||||
|
{"SST02",
|
||||||
|
{error, "False result returned from SST with filename ~s as "
|
||||||
|
++ "slot ~w has failed crc check"}},
|
||||||
|
{"SST03",
|
||||||
|
{info, "Opening SST file with filename ~s slots ~w and"
|
||||||
|
++ " max sqn ~w"}},
|
||||||
|
{"SST04",
|
||||||
|
{info, "Exit called for reason ~w on filename ~s"}},
|
||||||
|
{"SST05",
|
||||||
|
{warn, "Rename rogue filename ~s to ~s"}},
|
||||||
|
{"SST06",
|
||||||
|
{debug, "File ~s has been set for delete"}},
|
||||||
|
{"SST07",
|
||||||
|
{info, "Exit called and now clearing ~s"}},
|
||||||
|
{"SST08",
|
||||||
|
{info, "Completed creation of ~s at level ~w with max sqn ~w"}},
|
||||||
|
{"SST09",
|
||||||
|
{warn, "Read request exposes slot with bad CRC"}},
|
||||||
|
{"SST10",
|
||||||
|
{debug, "Expansion sought to support pointer to pid ~w status ~w"}},
|
||||||
|
{"SST11",
|
||||||
|
{info, "Level zero creation timings in microseconds "
|
||||||
|
++ "pmem_fetch=~w merge_lists=~w build_slots=~w "
|
||||||
|
++ "build_summary=~w read_switch=~w"}},
|
||||||
|
{"SST12",
|
||||||
|
{info, "SST Timings for sample_count=~w"
|
||||||
|
++ " at timing points index_query_time=~w"
|
||||||
|
++ " tiny_bloom_time=~w slot_index_time=~w slot_fetch_time=~w"
|
||||||
|
++ " noncached_block_fetch_time=~w"
|
||||||
|
++ " exiting at points tiny_bloom=~w slot_index=~w"
|
||||||
|
++ " slot_fetch=~w noncached_block_fetch=~w"}},
|
||||||
|
|
||||||
|
|
||||||
{"I0001",
|
{"I0001",
|
||||||
{info, "Unexpected failure to fetch value for Key=~w SQN=~w "
|
{info, "Unexpected failure to fetch value for Key=~w SQN=~w "
|
||||||
|
@ -262,32 +302,6 @@
|
||||||
{warn, "File with name ~s to be ignored in manifest as scanning for "
|
{warn, "File with name ~s to be ignored in manifest as scanning for "
|
||||||
++ "first key returned empty - maybe corrupted"}},
|
++ "first key returned empty - maybe corrupted"}},
|
||||||
|
|
||||||
{"PM002",
|
|
||||||
{info, "Completed dump of L0 cache to list of size ~w"}},
|
|
||||||
|
|
||||||
{"SST01",
|
|
||||||
{info, "SST timing for result ~w is sample ~w total ~w and max ~w"}},
|
|
||||||
{"SST02",
|
|
||||||
{error, "False result returned from SST with filename ~s as "
|
|
||||||
++ "slot ~w has failed crc check"}},
|
|
||||||
{"SST03",
|
|
||||||
{info, "Opening SST file with filename ~s slots ~w and"
|
|
||||||
++ " max sqn ~w"}},
|
|
||||||
{"SST04",
|
|
||||||
{info, "Exit called for reason ~w on filename ~s"}},
|
|
||||||
{"SST05",
|
|
||||||
{warn, "Rename rogue filename ~s to ~s"}},
|
|
||||||
{"SST06",
|
|
||||||
{debug, "File ~s has been set for delete"}},
|
|
||||||
{"SST07",
|
|
||||||
{info, "Exit called and now clearing ~s"}},
|
|
||||||
{"SST08",
|
|
||||||
{info, "Completed creation of ~s at level ~w with max sqn ~w"}},
|
|
||||||
{"SST09",
|
|
||||||
{warn, "Read request exposes slot with bad CRC"}},
|
|
||||||
{"SST10",
|
|
||||||
{debug, "Expansion sought to support pointer to pid ~w status ~w"}},
|
|
||||||
|
|
||||||
{"CDB01",
|
{"CDB01",
|
||||||
{info, "Opening file for writing with filename ~s"}},
|
{info, "Opening file for writing with filename ~s"}},
|
||||||
{"CDB02",
|
{"CDB02",
|
||||||
|
@ -330,10 +344,7 @@
|
||||||
{"CDB19",
|
{"CDB19",
|
||||||
{info, "Sample timings in microseconds for sample_count=~w "
|
{info, "Sample timings in microseconds for sample_count=~w "
|
||||||
++ "with totals of cycle_count=~w "
|
++ "with totals of cycle_count=~w "
|
||||||
++ "fetch_time=~w index_time=~w"}},
|
++ "fetch_time=~w index_time=~w"}}
|
||||||
|
|
||||||
{"R0001",
|
|
||||||
{debug, "Object fold to process batch of ~w objects"}}
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
|
||||||
|
@ -393,179 +404,6 @@ log_randomtimer(LogReference, Subs, StartTime, RandomProb) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Make a log of put timings split out by actor - one log for every
|
|
||||||
%% PUT_LOGPOINT puts
|
|
||||||
|
|
||||||
put_timing(_Actor, undefined, T0, T1) ->
|
|
||||||
{1, {T0, T1}, {T0, T1}};
|
|
||||||
put_timing(Actor, {?PUT_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
|
|
||||||
RN = leveled_rand:uniform(?PUT_LOGPOINT),
|
|
||||||
case RN > ?PUT_LOGPOINT div 2 of
|
|
||||||
true ->
|
|
||||||
% log at the timing point less than half the time
|
|
||||||
LogRef =
|
|
||||||
case Actor of
|
|
||||||
bookie -> "B0012" %;
|
|
||||||
% inker -> "I0019";
|
|
||||||
% journal -> "CDB17"
|
|
||||||
end,
|
|
||||||
log(LogRef, [?PUT_LOGPOINT, Total0, Total1, Max0, Max1]),
|
|
||||||
put_timing(Actor, undefined, T0, T1);
|
|
||||||
false ->
|
|
||||||
% Log some other random time
|
|
||||||
put_timing(Actor, {RN, {Total0, Total1}, {Max0, Max1}}, T0, T1)
|
|
||||||
end;
|
|
||||||
put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
|
|
||||||
{N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}.
|
|
||||||
|
|
||||||
%% Make a log of penciller head timings split out by level and result - one
|
|
||||||
%% log for every HEAD_LOGPOINT puts
|
|
||||||
%% Returns a tuple of {Count, TimingDict} to be stored on the process state
|
|
||||||
head_timing(undefined, SW, Level, R) ->
|
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
|
||||||
head_timing_int(undefined, T0, Level, R);
|
|
||||||
head_timing({N, HeadTimingD}, SW, Level, R) ->
|
|
||||||
case N band (?SAMPLE_RATE - 1) of
|
|
||||||
0 ->
|
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
|
||||||
head_timing_int({N, HeadTimingD}, T0, Level, R);
|
|
||||||
_ ->
|
|
||||||
% Not to be sampled this time
|
|
||||||
{N + 1, HeadTimingD}
|
|
||||||
end.
|
|
||||||
|
|
||||||
head_timing_int(undefined, T0, Level, R) ->
|
|
||||||
Key = head_key(R, Level),
|
|
||||||
NewDFun = fun(K, Acc) ->
|
|
||||||
case K of
|
|
||||||
Key ->
|
|
||||||
dict:store(K, [1, T0, T0], Acc);
|
|
||||||
_ ->
|
|
||||||
dict:store(K, [0, 0, 0], Acc)
|
|
||||||
end end,
|
|
||||||
{1, lists:foldl(NewDFun, dict:new(), head_keylist())};
|
|
||||||
head_timing_int({?HEAD_LOGPOINT, HeadTimingD}, T0, Level, R) ->
|
|
||||||
RN = leveled_rand:uniform(?HEAD_LOGPOINT),
|
|
||||||
case RN > ?HEAD_LOGPOINT div 2 of
|
|
||||||
true ->
|
|
||||||
% log at the timing point less than half the time
|
|
||||||
LogFun = fun(K) -> log("P0032", [K|dict:fetch(K, HeadTimingD)]) end,
|
|
||||||
lists:foreach(LogFun, head_keylist()),
|
|
||||||
head_timing_int(undefined, T0, Level, R);
|
|
||||||
false ->
|
|
||||||
% Log some other time - reset to RN not 0 to stagger logs out over
|
|
||||||
% time between the vnodes
|
|
||||||
head_timing_int({RN, HeadTimingD}, T0, Level, R)
|
|
||||||
end;
|
|
||||||
head_timing_int({N, HeadTimingD}, T0, Level, R) ->
|
|
||||||
Key = head_key(R, Level),
|
|
||||||
[Count0, Total0, Max0] = dict:fetch(Key, HeadTimingD),
|
|
||||||
{N + 1,
|
|
||||||
dict:store(Key, [Count0 + 1, Total0 + T0, max(Max0, T0)],
|
|
||||||
HeadTimingD)}.
|
|
||||||
|
|
||||||
head_key(not_present, _Level) ->
|
|
||||||
not_present;
|
|
||||||
head_key(found, 0) ->
|
|
||||||
found_0;
|
|
||||||
head_key(found, 1) ->
|
|
||||||
found_1;
|
|
||||||
head_key(found, 2) ->
|
|
||||||
found_2;
|
|
||||||
head_key(found, Level) when Level > 2 ->
|
|
||||||
found_lower.
|
|
||||||
|
|
||||||
head_keylist() ->
|
|
||||||
[not_present, found_lower, found_0, found_1, found_2].
|
|
||||||
|
|
||||||
|
|
||||||
sst_timing(undefined, SW, TimerType) ->
|
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
|
||||||
gen_timing_int(undefined,
|
|
||||||
T0,
|
|
||||||
TimerType,
|
|
||||||
fun sst_keylist/0,
|
|
||||||
?SST_LOGPOINT,
|
|
||||||
"SST01");
|
|
||||||
sst_timing({N, SSTTimerD}, SW, TimerType) ->
|
|
||||||
case N band (?SAMPLE_RATE - 1) of
|
|
||||||
0 ->
|
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
|
||||||
gen_timing_int({N, SSTTimerD},
|
|
||||||
T0,
|
|
||||||
TimerType,
|
|
||||||
fun sst_keylist/0,
|
|
||||||
?SST_LOGPOINT,
|
|
||||||
"SST01");
|
|
||||||
_ ->
|
|
||||||
% Not to be sampled this time
|
|
||||||
{N + 1, SSTTimerD}
|
|
||||||
end.
|
|
||||||
|
|
||||||
sst_keylist() ->
|
|
||||||
[tiny_bloom, slot_bloom, slot_fetch].
|
|
||||||
|
|
||||||
|
|
||||||
get_timing(undefined, SW, TimerType) ->
|
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
|
||||||
gen_timing_int(undefined,
|
|
||||||
T0,
|
|
||||||
TimerType,
|
|
||||||
fun get_keylist/0,
|
|
||||||
?GET_LOGPOINT,
|
|
||||||
"B0014");
|
|
||||||
get_timing({N, GetTimerD}, SW, TimerType) ->
|
|
||||||
case N band (?SAMPLE_RATE - 1) of
|
|
||||||
0 ->
|
|
||||||
T0 = timer:now_diff(os:timestamp(), SW),
|
|
||||||
gen_timing_int({N, GetTimerD},
|
|
||||||
T0,
|
|
||||||
TimerType,
|
|
||||||
fun get_keylist/0,
|
|
||||||
?GET_LOGPOINT,
|
|
||||||
"B0014");
|
|
||||||
_ ->
|
|
||||||
% Not to be sampled this time
|
|
||||||
{N + 1, GetTimerD}
|
|
||||||
end.
|
|
||||||
|
|
||||||
get_keylist() ->
|
|
||||||
[head_not_present, head_found, fetch].
|
|
||||||
|
|
||||||
|
|
||||||
gen_timing_int(undefined, T0, TimerType, KeyListFun, _LogPoint, _LogRef) ->
|
|
||||||
NewDFun = fun(K, Acc) ->
|
|
||||||
case K of
|
|
||||||
TimerType ->
|
|
||||||
dict:store(K, [1, T0, T0], Acc);
|
|
||||||
_ ->
|
|
||||||
dict:store(K, [0, 0, 0], Acc)
|
|
||||||
end end,
|
|
||||||
{1, lists:foldl(NewDFun, dict:new(), KeyListFun())};
|
|
||||||
gen_timing_int({LogPoint, TimerD}, T0, TimerType, KeyListFun, LogPoint,
|
|
||||||
LogRef) ->
|
|
||||||
RN = leveled_rand:uniform(LogPoint),
|
|
||||||
case RN > LogPoint div 2 of
|
|
||||||
true ->
|
|
||||||
% log at the timing point less than half the time
|
|
||||||
LogFun = fun(K) -> log(LogRef, [K|dict:fetch(K, TimerD)]) end,
|
|
||||||
lists:foreach(LogFun, KeyListFun()),
|
|
||||||
gen_timing_int(undefined, T0, TimerType,
|
|
||||||
KeyListFun, LogPoint, LogRef);
|
|
||||||
false ->
|
|
||||||
% Log some other time - reset to RN not 0 to stagger logs out over
|
|
||||||
% time between the vnodes
|
|
||||||
gen_timing_int({RN, TimerD}, T0, TimerType,
|
|
||||||
KeyListFun, LogPoint, LogRef)
|
|
||||||
end;
|
|
||||||
gen_timing_int({N, TimerD}, T0, TimerType, _KeyListFun, _LogPoint, _LogRef) ->
|
|
||||||
[Count0, Total0, Max0] = dict:fetch(TimerType, TimerD),
|
|
||||||
{N + 1,
|
|
||||||
dict:store(TimerType,
|
|
||||||
[Count0 + 1, Total0 + T0, max(Max0, T0)],
|
|
||||||
TimerD)}.
|
|
||||||
|
|
||||||
|
|
||||||
format_time() ->
|
format_time() ->
|
||||||
format_time(localtime_ms()).
|
format_time(localtime_ms()).
|
||||||
|
|
||||||
|
@ -591,17 +429,6 @@ log_test() ->
|
||||||
log("D0001", []),
|
log("D0001", []),
|
||||||
log_timer("D0001", [], os:timestamp()).
|
log_timer("D0001", [], os:timestamp()).
|
||||||
|
|
||||||
head_timing_test() ->
|
|
||||||
SW = os:timestamp(),
|
|
||||||
HeadTimer0 = lists:foldl(fun(_X, Acc) -> head_timing(Acc, SW, 2, found) end,
|
|
||||||
undefined,
|
|
||||||
lists:seq(0, 47)),
|
|
||||||
HeadTimer1 = head_timing(HeadTimer0, SW, 3, found),
|
|
||||||
{N, D} = HeadTimer1,
|
|
||||||
?assertMatch(49, N),
|
|
||||||
?assertMatch(3, lists:nth(1, dict:fetch(found_2, D))),
|
|
||||||
?assertMatch(1, lists:nth(1, dict:fetch(found_lower, D))).
|
|
||||||
|
|
||||||
log_warn_test() ->
|
log_warn_test() ->
|
||||||
ok = log("G0001", [], [warn, error]),
|
ok = log("G0001", [], [warn, error]),
|
||||||
ok = log("G8888", [], [info, warn, error]),
|
ok = log("G8888", [], [info, warn, error]),
|
||||||
|
|
|
@ -219,6 +219,8 @@
|
||||||
-define(ITERATOR_SCANWIDTH, 4).
|
-define(ITERATOR_SCANWIDTH, 4).
|
||||||
-define(SNAPSHOT_TIMEOUT_LONG, 3600).
|
-define(SNAPSHOT_TIMEOUT_LONG, 3600).
|
||||||
-define(SNAPSHOT_TIMEOUT_SHORT, 600).
|
-define(SNAPSHOT_TIMEOUT_SHORT, 600).
|
||||||
|
-define(TIMING_SAMPLECOUNTDOWN, 10000).
|
||||||
|
-define(TIMING_SAMPLESIZE, 100).
|
||||||
|
|
||||||
-record(state, {manifest, % a manifest record from the leveled_manifest module
|
-record(state, {manifest, % a manifest record from the leveled_manifest module
|
||||||
persisted_sqn = 0 :: integer(), % The highest SQN persisted
|
persisted_sqn = 0 :: integer(), % The highest SQN persisted
|
||||||
|
@ -244,10 +246,26 @@
|
||||||
work_ongoing = false :: boolean(), % i.e. compaction work
|
work_ongoing = false :: boolean(), % i.e. compaction work
|
||||||
work_backlog = false :: boolean(), % i.e. compaction work
|
work_backlog = false :: boolean(), % i.e. compaction work
|
||||||
|
|
||||||
head_timing :: tuple() | undefined,
|
timings = no_timing :: pcl_timings(),
|
||||||
|
timings_countdown = 0 :: integer(),
|
||||||
|
|
||||||
compression_method = native :: lz4|native}).
|
compression_method = native :: lz4|native}).
|
||||||
|
|
||||||
|
-record(pcl_timings,
|
||||||
|
{sample_count = 0 :: integer(),
|
||||||
|
foundmem_time = 0 :: integer(),
|
||||||
|
found0_time = 0 :: integer(),
|
||||||
|
found1_time = 0 :: integer(),
|
||||||
|
found2_time = 0 :: integer(),
|
||||||
|
foundlower_time = 0 :: integer(),
|
||||||
|
missed_time = 0 :: integer(),
|
||||||
|
foundmem_count = 0 :: integer(),
|
||||||
|
found0_count = 0 :: integer(),
|
||||||
|
found1_count = 0 :: integer(),
|
||||||
|
found2_count = 0 :: integer(),
|
||||||
|
foundlower_count = 0 :: integer(),
|
||||||
|
missed_count = 0 :: integer()}).
|
||||||
|
|
||||||
-type penciller_options() :: #penciller_options{}.
|
-type penciller_options() :: #penciller_options{}.
|
||||||
-type bookies_memory() :: {tuple()|empty_cache,
|
-type bookies_memory() :: {tuple()|empty_cache,
|
||||||
% array:array()|empty_array,
|
% array:array()|empty_array,
|
||||||
|
@ -255,6 +273,7 @@
|
||||||
integer()|infinity,
|
integer()|infinity,
|
||||||
integer()}.
|
integer()}.
|
||||||
-type pcl_state() :: #state{}.
|
-type pcl_state() :: #state{}.
|
||||||
|
-type pcl_timings() :: no_timing|#pcl_timings{}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -303,11 +322,11 @@ pcl_pushmem(Pid, LedgerCache) ->
|
||||||
%% The return value will be a leveled_skiplist that forms that part of the
|
%% The return value will be a leveled_skiplist that forms that part of the
|
||||||
%% cache
|
%% cache
|
||||||
pcl_fetchlevelzero(Pid, Slot) ->
|
pcl_fetchlevelzero(Pid, Slot) ->
|
||||||
%% Timeout to cause crash of L0 file when it can't get the close signal
|
% Timeout to cause crash of L0 file when it can't get the close signal
|
||||||
%% as it is deadlocked making this call.
|
% as it is deadlocked making this call.
|
||||||
%%
|
%
|
||||||
%% If the timeout gets hit outside of close scenario the Penciller will
|
% If the timeout gets hit outside of close scenario the Penciller will
|
||||||
%% be stuck in L0 pending
|
% be stuck in L0 pending
|
||||||
gen_server:call(Pid, {fetch_levelzero, Slot}, 60000).
|
gen_server:call(Pid, {fetch_levelzero, Slot}, 60000).
|
||||||
|
|
||||||
-spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present.
|
-spec pcl_fetch(pid(), tuple()) -> {tuple(), tuple()}|not_present.
|
||||||
|
@ -555,13 +574,15 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
|
||||||
State)}
|
State)}
|
||||||
end;
|
end;
|
||||||
handle_call({fetch, Key, Hash}, _From, State) ->
|
handle_call({fetch, Key, Hash}, _From, State) ->
|
||||||
{R, HeadTimer} = timed_fetch_mem(Key,
|
{R, UpdTimings} = timed_fetch_mem(Key,
|
||||||
Hash,
|
Hash,
|
||||||
State#state.manifest,
|
State#state.manifest,
|
||||||
State#state.levelzero_cache,
|
State#state.levelzero_cache,
|
||||||
State#state.levelzero_index,
|
State#state.levelzero_index,
|
||||||
State#state.head_timing),
|
State#state.timings),
|
||||||
{reply, R, State#state{head_timing=HeadTimer}};
|
{UpdTimings0, CountDown} =
|
||||||
|
update_statetimings(UpdTimings, State#state.timings_countdown),
|
||||||
|
{reply, R, State#state{timings=UpdTimings0, timings_countdown=CountDown}};
|
||||||
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
compare_to_sqn(plain_fetch_mem(Key,
|
compare_to_sqn(plain_fetch_mem(Key,
|
||||||
|
@ -1059,17 +1080,11 @@ roll_memory(State, true) ->
|
||||||
{ok, Constructor, _} = R,
|
{ok, Constructor, _} = R,
|
||||||
Constructor.
|
Constructor.
|
||||||
|
|
||||||
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) ->
|
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, Timings) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
||||||
UpdHeadTimer =
|
UpdTimings = update_timings(SW, Timings, R, Level),
|
||||||
case R of
|
{R, UpdTimings}.
|
||||||
not_present ->
|
|
||||||
leveled_log:head_timing(HeadTimer, SW, Level, not_present);
|
|
||||||
_ ->
|
|
||||||
leveled_log:head_timing(HeadTimer, SW, Level, found)
|
|
||||||
end,
|
|
||||||
{R, UpdHeadTimer}.
|
|
||||||
|
|
||||||
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
||||||
|
@ -1082,7 +1097,7 @@ fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||||
{false, not_found} ->
|
{false, not_found} ->
|
||||||
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4);
|
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/4);
|
||||||
{true, KV} ->
|
{true, KV} ->
|
||||||
{KV, 0}
|
{KV, memory}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
||||||
|
@ -1404,6 +1419,89 @@ find_nextkey(QueryArray, LCnt,
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Timing Functions
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec update_statetimings(pcl_timings(), integer())
|
||||||
|
-> {pcl_timings(), integer()}.
|
||||||
|
%% @doc
|
||||||
|
%%
|
||||||
|
%% The timings state is either in countdown to the next set of samples of
|
||||||
|
%% we are actively collecting a sample. Active collection take place
|
||||||
|
%% when the countdown is 0. Once the sample has reached the expected count
|
||||||
|
%% then there is a log of that sample, and the countdown is restarted.
|
||||||
|
%%
|
||||||
|
%% Outside of sample windows the timings object should be set to the atom
|
||||||
|
%% no_timing. no_timing is a valid state for the pcl_timings type.
|
||||||
|
update_statetimings(no_timing, 0) ->
|
||||||
|
{#pcl_timings{}, 0};
|
||||||
|
update_statetimings(Timings, 0) ->
|
||||||
|
case Timings#pcl_timings.sample_count of
|
||||||
|
SC when SC >= ?TIMING_SAMPLESIZE ->
|
||||||
|
log_timings(Timings),
|
||||||
|
{no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)};
|
||||||
|
_SC ->
|
||||||
|
{Timings, 0}
|
||||||
|
end;
|
||||||
|
update_statetimings(no_timing, N) ->
|
||||||
|
{no_timing, N - 1}.
|
||||||
|
|
||||||
|
log_timings(Timings) ->
|
||||||
|
leveled_log:log("P0032", [Timings#pcl_timings.sample_count,
|
||||||
|
Timings#pcl_timings.foundmem_time,
|
||||||
|
Timings#pcl_timings.found0_time,
|
||||||
|
Timings#pcl_timings.found1_time,
|
||||||
|
Timings#pcl_timings.found2_time,
|
||||||
|
Timings#pcl_timings.foundlower_time,
|
||||||
|
Timings#pcl_timings.missed_time,
|
||||||
|
Timings#pcl_timings.foundmem_count,
|
||||||
|
Timings#pcl_timings.found0_count,
|
||||||
|
Timings#pcl_timings.found1_count,
|
||||||
|
Timings#pcl_timings.found2_count,
|
||||||
|
Timings#pcl_timings.foundlower_count,
|
||||||
|
Timings#pcl_timings.missed_count]).
|
||||||
|
|
||||||
|
-spec update_timings(erlang:timestamp(), pcl_timings(),
|
||||||
|
not_found|tuple(), integer()|basement)
|
||||||
|
-> pcl_timings().
|
||||||
|
%% @doc
|
||||||
|
%%
|
||||||
|
%% update the timings record unless the current record object is the atom
|
||||||
|
%% no_timing.
|
||||||
|
update_timings(_SW, no_timing, _Result, _Stage) ->
|
||||||
|
no_timing;
|
||||||
|
update_timings(SW, Timings, Result, Stage) ->
|
||||||
|
Timer = timer:now_diff(os:timestamp(), SW),
|
||||||
|
SC = Timings#pcl_timings.sample_count + 1,
|
||||||
|
Timings0 = Timings#pcl_timings{sample_count = SC},
|
||||||
|
case {Result, Stage} of
|
||||||
|
{not_present, _} ->
|
||||||
|
NFT = Timings#pcl_timings.missed_time + Timer,
|
||||||
|
NFC = Timings#pcl_timings.missed_count + 1,
|
||||||
|
Timings0#pcl_timings{missed_time = NFT, missed_count = NFC};
|
||||||
|
{_, memory} ->
|
||||||
|
PMT = Timings#pcl_timings.foundmem_time + Timer,
|
||||||
|
PMC = Timings#pcl_timings.foundmem_count + 1,
|
||||||
|
Timings0#pcl_timings{foundmem_time = PMT, foundmem_count = PMC};
|
||||||
|
{_, 0} ->
|
||||||
|
L0T = Timings#pcl_timings.found0_time + Timer,
|
||||||
|
L0C = Timings#pcl_timings.found0_count + 1,
|
||||||
|
Timings0#pcl_timings{found0_time = L0T, found0_count = L0C};
|
||||||
|
{_, 1} ->
|
||||||
|
L1T = Timings#pcl_timings.found1_time + Timer,
|
||||||
|
L1C = Timings#pcl_timings.found1_count + 1,
|
||||||
|
Timings0#pcl_timings{found1_time = L1T, found1_count = L1C};
|
||||||
|
{_, 2} ->
|
||||||
|
L2T = Timings#pcl_timings.found2_time + Timer,
|
||||||
|
L2C = Timings#pcl_timings.found2_count + 1,
|
||||||
|
Timings0#pcl_timings{found2_time = L2T, found2_count = L2C};
|
||||||
|
_ ->
|
||||||
|
LLT = Timings#pcl_timings.foundlower_time + Timer,
|
||||||
|
LLC = Timings#pcl_timings.foundlower_count + 1,
|
||||||
|
Timings0#pcl_timings{foundlower_time = LLT, foundlower_count = LLC}
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
|
@ -1831,6 +1929,19 @@ checkready(Pid) ->
|
||||||
timeout
|
timeout
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
timings_test() ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
timer:sleep(1),
|
||||||
|
T0 = update_timings(SW, #pcl_timings{}, {"K", "V"}, 2),
|
||||||
|
timer:sleep(1),
|
||||||
|
T1 = update_timings(SW, T0, {"K", "V"}, 3),
|
||||||
|
T2 = update_timings(SW, T1, {"K", "V"}, basement),
|
||||||
|
?assertMatch(3, T2#pcl_timings.sample_count),
|
||||||
|
?assertMatch(true, T2#pcl_timings.foundlower_time > T2#pcl_timings.found2_time),
|
||||||
|
?assertMatch(1, T2#pcl_timings.found2_count),
|
||||||
|
?assertMatch(2, T2#pcl_timings.foundlower_count).
|
||||||
|
|
||||||
|
|
||||||
coverage_cheat_test() ->
|
coverage_cheat_test() ->
|
||||||
{noreply, _State0} = handle_info(timeout, #state{}),
|
{noreply, _State0} = handle_info(timeout, #state{}),
|
||||||
{ok, _State1} = code_change(null, #state{}, null).
|
{ok, _State1} = code_change(null, #state{}, null).
|
||||||
|
|
|
@ -76,6 +76,8 @@
|
||||||
-define(DELETE_TIMEOUT, 10000).
|
-define(DELETE_TIMEOUT, 10000).
|
||||||
-define(TREE_TYPE, idxt).
|
-define(TREE_TYPE, idxt).
|
||||||
-define(TREE_SIZE, 4).
|
-define(TREE_SIZE, 4).
|
||||||
|
-define(TIMING_SAMPLECOUNTDOWN, 10000).
|
||||||
|
-define(TIMING_SAMPLESIZE, 100).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -134,15 +136,29 @@
|
||||||
-record(state,
|
-record(state,
|
||||||
{summary,
|
{summary,
|
||||||
handle :: file:fd() | undefined,
|
handle :: file:fd() | undefined,
|
||||||
sst_timings :: tuple() | undefined,
|
|
||||||
penciller :: pid() | undefined,
|
penciller :: pid() | undefined,
|
||||||
root_path,
|
root_path,
|
||||||
filename,
|
filename,
|
||||||
yield_blockquery = false :: boolean(),
|
yield_blockquery = false :: boolean(),
|
||||||
blockindex_cache,
|
blockindex_cache,
|
||||||
compression_method = native :: press_methods()}).
|
compression_method = native :: press_methods(),
|
||||||
|
timings = no_timing :: sst_timings(),
|
||||||
|
timings_countdown = 0 :: integer()}).
|
||||||
|
|
||||||
|
-record(sst_timings,
|
||||||
|
{sample_count = 0 :: integer(),
|
||||||
|
index_query_time = 0 :: integer(),
|
||||||
|
tiny_bloom_time = 0 :: integer(),
|
||||||
|
slot_index_time = 0 :: integer(),
|
||||||
|
slot_fetch_time = 0 :: integer(),
|
||||||
|
noncached_block_time = 0 :: integer(),
|
||||||
|
tiny_bloom_count = 0 :: integer(),
|
||||||
|
slot_index_count = 0 :: integer(),
|
||||||
|
slot_fetch_count = 0 :: integer(),
|
||||||
|
noncached_block_count = 0 :: integer()}).
|
||||||
|
|
||||||
-type sst_state() :: #state{}.
|
-type sst_state() :: #state{}.
|
||||||
|
-type sst_timings() :: no_timing|#sst_timings{}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -421,27 +437,43 @@ starting({sst_new,
|
||||||
starting({sst_newlevelzero, RootPath, Filename,
|
starting({sst_newlevelzero, RootPath, Filename,
|
||||||
Slots, FetchFun, Penciller, MaxSQN,
|
Slots, FetchFun, Penciller, MaxSQN,
|
||||||
PressMethod}, State) ->
|
PressMethod}, State) ->
|
||||||
SW = os:timestamp(),
|
SW0 = os:timestamp(),
|
||||||
KVList = leveled_pmem:to_list(Slots, FetchFun),
|
KVList = leveled_pmem:to_list(Slots, FetchFun),
|
||||||
|
Time0 = timer:now_diff(os:timestamp(), SW0),
|
||||||
|
|
||||||
|
SW1 = os:timestamp(),
|
||||||
{[], [], SlotList, FirstKey} = merge_lists(KVList, PressMethod),
|
{[], [], SlotList, FirstKey} = merge_lists(KVList, PressMethod),
|
||||||
|
Time1 = timer:now_diff(os:timestamp(), SW1),
|
||||||
|
|
||||||
|
SW2 = os:timestamp(),
|
||||||
{SlotCount,
|
{SlotCount,
|
||||||
SlotIndex,
|
SlotIndex,
|
||||||
BlockIndex,
|
BlockIndex,
|
||||||
SlotsBin} = build_all_slots(SlotList, PressMethod),
|
SlotsBin} = build_all_slots(SlotList, PressMethod),
|
||||||
|
Time2 = timer:now_diff(os:timestamp(), SW2),
|
||||||
|
|
||||||
|
SW3 = os:timestamp(),
|
||||||
SummaryBin = build_table_summary(SlotIndex,
|
SummaryBin = build_table_summary(SlotIndex,
|
||||||
0,
|
0,
|
||||||
FirstKey,
|
FirstKey,
|
||||||
SlotCount,
|
SlotCount,
|
||||||
MaxSQN),
|
MaxSQN),
|
||||||
|
Time3 = timer:now_diff(os:timestamp(), SW3),
|
||||||
|
|
||||||
|
SW4 = os:timestamp(),
|
||||||
ActualFilename =
|
ActualFilename =
|
||||||
write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod),
|
write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod),
|
||||||
UpdState = read_file(ActualFilename,
|
UpdState = read_file(ActualFilename,
|
||||||
State#state{root_path = RootPath,
|
State#state{root_path = RootPath,
|
||||||
yield_blockquery = true}),
|
yield_blockquery = true}),
|
||||||
Summary = UpdState#state.summary,
|
Summary = UpdState#state.summary,
|
||||||
|
Time4 = timer:now_diff(os:timestamp(), SW4),
|
||||||
|
|
||||||
leveled_log:log_timer("SST08",
|
leveled_log:log_timer("SST08",
|
||||||
[ActualFilename, 0, Summary#summary.max_sqn],
|
[ActualFilename, 0, Summary#summary.max_sqn],
|
||||||
SW),
|
SW0),
|
||||||
|
leveled_log:log("SST11", [Time0, Time1, Time2, Time3, Time4]),
|
||||||
|
|
||||||
case Penciller of
|
case Penciller of
|
||||||
undefined ->
|
undefined ->
|
||||||
{next_state,
|
{next_state,
|
||||||
|
@ -459,10 +491,15 @@ starting({sst_newlevelzero, RootPath, Filename,
|
||||||
|
|
||||||
|
|
||||||
reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
SW = os:timestamp(),
|
% Get a KV value and potentially take sample timings
|
||||||
{Result, Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State),
|
{Result, UpdState, UpdTimings} =
|
||||||
UpdTimings = leveled_log:sst_timing(State#state.sst_timings, SW, Stage),
|
fetch(LedgerKey, Hash, State, State#state.timings),
|
||||||
{reply, Result, reader, UpdState#state{sst_timings = UpdTimings}};
|
|
||||||
|
{UpdTimings0, CountDown} =
|
||||||
|
update_statetimings(UpdTimings, State#state.timings_countdown),
|
||||||
|
|
||||||
|
{reply, Result, reader, UpdState#state{timings = UpdTimings0,
|
||||||
|
timings_countdown = CountDown}};
|
||||||
reader({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) ->
|
reader({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) ->
|
||||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
||||||
EndKey,
|
EndKey,
|
||||||
|
@ -497,8 +534,8 @@ reader(get_maxsequencenumber, _From, State) ->
|
||||||
Summary = State#state.summary,
|
Summary = State#state.summary,
|
||||||
{reply, Summary#summary.max_sqn, reader, State};
|
{reply, Summary#summary.max_sqn, reader, State};
|
||||||
reader(print_timings, _From, State) ->
|
reader(print_timings, _From, State) ->
|
||||||
io:format(user, "~nTimings of ~w~n", [State#state.sst_timings]),
|
log_timings(State#state.timings),
|
||||||
{reply, ok, reader, State#state{sst_timings = undefined}};
|
{reply, ok, reader, State};
|
||||||
reader({set_for_delete, Penciller}, _From, State) ->
|
reader({set_for_delete, Penciller}, _From, State) ->
|
||||||
leveled_log:log("SST06", [State#state.filename]),
|
leveled_log:log("SST06", [State#state.filename]),
|
||||||
{reply,
|
{reply,
|
||||||
|
@ -521,7 +558,7 @@ reader(close, _From, State) ->
|
||||||
|
|
||||||
|
|
||||||
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
{Result, _Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State),
|
{Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing),
|
||||||
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
||||||
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SlotList},
|
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SlotList},
|
||||||
_From, State) ->
|
_From, State) ->
|
||||||
|
@ -591,16 +628,34 @@ code_change(_OldVsn, StateName, State, _Extra) ->
|
||||||
%%% Internal Functions
|
%%% Internal Functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
fetch(LedgerKey, Hash, State) ->
|
-spec fetch(tuple(),
|
||||||
|
{integer(), integer()}|integer(),
|
||||||
|
sst_state(), sst_timings())
|
||||||
|
-> {not_present|tuple(), sst_state(), sst_timings()}.
|
||||||
|
%% @doc
|
||||||
|
%%
|
||||||
|
%% Fetch a key from the store, potentially taking timings. Result should be
|
||||||
|
%% not_present if the key is not in the store.
|
||||||
|
fetch(LedgerKey, Hash, State, Timings0) ->
|
||||||
|
SW0 = os:timestamp(),
|
||||||
|
|
||||||
Summary = State#state.summary,
|
Summary = State#state.summary,
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
Slot = lookup_slot(LedgerKey, Summary#summary.index),
|
Slot = lookup_slot(LedgerKey, Summary#summary.index),
|
||||||
|
|
||||||
|
{SW1, Timings1} = update_timings(SW0, Timings0, index_query, true),
|
||||||
|
|
||||||
SlotID = Slot#slot_index_value.slot_id,
|
SlotID = Slot#slot_index_value.slot_id,
|
||||||
Bloom = Slot#slot_index_value.bloom,
|
Bloom = Slot#slot_index_value.bloom,
|
||||||
case leveled_tinybloom:check_hash(Hash, Bloom) of
|
case leveled_tinybloom:check_hash(Hash, Bloom) of
|
||||||
false ->
|
false ->
|
||||||
{not_present, tiny_bloom, SlotID, State};
|
{_SW2, Timings2} =
|
||||||
|
update_timings(SW1, Timings1, tiny_bloom, false),
|
||||||
|
{not_present, State, Timings2};
|
||||||
true ->
|
true ->
|
||||||
|
{SW2, Timings2} =
|
||||||
|
update_timings(SW1, Timings1, tiny_bloom, true),
|
||||||
|
|
||||||
CachedBlockIdx = array:get(SlotID - 1,
|
CachedBlockIdx = array:get(SlotID - 1,
|
||||||
State#state.blockindex_cache),
|
State#state.blockindex_cache),
|
||||||
case CachedBlockIdx of
|
case CachedBlockIdx of
|
||||||
|
@ -612,10 +667,11 @@ fetch(LedgerKey, Hash, State) ->
|
||||||
<<BlockLengths/binary,
|
<<BlockLengths/binary,
|
||||||
BlockIdx/binary>>,
|
BlockIdx/binary>>,
|
||||||
State#state.blockindex_cache),
|
State#state.blockindex_cache),
|
||||||
|
{_SW3, Timings3} =
|
||||||
|
update_timings(SW2, Timings2, noncached_block, false),
|
||||||
{Result,
|
{Result,
|
||||||
slot_fetch,
|
State#state{blockindex_cache = BlockIndexCache},
|
||||||
Slot#slot_index_value.slot_id,
|
Timings3};
|
||||||
State#state{blockindex_cache = BlockIndexCache}};
|
|
||||||
<<BlockLengths:24/binary, BlockIdx/binary>> ->
|
<<BlockLengths:24/binary, BlockIdx/binary>> ->
|
||||||
PosList = find_pos(BlockIdx,
|
PosList = find_pos(BlockIdx,
|
||||||
extra_hash(Hash),
|
extra_hash(Hash),
|
||||||
|
@ -623,8 +679,18 @@ fetch(LedgerKey, Hash, State) ->
|
||||||
0),
|
0),
|
||||||
case PosList of
|
case PosList of
|
||||||
[] ->
|
[] ->
|
||||||
{not_present, slot_bloom, SlotID, State};
|
{_SW3, Timings3} =
|
||||||
|
update_timings(SW2,
|
||||||
|
Timings2,
|
||||||
|
slot_index,
|
||||||
|
false),
|
||||||
|
{not_present, State, Timings3};
|
||||||
_ ->
|
_ ->
|
||||||
|
{SW3, Timings3} =
|
||||||
|
update_timings(SW2,
|
||||||
|
Timings2,
|
||||||
|
slot_index,
|
||||||
|
true),
|
||||||
StartPos = Slot#slot_index_value.start_position,
|
StartPos = Slot#slot_index_value.start_position,
|
||||||
Result =
|
Result =
|
||||||
check_blocks(PosList,
|
check_blocks(PosList,
|
||||||
|
@ -634,7 +700,12 @@ fetch(LedgerKey, Hash, State) ->
|
||||||
LedgerKey,
|
LedgerKey,
|
||||||
PressMethod,
|
PressMethod,
|
||||||
not_present),
|
not_present),
|
||||||
{Result, slot_fetch, SlotID, State}
|
{_SW4, Timings4} =
|
||||||
|
update_timings(SW3,
|
||||||
|
Timings3,
|
||||||
|
slot_fetch,
|
||||||
|
false),
|
||||||
|
{Result, State, Timings4}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
@ -1722,6 +1793,97 @@ expand_list_by_pointer({next, ManEntry, StartKey, EndKey},
|
||||||
Width, SegList),
|
Width, SegList),
|
||||||
ExpPointer ++ Tail.
|
ExpPointer ++ Tail.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Timing Functions
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec update_statetimings(sst_timings(), integer())
|
||||||
|
-> {sst_timings(), integer()}.
|
||||||
|
%% @doc
|
||||||
|
%%
|
||||||
|
%% The timings state is either in countdown to the next set of samples of
|
||||||
|
%% we are actively collecting a sample. Active collection take place
|
||||||
|
%% when the countdown is 0. Once the sample has reached the expected count
|
||||||
|
%% then there is a log of that sample, and the countdown is restarted.
|
||||||
|
%%
|
||||||
|
%% Outside of sample windows the timings object should be set to the atom
|
||||||
|
%% no_timing. no_timing is a valid state for the cdb_timings type.
|
||||||
|
update_statetimings(no_timing, 0) ->
|
||||||
|
{#sst_timings{}, 0};
|
||||||
|
update_statetimings(Timings, 0) ->
|
||||||
|
case Timings#sst_timings.sample_count of
|
||||||
|
SC when SC >= ?TIMING_SAMPLESIZE ->
|
||||||
|
log_timings(Timings),
|
||||||
|
{no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)};
|
||||||
|
_SC ->
|
||||||
|
{Timings, 0}
|
||||||
|
end;
|
||||||
|
update_statetimings(no_timing, N) ->
|
||||||
|
{no_timing, N - 1}.
|
||||||
|
|
||||||
|
log_timings(no_timing) ->
|
||||||
|
ok;
|
||||||
|
log_timings(Timings) ->
|
||||||
|
leveled_log:log("SST12", [Timings#sst_timings.sample_count,
|
||||||
|
Timings#sst_timings.index_query_time,
|
||||||
|
Timings#sst_timings.tiny_bloom_time,
|
||||||
|
Timings#sst_timings.slot_index_time,
|
||||||
|
Timings#sst_timings.slot_fetch_time,
|
||||||
|
Timings#sst_timings.noncached_block_time,
|
||||||
|
Timings#sst_timings.tiny_bloom_count,
|
||||||
|
Timings#sst_timings.slot_index_count,
|
||||||
|
Timings#sst_timings.slot_fetch_count,
|
||||||
|
Timings#sst_timings.noncached_block_count]).
|
||||||
|
|
||||||
|
|
||||||
|
update_timings(_SW, no_timing, _Stage, _Continue) ->
|
||||||
|
{no_timing, no_timing};
|
||||||
|
update_timings(SW, Timings, Stage, Continue) ->
|
||||||
|
Timer = timer:now_diff(os:timestamp(), SW),
|
||||||
|
Timings0 =
|
||||||
|
case Stage of
|
||||||
|
index_query ->
|
||||||
|
IQT = Timings#sst_timings.index_query_time,
|
||||||
|
Timings#sst_timings{index_query_time = IQT + Timer};
|
||||||
|
tiny_bloom ->
|
||||||
|
TBT = Timings#sst_timings.tiny_bloom_time,
|
||||||
|
Timings#sst_timings{tiny_bloom_time = TBT + Timer};
|
||||||
|
slot_index ->
|
||||||
|
SIT = Timings#sst_timings.slot_index_time,
|
||||||
|
Timings#sst_timings{slot_index_time = SIT + Timer};
|
||||||
|
slot_fetch ->
|
||||||
|
SFT = Timings#sst_timings.slot_fetch_time,
|
||||||
|
Timings#sst_timings{slot_fetch_time = SFT + Timer};
|
||||||
|
noncached_block ->
|
||||||
|
NCT = Timings#sst_timings.noncached_block_time,
|
||||||
|
Timings#sst_timings{noncached_block_time = NCT + Timer}
|
||||||
|
end,
|
||||||
|
case Continue of
|
||||||
|
true ->
|
||||||
|
{os:timestamp(), Timings0};
|
||||||
|
false ->
|
||||||
|
Timings1 =
|
||||||
|
case Stage of
|
||||||
|
tiny_bloom ->
|
||||||
|
TBC = Timings#sst_timings.tiny_bloom_count,
|
||||||
|
Timings0#sst_timings{tiny_bloom_count = TBC + 1};
|
||||||
|
slot_index ->
|
||||||
|
SIC = Timings#sst_timings.slot_index_count,
|
||||||
|
Timings0#sst_timings{slot_index_count = SIC + 1};
|
||||||
|
slot_fetch ->
|
||||||
|
SFC = Timings#sst_timings.slot_fetch_count,
|
||||||
|
Timings0#sst_timings{slot_fetch_count = SFC + 1};
|
||||||
|
noncached_block ->
|
||||||
|
NCC = Timings#sst_timings.noncached_block_count,
|
||||||
|
Timings0#sst_timings{noncached_block_count = NCC + 1}
|
||||||
|
end,
|
||||||
|
SC = Timings1#sst_timings.sample_count,
|
||||||
|
{no_timing, Timings1#sst_timings{sample_count = SC + 1}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -1962,7 +2124,7 @@ indexed_list_mixedkeys_bitflip_test() ->
|
||||||
|
|
||||||
flip_byte(Binary) ->
|
flip_byte(Binary) ->
|
||||||
L = byte_size(Binary),
|
L = byte_size(Binary),
|
||||||
Byte1 = leveled_rand:uniform(L),
|
Byte1 = leveled_rand:uniform(L) - 1,
|
||||||
<<PreB1:Byte1/binary, A:8/integer, PostByte1/binary>> = Binary,
|
<<PreB1:Byte1/binary, A:8/integer, PostByte1/binary>> = Binary,
|
||||||
case A of
|
case A of
|
||||||
0 ->
|
0 ->
|
||||||
|
@ -2369,7 +2531,19 @@ check_segment_match(PosBinIndex1, KVL, TreeSize) ->
|
||||||
end,
|
end,
|
||||||
lists:foreach(CheckFun, KVL).
|
lists:foreach(CheckFun, KVL).
|
||||||
|
|
||||||
|
timings_test() ->
|
||||||
|
SW = os:timestamp(),
|
||||||
|
timer:sleep(1),
|
||||||
|
{no_timing, T0} = update_timings(SW, #sst_timings{}, tiny_bloom, false),
|
||||||
|
{no_timing, T1} = update_timings(SW, T0, slot_index, false),
|
||||||
|
{no_timing, T2} = update_timings(SW, T1, slot_fetch, false),
|
||||||
|
{no_timing, T3} = update_timings(SW, T2, noncached_block, false),
|
||||||
|
timer:sleep(1),
|
||||||
|
{_, T4} = update_timings(SW, T3, tiny_bloom, true),
|
||||||
|
?assertMatch(4, T4#sst_timings.sample_count),
|
||||||
|
?assertMatch(1, T4#sst_timings.tiny_bloom_count),
|
||||||
|
?assertMatch(true, T4#sst_timings.tiny_bloom_time >
|
||||||
|
T3#sst_timings.tiny_bloom_time).
|
||||||
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -281,7 +281,7 @@ handoff(_Config) ->
|
||||||
% Start the first database, load a test object, close it, start it again
|
% Start the first database, load a test object, close it, start it again
|
||||||
StartOpts1 = [{root_path, RootPathA},
|
StartOpts1 = [{root_path, RootPathA},
|
||||||
{max_pencillercachesize, 16000},
|
{max_pencillercachesize, 16000},
|
||||||
{sync_strategy, riak_sync}],
|
{sync_strategy, sync}],
|
||||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
|
||||||
% Add some noe Riak objects in - which should be ignored in folds.
|
% Add some noe Riak objects in - which should be ignored in folds.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue