Mas p401 coverage (#404)

* refactor leveled_sst from gen_fsm to gen_statem

* format_status/2 takes State and State Data
but this function is deprecated... put in for backward compatibility

* refactor leveled_cdb from gen_fsm to gen_statem

* disable irrelevant warning ignorer

* Remove unnecessary code paths

Only support messages, especially info messages, where they are possible.

* Mas i1820 offlinedeserialisation cbo (#403)

* Log report GC Info by manifest level

* Hibernate on range query

If Block Index Cache is not full, and we're not yielding

* Spawn to deserialise blocks offline

Hypothesis is that the growth in the heap necessary due to continual term_to_binary calls to deserialise blocks is wasting memory - so do this memory-intensive task in a short-lived process.

* Start with hibernate_after option

* Always build BIC

Testing indicates that the BIC itself is not a primary memory issue - the primary issue is due to a lack of garbage collection and a growing heap.

This change enhances the patch to offline serialisation so that:
- get_sqn & get_kv are standardised to build the BIC, and hibernate when it is built.
- the offline PId is linked to crash this process on failure (as would happen now).

* Standardise spawning for check_block/3

Now deserialise in both parts of the code.

* Only spawn for check_block if cache not full

* Update following review

* Standardise formatting

Make test more reliable.  Show no new compaction after third compaction.

* Update comments

---------

Co-authored-by: Thomas Arts <thomas.arts@quviq.com>
This commit is contained in:
Martin Sumner 2023-03-13 11:46:08 +00:00 committed by GitHub
parent e06d2a538f
commit 3d3d284805
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 1255 additions and 1253 deletions

View file

@ -2,7 +2,6 @@
{erl_opts,
[warnings_as_errors,
{platform_define, "^2[0-5]{1}", fsm_deprecated},
{platform_define, "^2[2-5]{1}", if_check}]}.
{xref_checks, [undefined_function_calls,undefined_functions]}.

View file

@ -47,37 +47,20 @@
-module(leveled_cdb).
-behaviour(gen_fsm).
-behaviour(gen_statem).
-include("include/leveled.hrl").
-ifdef(fsm_deprecated).
-compile({nowarn_deprecated_function,
[{gen_fsm, start_link, 3},
{gen_fsm, sync_send_event, 3},
{gen_fsm, sync_send_event, 2},
{gen_fsm, send_event, 2},
{gen_fsm, sync_send_all_state_event, 3},
{gen_fsm, send_all_state_event, 2},
{gen_fsm, reply, 2}]}).
-endif.
-export([init/1,
handle_sync_event/4,
handle_event/3,
handle_info/3,
callback_mode/0,
terminate/3,
code_change/4,
starting/3,
code_change/4]).
%% states
-export([starting/3,
writer/3,
writer/2,
rolling/2,
rolling/3,
reader/3,
reader/2,
delete_pending/3,
delete_pending/2]).
delete_pending/3]).
-export([cdb_open_writer/1,
cdb_open_writer/2,
@ -177,8 +160,8 @@ cdb_open_writer(Filename) ->
%% hashtree cached in memory (the file will need to be scanned to build the
%% hashtree)
cdb_open_writer(Filename, Opts) ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [Opts], []),
ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity),
{ok, Pid} = gen_statem:start_link(?MODULE, [Opts], []),
ok = gen_statem:call(Pid, {open_writer, Filename}, infinity),
{ok, Pid}.
-spec cdb_reopen_reader(string(), binary(), cdb_options()) -> {ok, pid()}.
@ -192,12 +175,12 @@ cdb_open_writer(Filename, Opts) ->
%% determine when scans over a file have completed.
cdb_reopen_reader(Filename, LastKey, CDBopts) ->
{ok, Pid} =
gen_fsm:start_link(?MODULE,
[CDBopts#cdb_options{binary_mode=true}],
[]),
ok = gen_fsm:sync_send_event(Pid,
{open_reader, Filename, LastKey},
infinity),
gen_statem:start_link(?MODULE,
[CDBopts#cdb_options{binary_mode=true}],
[]),
ok = gen_statem:call(Pid,
{open_reader, Filename, LastKey},
infinity),
{ok, Pid}.
-spec cdb_open_reader(string()) -> {ok, pid()}.
@ -215,15 +198,15 @@ cdb_open_reader(Filename) ->
%% to discover the LastKey.
%% Allows non-default cdb_options to be passed
cdb_open_reader(Filename, Opts) ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [Opts], []),
ok = gen_fsm:sync_send_event(Pid, {open_reader, Filename}, infinity),
{ok, Pid} = gen_statem:start_link(?MODULE, [Opts], []),
ok = gen_statem:call(Pid, {open_reader, Filename}, infinity),
{ok, Pid}.
-spec cdb_get(pid(), any()) -> {any(), any()}|missing.
%% @doc
%% Extract a Key and Value from a CDB file by passing in a Key.
cdb_get(Pid, Key) ->
gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity).
gen_statem:call(Pid, {get_kv, Key}, infinity).
-spec cdb_put(pid(), any(), any()) -> ok|roll.
%% @doc
@ -241,8 +224,7 @@ cdb_put(Pid, Key, Value) ->
%% See cdb_put/3. Addition of force-sync option, to be used when sync mode is
%% none to force a sync to disk on this particlar put.
cdb_put(Pid, Key, Value, Sync) ->
gen_fsm:sync_send_event(Pid, {put_kv, Key, Value, Sync}, infinity).
gen_statem:call(Pid, {put_kv, Key, Value, Sync}, infinity).
-spec cdb_mput(pid(), list()) -> ok|roll.
%% @doc
@ -253,7 +235,7 @@ cdb_put(Pid, Key, Value, Sync) ->
%% It may be preferable to respond to roll by trying individual PUTs until
%% roll is returned again
cdb_mput(Pid, KVList) ->
gen_fsm:sync_send_event(Pid, {mput_kv, KVList}, infinity).
gen_statem:call(Pid, {mput_kv, KVList}, infinity).
-spec cdb_getpositions(pid(), integer()|all) -> list().
%% @doc
@ -296,9 +278,8 @@ cdb_getpositions(Pid, SampleSize) ->
end.
cdb_getpositions_fromidx(Pid, SampleSize, Index, Acc) ->
gen_fsm:sync_send_event(Pid,
{get_positions, SampleSize, Index, Acc},
infinity).
gen_statem:call(Pid,
{get_positions, SampleSize, Index, Acc}, infinity).
-spec cdb_directfetch(pid(), list(), key_only|key_size|key_value_check) ->
list().
@ -307,26 +288,26 @@ cdb_getpositions_fromidx(Pid, SampleSize, Index, Acc) ->
%% key_value_check (with the check part indicating if the CRC is correct for
%% the value)
cdb_directfetch(Pid, PositionList, Info) ->
gen_fsm:sync_send_event(Pid, {direct_fetch, PositionList, Info}, infinity).
gen_statem:call(Pid, {direct_fetch, PositionList, Info}, infinity).
-spec cdb_close(pid()) -> ok.
%% @doc
%% RONSEAL
cdb_close(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, cdb_close, infinity).
gen_statem:call(Pid, cdb_close, infinity).
-spec cdb_deleteconfirmed(pid()) -> ok.
%% @doc
%% Delete has been confirmed, so close (state should be delete_pending)
cdb_deleteconfirmed(Pid) ->
gen_fsm:send_event(Pid, delete_confirmed).
gen_statem:cast(Pid, delete_confirmed).
-spec cdb_complete(pid()) -> {ok, string()}.
%% @doc
%% Persists the hashtable to the end of the file, to close it for further
%% writing then exit. Returns the filename that was saved.
cdb_complete(Pid) ->
gen_fsm:sync_send_event(Pid, cdb_complete, infinity).
gen_statem:call(Pid, cdb_complete, infinity).
-spec cdb_roll(pid()) -> ok.
%% @doc
@ -335,7 +316,7 @@ cdb_complete(Pid) ->
%% rolling state whilst the hashtable is being written, and will become a
%% reader (read-only) CDB file process on completion
cdb_roll(Pid) ->
gen_fsm:send_event(Pid, cdb_roll).
gen_statem:cast(Pid, cdb_roll).
-spec cdb_returnhashtable(pid(), list(), binary()) -> ok.
%% @doc
@ -345,20 +326,20 @@ cdb_roll(Pid) ->
%% [{Index, CurrPos, IndexLength}] which can be used to locate the slices of
%% the hashtree within that binary
cdb_returnhashtable(Pid, IndexList, HashTreeBin) ->
gen_fsm:sync_send_event(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity).
gen_statem:call(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity).
-spec cdb_checkhashtable(pid()) -> boolean().
%% @doc
%% Hash the hashtable been written for this file?
cdb_checkhashtable(Pid) ->
% only used in tests - so OK to be sync_send_event/2
gen_fsm:sync_send_event(Pid, check_hashtable).
% only used in tests - so OK to be call
gen_statem:call(Pid, check_hashtable).
-spec cdb_destroy(pid()) -> ok.
%% @doc
%% If the file is in a delete_pending state close (and will destroy)
cdb_destroy(Pid) ->
gen_fsm:send_event(Pid, destroy).
gen_statem:cast(Pid, destroy).
cdb_deletepending(Pid) ->
% Only used in unit tests
@ -373,7 +354,7 @@ cdb_deletepending(Pid) ->
%% Passing no_poll means there's no inker to poll, and the process will close
%% on timeout rather than poll.
cdb_deletepending(Pid, ManSQN, Inker) ->
gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}).
gen_statem:cast(Pid, {delete_pending, ManSQN, Inker}).
-spec cdb_scan(
pid(), filter_fun(), any(), integer()|undefined)
@ -386,42 +367,39 @@ cdb_deletepending(Pid, ManSQN, Inker) ->
%% LastPosition could be the atom complete when the last key processed was at
%% the end of the file. last_key must be defined in LoopState.
cdb_scan(Pid, FilterFun, InitAcc, StartPosition) ->
gen_fsm:sync_send_all_state_event(Pid,
{cdb_scan,
FilterFun,
InitAcc,
StartPosition},
infinity).
gen_statem:call(Pid,
{cdb_scan, FilterFun, InitAcc, StartPosition},
infinity).
-spec cdb_lastkey(pid()) -> any().
%% @doc
%% Get the last key to be added to the file (which will have the highest
%% sequence number)
cdb_lastkey(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, cdb_lastkey, infinity).
gen_statem:call(Pid, cdb_lastkey, infinity).
-spec cdb_firstkey(pid()) -> any().
cdb_firstkey(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, cdb_firstkey, infinity).
gen_statem:call(Pid, cdb_firstkey, infinity).
-spec cdb_filename(pid()) -> string().
%% @doc
%% Get the filename of the database
cdb_filename(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, cdb_filename, infinity).
gen_statem:call(Pid, cdb_filename, infinity).
-spec cdb_keycheck(pid(), any()) -> probably|missing.
%% @doc
%% Check to see if the key is probably present, will return either
%% probably or missing. Does not do a definitive check
cdb_keycheck(Pid, Key) ->
gen_fsm:sync_send_event(Pid, {key_check, Key}, infinity).
gen_statem:call(Pid, {key_check, Key}, infinity).
-spec cdb_isrolling(pid()) -> boolean().
%% @doc
%% Check to see if a cdb file is still rolling
cdb_isrolling(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity).
gen_statem:call(Pid, cdb_isrolling, infinity).
-spec cdb_clerkcomplete(pid()) -> ok.
%% @doc
@ -429,20 +407,20 @@ cdb_isrolling(Pid) ->
%% complete. Currently this will prompt hibernation, as the CDB process may
%% not be needed for a period.
cdb_clerkcomplete(Pid) ->
gen_fsm:send_all_state_event(Pid, clerk_complete).
gen_statem:cast(Pid, clerk_complete).
-spec cdb_getcachedscore(pid(), erlang:timestamp()) -> undefined|float().
%% @doc
%% Return the cached score for a CDB file
cdb_getcachedscore(Pid, Now) ->
gen_fsm:sync_send_all_state_event(Pid, {get_cachedscore, Now}, infinity).
gen_statem:call(Pid, {get_cachedscore, Now}, infinity).
-spec cdb_putcachedscore(pid(), float()) -> ok.
%% @doc
%% Return the cached score for a CDB file
cdb_putcachedscore(Pid, Score) ->
gen_fsm:sync_send_all_state_event(Pid, {put_cachedscore, Score}, infinity).
gen_statem:call(Pid, {put_cachedscore, Score}, infinity).
@ -475,7 +453,10 @@ init([Opts]) ->
log_options=Opts#cdb_options.log_options,
monitor=Opts#cdb_options.monitor}}.
starting({open_writer, Filename}, _From, State) ->
callback_mode() ->
state_functions.
starting({call, From}, {open_writer, Filename}, State) ->
leveled_log:save(State#state.log_options),
leveled_log:log(cdb01, [Filename]),
{LastPosition, HashTree, LastKey} = open_active_file(Filename),
@ -489,8 +470,8 @@ starting({open_writer, Filename}, _From, State) ->
last_key=LastKey,
filename=Filename,
hashtree=HashTree},
{reply, ok, writer, State0, hibernate};
starting({open_reader, Filename}, _From, State) ->
{next_state, writer, State0, [{reply, From, ok}, hibernate]};
starting({call, From}, {open_reader, Filename}, State) ->
leveled_log:save(State#state.log_options),
leveled_log:log(cdb02, [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
@ -498,8 +479,8 @@ starting({open_reader, Filename}, _From, State) ->
last_key=LastKey,
filename=Filename,
hash_index=Index},
{reply, ok, reader, State0, hibernate};
starting({open_reader, Filename, LastKey}, _From, State) ->
{next_state, reader, State0, [{reply, From, ok}, hibernate]};
starting({call, From}, {open_reader, Filename, LastKey}, State) ->
leveled_log:save(State#state.log_options),
leveled_log:log(cdb02, [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
@ -507,30 +488,33 @@ starting({open_reader, Filename, LastKey}, _From, State) ->
last_key=LastKey,
filename=Filename,
hash_index=Index},
{reply, ok, reader, State0, hibernate}.
{next_state, reader, State0, [{reply, From, ok}, hibernate]}.
writer({get_kv, Key}, _From, State) ->
{reply,
get_mem(Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode),
writer,
State};
writer({key_check, Key}, _From, State) ->
{reply,
get_mem(Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode,
loose_presence),
writer,
State};
writer({put_kv, Key, Value, Sync}, _From, State) ->
writer({call, From}, {get_kv, Key}, State) ->
{keep_state_and_data,
[{reply,
From,
get_mem(
Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode)}]};
writer({call, From}, {key_check, Key}, State) ->
{keep_state_and_data,
[{reply,
From,
get_mem(
Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode,
loose_presence)}]};
writer({call, From}, {put_kv, Key, Value, Sync}, State) ->
NewCount = State#state.current_count + 1,
case NewCount >= State#state.max_count of
true ->
{reply, roll, writer, State};
{keep_state_and_data, [{reply, From, roll}]};
false ->
Result = put(State#state.handle,
Key,
@ -542,7 +526,7 @@ writer({put_kv, Key, Value, Sync}, _From, State) ->
case Result of
roll ->
%% Key and value could not be written
{reply, roll, writer, State};
{keep_state_and_data, [{reply, From, roll}]};
{UpdHandle, NewPosition, HashTree} ->
ok =
case {State#state.sync_strategy, Sync} of
@ -553,22 +537,25 @@ writer({put_kv, Key, Value, Sync}, _From, State) ->
_ ->
ok
end,
{reply, ok, writer, State#state{handle=UpdHandle,
current_count=NewCount,
last_position=NewPosition,
last_key=Key,
hashtree=HashTree}}
{keep_state,
State#state{
handle=UpdHandle,
current_count=NewCount,
last_position=NewPosition,
last_key=Key,
hashtree=HashTree},
[{reply, From, ok}]}
end
end;
writer({mput_kv, []}, _From, State) ->
{reply, ok, writer, State};
writer({mput_kv, KVList}, _From, State) ->
writer({call, From}, {mput_kv, []}, _State) ->
{keep_state_and_data, [{reply, From, ok}]};
writer({call, From}, {mput_kv, KVList}, State) ->
NewCount = State#state.current_count + length(KVList),
TooMany = NewCount >= State#state.max_count,
NotEmpty = State#state.current_count > 0,
case (TooMany and NotEmpty) of
true ->
{reply, roll, writer, State};
{keep_state_and_data, [{reply, From, roll}]};
false ->
Result = mput(State#state.handle,
KVList,
@ -578,50 +565,58 @@ writer({mput_kv, KVList}, _From, State) ->
case Result of
roll ->
%% Keys and values could not be written
{reply, roll, writer, State};
{keep_state_and_data, [{reply, From, roll}]};
{UpdHandle, NewPosition, HashTree, LastKey} ->
{reply, ok, writer, State#state{handle=UpdHandle,
current_count=NewCount,
last_position=NewPosition,
last_key=LastKey,
hashtree=HashTree}}
{keep_state,
State#state{
handle=UpdHandle,
current_count=NewCount,
last_position=NewPosition,
last_key=LastKey,
hashtree=HashTree},
[{reply, From, ok}]}
end
end;
writer(cdb_complete, _From, State) ->
writer({call, From}, cdb_complete, State) ->
NewName = determine_new_filename(State#state.filename),
ok = close_file(State#state.handle,
State#state.hashtree,
State#state.last_position),
ok = rename_for_read(State#state.filename, NewName),
{stop, normal, {ok, NewName}, State}.
writer(cdb_roll, State) ->
ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree,
State#state.last_position,
self()),
{stop_and_reply, normal, [{reply, From, {ok, NewName}}]};
writer({call, From}, Event, State) ->
handle_sync_event(Event, From, State);
writer(cast, cdb_roll, State) ->
ok =
leveled_iclerk:clerk_hashtablecalc(
State#state.hashtree, State#state.last_position, self()),
{next_state, rolling, State}.
rolling({get_kv, Key}, _From, State) ->
{reply,
get_mem(Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode),
rolling,
State};
rolling({key_check, Key}, _From, State) ->
{reply,
get_mem(Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode,
loose_presence),
rolling,
State};
rolling({get_positions, _SampleSize, _Index, SampleAcc}, _From, State) ->
{reply, SampleAcc, rolling, State};
rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) ->
rolling({call, From}, {get_kv, Key}, State) ->
{keep_state_and_data,
[{reply,
From,
get_mem(
Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode)}]};
rolling({call, From}, {key_check, Key}, State) ->
{keep_state_and_data,
[{reply,
From,
get_mem(
Key,
State#state.handle,
State#state.hashtree,
State#state.binary_mode,
loose_presence)}]};
rolling({call, From},
{get_positions, _SampleSize, _Index, SampleAcc},
_State) ->
{keep_state_and_data, [{reply, From, SampleAcc}]};
rolling({call, From}, {return_hashtable, IndexList, HashTreeBin}, State) ->
SW = os:timestamp(),
Handle = State#state.handle,
{ok, BasePos} = file:position(Handle, State#state.last_position),
@ -632,36 +627,38 @@ rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) ->
ok = rename_for_read(State#state.filename, NewName),
leveled_log:log(cdb03, [NewName]),
ets:delete(State#state.hashtree),
{NewHandle, Index, LastKey} = open_for_readonly(NewName,
State#state.last_key),
{NewHandle, Index, LastKey} =
open_for_readonly(NewName, State#state.last_key),
State0 = State#state{handle=NewHandle,
last_key=LastKey,
filename=NewName,
hash_index=Index},
case State#state.deferred_delete of
true ->
{reply, ok, delete_pending, State0};
{next_state, delete_pending, State0, [{reply, From, ok}]};
false ->
leveled_log:log_timer(cdb18, [], SW),
{reply, ok, reader, State0, hibernate}
{next_state, reader, State0, [{reply, From, ok}, hibernate]}
end;
rolling(check_hashtable, _From, State) ->
{reply, false, rolling, State}.
rolling({delete_pending, ManSQN, Inker}, State) ->
{next_state,
rolling,
rolling({call, From}, check_hashtable, _State) ->
{keep_state_and_data, [{reply, From, false}]};
rolling({call, From}, cdb_isrolling, _State) ->
{keep_state_and_data, [{reply, From, true}]};
rolling({call, From}, Event, State) ->
handle_sync_event(Event, From, State);
rolling(cast, {delete_pending, ManSQN, Inker}, State) ->
{keep_state,
State#state{delete_point=ManSQN, inker=Inker, deferred_delete=true}}.
reader({get_kv, Key}, _From, State) ->
reader({call, From}, {get_kv, Key}, State) ->
Result =
get_withcache(State#state.handle,
Key,
State#state.hash_index,
State#state.binary_mode,
State#state.monitor),
{reply, Result, reader, State};
reader({key_check, Key}, _From, State) ->
{keep_state_and_data, [{reply, From, Result}]};
reader({call, From}, {key_check, Key}, State) ->
Result =
get_withcache(State#state.handle,
Key,
@ -669,17 +666,18 @@ reader({key_check, Key}, _From, State) ->
loose_presence,
State#state.binary_mode,
{no_monitor, 0}),
{reply, Result, reader, State};
reader({get_positions, SampleSize, Index, Acc}, _From, State) ->
{keep_state_and_data, [{reply, From, Result}]};
reader({call, From}, {get_positions, SampleSize, Index, Acc}, State) ->
{Pos, Count} = element(Index + 1, State#state.hash_index),
UpdAcc = scan_index_returnpositions(State#state.handle, Pos, Count, Acc),
case SampleSize of
all ->
{reply, UpdAcc, reader, State};
{keep_state_and_data, [{reply, From, UpdAcc}]};
_ ->
{reply, lists:sublist(UpdAcc, SampleSize), reader, State}
{keep_state_and_data,
[{reply, From, lists:sublist(UpdAcc, SampleSize)}]}
end;
reader({direct_fetch, PositionList, Info}, From, State) ->
reader({call, From}, {direct_fetch, PositionList, Info}, State) ->
H = State#state.handle,
FilterFalseKey =
fun(Tpl) ->
@ -698,10 +696,12 @@ reader({direct_fetch, PositionList, Info}, From, State) ->
FilterFalseKey(extract_key(H, P)) end,
PositionList),
MapFun = fun(T) -> element(1, T) end,
{reply, lists:map(MapFun, FM), reader, State};
{keep_state_and_data,
[{reply, From, lists:map(MapFun, FM)}]};
key_size ->
FilterFun = fun(P) -> FilterFalseKey(extract_key_size(H, P)) end,
{reply, lists:filtermap(FilterFun, PositionList), reader, State};
{keep_state_and_data,
[{reply, From, lists:filtermap(FilterFun, PositionList)}]};
key_value_check ->
BM = State#state.binary_mode,
MapFun = fun(P) -> extract_key_value_check(H, P, BM) end,
@ -709,38 +709,41 @@ reader({direct_fetch, PositionList, Info}, From, State) ->
% hibernate the process that is likely to be used again. However,
% a significant amount of unused binary references may have
% accumulated, so push a GC at this point
gen_fsm:reply(From, lists:map(MapFun, PositionList)),
gen_statem:reply(From, lists:map(MapFun, PositionList)),
garbage_collect(),
{next_state, reader, State}
{keep_state_and_data, []}
end;
reader(cdb_complete, _From, State) ->
reader({call, From}, cdb_complete, State) ->
leveled_log:log(cdb05, [State#state.filename, reader, cdb_ccomplete]),
ok = file:close(State#state.handle),
{stop, normal, {ok, State#state.filename}, State#state{handle=undefined}};
reader(check_hashtable, _From, State) ->
{reply, true, reader, State}.
reader({delete_pending, 0, no_poll}, State) ->
{next_state,
delete_pending,
State#state{delete_point=0}};
reader({delete_pending, ManSQN, Inker}, State) ->
{stop_and_reply,
normal,
[{reply, From, {ok, State#state.filename}}],
State#state{handle=undefined}};
reader({call, From}, check_hashtable, _State) ->
{keep_state_and_data, [{reply, From, true}]};
reader({call, From}, Event, State) ->
handle_sync_event(Event, From, State);
reader(cast, {delete_pending, 0, no_poll}, State) ->
{next_state, delete_pending, State#state{delete_point=0}};
reader(cast, {delete_pending, ManSQN, Inker}, State) ->
{next_state,
delete_pending,
State#state{delete_point=ManSQN, inker=Inker},
?DELETE_TIMEOUT}.
?DELETE_TIMEOUT};
reader(cast, clerk_complete, _State) ->
{keep_state_and_data, [hibernate]}.
delete_pending({get_kv, Key}, _From, State) ->
delete_pending({call, From}, {get_kv, Key}, State) ->
Result =
get_withcache(State#state.handle,
Key,
State#state.hash_index,
State#state.binary_mode,
State#state.monitor),
{reply, Result, delete_pending, State, ?DELETE_TIMEOUT};
delete_pending({key_check, Key}, _From, State) ->
{keep_state_and_data, [{reply, From, Result}, ?DELETE_TIMEOUT]};
delete_pending({call, From}, {key_check, Key}, State) ->
Result =
get_withcache(State#state.handle,
Key,
@ -748,41 +751,44 @@ delete_pending({key_check, Key}, _From, State) ->
loose_presence,
State#state.binary_mode,
{no_monitor, 0}),
{reply, Result, delete_pending, State, ?DELETE_TIMEOUT}.
delete_pending(timeout, State=#state{delete_point=ManSQN}) when ManSQN > 0 ->
{keep_state_and_data, [{reply, From, Result}, ?DELETE_TIMEOUT]};
delete_pending({call, From}, cdb_close, State) ->
leveled_log:log(cdb05, [State#state.filename, delete_pending, cdb_close]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop_and_reply, normal, [{reply, From, ok}]};
delete_pending(cast, delete_confirmed, State=#state{delete_point=ManSQN}) ->
leveled_log:log(cdb04, [State#state.filename, ManSQN]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal};
delete_pending(cast, destroy, State) ->
leveled_log:log(cdb05, [State#state.filename, delete_pending, destroy]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal};
delete_pending(
timeout, _, State=#state{delete_point=ManSQN}) when ManSQN > 0 ->
case is_process_alive(State#state.inker) of
true ->
ok =
leveled_inker:ink_confirmdelete(State#state.inker,
ManSQN,
self()),
{next_state, delete_pending, State, ?DELETE_TIMEOUT};
{keep_state_and_data, [?DELETE_TIMEOUT]};
false ->
leveled_log:log(cdb04, [State#state.filename, ManSQN]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, State}
end;
delete_pending(delete_confirmed, State=#state{delete_point=ManSQN}) ->
leveled_log:log(cdb04, [State#state.filename, ManSQN]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, State};
delete_pending(destroy, State) ->
leveled_log:log(cdb05, [State#state.filename, delete_pending, destroy]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, State}.
{stop, normal}
end.
handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
From,
StateName,
State) ->
handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, From, State) ->
{ok, EndPos0} = file:position(State#state.handle, eof),
{ok, StartPos0} =
case StartPos of
@ -814,12 +820,12 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
% garbage_collect/0 is used in preference to hibernate, as we're generally
% scanning in batches at startup - so the process will be needed straight
% away.
gen_fsm:reply(From, {LastPosition, Acc2}),
gen_statem:reply(From, {LastPosition, Acc2}),
garbage_collect(),
{next_state, StateName, State};
handle_sync_event(cdb_lastkey, _From, StateName, State) ->
{reply, State#state.last_key, StateName, State};
handle_sync_event(cdb_firstkey, _From, StateName, State) ->
{keep_state_and_data, []};
handle_sync_event(cdb_lastkey, From, State) ->
{keep_state_and_data, [{reply, From, State#state.last_key}]};
handle_sync_event(cdb_firstkey, From, State) ->
{ok, EOFPos} = file:position(State#state.handle, eof),
FilterFun = fun(Key, _V, _P, _O, _Fun) -> {stop, Key} end,
FirstKey =
@ -835,13 +841,12 @@ handle_sync_event(cdb_firstkey, _From, StateName, State) ->
State#state.last_key),
FirstScanKey
end,
{reply, FirstKey, StateName, State};
handle_sync_event(cdb_filename, _From, StateName, State) ->
{reply, State#state.filename, StateName, State};
handle_sync_event(cdb_isrolling, _From, StateName, State) ->
{reply, StateName == rolling, StateName, State};
handle_sync_event({get_cachedscore, {NowMega, NowSecs, _}},
_From, StateName, State) ->
{keep_state_and_data, [{reply, From, FirstKey}]};
handle_sync_event(cdb_filename, From, State) ->
{keep_state_and_data, [{reply, From, State#state.filename}]};
handle_sync_event(cdb_isrolling, From, _State) ->
{keep_state_and_data, [{reply, From, false}]};
handle_sync_event({get_cachedscore, {NowMega, NowSecs, _}}, From, State) ->
ScoreToReturn =
case State#state.cached_score of
undefined ->
@ -855,30 +860,19 @@ handle_sync_event({get_cachedscore, {NowMega, NowSecs, _}},
Score
end
end,
{reply, ScoreToReturn, StateName, State};
handle_sync_event({put_cachedscore, Score}, _From, StateName, State) ->
{reply, ok, StateName, State#state{cached_score = {Score,os:timestamp()}}};
handle_sync_event(cdb_close, _From, delete_pending, State) ->
leveled_log:log(cdb05, [State#state.filename, delete_pending, cdb_close]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, ok, State};
handle_sync_event(cdb_close, _From, _StateName, State) ->
{keep_state_and_data, [{reply, From, ScoreToReturn}]};
handle_sync_event({put_cachedscore, Score}, From, State) ->
{keep_state,
State#state{cached_score = {Score,os:timestamp()}},
[{reply, From, ok}]};
handle_sync_event(cdb_close, From, State) ->
file:close(State#state.handle),
{stop, normal, ok, State}.
handle_event(clerk_complete, StateName, State) ->
{next_state, StateName, State, hibernate}.
handle_info(_Msg, StateName, State) ->
{next_state, StateName, State}.
{stop_and_reply, normal, [{reply, From, ok}]}.
terminate(_Reason, _StateName, _State) ->
ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
@ -2731,9 +2725,6 @@ getpositions_sample_test() ->
nonsense_coverage_test() ->
?assertMatch({next_state, reader, #state{}}, handle_info(nonsense,
reader,
#state{})),
?assertMatch({ok, reader, #state{}}, code_change(nonsense,
reader,
#state{},

View file

@ -168,6 +168,8 @@
{info, <<"At level=~w file_count=~w avg_mem=~w file with most memory fn=~s p=~w mem=~w">>},
pc024 =>
{info, <<"Grooming compaction picked file with tomb_count=~w">>},
pc025 =>
{info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>},
pm002 =>
{info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>},
sst03 =>

View file

@ -200,11 +200,15 @@ handle_work(
list(leveled_pmanifest:manifest_entry())}.
merge(SrcLevel, Manifest, RootPath, OptsSST) ->
case leveled_pmanifest:report_manifest_level(Manifest, SrcLevel + 1) of
{0, 0, undefined} ->
{0, 0, undefined, 0, 0, 0, 0} ->
ok;
{FCnt, AvgMem, {MaxFN, MaxP, MaxMem}} ->
leveled_log:log(pc023,
[SrcLevel + 1, FCnt, AvgMem, MaxFN, MaxP, MaxMem])
{FCnt, MnMem, {MaxFN, MaxP, MaxMem}, MnHBS, MnHS, MnLHS, MnBVHS} ->
leveled_log:log(
pc023,
[SrcLevel + 1, FCnt, MnMem, MaxFN, MaxP, MaxMem]),
leveled_log:log(
pc025,
[SrcLevel + 1, FCnt, MnHBS, MnHS, MnLHS, MnBVHS])
end,
SelectMethod =
case leveled_rand:uniform(100) of

View file

@ -498,7 +498,7 @@ pcl_workforclerk(Pid) ->
-spec pcl_manifestchange(pid(), leveled_pmanifest:manifest()) -> ok.
%% @doc
%% Provide a manifest record (i.e. the output of the leveled_pmanifest module)
%% that is required to beocme the new manifest.
%% that is required to become the new manifest.
pcl_manifestchange(Pid, Manifest) ->
gen_server:cast(Pid, {manifest_change, Manifest}).

View file

@ -266,11 +266,16 @@ remove_manifest(RootPath, GC_SQN) ->
end.
-spec report_manifest_level(manifest(), non_neg_integer()) ->
{non_neg_integer(),
non_neg_integer(),
{string(), pid(), non_neg_integer()} |
undefined}.
-spec report_manifest_level(
manifest(), non_neg_integer()) ->
{non_neg_integer(),
non_neg_integer(),
{string(), pid(), non_neg_integer()} |
undefined,
non_neg_integer(),
non_neg_integer(),
non_neg_integer(),
non_neg_integer()}.
%% @doc
%% Report on a level in the manifest
%% - How many files in the level
@ -287,7 +292,7 @@ report_manifest_level(Manifest, LevelIdx) ->
{leveled_tree:tsize(Level), leveled_tree:to_list(Level)}
end,
AccMemFun =
fun(MaybeME, {MemAcc, Max}) ->
fun(MaybeME, {MemAcc, Max, HBSAcc, HSAcc, LHSAcc, BVHSAcc}) ->
ME = get_manifest_entry(MaybeME),
P = ME#manifest_entry.owner,
{memory, PM} = process_info(P, memory),
@ -298,15 +303,26 @@ report_manifest_level(Manifest, LevelIdx) ->
_ ->
{ME#manifest_entry.filename, P, PM}
end,
{MemAcc + PM, UpdMax}
{garbage_collection_info, GCI} =
process_info(P, garbage_collection_info),
HBS = proplists:get_value(heap_block_size, GCI),
HS = proplists:get_value(heap_size, GCI),
LHS = proplists:get_value(recent_size, GCI),
BVHS = proplists:get_value(bin_vheap_size, GCI),
{MemAcc + PM, UpdMax,
HBSAcc + HBS, HSAcc + HS, LHSAcc + LHS, BVHSAcc + BVHS}
end,
case LevelSize of
0 ->
{0, 0, undefined};
{0, 0, undefined, 0, 0, 0, 0};
_ ->
{TotalMem, BiggestMem} =
lists:foldl(AccMemFun, {0, undefined}, LevelList),
{LevelSize, TotalMem div LevelSize, BiggestMem}
{TotalMem, BiggestMem, TotalHBS, TotalHS, TotalLHS, TotalBVBS} =
lists:foldl(AccMemFun, {0, undefined, 0, 0, 0, 0}, LevelList),
{LevelSize, TotalMem div LevelSize, BiggestMem,
TotalHBS div LevelSize,
TotalHS div LevelSize,
TotalLHS div LevelSize,
TotalBVBS div LevelSize}
end.

File diff suppressed because it is too large Load diff

View file

@ -68,6 +68,7 @@ replace_everything(_Config) ->
testutil:put_altered_indexed_objects(Book1, BKT, KSpcL1),
ok = testutil:check_indexed_objects(Book1, BKT, KSpcL2, V2),
compact_and_wait(Book1, 1000),
compact_and_wait(Book1, 1000),
{ok, FileList1} = file:list_dir(CompPath),
io:format("Number of files after compaction ~w~n", [length(FileList1)]),
compact_and_wait(Book1, 1000),