From 58946a7f98533c2f3857b8b22ae9caec976ba26a Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 21 Nov 2017 17:00:23 +0000 Subject: [PATCH] Amend SST Timing Capture Use sampling mechansm from CDB timing capture. Do it less though - as far more SST fetches in comparison to CDB fetches. --- src/leveled_cdb.erl | 4 +- src/leveled_log.erl | 107 ++++++++---------- src/leveled_sst.erl | 200 +++++++++++++++++++++++++++++---- test/end_to_end/riak_SUITE.erl | 2 +- 4 files changed, 228 insertions(+), 85 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 6c71071..fef9ce1 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -97,7 +97,7 @@ -define(WRITE_OPS, [binary, raw, read, write]). -define(PENDING_ROLL_WAIT, 30). -define(DELETE_TIMEOUT, 10000). --define(TIMING_SAMPLECOUNTDOWN, 2000). +-define(TIMING_SAMPLECOUNTDOWN, 1000). -define(TIMING_SAMPLESIZE, 100). -record(state, {hashtree, @@ -1437,7 +1437,7 @@ update_statetimings(Timings, 0) -> Timings#cdb_timings.sample_cyclecount, Timings#cdb_timings.sample_fetchtime, Timings#cdb_timings.sample_indextime]), - {no_timing, leveled_rand:uniform(?TIMING_SAMPLECOUNTDOWN)}; + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; _SC -> {Timings, 0} end; diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 01bfd7f..8dc9511 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -12,13 +12,11 @@ log_randomtimer/4, put_timing/4, head_timing/4, - get_timing/3, - sst_timing/3]). + get_timing/3]). -define(PUT_LOGPOINT, 10000). -define(HEAD_LOGPOINT, 20000). -define(GET_LOGPOINT, 20000). --define(SST_LOGPOINT, 20000). -define(LOG_LEVEL, [info, warn, error, critical]). -define(SAMPLE_RATE, 16). @@ -61,6 +59,9 @@ {"B0014", {info, "Get timing for result ~w is sample ~w total ~w and max ~w"}}, + {"R0001", + {debug, "Object fold to process batch of ~w objects"}}, + {"P0001", {debug, "Ledger snapshot ~w registered"}}, {"P0003", @@ -191,7 +192,44 @@ {info, "Prompting deletions at ManifestSQN=~w"}}, {"PC022", {info, "Storing reference to deletions at ManifestSQN=~w"}}, - + {"PM002", + {info, "Completed dump of L0 cache to list of size ~w"}}, + + {"SST01", + {info, "SST timing for result ~w is sample ~w total ~w and max ~w"}}, + {"SST02", + {error, "False result returned from SST with filename ~s as " + ++ "slot ~w has failed crc check"}}, + {"SST03", + {info, "Opening SST file with filename ~s slots ~w and" + ++ " max sqn ~w"}}, + {"SST04", + {info, "Exit called for reason ~w on filename ~s"}}, + {"SST05", + {warn, "Rename rogue filename ~s to ~s"}}, + {"SST06", + {debug, "File ~s has been set for delete"}}, + {"SST07", + {info, "Exit called and now clearing ~s"}}, + {"SST08", + {info, "Completed creation of ~s at level ~w with max sqn ~w"}}, + {"SST09", + {warn, "Read request exposes slot with bad CRC"}}, + {"SST10", + {debug, "Expansion sought to support pointer to pid ~w status ~w"}}, + {"SST11", + {info, "Level zero creation timings in microseconds " + ++ "pmem_fetch=~w merge_lists=~w build_slots=~w " + ++ "build_summary=~w read_switch=~w"}}, + {"SST12", + {info, "SST Timings for sample_count=~w" + ++ " at timing points index_query_time=~w" + ++ " tiny_bloom_time=~w slot_index_time=~w slot_fetch_time=~w" + ++ " noncached_block_fetch_time=~w" + ++ " exiting at points tiny_bloom=~w slot_index=~w" + ++ " slot_fetch=~w noncached_block_fetch=~w"}}, + + {"I0001", {info, "Unexpected failure to fetch value for Key=~w SQN=~w " ++ "with reason ~w"}}, @@ -261,33 +299,7 @@ {"IC013", {warn, "File with name ~s to be ignored in manifest as scanning for " ++ "first key returned empty - maybe corrupted"}}, - - {"PM002", - {info, "Completed dump of L0 cache to list of size ~w"}}, - - {"SST01", - {info, "SST timing for result ~w is sample ~w total ~w and max ~w"}}, - {"SST02", - {error, "False result returned from SST with filename ~s as " - ++ "slot ~w has failed crc check"}}, - {"SST03", - {info, "Opening SST file with filename ~s slots ~w and" - ++ " max sqn ~w"}}, - {"SST04", - {info, "Exit called for reason ~w on filename ~s"}}, - {"SST05", - {warn, "Rename rogue filename ~s to ~s"}}, - {"SST06", - {debug, "File ~s has been set for delete"}}, - {"SST07", - {info, "Exit called and now clearing ~s"}}, - {"SST08", - {info, "Completed creation of ~s at level ~w with max sqn ~w"}}, - {"SST09", - {warn, "Read request exposes slot with bad CRC"}}, - {"SST10", - {debug, "Expansion sought to support pointer to pid ~w status ~w"}}, - + {"CDB01", {info, "Opening file for writing with filename ~s"}}, {"CDB02", @@ -330,10 +342,7 @@ {"CDB19", {info, "Sample timings in microseconds for sample_count=~w " ++ "with totals of cycle_count=~w " - ++ "fetch_time=~w index_time=~w"}}, - - {"R0001", - {debug, "Object fold to process batch of ~w objects"}} + ++ "fetch_time=~w index_time=~w"}} ]). @@ -478,34 +487,6 @@ head_key(found, Level) when Level > 2 -> head_keylist() -> [not_present, found_lower, found_0, found_1, found_2]. - -sst_timing(undefined, SW, TimerType) -> - T0 = timer:now_diff(os:timestamp(), SW), - gen_timing_int(undefined, - T0, - TimerType, - fun sst_keylist/0, - ?SST_LOGPOINT, - "SST01"); -sst_timing({N, SSTTimerD}, SW, TimerType) -> - case N band (?SAMPLE_RATE - 1) of - 0 -> - T0 = timer:now_diff(os:timestamp(), SW), - gen_timing_int({N, SSTTimerD}, - T0, - TimerType, - fun sst_keylist/0, - ?SST_LOGPOINT, - "SST01"); - _ -> - % Not to be sampled this time - {N + 1, SSTTimerD} - end. - -sst_keylist() -> - [tiny_bloom, slot_bloom, slot_fetch]. - - get_timing(undefined, SW, TimerType) -> T0 = timer:now_diff(os:timestamp(), SW), gen_timing_int(undefined, diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index e1cfc80..554edce 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -76,6 +76,8 @@ -define(DELETE_TIMEOUT, 10000). -define(TREE_TYPE, idxt). -define(TREE_SIZE, 4). +-define(TIMING_SAMPLECOUNTDOWN, 10000). +-define(TIMING_SAMPLESIZE, 100). -include_lib("eunit/include/eunit.hrl"). @@ -134,15 +136,29 @@ -record(state, {summary, handle :: file:fd() | undefined, - sst_timings :: tuple() | undefined, penciller :: pid() | undefined, root_path, filename, yield_blockquery = false :: boolean(), blockindex_cache, - compression_method = native :: press_methods()}). + compression_method = native :: press_methods(), + timings = no_timing :: sst_timings(), + timings_countdown = 0 :: integer()}). + +-record(sst_timings, + {sample_count = 0 :: integer(), + index_query_time = 0 :: integer(), + tiny_bloom_time = 0 :: integer(), + slot_index_time = 0 :: integer(), + slot_fetch_time = 0 :: integer(), + noncached_block_time = 0 :: integer(), + tiny_bloom_count = 0 :: integer(), + slot_index_count = 0 :: integer(), + slot_fetch_count = 0 :: integer(), + noncached_block_count = 0 :: integer()}). -type sst_state() :: #state{}. +-type sst_timings() :: no_timing|#sst_timings{}. %%%============================================================================ %%% API @@ -421,27 +437,43 @@ starting({sst_new, starting({sst_newlevelzero, RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN, PressMethod}, State) -> - SW = os:timestamp(), + SW0 = os:timestamp(), KVList = leveled_pmem:to_list(Slots, FetchFun), + Time0 = timer:now_diff(os:timestamp(), SW0), + + SW1 = os:timestamp(), {[], [], SlotList, FirstKey} = merge_lists(KVList, PressMethod), + Time1 = timer:now_diff(os:timestamp(), SW1), + + SW2 = os:timestamp(), {SlotCount, SlotIndex, BlockIndex, SlotsBin} = build_all_slots(SlotList, PressMethod), + Time2 = timer:now_diff(os:timestamp(), SW2), + + SW3 = os:timestamp(), SummaryBin = build_table_summary(SlotIndex, 0, FirstKey, SlotCount, MaxSQN), + Time3 = timer:now_diff(os:timestamp(), SW3), + + SW4 = os:timestamp(), ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod), UpdState = read_file(ActualFilename, State#state{root_path = RootPath, yield_blockquery = true}), Summary = UpdState#state.summary, + Time4 = timer:now_diff(os:timestamp(), SW4), + leveled_log:log_timer("SST08", [ActualFilename, 0, Summary#summary.max_sqn], - SW), + SW0), + leveled_log:log("SST11", [Time0, Time1, Time2, Time3, Time4]), + case Penciller of undefined -> {next_state, @@ -459,10 +491,15 @@ starting({sst_newlevelzero, RootPath, Filename, reader({get_kv, LedgerKey, Hash}, _From, State) -> - SW = os:timestamp(), - {Result, Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State), - UpdTimings = leveled_log:sst_timing(State#state.sst_timings, SW, Stage), - {reply, Result, reader, UpdState#state{sst_timings = UpdTimings}}; + % Get a KV value and potentially take sample timings + {Result, UpdState, UpdTimings} = + fetch(LedgerKey, Hash, State, State#state.timings), + + {UpdTimings0, CountDown} = + update_statetimings(UpdTimings, State#state.timings_countdown), + + {reply, Result, reader, UpdState#state{timings = UpdTimings0, + timings_countdown = CountDown}}; reader({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) -> {SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey, EndKey, @@ -497,8 +534,8 @@ reader(get_maxsequencenumber, _From, State) -> Summary = State#state.summary, {reply, Summary#summary.max_sqn, reader, State}; reader(print_timings, _From, State) -> - io:format(user, "~nTimings of ~w~n", [State#state.sst_timings]), - {reply, ok, reader, State#state{sst_timings = undefined}}; + log_timings(State#state.timings), + {reply, ok, reader, State}; reader({set_for_delete, Penciller}, _From, State) -> leveled_log:log("SST06", [State#state.filename]), {reply, @@ -521,7 +558,7 @@ reader(close, _From, State) -> delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> - {Result, _Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State), + {Result, UpdState, _Ts} = fetch(LedgerKey, Hash, State, no_timing), {reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT}; delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) -> @@ -591,16 +628,34 @@ code_change(_OldVsn, StateName, State, _Extra) -> %%% Internal Functions %%%============================================================================ -fetch(LedgerKey, Hash, State) -> +-spec fetch(tuple(), + {integer(), integer()}|integer(), + sst_state(), sst_timings()) + -> {not_present|tuple(), sst_state(), sst_timings()}. +%% @doc +%% +%% Fetch a key from the store, potentially taking timings. Result should be +%% not_present if the key is not in the store. +fetch(LedgerKey, Hash, State, Timings0) -> + SW0 = os:timestamp(), + Summary = State#state.summary, PressMethod = State#state.compression_method, Slot = lookup_slot(LedgerKey, Summary#summary.index), + + {SW1, Timings1} = update_timings(SW0, Timings0, index_query, true), + SlotID = Slot#slot_index_value.slot_id, Bloom = Slot#slot_index_value.bloom, case leveled_tinybloom:check_hash(Hash, Bloom) of false -> - {not_present, tiny_bloom, SlotID, State}; + {_SW2, Timings2} = + update_timings(SW1, Timings1, tiny_bloom, false), + {not_present, State, Timings2}; true -> + {SW2, Timings2} = + update_timings(SW1, Timings1, tiny_bloom, true), + CachedBlockIdx = array:get(SlotID - 1, State#state.blockindex_cache), case CachedBlockIdx of @@ -612,10 +667,11 @@ fetch(LedgerKey, Hash, State) -> <>, State#state.blockindex_cache), + {_SW3, Timings3} = + update_timings(SW2, Timings2, noncached_block, false), {Result, - slot_fetch, - Slot#slot_index_value.slot_id, - State#state{blockindex_cache = BlockIndexCache}}; + State#state{blockindex_cache = BlockIndexCache}, + Timings3}; <> -> PosList = find_pos(BlockIdx, extra_hash(Hash), @@ -623,8 +679,18 @@ fetch(LedgerKey, Hash, State) -> 0), case PosList of [] -> - {not_present, slot_bloom, SlotID, State}; + {_SW3, Timings3} = + update_timings(SW2, + Timings2, + slot_index, + false), + {not_present, State, Timings3}; _ -> + {SW3, Timings3} = + update_timings(SW2, + Timings2, + slot_index, + true), StartPos = Slot#slot_index_value.start_position, Result = check_blocks(PosList, @@ -634,7 +700,12 @@ fetch(LedgerKey, Hash, State) -> LedgerKey, PressMethod, not_present), - {Result, slot_fetch, SlotID, State} + {_SW4, Timings4} = + update_timings(SW3, + Timings3, + slot_fetch, + false), + {Result, State, Timings4} end end end. @@ -1722,6 +1793,97 @@ expand_list_by_pointer({next, ManEntry, StartKey, EndKey}, Width, SegList), ExpPointer ++ Tail. + + +%%%============================================================================ +%%% Timing Functions +%%%============================================================================ + +-spec update_statetimings(sst_timings(), integer()) + -> {sst_timings(), integer()}. +%% @doc +%% +%% The timings state is either in countdown to the next set of samples of +%% we are actively collecting a sample. Active collection take place +%% when the countdown is 0. Once the sample has reached the expected count +%% then there is a log of that sample, and the countdown is restarted. +%% +%% Outside of sample windows the timings object should be set to the atom +%% no_timing. no_timing is a valid state for the cdb_timings type. +update_statetimings(no_timing, 0) -> + {#sst_timings{}, 0}; +update_statetimings(Timings, 0) -> + case Timings#sst_timings.sample_count of + SC when SC >= ?TIMING_SAMPLESIZE -> + log_timings(Timings), + {no_timing, leveled_rand:uniform(2 * ?TIMING_SAMPLECOUNTDOWN)}; + _SC -> + {Timings, 0} + end; +update_statetimings(no_timing, N) -> + {no_timing, N - 1}. + +log_timings(no_timing) -> + ok; +log_timings(Timings) -> + leveled_log:log("SST12", [Timings#sst_timings.sample_count, + Timings#sst_timings.index_query_time, + Timings#sst_timings.tiny_bloom_time, + Timings#sst_timings.slot_index_time, + Timings#sst_timings.slot_fetch_time, + Timings#sst_timings.noncached_block_time, + Timings#sst_timings.tiny_bloom_count, + Timings#sst_timings.slot_index_count, + Timings#sst_timings.slot_fetch_count, + Timings#sst_timings.noncached_block_count]). + + +update_timings(_SW, no_timing, _Stage, _Continue) -> + {no_timing, no_timing}; +update_timings(SW, Timings, Stage, Continue) -> + Timer = timer:now_diff(os:timestamp(), SW), + Timings0 = + case Stage of + index_query -> + IQT = Timings#sst_timings.index_query_time, + Timings#sst_timings{index_query_time = IQT + Timer}; + tiny_bloom -> + TBT = Timings#sst_timings.tiny_bloom_time, + Timings#sst_timings{tiny_bloom_time = TBT + Timer}; + slot_index -> + SIT = Timings#sst_timings.slot_index_time, + Timings#sst_timings{slot_index_time = SIT + Timer}; + slot_fetch -> + SFT = Timings#sst_timings.slot_fetch_time, + Timings#sst_timings{slot_fetch_time = SFT + Timer}; + noncached_block -> + NCT = Timings#sst_timings.noncached_block_time, + Timings#sst_timings{noncached_block_time = NCT + Timer} + end, + case Continue of + true -> + {os:timestamp(), Timings0}; + false -> + Timings1 = + case Stage of + tiny_bloom -> + TBC = Timings#sst_timings.tiny_bloom_count, + Timings0#sst_timings{tiny_bloom_count = TBC + 1}; + slot_index -> + SIC = Timings#sst_timings.slot_index_count, + Timings0#sst_timings{slot_index_count = SIC + 1}; + slot_fetch -> + SFC = Timings#sst_timings.slot_fetch_count, + Timings0#sst_timings{slot_fetch_count = SFC + 1}; + noncached_block -> + NCC = Timings#sst_timings.noncached_block_count, + Timings0#sst_timings{noncached_block_count = NCC + 1} + end, + SC = Timings1#sst_timings.sample_count, + {no_timing, Timings1#sst_timings{sample_count = SC + 1}} + end. + + %%%============================================================================ %%% Test %%%============================================================================ @@ -1962,7 +2124,7 @@ indexed_list_mixedkeys_bitflip_test() -> flip_byte(Binary) -> L = byte_size(Binary), - Byte1 = leveled_rand:uniform(L), + Byte1 = leveled_rand:uniform(L) - 1, <> = Binary, case A of 0 -> diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 60c2609..1539ccf 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -281,7 +281,7 @@ handoff(_Config) -> % Start the first database, load a test object, close it, start it again StartOpts1 = [{root_path, RootPathA}, {max_pencillercachesize, 16000}, - {sync_strategy, riak_sync}], + {sync_strategy, sync}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), % Add some noe Riak objects in - which should be ignored in folds.