From 3c834afa082f74ae7eb1ba4bfd2d24baaa9e0d7f Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 18 Jul 2019 13:07:48 +0100 Subject: [PATCH] Use hibernate on open or roll to read CDB files may be opened or rolled then left untouched for a period, so clean up any memory. Been awoken from hibernate has a cost, but it is a rare event. --- src/leveled_cdb.erl | 86 +++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index ae1412a..8360a7f 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -436,28 +436,31 @@ starting({open_writer, Filename}, _From, State) -> {WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy), leveled_log:log("CDB13", [WriteOps]), {ok, Handle} = file:open(Filename, WriteOps), - {reply, ok, writer, State#state{handle=Handle, - sync_strategy = UpdStrategy, - last_position=LastPosition, - last_key=LastKey, - filename=Filename, - hashtree=HashTree}}; + State0 = State#state{handle=Handle, + sync_strategy = UpdStrategy, + last_position=LastPosition, + last_key=LastKey, + filename=Filename, + hashtree=HashTree} + {reply, ok, writer, State0, hibernate}; starting({open_reader, Filename}, _From, State) -> leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, false), - {reply, ok, reader, State#state{handle=Handle, - last_key=LastKey, - filename=Filename, - hash_index=Index}}; + State0 = State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index} + {reply, ok, reader, State0, hibernate}; starting({open_reader, Filename, LastKey}, _From, State) -> leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), - {reply, ok, reader, State#state{handle=Handle, - last_key=LastKey, - filename=Filename, - hash_index=Index}}. + State0 = State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index} + {reply, ok, reader, State0, hibernate}. writer({get_kv, Key}, _From, State) -> {reply, @@ -566,18 +569,16 @@ rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> ets:delete(State#state.hashtree), {NewHandle, Index, LastKey} = open_for_readonly(NewName, State#state.last_key), + State0 = State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + hash_index=Index}, case State#state.deferred_delete of true -> - {reply, ok, delete_pending, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - hash_index=Index}}; + {reply, ok, delete_pending, State0}; false -> leveled_log:log_timer("CDB18", [], SW), - {reply, ok, reader, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - hash_index=Index}} + {reply, ok, reader, State0, hibernate} end; rolling(check_hashtable, _From, State) -> {reply, false, rolling, State}. @@ -720,24 +721,21 @@ delete_pending(destroy, State) -> handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, - _From, + From, StateName, State) -> {ok, EndPos0} = file:position(State#state.handle, eof), - {ok, StartPos0} = case StartPos of - undefined -> - file:position(State#state.handle, - ?BASE_POSITION); - StartPos -> - {ok, StartPos} - end, + {ok, StartPos0} = + case StartPos of + undefined -> + file:position(State#state.handle, ?BASE_POSITION); + StartPos -> + {ok, StartPos} + end, file:position(State#state.handle, StartPos0), - file:advise(State#state.handle, - StartPos0, - EndPos0 - StartPos0, - sequential), - MaybeEnd = (check_last_key(State#state.last_key) == empty) or - (StartPos0 >= (EndPos0 - ?DWORD_SIZE)), + MaybeEnd = + (check_last_key(State#state.last_key) == empty) or + (StartPos0 >= (EndPos0 - ?DWORD_SIZE)), {LastPosition, Acc2} = case MaybeEnd of true -> @@ -749,12 +747,13 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, Acc, State#state.last_key) end, - {ok, LastReadPos} = file:position(State#state.handle, cur), - file:advise(State#state.handle, - StartPos0, - LastReadPos - StartPos0, - dont_need), - {reply, {LastPosition, Acc2}, StateName, State}; + % The scan may have created a lot of binary references, clear up the + % reference counters for this process here manually. The cdb process + % may be inactive for a period after the scan, and so GC may not kick in + % otherwise + gen_fsm:reply(From, {LastPosition, Acc2}), + garbage_collect(), + {next_state, StateName, State}; handle_sync_event(cdb_lastkey, _From, StateName, State) -> {reply, State#state.last_key, StateName, State}; handle_sync_event(cdb_firstkey, _From, StateName, State) -> @@ -1301,7 +1300,6 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> end, % Bring file back to that position {ok, Position} = file:position(Handle, {bof, Position}), - garbage_collect(), {eof, Output}; {Key, ValueAsBin, KeyLength, ValueLength} -> NewPosition = case Key of @@ -1317,12 +1315,10 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> Output, fun extract_valueandsize/1) of {stop, UpdOutput} -> - garbage_collect(), {Position, UpdOutput}; {loop, UpdOutput} -> case NewPosition of eof -> - garbage_collect(), {eof, UpdOutput}; _ -> scan_over_file(Handle,