Yield on query selectively
Still not clear if yielding is the cause of memory problems, but taking it away universally has impacted throughput. At the very least we should continue to yield on high-contention files (those at higher levels), where the processes are more likely to be quickly terminated anyway allowing GC to be invoked.
This commit is contained in:
parent
c46377584f
commit
dd0316eedf
1 changed files with 45 additions and 18 deletions
|
@ -120,12 +120,20 @@
|
|||
size :: integer(),
|
||||
max_sqn :: integer()}).
|
||||
|
||||
%% yield_blockquery is used to detemrine if the work necessary to process a
|
||||
%% range query beyond the fetching the slot should be managed from within
|
||||
%% this process, or should be handled by the calling process.
|
||||
%% Handling within the calling process may lead to extra binary heap garbage
|
||||
%% see Issue 52. Handling within the SST process may lead to contention and
|
||||
%% extra copying. Files at the top of the tree yield, those lower down don't.
|
||||
|
||||
-record(state, {summary,
|
||||
handle :: file:fd(),
|
||||
sst_timings :: tuple(),
|
||||
penciller :: pid(),
|
||||
root_path,
|
||||
filename,
|
||||
yield_blockquery = false :: boolean(),
|
||||
blockindex_cache}).
|
||||
|
||||
|
||||
|
@ -196,9 +204,18 @@ sst_get(Pid, LedgerKey, Hash) ->
|
|||
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
|
||||
|
||||
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
|
||||
gen_fsm:sync_send_event(Pid,
|
||||
{get_kvrange, StartKey, EndKey, ScanWidth},
|
||||
infinity).
|
||||
case gen_fsm:sync_send_event(Pid,
|
||||
{get_kvrange, StartKey, EndKey, ScanWidth},
|
||||
infinity) of
|
||||
{yield, SlotsToFetchBinList, SlotsToPoint} ->
|
||||
FetchFun =
|
||||
fun({SlotBin, SK, EK}, Acc) ->
|
||||
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
|
||||
end,
|
||||
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint;
|
||||
Reply ->
|
||||
Reply
|
||||
end.
|
||||
|
||||
sst_getslots(Pid, SlotList) ->
|
||||
SlotBins = gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity),
|
||||
|
@ -261,7 +278,10 @@ starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN},
|
|||
Length,
|
||||
MaxSQN),
|
||||
ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin),
|
||||
UpdState = read_file(ActualFilename, State#state{root_path=RootPath}),
|
||||
YBQ = Level =< 1,
|
||||
UpdState = read_file(ActualFilename,
|
||||
State#state{root_path=RootPath,
|
||||
yield_blockquery=YBQ}),
|
||||
Summary = UpdState#state.summary,
|
||||
leveled_log:log_timer("SST08",
|
||||
[ActualFilename, Level, Summary#summary.max_sqn],
|
||||
|
@ -286,7 +306,9 @@ starting({sst_newlevelzero, RootPath, Filename,
|
|||
SlotCount,
|
||||
MaxSQN),
|
||||
ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin),
|
||||
UpdState = read_file(ActualFilename, State#state{root_path=RootPath}),
|
||||
UpdState = read_file(ActualFilename,
|
||||
State#state{root_path = RootPath,
|
||||
yield_blockquery = true}),
|
||||
Summary = UpdState#state.summary,
|
||||
leveled_log:log_timer("SST08",
|
||||
[ActualFilename, 0, Summary#summary.max_sqn],
|
||||
|
@ -309,18 +331,26 @@ reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
|||
UpdTimings = leveled_log:sst_timing(State#state.sst_timings, SW, Stage),
|
||||
{reply, Result, reader, UpdState#state{sst_timings = UpdTimings}};
|
||||
reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
|
||||
FetchFun =
|
||||
fun({SlotBin, SK, EK}, Acc) ->
|
||||
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
|
||||
end,
|
||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
||||
EndKey,
|
||||
ScanWidth,
|
||||
State),
|
||||
{reply,
|
||||
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint,
|
||||
reader,
|
||||
State};
|
||||
case State#state.yield_blockquery of
|
||||
true ->
|
||||
{reply,
|
||||
{yield, SlotsToFetchBinList, SlotsToPoint},
|
||||
reader,
|
||||
State};
|
||||
false ->
|
||||
FetchFun =
|
||||
fun({SlotBin, SK, EK}, Acc) ->
|
||||
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
|
||||
end,
|
||||
{reply,
|
||||
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint,
|
||||
reader,
|
||||
State}
|
||||
end;
|
||||
reader({get_slots, SlotList}, _From, State) ->
|
||||
SlotBins = read_slots(State#state.handle, SlotList),
|
||||
{reply, SlotBins, reader, State};
|
||||
|
@ -355,16 +385,13 @@ delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
|||
{Result, _Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State),
|
||||
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
||||
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
|
||||
FetchFun =
|
||||
fun({SlotBin, SK, EK}, Acc) ->
|
||||
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
|
||||
end,
|
||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
||||
EndKey,
|
||||
ScanWidth,
|
||||
State),
|
||||
% Always yield as about to clear and de-reference
|
||||
{reply,
|
||||
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint,
|
||||
{yield, SlotsToFetchBinList, SlotsToPoint},
|
||||
delete_pending,
|
||||
State,
|
||||
?DELETE_TIMEOUT};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue