diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 35887b8..90a0b2a 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -150,6 +150,7 @@ -define(CHECKJOURNAL_PROB, 0.2). -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). +-define(LONG_RUNNING, 80000). -record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(), min_sqn = infinity :: integer()|infinity, @@ -160,7 +161,9 @@ cache_size :: integer(), ledger_cache = #ledger_cache{}, is_snapshot :: boolean(), - slow_offer = false :: boolean()}). + slow_offer = false :: boolean(), + put_timing :: tuple(), + get_timing :: tuple()}). %%%============================================================================ @@ -262,10 +265,12 @@ init([Opts]) -> handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + SW = os:timestamp(), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, LedgerKey, Object, {IndexSpecs, TTL}), + T0 = timer:now_diff(os:timestamp(), SW), Changes = preparefor_ledgercache(no_type_assigned, LedgerKey, SQN, @@ -273,6 +278,8 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> ObjSize, {IndexSpecs, TTL}), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), + T1 = timer:now_diff(os:timestamp(), SW) - T0, + PutTimes = leveled_log:put_timing(bookie, State#state.put_timing, T0, T1), % If the previous push to memory was returned then punish this PUT with a % delay. If the back-pressure in the Penciller continues, these delays % will beocme more frequent @@ -282,36 +289,50 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> false -> gen_server:reply(From, ok) end, + maybe_longrunning(SW, overall_put), case maybepush_ledgercache(State#state.cache_size, Cache0, State#state.penciller) of {ok, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, slow_offer=false}}; + {noreply, State#state{ledger_cache=NewCache, + put_timing=PutTimes, + slow_offer=false}}; {returned, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, slow_offer=true}} + {noreply, State#state{ledger_cache=NewCache, + put_timing=PutTimes, + slow_offer=true}} end; handle_call({get, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), + SWh = os:timestamp(), case fetch_head(LedgerKey, State#state.penciller, State#state.ledger_cache) of not_present -> - {reply, not_found, State}; + GT0 = leveled_log:get_timing(State#state.get_timing, + SWh, + head_not_present), + {reply, not_found, State#state{get_timing=GT0}}; Head -> + GT0 = leveled_log:get_timing(State#state.get_timing, + SWh, + head_found), + SWg = os:timestamp(), {Seqn, Status, _MH, _MD} = leveled_codec:striphead_to_details(Head), case Status of tomb -> {reply, not_found, State}; {active, TS} -> Active = TS >= leveled_codec:integer_now(), - case {Active, - fetch_value(LedgerKey, Seqn, State#state.inker)} of + Object = fetch_value(LedgerKey, Seqn, State#state.inker), + GT1 = leveled_log:get_timing(GT0, SWg, fetch), + case {Active, Object} of {_, not_present} -> - {reply, not_found, State}; + {reply, not_found, State#state{get_timing=GT1}}; {true, Object} -> - {reply, {ok, Object}, State}; + {reply, {ok, Object}, State#state{get_timing=GT1}}; _ -> - {reply, not_found, State} + {reply, not_found, State#state{get_timing=GT1}} end end end; @@ -454,6 +475,16 @@ push_ledgercache(Penciller, Cache) -> %%% Internal functions %%%============================================================================ + + +maybe_longrunning(SW, Aspect) -> + case timer:now_diff(os:timestamp(), SW) of + N when N > ?LONG_RUNNING -> + leveled_log:log("B0013", [N, Aspect]); + _ -> + ok + end. + cache_size(LedgerCache) -> leveled_skiplist:size(LedgerCache#ledger_cache.skiplist). @@ -728,6 +759,7 @@ startup(InkerOpts, PencillerOpts) -> fetch_head(Key, Penciller, LedgerCache) -> + SW = os:timestamp(), Hash = leveled_codec:magic_hash(Key), if Hash /= no_lookup -> @@ -736,20 +768,25 @@ fetch_head(Key, Penciller, LedgerCache) -> LedgerCache#ledger_cache.skiplist), case L0R of {value, Head} -> + maybe_longrunning(SW, local_head), Head; none -> case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of {Key, Head} -> + maybe_longrunning(SW, pcl_head), Head; not_present -> + maybe_longrunning(SW, pcl_head), not_present end end end. fetch_value(Key, SQN, Inker) -> + SW = os:timestamp(), case leveled_inker:ink_fetch(Inker, Key, SQN) of {ok, Value} -> + maybe_longrunning(SW, inker_fetch), Value; not_present -> not_present diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 1c8f9d0..0c0f745 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -108,7 +108,8 @@ inker :: pid(), deferred_delete = false :: boolean(), waste_path :: string(), - sync_strategy = none}). + sync_strategy = none, + put_timing :: tuple()}). %%%============================================================================ @@ -256,12 +257,14 @@ writer({key_check, Key}, _From, State) -> writer, State}; writer({put_kv, Key, Value}, _From, State) -> + SW = os:timestamp(), Result = put(State#state.handle, Key, Value, {State#state.last_position, State#state.hashtree}, State#state.binary_mode, State#state.max_size), + T0 = timer:now_diff(os:timestamp(), SW), case Result of roll -> %% Key and value could not be written @@ -274,10 +277,15 @@ writer({put_kv, Key, Value}, _From, State) -> _ -> ok end, + T1 = timer:now_diff(os:timestamp(), SW) - T0, + Timings = leveled_log:put_timing(journal, + State#state.put_timing, + T0, T1), {reply, ok, writer, State#state{handle=UpdHandle, last_position=NewPosition, last_key=Key, - hashtree=HashTree}} + hashtree=HashTree, + put_timing=Timings}} end; writer({mput_kv, []}, _From, State) -> {reply, ok, writer, State}; diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index ffb3f98..b27f5b9 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -469,4 +469,14 @@ stringcheck_test() -> ?assertMatch("Bucket", turn_to_string(<<"Bucket">>)), ?assertMatch("bucket", turn_to_string(bucket)). +%% Test below proved that the overhead of performing hashes was trivial +%% Maybe 5 microseconds per hash + +%hashperf_test() -> +% OL = lists:map(fun(_X) -> crypto:rand_bytes(8192) end, lists:seq(1, 10000)), +% SW = os:timestamp(), +% _HL = lists:map(fun(Obj) -> erlang:phash2(Obj) end, OL), +% io:format(user, "10000 object hashes in ~w microseconds~n", +% [timer:now_diff(os:timestamp(), SW)]). + -endif. \ No newline at end of file diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 9a37cae..f56ea20 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -138,7 +138,8 @@ clerk :: pid(), compaction_pending = false :: boolean(), is_snapshot = false :: boolean(), - source_inker :: pid()}). + source_inker :: pid(), + put_timing :: tuple()}). %%%============================================================================ @@ -414,17 +415,25 @@ start_from_file(InkOpts) -> put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, + SW= os:timestamp(), {JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey, NewSQN, Object, KeyChanges), + T0 = timer:now_diff(os:timestamp(), SW), case leveled_cdb:cdb_put(State#state.active_journaldb, JournalKey, JournalBin) of ok -> - {ok, State#state{journal_sqn=NewSQN}, byte_size(JournalBin)}; + T1 = timer:now_diff(os:timestamp(), SW) - T0, + UpdPutTimes = leveled_log:put_timing(inker, + State#state.put_timing, + T0, T1), + {ok, + State#state{journal_sqn=NewSQN, put_timing=UpdPutTimes}, + byte_size(JournalBin)}; roll -> - SW = os:timestamp(), + SWroll = os:timestamp(), CDBopts = State#state.cdb_options, ManEntry = start_new_activejournal(NewSQN, State#state.root_path, @@ -437,7 +446,7 @@ put_object(LedgerKey, Object, KeyChanges, State) -> ok = leveled_cdb:cdb_put(NewJournalP, JournalKey, JournalBin), - leveled_log:log_timer("I0008", [], SW), + leveled_log:log_timer("I0008", [], SWroll), {rolling, State#state{journal_sqn=NewSQN, manifest=NewManifest, @@ -742,6 +751,7 @@ initiate_penciller_snapshot(Bookie) -> MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. + %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 5c36cf7..3d7cba3 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -8,9 +8,17 @@ -include_lib("eunit/include/eunit.hrl"). -export([log/2, - log_timer/3]). + log_timer/3, + put_timing/4, + head_timing/4, + get_timing/3]). +-define(PUT_TIMING_LOGPOINT, 20000). +-define(HEAD_TIMING_LOGPOINT, 160000). +-define(GET_TIMING_LOGPOINT, 160000). -define(LOG_LEVEL, [info, warn, error, critical]). +-define(SAMPLE_RATE, 16#F). + -define(LOGBASE, dict:from_list([ {"G0001", @@ -40,6 +48,13 @@ {info, "Bucket list finds non-binary Bucket ~w"}}, {"B0011", {warn, "Call to destroy the store and so all files to be removed"}}, + {"B0012", + {info, "After ~w PUTs total inker time is ~w total ledger time is ~w " + ++ "and max inker time is ~w and max ledger time is ~w"}}, + {"B0013", + {warn, "Long running task took ~w microseconds with task of type ~w"}}, + {"B0014", + {info, "Get timing for result ~w is sample ~w total ~w and max ~w"}}, {"P0001", {info, "Ledger snapshot ~w registered"}}, @@ -99,13 +114,15 @@ {"P0027", {info, "Rename of manifest from ~s ~w to ~s ~w"}}, {"P0028", - {info, "Adding cleared file ~s to deletion list"}}, + {debug, "Adding cleared file ~s to deletion list"}}, {"P0029", {info, "L0 completion confirmed and will transition to not pending"}}, {"P0030", {warn, "We're doomed - intention recorded to destroy all files"}}, {"P0031", {info, "Completion of update to levelzero"}}, + {"P0032", + {info, "Head timing for result ~w is sample ~w total ~w and max ~w"}}, {"PC001", {info, "Penciller's clerk ~w started with owner ~w"}}, @@ -137,6 +154,8 @@ {info, "Empty file ~s to be cleared"}}, {"PC015", {info, "File created"}}, + {"PC016", + {info, "Slow fetch from SFT ~w of ~w microseconds with result ~w"}}, {"I0001", {info, "Unexpected failure to fetch value for Key=~w SQN=~w " @@ -176,6 +195,9 @@ {info, "At SQN=~w journal has filename ~s"}}, {"I0018", {warn, "We're doomed - intention recorded to destroy all files"}}, + {"I0019", + {info, "After ~w PUTs total prepare time is ~w total cdb time is ~w " + ++ "and max prepare time is ~w and max cdb time is ~w"}}, {"IC001", {info, "Closed for reason ~w so maybe leaving garbage"}}, @@ -216,7 +238,7 @@ {"SFT03", {info, "File creation of L0 file ~s"}}, {"SFT04", - {info, "File ~s prompting for delete status check"}}, + {debug, "File ~s prompting for delete status check"}}, {"SFT05", {info, "Exit called for reason ~w on filename ~s"}}, {"SFT06", @@ -235,7 +257,8 @@ {error, "Segment filter failed due to CRC check ~w did not match ~w"}}, {"SFT13", {error, "Segment filter failed due to ~s"}}, - + {"SFT14", + {debug, "Range fetch from SFT PID ~w"}}, {"CDB01", {info, "Opening file for writing with filename ~s"}}, @@ -271,7 +294,10 @@ {info, "Cycle count of ~w in hashtable search higher than expected" ++ " in search for hash ~w with result ~w"}}, {"CDB16", - {info, "CDB scan from start ~w in file with end ~w and last_key ~w"}} + {info, "CDB scan from start ~w in file with end ~w and last_key ~w"}}, + {"CDB17", + {info, "After ~w PUTs total write time is ~w total sync time is ~w " + ++ "and max write time is ~w and max sync time is ~w"}} ])). @@ -304,9 +330,140 @@ log_timer(LogReference, Subs, StartTime) -> ok end. +%% Make a log of put timings split out by actor - one log for every +%% PUT_TIMING_LOGPOINT puts + +put_timing(_Actor, undefined, T0, T1) -> + {1, {T0, T1}, {T0, T1}}; +put_timing(Actor, {?PUT_TIMING_LOGPOINT, {Total0, Total1}, {Max0, Max1}}, + T0, T1) -> + RN = random:uniform(?HEAD_TIMING_LOGPOINT), + case RN > ?HEAD_TIMING_LOGPOINT div 2 of + true -> + % log at the timing point less than half the time + LogRef = + case Actor of + bookie -> "B0012"; + inker -> "I0019"; + journal -> "CDB17" + end, + log(LogRef, [?PUT_TIMING_LOGPOINT, Total0, Total1, Max0, Max1]), + put_timing(Actor, undefined, T0, T1); + false -> + % Log some other random time + put_timing(Actor, {RN, {Total0, Total1}, {Max0, Max1}}, T0, T1) + end; +put_timing(_Actor, {N, {Total0, Total1}, {Max0, Max1}}, T0, T1) -> + {N + 1, {Total0 + T0, Total1 + T1}, {max(Max0, T0), max(Max1, T1)}}. + +%% Make a log of penciller head timings split out by level and result - one +%% log for every HEAD_TIMING_LOGPOINT puts +%% Returns a tuple of {Count, TimingDict} to be stored on the process state +head_timing(undefined, SW, Level, R) -> + T0 = timer:now_diff(os:timestamp(), SW), + head_timing_int(undefined, T0, Level, R); +head_timing({N, HeadTimingD}, SW, Level, R) -> + case N band ?SAMPLE_RATE of + 0 -> + T0 = timer:now_diff(os:timestamp(), SW), + head_timing_int({N, HeadTimingD}, T0, Level, R); + _ -> + % Not to be sampled this time + {N + 1, HeadTimingD} + end. + +head_timing_int(undefined, T0, Level, R) -> + Key = head_key(R, Level), + NewDFun = fun(K, Acc) -> + case K of + Key -> + dict:store(K, [1, T0, T0], Acc); + _ -> + dict:store(K, [0, 0, 0], Acc) + end end, + {1, lists:foldl(NewDFun, dict:new(), head_keylist())}; +head_timing_int({?HEAD_TIMING_LOGPOINT, HeadTimingD}, T0, Level, R) -> + RN = random:uniform(?HEAD_TIMING_LOGPOINT), + case RN > ?HEAD_TIMING_LOGPOINT div 2 of + true -> + % log at the timing point less than half the time + LogFun = fun(K) -> log("P0032", [K|dict:fetch(K, HeadTimingD)]) end, + lists:foreach(LogFun, head_keylist()), + head_timing_int(undefined, T0, Level, R); + false -> + % Log some other time - reset to RN not 0 to stagger logs out over + % time between the vnodes + head_timing_int({RN, HeadTimingD}, T0, Level, R) + end; +head_timing_int({N, HeadTimingD}, T0, Level, R) -> + Key = head_key(R, Level), + [Count0, Total0, Max0] = dict:fetch(Key, HeadTimingD), + {N + 1, + dict:store(Key, [Count0 + 1, Total0 + T0, max(Max0, T0)], + HeadTimingD)}. + +head_key(not_present, _Level) -> + not_present; +head_key(found, 0) -> + found_0; +head_key(found, 1) -> + found_1; +head_key(found, 2) -> + found_2; +head_key(found, Level) when Level > 2 -> + found_lower. + +head_keylist() -> + [not_present, found_lower, found_0, found_1, found_2]. +get_timing(undefined, SW, TimerType) -> + T0 = timer:now_diff(os:timestamp(), SW), + get_timing_int(undefined, T0, TimerType); +get_timing({N, GetTimerD}, SW, TimerType) -> + case N band ?SAMPLE_RATE of + 0 -> + T0 = timer:now_diff(os:timestamp(), SW), + get_timing_int({N, GetTimerD}, T0, TimerType); + _ -> + % Not to be sampled this time + {N + 1, GetTimerD} + end. + +get_timing_int(undefined, T0, TimerType) -> + NewDFun = fun(K, Acc) -> + case K of + TimerType -> + dict:store(K, [1, T0, T0], Acc); + _ -> + dict:store(K, [0, 0, 0], Acc) + end end, + {1, lists:foldl(NewDFun, dict:new(), get_keylist())}; +get_timing_int({?GET_TIMING_LOGPOINT, GetTimerD}, T0, TimerType) -> + RN = random:uniform(?GET_TIMING_LOGPOINT), + case RN > ?GET_TIMING_LOGPOINT div 2 of + true -> + % log at the timing point less than half the time + LogFun = fun(K) -> log("B0014", [K|dict:fetch(K, GetTimerD)]) end, + lists:foreach(LogFun, get_keylist()), + get_timing_int(undefined, T0, TimerType); + false -> + % Log some other time - reset to RN not 0 to stagger logs out over + % time between the vnodes + get_timing_int({RN, GetTimerD}, T0, TimerType) + end; +get_timing_int({N, GetTimerD}, T0, TimerType) -> + [Count0, Total0, Max0] = dict:fetch(TimerType, GetTimerD), + {N + 1, + dict:store(TimerType, + [Count0 + 1, Total0 + T0, max(Max0, T0)], + GetTimerD)}. + + + +get_keylist() -> + [head_not_present, head_found, fetch]. %%%============================================================================ %%% Test @@ -320,4 +477,15 @@ log_test() -> log("D0001", []), log_timer("D0001", [], os:timestamp()). +head_timing_test() -> + SW = os:timestamp(), + HeadTimer0 = lists:foldl(fun(_X, Acc) -> head_timing(Acc, SW, 2, found) end, + undefined, + lists:seq(0, 47)), + HeadTimer1 = head_timing(HeadTimer0, SW, 3, found), + {N, D} = HeadTimer1, + ?assertMatch(49, N), + ?assertMatch(3, lists:nth(1, dict:fetch(found_2, D))), + ?assertMatch(1, lists:nth(1, dict:fetch(found_lower, D))). + -endif. \ No newline at end of file diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 82dc432..0de9b2b 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -202,6 +202,7 @@ -define(PROMPT_WAIT_ONL0, 5). -define(WORKQUEUE_BACKLOG_TOLERANCE, 4). -define(COIN_SIDECOUNT, 5). +-define(SLOW_FETCH, 20000). -record(state, {manifest = [] :: list(), manifest_sqn = 0 :: integer(), @@ -227,7 +228,9 @@ levelzero_astree :: list(), ongoing_work = [] :: list(), - work_backlog = false :: boolean()}). + work_backlog = false :: boolean(), + + head_timing :: tuple()}). %%%============================================================================ @@ -366,20 +369,20 @@ handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}}, State)} end; handle_call({fetch, Key, Hash}, _From, State) -> - {reply, - fetch_mem(Key, - Hash, - State#state.manifest, - State#state.levelzero_cache, - State#state.levelzero_index), - State}; + {R, HeadTimer} = timed_fetch_mem(Key, + Hash, + State#state.manifest, + State#state.levelzero_cache, + State#state.levelzero_index, + State#state.head_timing), + {reply, R, State#state{head_timing=HeadTimer}}; handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> {reply, - compare_to_sqn(fetch_mem(Key, - Hash, - State#state.manifest, - State#state.levelzero_cache, - State#state.levelzero_index), + compare_to_sqn(plain_fetch_mem(Key, + Hash, + State#state.manifest, + State#state.levelzero_cache, + State#state.levelzero_index), SQN), State}; handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, @@ -730,26 +733,40 @@ levelzero_filename(State) -> ++ integer_to_list(MSN) ++ "_0_0", FileName. +timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) -> + SW = os:timestamp(), + {R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), + UpdHeadTimer = + case R of + not_present -> + leveled_log:head_timing(HeadTimer, SW, Level, not_present); + _ -> + leveled_log:head_timing(HeadTimer, SW, Level, found) + end, + {R, UpdHeadTimer}. +plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> + R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), + element(1, R). fetch_mem(Key, Hash, Manifest, L0Cache, none) -> L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), case L0Check of {false, not_found} -> - fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/3); + fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3); {true, KV} -> - KV + {KV, 0} end; fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> case leveled_pmem:check_index(Hash, L0Index) of true -> fetch_mem(Key, Hash, Manifest, L0Cache, none); false -> - fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/3) + fetch(Key, Hash, Manifest, 0, fun timed_sft_get/3) end. fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> - not_present; + {not_present, basement}; fetch(Key, Hash, Manifest, Level, FetchFun) -> LevelManifest = get_item(Level, Manifest, []), case lists:foldl(fun(File, Acc) -> @@ -770,10 +787,25 @@ fetch(Key, Hash, Manifest, Level, FetchFun) -> not_present -> fetch(Key, Hash, Manifest, Level + 1, FetchFun); ObjectFound -> - ObjectFound + {ObjectFound, Level} end end. +timed_sft_get(PID, Key, Hash) -> + SW = os:timestamp(), + R = leveled_sft:sft_get(PID, Key, Hash), + T0 = timer:now_diff(os:timestamp(), SW), + case {T0, R} of + {T, R} when T < ?SLOW_FETCH -> + R; + {T, not_present} -> + leveled_log:log("PC016", [PID, T, not_present]), + not_present; + {T, R} -> + leveled_log:log("PC016", [PID, T, found]), + R + end. + compare_to_sqn(Obj, SQN) -> case Obj of diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 7cffdd7..e736a47 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -644,6 +644,8 @@ acc_list_keysonly(null, empty) -> []; acc_list_keysonly(null, RList) -> RList; +acc_list_keysonly(R, RList) when is_list(R) -> + lists:foldl(fun acc_list_keysonly/2, RList, R); acc_list_keysonly(R, RList) -> lists:append(RList, [leveled_codec:strip_to_keyseqstatusonly(R)]). @@ -651,6 +653,8 @@ acc_list_kv(null, empty) -> []; acc_list_kv(null, RList) -> RList; +acc_list_kv(R, RList) when is_list(R) -> + RList ++ R; acc_list_kv(R, RList) -> lists:append(RList, [R]). @@ -713,7 +717,13 @@ fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, Pointer, Acc) -> Block = fetch_block(Handle, LengthList, BlockNumber, Pointer), - Results = scan_block(Block, StartKey, EndKey, AccFun, Acc), + Results = + case maybe_scan_entire_block(Block, StartKey, EndKey) of + true -> + {partial, AccFun(Block, Acc), StartKey}; + false -> + scan_block(Block, StartKey, EndKey, AccFun, Acc) + end, case Results of {partial, Acc1, StartKey} -> %% Move on to the next block @@ -741,6 +751,25 @@ scan_block([HeadKV|T], StartKey, EndKey, AccFun, Acc) -> end. +maybe_scan_entire_block([], _, _) -> + true; +maybe_scan_entire_block(_Block, all, all) -> + true; +maybe_scan_entire_block(Block, StartKey, all) -> + [FirstKey|_Tail] = Block, + leveled_codec:strip_to_keyonly(FirstKey) > StartKey; +maybe_scan_entire_block(Block, StartKey, EndKey) -> + [FirstKey|_Tail] = Block, + LastKey = leveled_codec:strip_to_keyonly(lists:last(Block)), + FromStart = leveled_codec:strip_to_keyonly(FirstKey) > StartKey, + ToEnd = leveled_codec:endkey_passed(EndKey, LastKey), + case {FromStart, ToEnd} of + {true, false} -> + true; + _ -> + false + end. + fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) -> not_present; fetch_keyvalue_fromblock([BlockNmb|T], Key, LengthList, Handle, StartOfSlot) -> @@ -1100,7 +1129,9 @@ maybe_expand_pointer([H|Tail]) -> case H of {next, SFTPid, StartKey} -> %% io:format("Scanning further on PID ~w ~w~n", [SFTPid, StartKey]), + SW = os:timestamp(), Acc = sft_getkvrange(SFTPid, StartKey, all, ?MERGE_SCANWIDTH), + leveled_log:log_timer("SFT14", [SFTPid], SW), lists:append(Acc, Tail); _ -> [H|Tail]