diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 35887b8..ee94fac 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -150,6 +150,7 @@ -define(CHECKJOURNAL_PROB, 0.2). -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). +-define(PUT_TIMING_LOGPOINT, 10000). -record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(), min_sqn = infinity :: integer()|infinity, @@ -160,7 +161,8 @@ cache_size :: integer(), ledger_cache = #ledger_cache{}, is_snapshot :: boolean(), - slow_offer = false :: boolean()}). + slow_offer = false :: boolean(), + put_timing = {0, {0, 0}, {0, 0}} :: tuple()}). %%%============================================================================ @@ -262,10 +264,12 @@ init([Opts]) -> handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + SW = os:timestamp(), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, LedgerKey, Object, {IndexSpecs, TTL}), + T0 = timer:now_diff(os:timestamp(), SW), Changes = preparefor_ledgercache(no_type_assigned, LedgerKey, SQN, @@ -273,6 +277,8 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> ObjSize, {IndexSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), + T1 = timer:now_diff(os:timestamp(), SW) - T0, + PutTimings = update_put_timings(State#state.put_timing, T0, T1), % If the previous push to memory was returned then punish this PUT with a % delay. If the back-pressure in the Penciller continues, these delays % will beocme more frequent @@ -286,9 +292,13 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> Cache0, State#state.penciller) of {ok, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, slow_offer=false}}; + {noreply, State#state{ledger_cache=NewCache, + put_timing=PutTimings, + slow_offer=false}}; {returned, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, slow_offer=true}} + {noreply, State#state{ledger_cache=NewCache, + put_timing=PutTimings, + slow_offer=true}} end; handle_call({get, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), @@ -454,6 +464,15 @@ push_ledgercache(Penciller, Cache) -> %%% Internal functions %%%============================================================================ +update_put_timings({?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, + T0, T1) -> + leveled_log:log("B0012", + [?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]), + {1, {T0, T1}, {T0, T1}}; +update_put_timings({N, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> + {N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}. + + cache_size(LedgerCache) -> leveled_skiplist:size(LedgerCache#ledger_cache.skiplist). diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 1c8f9d0..2501098 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -95,6 +95,7 @@ -define(WRITE_OPS, [binary, raw, read, write]). -define(PENDING_ROLL_WAIT, 30). -define(DELETE_TIMEOUT, 10000). +-define(PUT_TIMING_LOGPOINT, 10000). -record(state, {hashtree, last_position :: integer(), @@ -108,7 +109,8 @@ inker :: pid(), deferred_delete = false :: boolean(), waste_path :: string(), - sync_strategy = none}). + sync_strategy = none, + put_timing = {0, {0, 0}, {0, 0}} :: tuple()}). %%%============================================================================ @@ -256,12 +258,14 @@ writer({key_check, Key}, _From, State) -> writer, State}; writer({put_kv, Key, Value}, _From, State) -> + SW = os:timestamp(), Result = put(State#state.handle, Key, Value, {State#state.last_position, State#state.hashtree}, State#state.binary_mode, State#state.max_size), + T0 = timer:now_diff(os:timestamp(), SW), case Result of roll -> %% Key and value could not be written @@ -274,10 +278,13 @@ writer({put_kv, Key, Value}, _From, State) -> _ -> ok end, + T1 = timer:now_diff(os:timestamp(), SW) - T0, + Timings = update_put_timings(State#state.put_timing, T0, T1), {reply, ok, writer, State#state{handle=UpdHandle, last_position=NewPosition, last_key=Key, - hashtree=HashTree}} + hashtree=HashTree, + put_timing=Timings}} end; writer({mput_kv, []}, _From, State) -> {reply, ok, writer, State}; @@ -772,6 +779,14 @@ hashtable_calc(HashTree, StartPos) -> %% Internal functions %%%%%%%%%%%%%%%%%%%% +update_put_timings({?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, + T0, T1) -> + leveled_log:log("CDB17", + [?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]), + {1, {T0, T1}, {T0, T1}}; +update_put_timings({N, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> + {N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}. + determine_new_filename(Filename) -> filename:rootname(Filename, ".pnd") ++ ".cdb". diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 5344f53..b27f5b9 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -472,11 +472,11 @@ stringcheck_test() -> %% Test below proved that the overhead of performing hashes was trivial %% Maybe 5 microseconds per hash -hashperf_test() -> - OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 10000)), - SW = os:timestamp(), - _HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL), - io:format(user, "10000 object hashes in ~w microseconds~n", - [timer:now_diff(os:timestamp(), SW)]). +%hashperf_test() -> +% OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 10000)), +% SW = os:timestamp(), +% _HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL), +% io:format(user, "10000 object hashes in ~w microseconds~n", +% [timer:now_diff(os:timestamp(), SW)]). -endif. \ No newline at end of file diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 9a37cae..e8fd11e 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -126,6 +126,7 @@ -define(PENDING_FILEX, "pnd"). -define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). +-define(PUT_TIMING_LOGPOINT, 10000). -record(state, {manifest = [] :: list(), manifest_sqn = 0 :: integer(), @@ -138,7 +139,8 @@ clerk :: pid(), compaction_pending = false :: boolean(), is_snapshot = false :: boolean(), - source_inker :: pid()}). + source_inker :: pid(), + put_timing = {0, {0, 0}, {0, 0}} ::tuple()}). %%%============================================================================ @@ -414,15 +416,21 @@ start_from_file(InkOpts) -> put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, + SW= os:timestamp(), {JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey, NewSQN, Object, KeyChanges), + T0 = timer:now_diff(os:timestamp(), SW), case leveled_cdb:cdb_put(State#state.active_journaldb, JournalKey, JournalBin) of ok -> - {ok, State#state{journal_sqn=NewSQN}, byte_size(JournalBin)}; + T1 = timer:now_diff(os:timestamp(), SW) - T0, + UpdPutTimings = update_put_timings(State#state.put_timing, T0, T1), + {ok, + State#state{journal_sqn=NewSQN, put_timing=UpdPutTimings}, + byte_size(JournalBin)}; roll -> SW = os:timestamp(), CDBopts = State#state.cdb_options, @@ -742,6 +750,15 @@ initiate_penciller_snapshot(Bookie) -> MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. +update_put_timings({?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, + T0, T1) -> + leveled_log:log("I0019", + [?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]), + {1, {T0, T1}, {T0, T1}}; +update_put_timings({N, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> + {N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}. + + %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 5c36cf7..c4a50bf 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -40,6 +40,9 @@ {info, "Bucket list finds non-binary Bucket ~w"}}, {"B0011", {warn, "Call to destroy the store and so all files to be removed"}}, + {"B0012", + {info, "After ~w PUTs total inker time is ~w total ledger time is ~w " + ++ "and max inker time is ~w and max ledger time is ~w"}}, {"P0001", {info, "Ledger snapshot ~w registered"}}, @@ -176,6 +179,9 @@ {info, "At SQN=~w journal has filename ~s"}}, {"I0018", {warn, "We're doomed - intention recorded to destroy all files"}}, + {"I0019", + {info, "After ~w PUTs total prepare time is ~w total cdb time is ~w " + ++ "and max prepare time is ~w and max cdb time is ~w"}}, {"IC001", {info, "Closed for reason ~w so maybe leaving garbage"}}, @@ -271,7 +277,10 @@ {info, "Cycle count of ~w in hashtable search higher than expected" ++ " in search for hash ~w with result ~w"}}, {"CDB16", - {info, "CDB scan from start ~w in file with end ~w and last_key ~w"}} + {info, "CDB scan from start ~w in file with end ~w and last_key ~w"}}, + {"CDB17", + {info, "After ~w PUTs total write time is ~w total sync time is ~w " + ++ "and max write time is ~w and max sync time is ~w"}} ])).