Mas i363 d31 (#364)

* Reduce size of state on terminate (#363)

Otherwise large volume of keys will be written on failure of the process

* Add format_status to leveled_sst
This commit is contained in:
Martin Sumner 2021-10-27 13:42:53 +01:00 committed by GitHub
parent beb8ba14a5
commit 43ec9a4eab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 11 deletions

View file

@ -167,7 +167,8 @@
handle_cast/2, handle_cast/2,
handle_info/2, handle_info/2,
terminate/2, terminate/2,
code_change/3]). code_change/3,
format_status/2]).
-export([ -export([
pcl_snapstart/1, pcl_snapstart/1,
@ -240,7 +241,8 @@
-define(TIMING_SAMPLESIZE, 100). -define(TIMING_SAMPLESIZE, 100).
-define(OPEN_LASTMOD_RANGE, {0, infinity}). -define(OPEN_LASTMOD_RANGE, {0, infinity}).
-record(state, {manifest, % a manifest record from the leveled_manifest module -record(state, {manifest ::
leveled_pmanifest:manifest() | undefined | redacted,
persisted_sqn = 0 :: integer(), % The highest SQN persisted persisted_sqn = 0 :: integer(), % The highest SQN persisted
ledger_sqn = 0 :: integer(), % The highest SQN added to L0 ledger_sqn = 0 :: integer(), % The highest SQN added to L0
@ -250,18 +252,18 @@
levelzero_pending = false :: boolean(), levelzero_pending = false :: boolean(),
levelzero_constructor :: pid() | undefined, levelzero_constructor :: pid() | undefined,
levelzero_cache = [] :: levelzero_cache(), levelzero_cache = [] :: levelzero_cache() | redacted,
levelzero_size = 0 :: integer(), levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer() | undefined, levelzero_maxcachesize :: integer() | undefined,
levelzero_cointoss = false :: boolean(), levelzero_cointoss = false :: boolean(),
levelzero_index, % An array levelzero_index :: array:array() | undefined | redacted,
is_snapshot = false :: boolean(), is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(), snapshot_fully_loaded = false :: boolean(),
snapshot_time :: pos_integer() | undefined, snapshot_time :: pos_integer() | undefined,
source_penciller :: pid() | undefined, source_penciller :: pid() | undefined,
bookie_monref :: reference() | undefined, bookie_monref :: reference() | undefined,
levelzero_astree :: list() | undefined, levelzero_astree :: list() | undefined | redacted,
work_ongoing = false :: boolean(), % i.e. compaction work work_ongoing = false :: boolean(), % i.e. compaction work
work_backlog = false :: boolean(), % i.e. compaction work work_backlog = false :: boolean(), % i.e. compaction work
@ -336,7 +338,7 @@
%% When starting a clone a query can also be passed. This prevents the whole %% When starting a clone a query can also be passed. This prevents the whole
%% Level Zero memory space from being copied to the snapshot, instead the %% Level Zero memory space from being copied to the snapshot, instead the
%% query is run against the level zero space and just the query results are %% query is run against the level zero space and just the query results are
%5 copied into the clone. %% copied into the clone.
pcl_start(PCLopts) -> pcl_start(PCLopts) ->
gen_server:start_link(?MODULE, [leveled_log:get_opts(), PCLopts], []). gen_server:start_link(?MODULE, [leveled_log:get_opts(), PCLopts], []).
@ -1111,6 +1113,14 @@ terminate(Reason, _State=#state{is_snapshot=Snap}) when Snap == true ->
terminate(Reason, _State) -> terminate(Reason, _State) ->
leveled_log:log("P0011", [Reason]). leveled_log:log("P0011", [Reason]).
format_status(normal, [_PDict, State]) ->
State;
format_status(terminate, [_PDict, State]) ->
State#state{manifest = redacted,
levelzero_cache = redacted,
levelzero_index = redacted,
levelzero_astree = redacted}.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -2060,6 +2070,23 @@ shutdown_when_compact(Pid) ->
io:format("No outstanding compaction work for ~w~n", [Pid]), io:format("No outstanding compaction work for ~w~n", [Pid]),
pcl_close(Pid). pcl_close(Pid).
format_status_test() ->
RootPath = "test/test_area/ledger",
clean_testdir(RootPath),
{ok, PCL} =
pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000,
sst_options=#sst_options{}}),
{status, PCL, {module, gen_server}, SItemL} = sys:get_status(PCL),
S = lists:keyfind(state, 1, lists:nth(5, SItemL)),
true = is_integer(array:size(element(2, S#state.manifest))),
ST = format_status(terminate, [dict:new(), S]),
?assertMatch(redacted, ST#state.manifest),
?assertMatch(redacted, ST#state.levelzero_cache),
?assertMatch(redacted, ST#state.levelzero_index),
?assertMatch(redacted, ST#state.levelzero_astree),
clean_testdir(RootPath).
simple_server_test() -> simple_server_test() ->
RootPath = "test/test_area/ledger", RootPath = "test/test_area/ledger",
clean_testdir(RootPath), clean_testdir(RootPath),

View file

@ -105,6 +105,7 @@
handle_info/3, handle_info/3,
terminate/3, terminate/3,
code_change/4, code_change/4,
format_status/2,
starting/2, starting/2,
starting/3, starting/3,
reader/2, reader/2,
@ -182,8 +183,7 @@
:: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}. :: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}.
-type sst_summary() -type sst_summary()
:: #summary{}. :: #summary{}.
-type blockindex_cache() -type blockindex_cache() :: array:array().
:: any(). % An array but OTP 16 types
%% yield_blockquery is used to determine if the work necessary to process a %% yield_blockquery is used to determine if the work necessary to process a
%% range query beyond the fetching the slot should be managed from within %% range query beyond the fetching the slot should be managed from within
@ -199,13 +199,15 @@
root_path, root_path,
filename, filename,
yield_blockquery = false :: boolean(), yield_blockquery = false :: boolean(),
blockindex_cache :: blockindex_cache()|undefined, blockindex_cache ::
blockindex_cache() | undefined |redacted,
compression_method = native :: press_method(), compression_method = native :: press_method(),
index_moddate = ?INDEX_MODDATE :: boolean(), index_moddate = ?INDEX_MODDATE :: boolean(),
timings = no_timing :: sst_timings(), timings = no_timing :: sst_timings(),
timings_countdown = 0 :: integer(), timings_countdown = 0 :: integer(),
starting_pid :: pid()|undefined, starting_pid :: pid()|undefined,
fetch_cache = array:new([{size, ?CACHE_SIZE}]), fetch_cache = array:new([{size, ?CACHE_SIZE}]) ::
array:array() | redacted,
new_slots :: list()|undefined, new_slots :: list()|undefined,
deferred_startup_tuple :: tuple()|undefined, deferred_startup_tuple :: tuple()|undefined,
level :: non_neg_integer()|undefined, level :: non_neg_integer()|undefined,
@ -886,6 +888,12 @@ terminate(Reason, _StateName, State) ->
code_change(_OldVsn, StateName, State, _Extra) -> code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}. {ok, StateName, State}.
format_status(normal, [_PDict, State]) ->
State;
format_status(terminate, [_PDict, State]) ->
State#state{blockindex_cache = redacted,
fetch_cache = redacted}.
%%%============================================================================ %%%============================================================================
%%% External Functions %%% External Functions
@ -1962,7 +1970,7 @@ binarysplit_mapfun(MultiSlotBin, StartPos) ->
-spec read_slots(file:io_device(), list(), -spec read_slots(file:io_device(), list(),
{false|list(), non_neg_integer(), binary()}, {false|list(), non_neg_integer(), blockindex_cache()},
press_method(), boolean()) -> press_method(), boolean()) ->
{boolean(), list(binaryslot_element())}. {boolean(), list(binaryslot_element())}.
%% @doc %% @doc
@ -3644,6 +3652,24 @@ delete_pending_tester() ->
timer:sleep(?DELETE_TIMEOUT + 1000), timer:sleep(?DELETE_TIMEOUT + 1000),
?assertMatch(false, is_process_alive(Pid)). ?assertMatch(false, is_process_alive(Pid)).
fetch_status_test() ->
{RP, Filename} = {?TEST_AREA, "fetchstatus_test"},
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 4, 1, 20),
KVList1 = lists:ukeysort(1, KVList0),
[{FirstKey, _FV}|_Rest] = KVList1,
{LastKey, _LV} = lists:last(KVList1),
{ok, Pid, {FirstKey, LastKey}, _Bloom} =
testsst_new(RP, Filename, 1, KVList1, length(KVList1), native),
{status, Pid, {module, gen_fsm}, SItemL} = sys:get_status(Pid),
S = lists:keyfind(state, 1, lists:nth(5, SItemL)),
true = is_integer(array:size(S#state.fetch_cache)),
true = is_integer(array:size(S#state.blockindex_cache)),
ST = format_status(terminate, [dict:new(), S]),
?assertMatch(redacted, ST#state.blockindex_cache),
?assertMatch(redacted, ST#state.fetch_cache),
ok = sst_close(Pid),
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
simple_persisted_test_() -> simple_persisted_test_() ->
{timeout, 60, fun simple_persisted_test_bothformats/0}. {timeout, 60, fun simple_persisted_test_bothformats/0}.