Add put timing points

This commit is contained in:
martinsumner 2016-12-20 23:11:50 +00:00
parent 299e8e6de3
commit 060ce2e263
5 changed files with 74 additions and 14 deletions

View file

@ -150,6 +150,7 @@
-define(CHECKJOURNAL_PROB, 0.2). -define(CHECKJOURNAL_PROB, 0.2).
-define(CACHE_SIZE_JITTER, 25). -define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20). -define(JOURNAL_SIZE_JITTER, 20).
-define(PUT_TIMING_LOGPOINT, 10000).
-record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(), -record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(),
min_sqn = infinity :: integer()|infinity, min_sqn = infinity :: integer()|infinity,
@ -160,7 +161,8 @@
cache_size :: integer(), cache_size :: integer(),
ledger_cache = #ledger_cache{}, ledger_cache = #ledger_cache{},
is_snapshot :: boolean(), is_snapshot :: boolean(),
slow_offer = false :: boolean()}). slow_offer = false :: boolean(),
put_timing = {0, {0, 0}, {0, 0}} :: tuple()}).
%%%============================================================================ %%%============================================================================
@ -262,10 +264,12 @@ init([Opts]) ->
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
SW = os:timestamp(),
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
LedgerKey, LedgerKey,
Object, Object,
{IndexSpecs, TTL}), {IndexSpecs, TTL}),
T0 = timer:now_diff(os:timestamp(), SW),
Changes = preparefor_ledgercache(no_type_assigned, Changes = preparefor_ledgercache(no_type_assigned,
LedgerKey, LedgerKey,
SQN, SQN,
@ -273,6 +277,8 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
ObjSize, ObjSize,
{IndexSpecs, TTL}), {IndexSpecs, TTL}),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
T1 = timer:now_diff(os:timestamp(), SW) - T0,
PutTimings = update_put_timings(State#state.put_timing, T0, T1),
% If the previous push to memory was returned then punish this PUT with a % If the previous push to memory was returned then punish this PUT with a
% delay. If the back-pressure in the Penciller continues, these delays % delay. If the back-pressure in the Penciller continues, these delays
% will beocme more frequent % will beocme more frequent
@ -286,9 +292,13 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
Cache0, Cache0,
State#state.penciller) of State#state.penciller) of
{ok, NewCache} -> {ok, NewCache} ->
{noreply, State#state{ledger_cache=NewCache, slow_offer=false}}; {noreply, State#state{ledger_cache=NewCache,
put_timing=PutTimings,
slow_offer=false}};
{returned, NewCache} -> {returned, NewCache} ->
{noreply, State#state{ledger_cache=NewCache, slow_offer=true}} {noreply, State#state{ledger_cache=NewCache,
put_timing=PutTimings,
slow_offer=true}}
end; end;
handle_call({get, Bucket, Key, Tag}, _From, State) -> handle_call({get, Bucket, Key, Tag}, _From, State) ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
@ -454,6 +464,15 @@ push_ledgercache(Penciller, Cache) ->
%%% Internal functions %%% Internal functions
%%%============================================================================ %%%============================================================================
update_put_timings({?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}},
T0, T1) ->
leveled_log:log("B0012",
[?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]),
{1, {T0, T1}, {T0, T1}};
update_put_timings({N, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
{N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}.
cache_size(LedgerCache) -> cache_size(LedgerCache) ->
leveled_skiplist:size(LedgerCache#ledger_cache.skiplist). leveled_skiplist:size(LedgerCache#ledger_cache.skiplist).

View file

@ -95,6 +95,7 @@
-define(WRITE_OPS, [binary, raw, read, write]). -define(WRITE_OPS, [binary, raw, read, write]).
-define(PENDING_ROLL_WAIT, 30). -define(PENDING_ROLL_WAIT, 30).
-define(DELETE_TIMEOUT, 10000). -define(DELETE_TIMEOUT, 10000).
-define(PUT_TIMING_LOGPOINT, 10000).
-record(state, {hashtree, -record(state, {hashtree,
last_position :: integer(), last_position :: integer(),
@ -108,7 +109,8 @@
inker :: pid(), inker :: pid(),
deferred_delete = false :: boolean(), deferred_delete = false :: boolean(),
waste_path :: string(), waste_path :: string(),
sync_strategy = none}). sync_strategy = none,
put_timing = {0, {0, 0}, {0, 0}} :: tuple()}).
%%%============================================================================ %%%============================================================================
@ -256,12 +258,14 @@ writer({key_check, Key}, _From, State) ->
writer, writer,
State}; State};
writer({put_kv, Key, Value}, _From, State) -> writer({put_kv, Key, Value}, _From, State) ->
SW = os:timestamp(),
Result = put(State#state.handle, Result = put(State#state.handle,
Key, Key,
Value, Value,
{State#state.last_position, State#state.hashtree}, {State#state.last_position, State#state.hashtree},
State#state.binary_mode, State#state.binary_mode,
State#state.max_size), State#state.max_size),
T0 = timer:now_diff(os:timestamp(), SW),
case Result of case Result of
roll -> roll ->
%% Key and value could not be written %% Key and value could not be written
@ -274,10 +278,13 @@ writer({put_kv, Key, Value}, _From, State) ->
_ -> _ ->
ok ok
end, end,
T1 = timer:now_diff(os:timestamp(), SW) - T0,
Timings = update_put_timings(State#state.put_timing, T0, T1),
{reply, ok, writer, State#state{handle=UpdHandle, {reply, ok, writer, State#state{handle=UpdHandle,
last_position=NewPosition, last_position=NewPosition,
last_key=Key, last_key=Key,
hashtree=HashTree}} hashtree=HashTree,
put_timing=Timings}}
end; end;
writer({mput_kv, []}, _From, State) -> writer({mput_kv, []}, _From, State) ->
{reply, ok, writer, State}; {reply, ok, writer, State};
@ -772,6 +779,14 @@ hashtable_calc(HashTree, StartPos) ->
%% Internal functions %% Internal functions
%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%
update_put_timings({?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}},
T0, T1) ->
leveled_log:log("CDB17",
[?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]),
{1, {T0, T1}, {T0, T1}};
update_put_timings({N, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
{N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}.
determine_new_filename(Filename) -> determine_new_filename(Filename) ->
filename:rootname(Filename, ".pnd") ++ ".cdb". filename:rootname(Filename, ".pnd") ++ ".cdb".

View file

@ -472,11 +472,11 @@ stringcheck_test() ->
%% Test below proved that the overhead of performing hashes was trivial %% Test below proved that the overhead of performing hashes was trivial
%% Maybe 5 microseconds per hash %% Maybe 5 microseconds per hash
hashperf_test() -> %hashperf_test() ->
OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 10000)), % OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 10000)),
SW = os:timestamp(), % SW = os:timestamp(),
_HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL), % _HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL),
io:format(user, "10000 object hashes in ~w microseconds~n", % io:format(user, "10000 object hashes in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]). % [timer:now_diff(os:timestamp(), SW)]).
-endif. -endif.

View file

@ -126,6 +126,7 @@
-define(PENDING_FILEX, "pnd"). -define(PENDING_FILEX, "pnd").
-define(LOADING_PAUSE, 1000). -define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000). -define(LOADING_BATCH, 1000).
-define(PUT_TIMING_LOGPOINT, 10000).
-record(state, {manifest = [] :: list(), -record(state, {manifest = [] :: list(),
manifest_sqn = 0 :: integer(), manifest_sqn = 0 :: integer(),
@ -138,7 +139,8 @@
clerk :: pid(), clerk :: pid(),
compaction_pending = false :: boolean(), compaction_pending = false :: boolean(),
is_snapshot = false :: boolean(), is_snapshot = false :: boolean(),
source_inker :: pid()}). source_inker :: pid(),
put_timing = {0, {0, 0}, {0, 0}} ::tuple()}).
%%%============================================================================ %%%============================================================================
@ -414,15 +416,21 @@ start_from_file(InkOpts) ->
put_object(LedgerKey, Object, KeyChanges, State) -> put_object(LedgerKey, Object, KeyChanges, State) ->
NewSQN = State#state.journal_sqn + 1, NewSQN = State#state.journal_sqn + 1,
SW= os:timestamp(),
{JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey, {JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey,
NewSQN, NewSQN,
Object, Object,
KeyChanges), KeyChanges),
T0 = timer:now_diff(os:timestamp(), SW),
case leveled_cdb:cdb_put(State#state.active_journaldb, case leveled_cdb:cdb_put(State#state.active_journaldb,
JournalKey, JournalKey,
JournalBin) of JournalBin) of
ok -> ok ->
{ok, State#state{journal_sqn=NewSQN}, byte_size(JournalBin)}; T1 = timer:now_diff(os:timestamp(), SW) - T0,
UpdPutTimings = update_put_timings(State#state.put_timing, T0, T1),
{ok,
State#state{journal_sqn=NewSQN, put_timing=UpdPutTimings},
byte_size(JournalBin)};
roll -> roll ->
SW = os:timestamp(), SW = os:timestamp(),
CDBopts = State#state.cdb_options, CDBopts = State#state.cdb_options,
@ -742,6 +750,15 @@ initiate_penciller_snapshot(Bookie) ->
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
{LedgerSnap, MaxSQN}. {LedgerSnap, MaxSQN}.
update_put_timings({?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}},
T0, T1) ->
leveled_log:log("I0019",
[?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]),
{1, {T0, T1}, {T0, T1}};
update_put_timings({N, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
{N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}.
%%%============================================================================ %%%============================================================================
%%% Test %%% Test
%%%============================================================================ %%%============================================================================

View file

@ -40,6 +40,9 @@
{info, "Bucket list finds non-binary Bucket ~w"}}, {info, "Bucket list finds non-binary Bucket ~w"}},
{"B0011", {"B0011",
{warn, "Call to destroy the store and so all files to be removed"}}, {warn, "Call to destroy the store and so all files to be removed"}},
{"B0012",
{info, "After ~w PUTs total inker time is ~w total ledger time is ~w "
++ "and max inker time is ~w and max ledger time is ~w"}},
{"P0001", {"P0001",
{info, "Ledger snapshot ~w registered"}}, {info, "Ledger snapshot ~w registered"}},
@ -176,6 +179,9 @@
{info, "At SQN=~w journal has filename ~s"}}, {info, "At SQN=~w journal has filename ~s"}},
{"I0018", {"I0018",
{warn, "We're doomed - intention recorded to destroy all files"}}, {warn, "We're doomed - intention recorded to destroy all files"}},
{"I0019",
{info, "After ~w PUTs total prepare time is ~w total cdb time is ~w "
++ "and max prepare time is ~w and max cdb time is ~w"}},
{"IC001", {"IC001",
{info, "Closed for reason ~w so maybe leaving garbage"}}, {info, "Closed for reason ~w so maybe leaving garbage"}},
@ -271,7 +277,10 @@
{info, "Cycle count of ~w in hashtable search higher than expected" {info, "Cycle count of ~w in hashtable search higher than expected"
++ " in search for hash ~w with result ~w"}}, ++ " in search for hash ~w with result ~w"}},
{"CDB16", {"CDB16",
{info, "CDB scan from start ~w in file with end ~w and last_key ~w"}} {info, "CDB scan from start ~w in file with end ~w and last_key ~w"}},
{"CDB17",
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
++ "and max write time is ~w and max sync time is ~w"}}
])). ])).