Take sample timings from CDB files

Periodically get a CDB file process to take samples of how long fetching keys/values takes - and record  those samples
This commit is contained in:
Martin Sumner 2017-11-20 14:58:43 +00:00
parent 9892d26e60
commit 8a43539090
2 changed files with 181 additions and 59 deletions

View file

@ -97,6 +97,8 @@
-define(WRITE_OPS, [binary, raw, read, write]).
-define(PENDING_ROLL_WAIT, 30).
-define(DELETE_TIMEOUT, 10000).
-define(TIMING_SAMPLECOUNTDOWN, 2000).
-define(TIMING_SAMPLESIZE, 100).
-record(state, {hashtree,
last_position :: integer() | undefined,
@ -110,9 +112,18 @@
inker :: pid() | undefined,
deferred_delete = false :: boolean(),
waste_path :: string() | undefined,
sync_strategy = none}).
sync_strategy = none,
timings = no_timing :: cdb_timings(),
timings_countdown = 0 :: integer()}).
-record(cdb_timings, {sample_count = 0 :: integer(),
sample_cyclecount = 0 :: integer(),
sample_indextime = 0 :: integer(),
sample_fetchtime = 0 :: integer(),
fetchloop_starttime :: erlang:timestamp()}).
-type cdb_options() :: #cdb_options{}.
-type cdb_timings() :: no_timing|#cdb_timings{}.
@ -523,22 +534,27 @@ rolling({delete_pending, ManSQN, Inker}, State) ->
State#state{delete_point=ManSQN, inker=Inker, deferred_delete=true}}.
reader({get_kv, Key}, _From, State) ->
{reply,
{UpdTimings, Result} =
get_withcache(State#state.handle,
Key,
State#state.hash_index,
State#state.binary_mode),
reader,
State};
reader({key_check, Key}, _From, State) ->
State#state.binary_mode,
State#state.timings),
{UpdTimings0, CountDown} =
update_statetimings(UpdTimings, State#state.timings_countdown),
{reply,
Result,
reader,
State#state{timings = UpdTimings0, timings_countdown = CountDown}};
reader({key_check, Key}, _From, State) ->
{no_timing, Result} =
get_withcache(State#state.handle,
Key,
State#state.hash_index,
loose_presence,
State#state.binary_mode),
reader,
State};
State#state.binary_mode,
State#state.timings),
{reply, Result, reader, State};
reader({get_positions, SampleSize, Index, Acc}, _From, State) ->
{Pos, Count} = element(Index + 1, State#state.hash_index),
UpdAcc = scan_index_returnpositions(State#state.handle, Pos, Count, Acc),
@ -597,24 +613,28 @@ reader({delete_pending, ManSQN, Inker}, State) ->
delete_pending({get_kv, Key}, _From, State) ->
{reply,
{UpdTimings, Result} =
get_withcache(State#state.handle,
Key,
State#state.hash_index,
State#state.binary_mode),
State#state.binary_mode,
State#state.timings),
{UpdTimings0, CountDown} =
update_statetimings(UpdTimings, State#state.timings_countdown),
{reply,
Result,
delete_pending,
State,
State#state{timings = UpdTimings0, timings_countdown = CountDown},
?DELETE_TIMEOUT};
delete_pending({key_check, Key}, _From, State) ->
{reply,
{no_timing, Result} =
get_withcache(State#state.handle,
Key,
State#state.hash_index,
loose_presence,
State#state.binary_mode),
delete_pending,
State,
?DELETE_TIMEOUT}.
State#state.binary_mode,
no_timing),
{reply, Result, delete_pending, State, ?DELETE_TIMEOUT}.
delete_pending(timeout, State=#state{delete_point=ManSQN}) when ManSQN > 0 ->
case is_process_alive(State#state.inker) of
@ -835,40 +855,60 @@ mput(Handle, KVList, {LastPosition, HashTree0}, BinaryMode, MaxSize) ->
end.
-spec get_withcache(file:io_device(), any(), tuple(), boolean()) -> tuple().
-spec get_withcache(file:io_device(),
any(),
tuple(),
boolean(),
cdb_timings())
-> {cdb_timings(), missing|probably|tuple()}.
%% @doc
%%
%% Using a cache of the Index array - get a K/V pair from the file using the
%% Key
get_withcache(Handle, Key, Cache, BinaryMode) ->
get(Handle, Key, Cache, true, BinaryMode).
%% Key. should return an updated timings object (if timings are being taken)
%% along with the result (which may be missing if the no matching entry is
%% found, or probably in QuickCheck scenarios)
get_withcache(Handle, Key, Cache, BinaryMode, Timings) ->
get(Handle, Key, Cache, true, BinaryMode, Timings).
get_withcache(Handle, Key, Cache, QuickCheck, BinaryMode) ->
get(Handle, Key, Cache, QuickCheck, BinaryMode).
get_withcache(Handle, Key, Cache, QuickCheck, BinaryMode, Timings) ->
get(Handle, Key, Cache, QuickCheck, BinaryMode, Timings).
get(FileNameOrHandle, Key, BinaryMode) ->
get(FileNameOrHandle, Key, no_cache, true, BinaryMode).
{no_timing, R} =
get(FileNameOrHandle, Key, no_cache, true, BinaryMode, no_timing),
R.
-spec get(list()|file:io_device(),
any(), no_cache|tuple(),
loose_presence|any(), boolean())
-> tuple()|probably|missing.
loose_presence|any(),
boolean(),
cdb_timings())
-> {cdb_timings(), tuple()|probably|missing}.
%% @doc
%%
%% Get a K/V pair from the file using the Key. QuickCheck can be set to
%% loose_presence if all is required is a loose check of presence (that the
%% Key is probably present as there is a hash in the hash table which matches
%% that Key)
get(FileName, Key, Cache, QuickCheck, BinaryMode) when is_list(FileName) ->
%%
%% Timings also passed in and can be updated based on results
get(FileName, Key, Cache, QuickCheck, BinaryMode, Timings)
when is_list(FileName) ->
{ok, Handle} = file:open(FileName,[binary, raw, read]),
get(Handle, Key, Cache, QuickCheck, BinaryMode);
get(Handle, Key, Cache, QuickCheck, BinaryMode) when is_tuple(Handle) ->
get(Handle, Key, Cache, QuickCheck, BinaryMode, Timings);
get(Handle, Key, Cache, QuickCheck, BinaryMode, Timings)
when is_tuple(Handle) ->
SW = os:timestamp(),
Hash = hash(Key),
Index = hash_to_index(Hash),
{HashTable, Count} = get_index(Handle, Index, Cache),
% If the count is 0 for that index - key must be missing
case Count of
0 ->
missing;
{no_timing, missing};
_ ->
% Get starting slot in hashtable
{ok, FirstHashPosition} = file:position(Handle, {bof, HashTable}),
@ -880,12 +920,14 @@ get(Handle, Key, Cache, QuickCheck, BinaryMode) when is_tuple(Handle) ->
?DWORD_SIZE),
% Split list around starting slot.
{L1, L2} = lists:split(Slot, LocList),
UpdTimings = update_indextimings(Timings, SW),
search_hash_table(Handle,
lists:append(L2, L1),
Hash,
Key,
QuickCheck,
BinaryMode)
BinaryMode,
UpdTimings)
end.
get_index(Handle, Index, no_cache) ->
@ -1261,6 +1303,18 @@ read_integerpairs(<<Int1:32, Int2:32, Rest/binary>>, Pairs) ->
Pairs ++ [{endian_flip(Int1),
endian_flip(Int2)}]).
search_hash_table(Handle, Entries, Hash, Key,
QuickCheck, BinaryMode, Timings) ->
search_hash_table(Handle, Entries, Hash, Key,
QuickCheck, BinaryMode, Timings, 0).
-spec search_hash_table(file:io_device(), list(), integer(), any(),
loose_presence|boolean(), boolean(),
cdb_timings(), integer()) ->
{cdb_timings(), missing|probably|tuple()}.
%% @doc
%%
%% Seach the hash table for the matching hash and key. Be prepared for
%% multiple keys to have the same hash value.
%%
@ -1268,21 +1322,19 @@ read_integerpairs(<<Int1:32, Int2:32, Rest/binary>>, Pairs) ->
%% true - check the CRC before returning key & value
%% false - don't check the CRC before returning key & value
%% loose_presence - confirm that the hash of the key is present
search_hash_table(Handle, Entries, Hash, Key, QuickCheck, BinaryMode) ->
search_hash_table(Handle, Entries, Hash, Key, QuickCheck, BinaryMode, 0).
search_hash_table(_Handle, [], Hash, _Key,
_QuickCheck, _BinaryMode, CycleCount) ->
_QuickCheck, _BinaryMode, _Timings, CycleCount) ->
log_cyclecount(CycleCount, Hash, missing),
missing;
{no_timing, missing};
search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
QuickCheck, BinaryMode, CycleCount) ->
QuickCheck, BinaryMode, Timings, CycleCount) ->
{ok, _} = file:position(Handle, Entry),
{StoredHash, DataLoc} = read_next_2_integers(Handle),
case StoredHash of
Hash ->
KV = case QuickCheck of
KV =
case QuickCheck of
loose_presence ->
probably;
_ ->
@ -1296,17 +1348,18 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key,
Key,
QuickCheck,
BinaryMode,
Timings,
CycleCount + 1);
_ ->
UpdTimings = update_fetchtimings(Timings, CycleCount),
log_cyclecount(CycleCount, Hash, found),
KV
{UpdTimings, KV}
end;
%0 ->
% % Hash is 0 so key must be missing as 0 found before Hash matched
% missing;
_ ->
search_hash_table(Handle, RestOfEntries, Hash, Key,
QuickCheck, BinaryMode, CycleCount + 1)
QuickCheck, BinaryMode,
Timings,
CycleCount + 1)
end.
log_cyclecount(CycleCount, Hash, Result) ->
@ -1317,6 +1370,69 @@ log_cyclecount(CycleCount, Hash, Result) ->
ok
end.
-spec update_fetchtimings(no_timing|cdb_timings(), integer()) ->
no_timing|cdb_timings().
%% @doc
%%
%% Update the timings record if sample timings currently being taken
%% (otherwise the timngs record will be set to no_timing)
update_fetchtimings(no_timing, _CycleCount) ->
no_timing;
update_fetchtimings(Timings, CycleCount) ->
FetchTime =
timer:now_diff(os:timestamp(),
Timings#cdb_timings.fetchloop_starttime),
Timings#cdb_timings{sample_fetchtime =
Timings#cdb_timings.sample_fetchtime + FetchTime,
sample_cyclecount =
Timings#cdb_timings.sample_cyclecount + CycleCount,
sample_count =
Timings#cdb_timings.sample_count + 1}.
-spec update_indextimings(no_timing|cdb_timings(), erlang:timestamp()) ->
no_timing|cdb_timings().
%% @doc
%%
%% Update the timings record with the time spent looking up the position
%% list to check from the index
update_indextimings(no_timing, _SW) ->
no_timing;
update_indextimings(Timings, SW) ->
IdxTime = timer:now_diff(os:timestamp(), SW),
Timings#cdb_timings{sample_indextime =
Timings#cdb_timings.sample_indextime
+ IdxTime,
fetchloop_starttime =
os:timestamp()}.
-spec update_statetimings(cdb_timings(), integer())
-> {cdb_timings(), integer()}.
%% @doc
%%
%% The timings state is either in countdown to the next set of samples of
%% we are actively collecting a sample. Active collection take place
%% when the countdown is 0. Once the sample has reached the expected count
%% then there is a log of that sample, and the countdown is restarted.
%%
%% Outside of sample windows the timings object should be set to the atom
%% no_timing. no_timing is a valid state for the cdb_timings type.
update_statetimings(no_timing, 0) ->
{#cdb_timings{}, 0};
update_statetimings(Timings, 0) ->
case Timings#cdb_timings.sample_count of
SC when SC >= ?TIMING_SAMPLESIZE ->
leveled_log:log("CDBnn", [Timings#cdb_timings.sample_count,
Timings#cdb_timings.sample_cyclecount,
Timings#cdb_timings.sample_fetchtime,
Timings#cdb_timings.sample_indextime]),
{no_timing, leveled_rand:uniform(?TIMING_SAMPLECOUNTDOWN)};
_SC ->
{Timings, 0}
end;
update_statetimings(no_timing, N) ->
{no_timing, N - 1}.
% Write Key and Value tuples into the CDB. Each tuple consists of a
% 4 byte key length, a 4 byte value length, the actual key followed
% by the value.
@ -1849,10 +1965,12 @@ search_hash_table_findinslot_test() ->
io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]),
?assertMatch(0, ReadH4),
?assertMatch({"key1", "value1"}, get(Handle, Key1, false)),
?assertMatch(probably, get(Handle, Key1,
no_cache, loose_presence, false)),
?assertMatch(missing, get(Handle, "Key99",
no_cache, loose_presence, false)),
?assertMatch({no_timing, probably},
get(Handle, Key1,
no_cache, loose_presence, false, no_timing)),
?assertMatch({no_timing, missing},
get(Handle, "Key99",
no_cache, loose_presence, false, no_timing)),
{ok, _} = file:position(Handle, FirstHashPosition),
FlipH3 = endian_flip(ReadH3),
FlipP3 = endian_flip(ReadP3),

View file

@ -327,7 +327,11 @@
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
++ "and max write time is ~w and max sync time is ~w"}},
{"CDB18",
{info, "Handled return and write of hashtable"}}
{info, "Handled return and write of hashtable"}},
{"CDB19",
{info, "Sample timings in microseconds for sample_count=~w"
++ "totals of cycle_count=~w "
++ "fetch_time=~w index_time=~w "}}
]).