Use hibernate on open or roll to read
CDB files may be opened or rolled then left untouched for a period, so clean up any memory. Been awoken from hibernate has a cost, but it is a rare event.
This commit is contained in:
parent
478c5b6db0
commit
3c834afa08
1 changed files with 41 additions and 45 deletions
|
@ -436,28 +436,31 @@ starting({open_writer, Filename}, _From, State) ->
|
||||||
{WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy),
|
{WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy),
|
||||||
leveled_log:log("CDB13", [WriteOps]),
|
leveled_log:log("CDB13", [WriteOps]),
|
||||||
{ok, Handle} = file:open(Filename, WriteOps),
|
{ok, Handle} = file:open(Filename, WriteOps),
|
||||||
{reply, ok, writer, State#state{handle=Handle,
|
State0 = State#state{handle=Handle,
|
||||||
sync_strategy = UpdStrategy,
|
sync_strategy = UpdStrategy,
|
||||||
last_position=LastPosition,
|
last_position=LastPosition,
|
||||||
last_key=LastKey,
|
last_key=LastKey,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
hashtree=HashTree}};
|
hashtree=HashTree}
|
||||||
|
{reply, ok, writer, State0, hibernate};
|
||||||
starting({open_reader, Filename}, _From, State) ->
|
starting({open_reader, Filename}, _From, State) ->
|
||||||
leveled_log:save(State#state.log_options),
|
leveled_log:save(State#state.log_options),
|
||||||
leveled_log:log("CDB02", [Filename]),
|
leveled_log:log("CDB02", [Filename]),
|
||||||
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
|
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
|
||||||
{reply, ok, reader, State#state{handle=Handle,
|
State0 = State#state{handle=Handle,
|
||||||
last_key=LastKey,
|
last_key=LastKey,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
hash_index=Index}};
|
hash_index=Index}
|
||||||
|
{reply, ok, reader, State0, hibernate};
|
||||||
starting({open_reader, Filename, LastKey}, _From, State) ->
|
starting({open_reader, Filename, LastKey}, _From, State) ->
|
||||||
leveled_log:save(State#state.log_options),
|
leveled_log:save(State#state.log_options),
|
||||||
leveled_log:log("CDB02", [Filename]),
|
leveled_log:log("CDB02", [Filename]),
|
||||||
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
|
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
|
||||||
{reply, ok, reader, State#state{handle=Handle,
|
State0 = State#state{handle=Handle,
|
||||||
last_key=LastKey,
|
last_key=LastKey,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
hash_index=Index}}.
|
hash_index=Index}
|
||||||
|
{reply, ok, reader, State0, hibernate}.
|
||||||
|
|
||||||
writer({get_kv, Key}, _From, State) ->
|
writer({get_kv, Key}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
|
@ -566,18 +569,16 @@ rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) ->
|
||||||
ets:delete(State#state.hashtree),
|
ets:delete(State#state.hashtree),
|
||||||
{NewHandle, Index, LastKey} = open_for_readonly(NewName,
|
{NewHandle, Index, LastKey} = open_for_readonly(NewName,
|
||||||
State#state.last_key),
|
State#state.last_key),
|
||||||
|
State0 = State#state{handle=NewHandle,
|
||||||
|
last_key=LastKey,
|
||||||
|
filename=NewName,
|
||||||
|
hash_index=Index},
|
||||||
case State#state.deferred_delete of
|
case State#state.deferred_delete of
|
||||||
true ->
|
true ->
|
||||||
{reply, ok, delete_pending, State#state{handle=NewHandle,
|
{reply, ok, delete_pending, State0};
|
||||||
last_key=LastKey,
|
|
||||||
filename=NewName,
|
|
||||||
hash_index=Index}};
|
|
||||||
false ->
|
false ->
|
||||||
leveled_log:log_timer("CDB18", [], SW),
|
leveled_log:log_timer("CDB18", [], SW),
|
||||||
{reply, ok, reader, State#state{handle=NewHandle,
|
{reply, ok, reader, State0, hibernate}
|
||||||
last_key=LastKey,
|
|
||||||
filename=NewName,
|
|
||||||
hash_index=Index}}
|
|
||||||
end;
|
end;
|
||||||
rolling(check_hashtable, _From, State) ->
|
rolling(check_hashtable, _From, State) ->
|
||||||
{reply, false, rolling, State}.
|
{reply, false, rolling, State}.
|
||||||
|
@ -720,24 +721,21 @@ delete_pending(destroy, State) ->
|
||||||
|
|
||||||
|
|
||||||
handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
|
handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
|
||||||
_From,
|
From,
|
||||||
StateName,
|
StateName,
|
||||||
State) ->
|
State) ->
|
||||||
{ok, EndPos0} = file:position(State#state.handle, eof),
|
{ok, EndPos0} = file:position(State#state.handle, eof),
|
||||||
{ok, StartPos0} = case StartPos of
|
{ok, StartPos0} =
|
||||||
undefined ->
|
case StartPos of
|
||||||
file:position(State#state.handle,
|
undefined ->
|
||||||
?BASE_POSITION);
|
file:position(State#state.handle, ?BASE_POSITION);
|
||||||
StartPos ->
|
StartPos ->
|
||||||
{ok, StartPos}
|
{ok, StartPos}
|
||||||
end,
|
end,
|
||||||
file:position(State#state.handle, StartPos0),
|
file:position(State#state.handle, StartPos0),
|
||||||
file:advise(State#state.handle,
|
MaybeEnd =
|
||||||
StartPos0,
|
(check_last_key(State#state.last_key) == empty) or
|
||||||
EndPos0 - StartPos0,
|
(StartPos0 >= (EndPos0 - ?DWORD_SIZE)),
|
||||||
sequential),
|
|
||||||
MaybeEnd = (check_last_key(State#state.last_key) == empty) or
|
|
||||||
(StartPos0 >= (EndPos0 - ?DWORD_SIZE)),
|
|
||||||
{LastPosition, Acc2} =
|
{LastPosition, Acc2} =
|
||||||
case MaybeEnd of
|
case MaybeEnd of
|
||||||
true ->
|
true ->
|
||||||
|
@ -749,12 +747,13 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
|
||||||
Acc,
|
Acc,
|
||||||
State#state.last_key)
|
State#state.last_key)
|
||||||
end,
|
end,
|
||||||
{ok, LastReadPos} = file:position(State#state.handle, cur),
|
% The scan may have created a lot of binary references, clear up the
|
||||||
file:advise(State#state.handle,
|
% reference counters for this process here manually. The cdb process
|
||||||
StartPos0,
|
% may be inactive for a period after the scan, and so GC may not kick in
|
||||||
LastReadPos - StartPos0,
|
% otherwise
|
||||||
dont_need),
|
gen_fsm:reply(From, {LastPosition, Acc2}),
|
||||||
{reply, {LastPosition, Acc2}, StateName, State};
|
garbage_collect(),
|
||||||
|
{next_state, StateName, State};
|
||||||
handle_sync_event(cdb_lastkey, _From, StateName, State) ->
|
handle_sync_event(cdb_lastkey, _From, StateName, State) ->
|
||||||
{reply, State#state.last_key, StateName, State};
|
{reply, State#state.last_key, StateName, State};
|
||||||
handle_sync_event(cdb_firstkey, _From, StateName, State) ->
|
handle_sync_event(cdb_firstkey, _From, StateName, State) ->
|
||||||
|
@ -1301,7 +1300,6 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
|
||||||
end,
|
end,
|
||||||
% Bring file back to that position
|
% Bring file back to that position
|
||||||
{ok, Position} = file:position(Handle, {bof, Position}),
|
{ok, Position} = file:position(Handle, {bof, Position}),
|
||||||
garbage_collect(),
|
|
||||||
{eof, Output};
|
{eof, Output};
|
||||||
{Key, ValueAsBin, KeyLength, ValueLength} ->
|
{Key, ValueAsBin, KeyLength, ValueLength} ->
|
||||||
NewPosition = case Key of
|
NewPosition = case Key of
|
||||||
|
@ -1317,12 +1315,10 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
|
||||||
Output,
|
Output,
|
||||||
fun extract_valueandsize/1) of
|
fun extract_valueandsize/1) of
|
||||||
{stop, UpdOutput} ->
|
{stop, UpdOutput} ->
|
||||||
garbage_collect(),
|
|
||||||
{Position, UpdOutput};
|
{Position, UpdOutput};
|
||||||
{loop, UpdOutput} ->
|
{loop, UpdOutput} ->
|
||||||
case NewPosition of
|
case NewPosition of
|
||||||
eof ->
|
eof ->
|
||||||
garbage_collect(),
|
|
||||||
{eof, UpdOutput};
|
{eof, UpdOutput};
|
||||||
_ ->
|
_ ->
|
||||||
scan_over_file(Handle,
|
scan_over_file(Handle,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue