From da1ecc144a91d0e56733a4ad639648e178202aa3 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 19 Jul 2019 13:30:53 +0100 Subject: [PATCH] Tidy-up GC on compaction Make sure we hibernate any CDB files after we score them, as they may not be used for sometime, and there may be garbage binary references present. --- src/leveled_cdb.erl | 78 ++++++++++++++++++++++++++---------------- src/leveled_iclerk.erl | 10 ++++-- 2 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index a24feb4..5c8caba 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -114,7 +114,8 @@ cdb_destroy/1, cdb_deletepending/1, cdb_deletepending/3, - cdb_isrolling/1]). + cdb_isrolling/1, + cdb_clerkcomplete/1]). -export([finished_rolling/1, hashtable_calc/2]). @@ -410,6 +411,14 @@ cdb_keycheck(Pid, Key) -> cdb_isrolling(Pid) -> gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity). +-spec cdb_clerkcomplete(pid()) -> ok. +%% @doc +%% When an Inker's clerk has finished with a CDB process, then it will call +%% complete. Currently this will prompt hibernation, as the CDB process may +%% not be needed for a period. +cdb_clerkcomplete(Pid) -> + gen_fsm:send_all_state_event(Pid, clerk_complete). + %%%============================================================================ %%% gen_server callbacks @@ -620,33 +629,40 @@ reader({get_positions, SampleSize, Index, Acc}, _From, State) -> _ -> {reply, lists:sublist(UpdAcc, SampleSize), reader, State} end; -reader({direct_fetch, PositionList, Info}, _From, State) -> +reader({direct_fetch, PositionList, Info}, From, State) -> H = State#state.handle, - FilterFalseKey = fun(Tpl) -> case element(1, Tpl) of - false -> - false; - _Key -> - {true, Tpl} - end end, - Reply = - case Info of - key_only -> - FM = lists:filtermap( - fun(P) -> - FilterFalseKey(extract_key(H, P)) end, - PositionList), - lists:map(fun(T) -> element(1, T) end, FM); - key_size -> - lists:filtermap( - fun(P) -> - FilterFalseKey(extract_key_size(H, P)) end, - PositionList); - key_value_check -> - BM = State#state.binary_mode, - lists:map(fun(P) -> extract_key_value_check(H, P, BM) end, - PositionList) + FilterFalseKey = + fun(Tpl) -> + case element(1, Tpl) of + false -> + false; + _Key -> + {true, Tpl} + end end, - {reply, Reply, reader, State}; + + case Info of + key_only -> + FM = lists:filtermap( + fun(P) -> + FilterFalseKey(extract_key(H, P)) end, + PositionList), + MapFun = fun(T) -> element(1, T) end, + {reply, lists:map(MapFun, FM), reader, State}; + key_size -> + FilterFun = fun(P) -> FilterFalseKey(extract_key_size(H, P)) end, + {reply, lists:filtermap(FilterFun, PositionList), reader, State}; + key_value_check -> + BM = State#state.binary_mode, + MapFun = fun(P) -> extract_key_value_check(H, P, BM) end, + % direct_fetch will occur in batches, so it doesn't make sense to + % hibernate the process that is likely to be used again. However, + % a significant amount of unused binary references may have + % accumulated, so push a GC at this point + gen_fsm:reply(From, lists:map(MapFun, PositionList)), + garbage_collect(), + {next_state, reader, State} + end; reader(cdb_complete, _From, State) -> leveled_log:log("CDB05", [State#state.filename, reader, cdb_ccomplete]), ok = file:close(State#state.handle), @@ -752,6 +768,10 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, % reference counters for this process here manually. The cdb process % may be inactive for a period after the scan, and so GC may not kick in % otherwise + % + % garbage_collect/0 is used in preference to hibernate, as we're generally + % scanning in batches at startup - so the process will be needed straight + % away. gen_fsm:reply(From, {LastPosition, Acc2}), garbage_collect(), {next_state, StateName, State}; @@ -790,8 +810,8 @@ handle_sync_event(cdb_close, _From, StateName, State) -> file:close(State#state.handle), {stop, normal, ok, State}. -handle_event(_Msg, StateName, State) -> - {next_state, StateName, State}. +handle_event(clerk_complete, StateName, State) -> + {next_state, StateName, State, hibernate}. handle_info(_Msg, StateName, State) -> {next_state, StateName, State}. @@ -2668,8 +2688,6 @@ getpositions_sample_test() -> nonsense_coverage_test() -> - {ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []), - ok = gen_fsm:send_all_state_event(Pid, nonsense), ?assertMatch({next_state, reader, #state{}}, handle_info(nonsense, reader, #state{})), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index bf0cca6..5ac8f3c 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -547,7 +547,8 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> end. -fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> +fetch_inbatches([], _BatchSize, CDB, CheckedList) -> + ok = leveled_cdb:cdb_clerkcomplete(CDB), CheckedList; fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> {Batch, Tail} = if @@ -698,6 +699,11 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, ActiveJournal0, ManSlice0, PressMethod), + % The inker's clerk will no longer need these (potentially large) binaries, + % so force garbage collection at this point. This will mean when we roll + % each CDB file there will be no remaining references to the binaries that + % have been transferred and the memory can immediately be cleared + garbage_collect(), compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, RStrategy, PressMethod, ManSlice1). @@ -762,7 +768,7 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> % strategy [KVC0|Acc]; {false, retain} -> - % If we have a retain startegy, it can't be + % If we have a retain strategy, it can't be % discarded - but the value part is no longer % required as this version has been replaced {JK0, JV0} =