Tidy-up GC on compaction
Make sure we hibernate any CDB files after we score them, as they may not be used for sometime, and there may be garbage binary references present.
This commit is contained in:
parent
7862a6c523
commit
da1ecc144a
2 changed files with 56 additions and 32 deletions
|
@ -114,7 +114,8 @@
|
|||
cdb_destroy/1,
|
||||
cdb_deletepending/1,
|
||||
cdb_deletepending/3,
|
||||
cdb_isrolling/1]).
|
||||
cdb_isrolling/1,
|
||||
cdb_clerkcomplete/1]).
|
||||
|
||||
-export([finished_rolling/1,
|
||||
hashtable_calc/2]).
|
||||
|
@ -410,6 +411,14 @@ cdb_keycheck(Pid, Key) ->
|
|||
cdb_isrolling(Pid) ->
|
||||
gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity).
|
||||
|
||||
-spec cdb_clerkcomplete(pid()) -> ok.
|
||||
%% @doc
|
||||
%% When an Inker's clerk has finished with a CDB process, then it will call
|
||||
%% complete. Currently this will prompt hibernation, as the CDB process may
|
||||
%% not be needed for a period.
|
||||
cdb_clerkcomplete(Pid) ->
|
||||
gen_fsm:send_all_state_event(Pid, clerk_complete).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% gen_server callbacks
|
||||
|
@ -620,33 +629,40 @@ reader({get_positions, SampleSize, Index, Acc}, _From, State) ->
|
|||
_ ->
|
||||
{reply, lists:sublist(UpdAcc, SampleSize), reader, State}
|
||||
end;
|
||||
reader({direct_fetch, PositionList, Info}, _From, State) ->
|
||||
reader({direct_fetch, PositionList, Info}, From, State) ->
|
||||
H = State#state.handle,
|
||||
FilterFalseKey = fun(Tpl) -> case element(1, Tpl) of
|
||||
FilterFalseKey =
|
||||
fun(Tpl) ->
|
||||
case element(1, Tpl) of
|
||||
false ->
|
||||
false;
|
||||
_Key ->
|
||||
{true, Tpl}
|
||||
end end,
|
||||
Reply =
|
||||
end
|
||||
end,
|
||||
|
||||
case Info of
|
||||
key_only ->
|
||||
FM = lists:filtermap(
|
||||
fun(P) ->
|
||||
FilterFalseKey(extract_key(H, P)) end,
|
||||
PositionList),
|
||||
lists:map(fun(T) -> element(1, T) end, FM);
|
||||
MapFun = fun(T) -> element(1, T) end,
|
||||
{reply, lists:map(MapFun, FM), reader, State};
|
||||
key_size ->
|
||||
lists:filtermap(
|
||||
fun(P) ->
|
||||
FilterFalseKey(extract_key_size(H, P)) end,
|
||||
PositionList);
|
||||
FilterFun = fun(P) -> FilterFalseKey(extract_key_size(H, P)) end,
|
||||
{reply, lists:filtermap(FilterFun, PositionList), reader, State};
|
||||
key_value_check ->
|
||||
BM = State#state.binary_mode,
|
||||
lists:map(fun(P) -> extract_key_value_check(H, P, BM) end,
|
||||
PositionList)
|
||||
end,
|
||||
{reply, Reply, reader, State};
|
||||
MapFun = fun(P) -> extract_key_value_check(H, P, BM) end,
|
||||
% direct_fetch will occur in batches, so it doesn't make sense to
|
||||
% hibernate the process that is likely to be used again. However,
|
||||
% a significant amount of unused binary references may have
|
||||
% accumulated, so push a GC at this point
|
||||
gen_fsm:reply(From, lists:map(MapFun, PositionList)),
|
||||
garbage_collect(),
|
||||
{next_state, reader, State}
|
||||
end;
|
||||
reader(cdb_complete, _From, State) ->
|
||||
leveled_log:log("CDB05", [State#state.filename, reader, cdb_ccomplete]),
|
||||
ok = file:close(State#state.handle),
|
||||
|
@ -752,6 +768,10 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
|
|||
% reference counters for this process here manually. The cdb process
|
||||
% may be inactive for a period after the scan, and so GC may not kick in
|
||||
% otherwise
|
||||
%
|
||||
% garbage_collect/0 is used in preference to hibernate, as we're generally
|
||||
% scanning in batches at startup - so the process will be needed straight
|
||||
% away.
|
||||
gen_fsm:reply(From, {LastPosition, Acc2}),
|
||||
garbage_collect(),
|
||||
{next_state, StateName, State};
|
||||
|
@ -790,8 +810,8 @@ handle_sync_event(cdb_close, _From, StateName, State) ->
|
|||
file:close(State#state.handle),
|
||||
{stop, normal, ok, State}.
|
||||
|
||||
handle_event(_Msg, StateName, State) ->
|
||||
{next_state, StateName, State}.
|
||||
handle_event(clerk_complete, StateName, State) ->
|
||||
{next_state, StateName, State, hibernate}.
|
||||
|
||||
handle_info(_Msg, StateName, State) ->
|
||||
{next_state, StateName, State}.
|
||||
|
@ -2668,8 +2688,6 @@ getpositions_sample_test() ->
|
|||
|
||||
|
||||
nonsense_coverage_test() ->
|
||||
{ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []),
|
||||
ok = gen_fsm:send_all_state_event(Pid, nonsense),
|
||||
?assertMatch({next_state, reader, #state{}}, handle_info(nonsense,
|
||||
reader,
|
||||
#state{})),
|
||||
|
|
|
@ -547,7 +547,8 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
|
|||
end.
|
||||
|
||||
|
||||
fetch_inbatches([], _BatchSize, _CDB, CheckedList) ->
|
||||
fetch_inbatches([], _BatchSize, CDB, CheckedList) ->
|
||||
ok = leveled_cdb:cdb_clerkcomplete(CDB),
|
||||
CheckedList;
|
||||
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
|
||||
{Batch, Tail} = if
|
||||
|
@ -698,6 +699,11 @@ compact_files([Batch|T], CDBopts, ActiveJournal0,
|
|||
ActiveJournal0,
|
||||
ManSlice0,
|
||||
PressMethod),
|
||||
% The inker's clerk will no longer need these (potentially large) binaries,
|
||||
% so force garbage collection at this point. This will mean when we roll
|
||||
% each CDB file there will be no remaining references to the binaries that
|
||||
% have been transferred and the memory can immediately be cleared
|
||||
garbage_collect(),
|
||||
compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN,
|
||||
RStrategy, PressMethod, ManSlice1).
|
||||
|
||||
|
@ -762,7 +768,7 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
|||
% strategy
|
||||
[KVC0|Acc];
|
||||
{false, retain} ->
|
||||
% If we have a retain startegy, it can't be
|
||||
% If we have a retain strategy, it can't be
|
||||
% discarded - but the value part is no longer
|
||||
% required as this version has been replaced
|
||||
{JK0, JV0} =
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue