commit
44cf5788ab
7 changed files with 335 additions and 39 deletions
|
@ -150,6 +150,7 @@
|
|||
-define(CHECKJOURNAL_PROB, 0.2).
|
||||
-define(CACHE_SIZE_JITTER, 25).
|
||||
-define(JOURNAL_SIZE_JITTER, 20).
|
||||
-define(LONG_RUNNING, 80000).
|
||||
|
||||
-record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(),
|
||||
min_sqn = infinity :: integer()|infinity,
|
||||
|
@ -160,7 +161,9 @@
|
|||
cache_size :: integer(),
|
||||
ledger_cache = #ledger_cache{},
|
||||
is_snapshot :: boolean(),
|
||||
slow_offer = false :: boolean()}).
|
||||
slow_offer = false :: boolean(),
|
||||
put_timing :: tuple(),
|
||||
get_timing :: tuple()}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -262,10 +265,12 @@ init([Opts]) ->
|
|||
|
||||
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||
SW = os:timestamp(),
|
||||
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
|
||||
LedgerKey,
|
||||
Object,
|
||||
{IndexSpecs, TTL}),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
Changes = preparefor_ledgercache(no_type_assigned,
|
||||
LedgerKey,
|
||||
SQN,
|
||||
|
@ -273,6 +278,8 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
|||
ObjSize,
|
||||
{IndexSpecs, TTL}),
|
||||
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
||||
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
||||
PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1),
|
||||
% If the previous push to memory was returned then punish this PUT with a
|
||||
% delay. If the back-pressure in the Penciller continues, these delays
|
||||
% will beocme more frequent
|
||||
|
@ -282,36 +289,50 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
|
|||
false ->
|
||||
gen_server:reply(From, ok)
|
||||
end,
|
||||
maybe_longrunning(SW, overall_put),
|
||||
case maybepush_ledgercache(State#state.cache_size,
|
||||
Cache0,
|
||||
State#state.penciller) of
|
||||
{ok, NewCache} ->
|
||||
{noreply, State#state{ledger_cache=NewCache, slow_offer=false}};
|
||||
{noreply, State#state{ledger_cache=NewCache,
|
||||
put_timing=PutTimes,
|
||||
slow_offer=false}};
|
||||
{returned, NewCache} ->
|
||||
{noreply, State#state{ledger_cache=NewCache, slow_offer=true}}
|
||||
{noreply, State#state{ledger_cache=NewCache,
|
||||
put_timing=PutTimes,
|
||||
slow_offer=true}}
|
||||
end;
|
||||
handle_call({get, Bucket, Key, Tag}, _From, State) ->
|
||||
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||
SWh = os:timestamp(),
|
||||
case fetch_head(LedgerKey,
|
||||
State#state.penciller,
|
||||
State#state.ledger_cache) of
|
||||
not_present ->
|
||||
{reply, not_found, State};
|
||||
GT0 = leveled_log:get_timing(State#state.get_timing,
|
||||
SWh,
|
||||
head_not_present),
|
||||
{reply, not_found, State#state{get_timing=GT0}};
|
||||
Head ->
|
||||
GT0 = leveled_log:get_timing(State#state.get_timing,
|
||||
SWh,
|
||||
head_found),
|
||||
SWg = os:timestamp(),
|
||||
{Seqn, Status, _MH, _MD} = leveled_codec:striphead_to_details(Head),
|
||||
case Status of
|
||||
tomb ->
|
||||
{reply, not_found, State};
|
||||
{active, TS} ->
|
||||
Active = TS >= leveled_codec:integer_now(),
|
||||
case {Active,
|
||||
fetch_value(LedgerKey, Seqn, State#state.inker)} of
|
||||
Object = fetch_value(LedgerKey, Seqn, State#state.inker),
|
||||
GT1 = leveled_log:get_timing(GT0, SWg, fetch),
|
||||
case {Active, Object} of
|
||||
{_, not_present} ->
|
||||
{reply, not_found, State};
|
||||
{reply, not_found, State#state{get_timing=GT1}};
|
||||
{true, Object} ->
|
||||
{reply, {ok, Object}, State};
|
||||
{reply, {ok, Object}, State#state{get_timing=GT1}};
|
||||
_ ->
|
||||
{reply, not_found, State}
|
||||
{reply, not_found, State#state{get_timing=GT1}}
|
||||
end
|
||||
end
|
||||
end;
|
||||
|
@ -454,6 +475,16 @@ push_ledgercache(Penciller, Cache) ->
|
|||
%%% Internal functions
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
|
||||
maybe_longrunning(SW, Aspect) ->
|
||||
case timer:now_diff(os:timestamp(), SW) of
|
||||
N when N > ?LONG_RUNNING ->
|
||||
leveled_log:log("B0013", [N, Aspect]);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
cache_size(LedgerCache) ->
|
||||
leveled_skiplist:size(LedgerCache#ledger_cache.skiplist).
|
||||
|
||||
|
@ -728,6 +759,7 @@ startup(InkerOpts, PencillerOpts) ->
|
|||
|
||||
|
||||
fetch_head(Key, Penciller, LedgerCache) ->
|
||||
SW = os:timestamp(),
|
||||
Hash = leveled_codec:magic_hash(Key),
|
||||
if
|
||||
Hash /= no_lookup ->
|
||||
|
@ -736,20 +768,25 @@ fetch_head(Key, Penciller, LedgerCache) ->
|
|||
LedgerCache#ledger_cache.skiplist),
|
||||
case L0R of
|
||||
{value, Head} ->
|
||||
maybe_longrunning(SW, local_head),
|
||||
Head;
|
||||
none ->
|
||||
case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of
|
||||
{Key, Head} ->
|
||||
maybe_longrunning(SW, pcl_head),
|
||||
Head;
|
||||
not_present ->
|
||||
maybe_longrunning(SW, pcl_head),
|
||||
not_present
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
fetch_value(Key, SQN, Inker) ->
|
||||
SW = os:timestamp(),
|
||||
case leveled_inker:ink_fetch(Inker, Key, SQN) of
|
||||
{ok, Value} ->
|
||||
maybe_longrunning(SW, inker_fetch),
|
||||
Value;
|
||||
not_present ->
|
||||
not_present
|
||||
|
|
|
@ -108,7 +108,8 @@
|
|||
inker :: pid(),
|
||||
deferred_delete = false :: boolean(),
|
||||
waste_path :: string(),
|
||||
sync_strategy = none}).
|
||||
sync_strategy = none,
|
||||
put_timing :: tuple()}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -256,12 +257,14 @@ writer({key_check, Key}, _From, State) ->
|
|||
writer,
|
||||
State};
|
||||
writer({put_kv, Key, Value}, _From, State) ->
|
||||
SW = os:timestamp(),
|
||||
Result = put(State#state.handle,
|
||||
Key,
|
||||
Value,
|
||||
{State#state.last_position, State#state.hashtree},
|
||||
State#state.binary_mode,
|
||||
State#state.max_size),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
case Result of
|
||||
roll ->
|
||||
%% Key and value could not be written
|
||||
|
@ -274,10 +277,15 @@ writer({put_kv, Key, Value}, _From, State) ->
|
|||
_ ->
|
||||
ok
|
||||
end,
|
||||
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
||||
Timings = leveled_log:put_timing(journal,
|
||||
State#state.put_timing,
|
||||
T0, T1),
|
||||
{reply, ok, writer, State#state{handle=UpdHandle,
|
||||
last_position=NewPosition,
|
||||
last_key=Key,
|
||||
hashtree=HashTree}}
|
||||
hashtree=HashTree,
|
||||
put_timing=Timings}}
|
||||
end;
|
||||
writer({mput_kv, []}, _From, State) ->
|
||||
{reply, ok, writer, State};
|
||||
|
|
|
@ -469,4 +469,14 @@ stringcheck_test() ->
|
|||
?assertMatch("Bucket", turn_to_string(<<"Bucket">>)),
|
||||
?assertMatch("bucket", turn_to_string(bucket)).
|
||||
|
||||
%% Test below proved that the overhead of performing hashes was trivial
|
||||
%% Maybe 5 microseconds per hash
|
||||
|
||||
%hashperf_test() ->
|
||||
% OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 10000)),
|
||||
% SW = os:timestamp(),
|
||||
% _HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL),
|
||||
% io:format(user, "10000 object hashes in ~w microseconds~n",
|
||||
% [timer:now_diff(os:timestamp(), SW)]).
|
||||
|
||||
-endif.
|
|
@ -138,7 +138,8 @@
|
|||
clerk :: pid(),
|
||||
compaction_pending = false :: boolean(),
|
||||
is_snapshot = false :: boolean(),
|
||||
source_inker :: pid()}).
|
||||
source_inker :: pid(),
|
||||
put_timing :: tuple()}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -414,17 +415,25 @@ start_from_file(InkOpts) ->
|
|||
|
||||
put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||
NewSQN = State#state.journal_sqn + 1,
|
||||
SW= os:timestamp(),
|
||||
{JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey,
|
||||
NewSQN,
|
||||
Object,
|
||||
KeyChanges),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
case leveled_cdb:cdb_put(State#state.active_journaldb,
|
||||
JournalKey,
|
||||
JournalBin) of
|
||||
ok ->
|
||||
{ok, State#state{journal_sqn=NewSQN}, byte_size(JournalBin)};
|
||||
T1 = timer:now_diff(os:timestamp(), SW) - T0,
|
||||
UpdPutTimes = leveled_log:put_timing(inker,
|
||||
State#state.put_timing,
|
||||
T0, T1),
|
||||
{ok,
|
||||
State#state{journal_sqn=NewSQN, put_timing=UpdPutTimes},
|
||||
byte_size(JournalBin)};
|
||||
roll ->
|
||||
SW = os:timestamp(),
|
||||
SWroll = os:timestamp(),
|
||||
CDBopts = State#state.cdb_options,
|
||||
ManEntry = start_new_activejournal(NewSQN,
|
||||
State#state.root_path,
|
||||
|
@ -437,7 +446,7 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
|||
ok = leveled_cdb:cdb_put(NewJournalP,
|
||||
JournalKey,
|
||||
JournalBin),
|
||||
leveled_log:log_timer("I0008", [], SW),
|
||||
leveled_log:log_timer("I0008", [], SWroll),
|
||||
{rolling,
|
||||
State#state{journal_sqn=NewSQN,
|
||||
manifest=NewManifest,
|
||||
|
@ -742,6 +751,7 @@ initiate_penciller_snapshot(Bookie) ->
|
|||
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
|
||||
{LedgerSnap, MaxSQN}.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
%%%============================================================================
|
||||
|
|
|
@ -8,9 +8,17 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-export([log/2,
|
||||
log_timer/3]).
|
||||
log_timer/3,
|
||||
put_timing/4,
|
||||
head_timing/4,
|
||||
get_timing/3]).
|
||||
|
||||
-define(PUT_TIMING_LOGPOINT, 20000).
|
||||
-define(HEAD_TIMING_LOGPOINT, 160000).
|
||||
-define(GET_TIMING_LOGPOINT, 160000).
|
||||
-define(LOG_LEVEL, [info, warn, error, critical]).
|
||||
-define(SAMPLE_RATE, 16#F).
|
||||
|
||||
-define(LOGBASE, dict:from_list([
|
||||
|
||||
{"G0001",
|
||||
|
@ -40,6 +48,13 @@
|
|||
{info, "Bucket list finds non-binary Bucket ~w"}},
|
||||
{"B0011",
|
||||
{warn, "Call to destroy the store and so all files to be removed"}},
|
||||
{"B0012",
|
||||
{info, "After ~w PUTs total inker time is ~w total ledger time is ~w "
|
||||
++ "and max inker time is ~w and max ledger time is ~w"}},
|
||||
{"B0013",
|
||||
{warn, "Long running task took ~w microseconds with task of type ~w"}},
|
||||
{"B0014",
|
||||
{info, "Get timing for result ~w is sample ~w total ~w and max ~w"}},
|
||||
|
||||
{"P0001",
|
||||
{info, "Ledger snapshot ~w registered"}},
|
||||
|
@ -99,13 +114,15 @@
|
|||
{"P0027",
|
||||
{info, "Rename of manifest from ~s ~w to ~s ~w"}},
|
||||
{"P0028",
|
||||
{info, "Adding cleared file ~s to deletion list"}},
|
||||
{debug, "Adding cleared file ~s to deletion list"}},
|
||||
{"P0029",
|
||||
{info, "L0 completion confirmed and will transition to not pending"}},
|
||||
{"P0030",
|
||||
{warn, "We're doomed - intention recorded to destroy all files"}},
|
||||
{"P0031",
|
||||
{info, "Completion of update to levelzero"}},
|
||||
{"P0032",
|
||||
{info, "Head timing for result ~w is sample ~w total ~w and max ~w"}},
|
||||
|
||||
{"PC001",
|
||||
{info, "Penciller's clerk ~w started with owner ~w"}},
|
||||
|
@ -137,6 +154,8 @@
|
|||
{info, "Empty file ~s to be cleared"}},
|
||||
{"PC015",
|
||||
{info, "File created"}},
|
||||
{"PC016",
|
||||
{info, "Slow fetch from SFT ~w of ~w microseconds with result ~w"}},
|
||||
|
||||
{"I0001",
|
||||
{info, "Unexpected failure to fetch value for Key=~w SQN=~w "
|
||||
|
@ -176,6 +195,9 @@
|
|||
{info, "At SQN=~w journal has filename ~s"}},
|
||||
{"I0018",
|
||||
{warn, "We're doomed - intention recorded to destroy all files"}},
|
||||
{"I0019",
|
||||
{info, "After ~w PUTs total prepare time is ~w total cdb time is ~w "
|
||||
++ "and max prepare time is ~w and max cdb time is ~w"}},
|
||||
|
||||
{"IC001",
|
||||
{info, "Closed for reason ~w so maybe leaving garbage"}},
|
||||
|
@ -216,7 +238,7 @@
|
|||
{"SFT03",
|
||||
{info, "File creation of L0 file ~s"}},
|
||||
{"SFT04",
|
||||
{info, "File ~s prompting for delete status check"}},
|
||||
{debug, "File ~s prompting for delete status check"}},
|
||||
{"SFT05",
|
||||
{info, "Exit called for reason ~w on filename ~s"}},
|
||||
{"SFT06",
|
||||
|
@ -235,7 +257,8 @@
|
|||
{error, "Segment filter failed due to CRC check ~w did not match ~w"}},
|
||||
{"SFT13",
|
||||
{error, "Segment filter failed due to ~s"}},
|
||||
|
||||
{"SFT14",
|
||||
{debug, "Range fetch from SFT PID ~w"}},
|
||||
|
||||
{"CDB01",
|
||||
{info, "Opening file for writing with filename ~s"}},
|
||||
|
@ -271,7 +294,10 @@
|
|||
{info, "Cycle count of ~w in hashtable search higher than expected"
|
||||
++ " in search for hash ~w with result ~w"}},
|
||||
{"CDB16",
|
||||
{info, "CDB scan from start ~w in file with end ~w and last_key ~w"}}
|
||||
{info, "CDB scan from start ~w in file with end ~w and last_key ~w"}},
|
||||
{"CDB17",
|
||||
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
|
||||
++ "and max write time is ~w and max sync time is ~w"}}
|
||||
])).
|
||||
|
||||
|
||||
|
@ -304,9 +330,140 @@ log_timer(LogReference, Subs, StartTime) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
%% Make a log of put timings split out by actor - one log for every
|
||||
%% PUT_TIMING_LOGPOINT puts
|
||||
|
||||
put_timing(_Actor, undefined, T0, T1) ->
|
||||
{1, {T0, T1}, {T0, T1}};
|
||||
put_timing(Actor, {?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}},
|
||||
T0, T1) ->
|
||||
RN = random:uniform(?HEAD_TIMING_LOGPOINT),
|
||||
case RN > ?HEAD_TIMING_LOGPOINT div 2 of
|
||||
true ->
|
||||
% log at the timing point less than half the time
|
||||
LogRef =
|
||||
case Actor of
|
||||
bookie -> "B0012";
|
||||
inker -> "I0019";
|
||||
journal -> "CDB17"
|
||||
end,
|
||||
log(LogRef, [?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]),
|
||||
put_timing(Actor, undefined, T0, T1);
|
||||
false ->
|
||||
% Log some other random time
|
||||
put_timing(Actor, {RN, {Total0, Total1}, {Max0, Max1}}, T0, T1)
|
||||
end;
|
||||
put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) ->
|
||||
{N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}.
|
||||
|
||||
%% Make a log of penciller head timings split out by level and result - one
|
||||
%% log for every HEAD_TIMING_LOGPOINT puts
|
||||
%% Returns a tuple of {Count, TimingDict} to be stored on the process state
|
||||
head_timing(undefined, SW, Level, R) ->
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
head_timing_int(undefined, T0, Level, R);
|
||||
head_timing({N, HeadTimingD}, SW, Level, R) ->
|
||||
case N band ?SAMPLE_RATE of
|
||||
0 ->
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
head_timing_int({N, HeadTimingD}, T0, Level, R);
|
||||
_ ->
|
||||
% Not to be sampled this time
|
||||
{N + 1, HeadTimingD}
|
||||
end.
|
||||
|
||||
head_timing_int(undefined, T0, Level, R) ->
|
||||
Key = head_key(R, Level),
|
||||
NewDFun = fun(K, Acc) ->
|
||||
case K of
|
||||
Key ->
|
||||
dict:store(K, [1, T0, T0], Acc);
|
||||
_ ->
|
||||
dict:store(K, [0, 0, 0], Acc)
|
||||
end end,
|
||||
{1, lists:foldl(NewDFun, dict:new(), head_keylist())};
|
||||
head_timing_int({?HEAD_TIMING_LOGPOINT, HeadTimingD}, T0, Level, R) ->
|
||||
RN = random:uniform(?HEAD_TIMING_LOGPOINT),
|
||||
case RN > ?HEAD_TIMING_LOGPOINT div 2 of
|
||||
true ->
|
||||
% log at the timing point less than half the time
|
||||
LogFun = fun(K) -> log("P0032", [K|dict:fetch(K, HeadTimingD)]) end,
|
||||
lists:foreach(LogFun, head_keylist()),
|
||||
head_timing_int(undefined, T0, Level, R);
|
||||
false ->
|
||||
% Log some other time - reset to RN not 0 to stagger logs out over
|
||||
% time between the vnodes
|
||||
head_timing_int({RN, HeadTimingD}, T0, Level, R)
|
||||
end;
|
||||
head_timing_int({N, HeadTimingD}, T0, Level, R) ->
|
||||
Key = head_key(R, Level),
|
||||
[Count0, Total0, Max0] = dict:fetch(Key, HeadTimingD),
|
||||
{N + 1,
|
||||
dict:store(Key, [Count0 + 1, Total0 + T0, max(Max0, T0)],
|
||||
HeadTimingD)}.
|
||||
|
||||
head_key(not_present, _Level) ->
|
||||
not_present;
|
||||
head_key(found, 0) ->
|
||||
found_0;
|
||||
head_key(found, 1) ->
|
||||
found_1;
|
||||
head_key(found, 2) ->
|
||||
found_2;
|
||||
head_key(found, Level) when Level > 2 ->
|
||||
found_lower.
|
||||
|
||||
head_keylist() ->
|
||||
[not_present, found_lower, found_0, found_1, found_2].
|
||||
|
||||
|
||||
|
||||
get_timing(undefined, SW, TimerType) ->
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
get_timing_int(undefined, T0, TimerType);
|
||||
get_timing({N, GetTimerD}, SW, TimerType) ->
|
||||
case N band ?SAMPLE_RATE of
|
||||
0 ->
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
get_timing_int({N, GetTimerD}, T0, TimerType);
|
||||
_ ->
|
||||
% Not to be sampled this time
|
||||
{N + 1, GetTimerD}
|
||||
end.
|
||||
|
||||
get_timing_int(undefined, T0, TimerType) ->
|
||||
NewDFun = fun(K, Acc) ->
|
||||
case K of
|
||||
TimerType ->
|
||||
dict:store(K, [1, T0, T0], Acc);
|
||||
_ ->
|
||||
dict:store(K, [0, 0, 0], Acc)
|
||||
end end,
|
||||
{1, lists:foldl(NewDFun, dict:new(), get_keylist())};
|
||||
get_timing_int({?GET_TIMING_LOGPOINT, GetTimerD}, T0, TimerType) ->
|
||||
RN = random:uniform(?GET_TIMING_LOGPOINT),
|
||||
case RN > ?GET_TIMING_LOGPOINT div 2 of
|
||||
true ->
|
||||
% log at the timing point less than half the time
|
||||
LogFun = fun(K) -> log("B0014", [K|dict:fetch(K, GetTimerD)]) end,
|
||||
lists:foreach(LogFun, get_keylist()),
|
||||
get_timing_int(undefined, T0, TimerType);
|
||||
false ->
|
||||
% Log some other time - reset to RN not 0 to stagger logs out over
|
||||
% time between the vnodes
|
||||
get_timing_int({RN, GetTimerD}, T0, TimerType)
|
||||
end;
|
||||
get_timing_int({N, GetTimerD}, T0, TimerType) ->
|
||||
[Count0, Total0, Max0] = dict:fetch(TimerType, GetTimerD),
|
||||
{N + 1,
|
||||
dict:store(TimerType,
|
||||
[Count0 + 1, Total0 + T0, max(Max0, T0)],
|
||||
GetTimerD)}.
|
||||
|
||||
|
||||
|
||||
get_keylist() ->
|
||||
[head_not_present, head_found, fetch].
|
||||
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
|
@ -320,4 +477,15 @@ log_test() ->
|
|||
log("D0001", []),
|
||||
log_timer("D0001", [], os:timestamp()).
|
||||
|
||||
head_timing_test() ->
|
||||
SW = os:timestamp(),
|
||||
HeadTimer0 = lists:foldl(fun(_X, Acc) -> head_timing(Acc, SW, 2, found) end,
|
||||
undefined,
|
||||
lists:seq(0, 47)),
|
||||
HeadTimer1 = head_timing(HeadTimer0, SW, 3, found),
|
||||
{N, D} = HeadTimer1,
|
||||
?assertMatch(49, N),
|
||||
?assertMatch(3, lists:nth(1, dict:fetch(found_2, D))),
|
||||
?assertMatch(1, lists:nth(1, dict:fetch(found_lower, D))).
|
||||
|
||||
-endif.
|
|
@ -202,6 +202,7 @@
|
|||
-define(PROMPT_WAIT_ONL0, 5).
|
||||
-define(WORKQUEUE_BACKLOG_TOLERANCE, 4).
|
||||
-define(COIN_SIDECOUNT, 5).
|
||||
-define(SLOW_FETCH, 20000).
|
||||
|
||||
-record(state, {manifest = [] :: list(),
|
||||
manifest_sqn = 0 :: integer(),
|
||||
|
@ -227,7 +228,9 @@
|
|||
levelzero_astree :: list(),
|
||||
|
||||
ongoing_work = [] :: list(),
|
||||
work_backlog = false :: boolean()}).
|
||||
work_backlog = false :: boolean(),
|
||||
|
||||
head_timing :: tuple()}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -366,20 +369,20 @@ handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}},
|
|||
State)}
|
||||
end;
|
||||
handle_call({fetch, Key, Hash}, _From, State) ->
|
||||
{reply,
|
||||
fetch_mem(Key,
|
||||
Hash,
|
||||
State#state.manifest,
|
||||
State#state.levelzero_cache,
|
||||
State#state.levelzero_index),
|
||||
State};
|
||||
{R, HeadTimer} = timed_fetch_mem(Key,
|
||||
Hash,
|
||||
State#state.manifest,
|
||||
State#state.levelzero_cache,
|
||||
State#state.levelzero_index,
|
||||
State#state.head_timing),
|
||||
{reply, R, State#state{head_timing=HeadTimer}};
|
||||
handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
|
||||
{reply,
|
||||
compare_to_sqn(fetch_mem(Key,
|
||||
Hash,
|
||||
State#state.manifest,
|
||||
State#state.levelzero_cache,
|
||||
State#state.levelzero_index),
|
||||
compare_to_sqn(plain_fetch_mem(Key,
|
||||
Hash,
|
||||
State#state.manifest,
|
||||
State#state.levelzero_cache,
|
||||
State#state.levelzero_index),
|
||||
SQN),
|
||||
State};
|
||||
handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
|
||||
|
@ -730,26 +733,40 @@ levelzero_filename(State) ->
|
|||
++ integer_to_list(MSN) ++ "_0_0",
|
||||
FileName.
|
||||
|
||||
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) ->
|
||||
SW = os:timestamp(),
|
||||
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
||||
UpdHeadTimer =
|
||||
case R of
|
||||
not_present ->
|
||||
leveled_log:head_timing(HeadTimer, SW, Level, not_present);
|
||||
_ ->
|
||||
leveled_log:head_timing(HeadTimer, SW, Level, found)
|
||||
end,
|
||||
{R, UpdHeadTimer}.
|
||||
|
||||
plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
||||
element(1, R).
|
||||
|
||||
fetch_mem(Key, Hash, Manifest, L0Cache, none) ->
|
||||
L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache),
|
||||
case L0Check of
|
||||
{false, not_found} ->
|
||||
fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/3);
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3);
|
||||
{true, KV} ->
|
||||
KV
|
||||
{KV, 0}
|
||||
end;
|
||||
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
|
||||
case leveled_pmem:check_index(Hash, L0Index) of
|
||||
true ->
|
||||
fetch_mem(Key, Hash, Manifest, L0Cache, none);
|
||||
false ->
|
||||
fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/3)
|
||||
fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3)
|
||||
end.
|
||||
|
||||
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
|
||||
not_present;
|
||||
{not_present, basement};
|
||||
fetch(Key, Hash, Manifest, Level, FetchFun) ->
|
||||
LevelManifest = get_item(Level, Manifest, []),
|
||||
case lists:foldl(fun(File, Acc) ->
|
||||
|
@ -770,10 +787,25 @@ fetch(Key, Hash, Manifest, Level, FetchFun) ->
|
|||
not_present ->
|
||||
fetch(Key, Hash, Manifest, Level + 1, FetchFun);
|
||||
ObjectFound ->
|
||||
ObjectFound
|
||||
{ObjectFound, Level}
|
||||
end
|
||||
end.
|
||||
|
||||
timed_sft_get(PID, Key, Hash) ->
|
||||
SW = os:timestamp(),
|
||||
R = leveled_sft:sft_get(PID, Key, Hash),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
case {T0, R} of
|
||||
{T, R} when T < ?SLOW_FETCH ->
|
||||
R;
|
||||
{T, not_present} ->
|
||||
leveled_log:log("PC016", [PID, T, not_present]),
|
||||
not_present;
|
||||
{T, R} ->
|
||||
leveled_log:log("PC016", [PID, T, found]),
|
||||
R
|
||||
end.
|
||||
|
||||
|
||||
compare_to_sqn(Obj, SQN) ->
|
||||
case Obj of
|
||||
|
|
|
@ -644,6 +644,8 @@ acc_list_keysonly(null, empty) ->
|
|||
[];
|
||||
acc_list_keysonly(null, RList) ->
|
||||
RList;
|
||||
acc_list_keysonly(R, RList) when is_list(R) ->
|
||||
lists:foldl(fun acc_list_keysonly/2, RList, R);
|
||||
acc_list_keysonly(R, RList) ->
|
||||
lists:append(RList, [leveled_codec:strip_to_keyseqstatusonly(R)]).
|
||||
|
||||
|
@ -651,6 +653,8 @@ acc_list_kv(null, empty) ->
|
|||
[];
|
||||
acc_list_kv(null, RList) ->
|
||||
RList;
|
||||
acc_list_kv(R, RList) when is_list(R) ->
|
||||
RList ++ R;
|
||||
acc_list_kv(R, RList) ->
|
||||
lists:append(RList, [R]).
|
||||
|
||||
|
@ -713,7 +717,13 @@ fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey,
|
|||
Pointer,
|
||||
Acc) ->
|
||||
Block = fetch_block(Handle, LengthList, BlockNumber, Pointer),
|
||||
Results = scan_block(Block, StartKey, EndKey, AccFun, Acc),
|
||||
Results =
|
||||
case maybe_scan_entire_block(Block, StartKey, EndKey) of
|
||||
true ->
|
||||
{partial, AccFun(Block, Acc), StartKey};
|
||||
false ->
|
||||
scan_block(Block, StartKey, EndKey, AccFun, Acc)
|
||||
end,
|
||||
case Results of
|
||||
{partial, Acc1, StartKey} ->
|
||||
%% Move on to the next block
|
||||
|
@ -741,6 +751,25 @@ scan_block([HeadKV|T], StartKey, EndKey, AccFun, Acc) ->
|
|||
end.
|
||||
|
||||
|
||||
maybe_scan_entire_block([], _, _) ->
|
||||
true;
|
||||
maybe_scan_entire_block(_Block, all, all) ->
|
||||
true;
|
||||
maybe_scan_entire_block(Block, StartKey, all) ->
|
||||
[FirstKey|_Tail] = Block,
|
||||
leveled_codec:strip_to_keyonly(FirstKey) > StartKey;
|
||||
maybe_scan_entire_block(Block, StartKey, EndKey) ->
|
||||
[FirstKey|_Tail] = Block,
|
||||
LastKey = leveled_codec:strip_to_keyonly(lists:last(Block)),
|
||||
FromStart = leveled_codec:strip_to_keyonly(FirstKey) > StartKey,
|
||||
ToEnd = leveled_codec:endkey_passed(EndKey, LastKey),
|
||||
case {FromStart, ToEnd} of
|
||||
{true, false} ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) ->
|
||||
not_present;
|
||||
fetch_keyvalue_fromblock([BlockNmb|T], Key, LengthList, Handle, StartOfSlot) ->
|
||||
|
@ -1100,7 +1129,9 @@ maybe_expand_pointer([H|Tail]) ->
|
|||
case H of
|
||||
{next, SFTPid, StartKey} ->
|
||||
%% io:format("Scanning further on PID ~w ~w~n", [SFTPid, StartKey]),
|
||||
SW = os:timestamp(),
|
||||
Acc = sft_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH),
|
||||
leveled_log:log_timer("SFT14", [SFTPid], SW),
|
||||
lists:append(Acc, Tail);
|
||||
_ ->
|
||||
[H|Tail]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue