Merge remote-tracking branch 'refs/remotes/origin/mas-etsmem-i52' into mas-sstfiveblocks
This commit is contained in:
commit
e59585d733
3 changed files with 56 additions and 17 deletions
|
@ -1188,8 +1188,9 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
|||
case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of
|
||||
ok ->
|
||||
Cache0 = #ledger_cache{},
|
||||
true = ets:delete_all_objects(Tab),
|
||||
{ok, Cache0#ledger_cache{mem=Tab}};
|
||||
true = ets:delete(Tab),
|
||||
NewTab = ets:new(mem, [ordered_set]),
|
||||
{ok, Cache0#ledger_cache{mem=NewTab}};
|
||||
returned ->
|
||||
{returned, Cache}
|
||||
end;
|
||||
|
|
|
@ -364,6 +364,7 @@ rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) ->
|
|||
file:close(Handle),
|
||||
ok = rename_for_read(State#state.filename, NewName),
|
||||
leveled_log:log("CDB03", [NewName]),
|
||||
ets:delete(State#state.hashtree),
|
||||
{NewHandle, Index, LastKey} = open_for_readonly(NewName,
|
||||
State#state.last_key),
|
||||
case State#state.deferred_delete of
|
||||
|
|
|
@ -121,12 +121,20 @@
|
|||
size :: integer(),
|
||||
max_sqn :: integer()}).
|
||||
|
||||
%% yield_blockquery is used to detemrine if the work necessary to process a
|
||||
%% range query beyond the fetching the slot should be managed from within
|
||||
%% this process, or should be handled by the calling process.
|
||||
%% Handling within the calling process may lead to extra binary heap garbage
|
||||
%% see Issue 52. Handling within the SST process may lead to contention and
|
||||
%% extra copying. Files at the top of the tree yield, those lower down don't.
|
||||
|
||||
-record(state, {summary,
|
||||
handle :: file:fd(),
|
||||
sst_timings :: tuple(),
|
||||
penciller :: pid(),
|
||||
root_path,
|
||||
filename,
|
||||
yield_blockquery = false :: boolean(),
|
||||
blockindex_cache}).
|
||||
|
||||
|
||||
|
@ -197,15 +205,18 @@ sst_get(Pid, LedgerKey, Hash) ->
|
|||
gen_fsm:sync_send_event(Pid, {get_kv, LedgerKey, Hash}, infinity).
|
||||
|
||||
sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) ->
|
||||
Reply = gen_fsm:sync_send_event(Pid,
|
||||
case gen_fsm:sync_send_event(Pid,
|
||||
{get_kvrange, StartKey, EndKey, ScanWidth},
|
||||
infinity),
|
||||
FetchFun =
|
||||
fun({SlotBin, SK, EK}, Acc) ->
|
||||
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
|
||||
end,
|
||||
{SlotsToFetchBinList, SlotsToPoint} = Reply,
|
||||
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint.
|
||||
infinity) of
|
||||
{yield, SlotsToFetchBinList, SlotsToPoint} ->
|
||||
FetchFun =
|
||||
fun({SlotBin, SK, EK}, Acc) ->
|
||||
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
|
||||
end,
|
||||
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint;
|
||||
Reply ->
|
||||
Reply
|
||||
end.
|
||||
|
||||
sst_getslots(Pid, SlotList) ->
|
||||
SlotBins = gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity),
|
||||
|
@ -268,7 +279,10 @@ starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN},
|
|||
Length,
|
||||
MaxSQN),
|
||||
ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin),
|
||||
UpdState = read_file(ActualFilename, State#state{root_path=RootPath}),
|
||||
YBQ = Level =< 2,
|
||||
UpdState = read_file(ActualFilename,
|
||||
State#state{root_path=RootPath,
|
||||
yield_blockquery=YBQ}),
|
||||
Summary = UpdState#state.summary,
|
||||
leveled_log:log_timer("SST08",
|
||||
[ActualFilename, Level, Summary#summary.max_sqn],
|
||||
|
@ -293,7 +307,9 @@ starting({sst_newlevelzero, RootPath, Filename,
|
|||
SlotCount,
|
||||
MaxSQN),
|
||||
ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin),
|
||||
UpdState = read_file(ActualFilename, State#state{root_path=RootPath}),
|
||||
UpdState = read_file(ActualFilename,
|
||||
State#state{root_path = RootPath,
|
||||
yield_blockquery = true}),
|
||||
Summary = UpdState#state.summary,
|
||||
leveled_log:log_timer("SST08",
|
||||
[ActualFilename, 0, Summary#summary.max_sqn],
|
||||
|
@ -316,10 +332,26 @@ reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
|||
UpdTimings = leveled_log:sst_timing(State#state.sst_timings, SW, Stage),
|
||||
{reply, Result, reader, UpdState#state{sst_timings = UpdTimings}};
|
||||
reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
|
||||
{reply,
|
||||
fetch_range(StartKey, EndKey, ScanWidth, State),
|
||||
reader,
|
||||
State};
|
||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
||||
EndKey,
|
||||
ScanWidth,
|
||||
State),
|
||||
case State#state.yield_blockquery of
|
||||
true ->
|
||||
{reply,
|
||||
{yield, SlotsToFetchBinList, SlotsToPoint},
|
||||
reader,
|
||||
State};
|
||||
false ->
|
||||
FetchFun =
|
||||
fun({SlotBin, SK, EK}, Acc) ->
|
||||
Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)
|
||||
end,
|
||||
{reply,
|
||||
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint,
|
||||
reader,
|
||||
State}
|
||||
end;
|
||||
reader({get_slots, SlotList}, _From, State) ->
|
||||
SlotBins = read_slots(State#state.handle, SlotList),
|
||||
{reply, SlotBins, reader, State};
|
||||
|
@ -354,8 +386,13 @@ delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
|||
{Result, _Stage, _SlotID, UpdState} = fetch(LedgerKey, Hash, State),
|
||||
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
||||
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) ->
|
||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
||||
EndKey,
|
||||
ScanWidth,
|
||||
State),
|
||||
% Always yield as about to clear and de-reference
|
||||
{reply,
|
||||
fetch_range(StartKey, EndKey, ScanWidth, State),
|
||||
{yield, SlotsToFetchBinList, SlotsToPoint},
|
||||
delete_pending,
|
||||
State,
|
||||
?DELETE_TIMEOUT};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue