From 43ec9a4eab02123e147318232654ed58e2ceea64 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 27 Oct 2021 13:42:53 +0100 Subject: [PATCH] Mas i363 d31 (#364) * Reduce size of state on terminate (#363) Otherwise large volume of keys will be written on failure of the process * Add format_status to leveled_sst --- src/leveled_penciller.erl | 39 +++++++++++++++++++++++++++++++++------ src/leveled_sst.erl | 36 +++++++++++++++++++++++++++++++----- 2 files changed, 64 insertions(+), 11 deletions(-) diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a756ad3..96187f5 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -167,7 +167,8 @@ handle_cast/2, handle_info/2, terminate/2, - code_change/3]). + code_change/3, + format_status/2]). -export([ pcl_snapstart/1, @@ -240,7 +241,8 @@ -define(TIMING_SAMPLESIZE, 100). -define(OPEN_LASTMOD_RANGE, {0, infinity}). --record(state, {manifest, % a manifest record from the leveled_manifest module +-record(state, {manifest :: + leveled_pmanifest:manifest() | undefined | redacted, persisted_sqn = 0 :: integer(), % The highest SQN persisted ledger_sqn = 0 :: integer(), % The highest SQN added to L0 @@ -250,18 +252,18 @@ levelzero_pending = false :: boolean(), levelzero_constructor :: pid() | undefined, - levelzero_cache = [] :: levelzero_cache(), + levelzero_cache = [] :: levelzero_cache() | redacted, levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer() | undefined, levelzero_cointoss = false :: boolean(), - levelzero_index, % An array + levelzero_index :: array:array() | undefined | redacted, is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), snapshot_time :: pos_integer() | undefined, source_penciller :: pid() | undefined, bookie_monref :: reference() | undefined, - levelzero_astree :: list() | undefined, + levelzero_astree :: list() | undefined | redacted, work_ongoing = false :: boolean(), % i.e. compaction work work_backlog = false :: boolean(), % i.e. compaction work @@ -336,7 +338,7 @@ %% When starting a clone a query can also be passed. This prevents the whole %% Level Zero memory space from being copied to the snapshot, instead the %% query is run against the level zero space and just the query results are -%5 copied into the clone. +%% copied into the clone. pcl_start(PCLopts) -> gen_server:start_link(?MODULE, [leveled_log:get_opts(), PCLopts], []). @@ -1111,6 +1113,14 @@ terminate(Reason, _State=#state{is_snapshot=Snap}) when Snap == true -> terminate(Reason, _State) -> leveled_log:log("P0011", [Reason]). +format_status(normal, [_PDict, State]) -> + State; +format_status(terminate, [_PDict, State]) -> + State#state{manifest = redacted, + levelzero_cache = redacted, + levelzero_index = redacted, + levelzero_astree = redacted}. + code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -2060,6 +2070,23 @@ shutdown_when_compact(Pid) -> io:format("No outstanding compaction work for ~w~n", [Pid]), pcl_close(Pid). +format_status_test() -> + RootPath = "test/test_area/ledger", + clean_testdir(RootPath), + {ok, PCL} = + pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000, + sst_options=#sst_options{}}), + {status, PCL, {module, gen_server}, SItemL} = sys:get_status(PCL), + S = lists:keyfind(state, 1, lists:nth(5, SItemL)), + true = is_integer(array:size(element(2, S#state.manifest))), + ST = format_status(terminate, [dict:new(), S]), + ?assertMatch(redacted, ST#state.manifest), + ?assertMatch(redacted, ST#state.levelzero_cache), + ?assertMatch(redacted, ST#state.levelzero_index), + ?assertMatch(redacted, ST#state.levelzero_astree), + clean_testdir(RootPath). + simple_server_test() -> RootPath = "test/test_area/ledger", clean_testdir(RootPath), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 2545559..abb41d7 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -105,6 +105,7 @@ handle_info/3, terminate/3, code_change/4, + format_status/2, starting/2, starting/3, reader/2, @@ -182,8 +183,7 @@ :: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}. -type sst_summary() :: #summary{}. --type blockindex_cache() - :: any(). % An array but OTP 16 types +-type blockindex_cache() :: array:array(). %% yield_blockquery is used to determine if the work necessary to process a %% range query beyond the fetching the slot should be managed from within @@ -199,13 +199,15 @@ root_path, filename, yield_blockquery = false :: boolean(), - blockindex_cache :: blockindex_cache()|undefined, + blockindex_cache :: + blockindex_cache() | undefined |redacted, compression_method = native :: press_method(), index_moddate = ?INDEX_MODDATE :: boolean(), timings = no_timing :: sst_timings(), timings_countdown = 0 :: integer(), starting_pid :: pid()|undefined, - fetch_cache = array:new([{size, ?CACHE_SIZE}]), + fetch_cache = array:new([{size, ?CACHE_SIZE}]) :: + array:array() | redacted, new_slots :: list()|undefined, deferred_startup_tuple :: tuple()|undefined, level :: non_neg_integer()|undefined, @@ -886,6 +888,12 @@ terminate(Reason, _StateName, State) -> code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. +format_status(normal, [_PDict, State]) -> + State; +format_status(terminate, [_PDict, State]) -> + State#state{blockindex_cache = redacted, + fetch_cache = redacted}. + %%%============================================================================ %%% External Functions @@ -1962,7 +1970,7 @@ binarysplit_mapfun(MultiSlotBin, StartPos) -> -spec read_slots(file:io_device(), list(), - {false|list(), non_neg_integer(), binary()}, + {false|list(), non_neg_integer(), blockindex_cache()}, press_method(), boolean()) -> {boolean(), list(binaryslot_element())}. %% @doc @@ -3644,6 +3652,24 @@ delete_pending_tester() -> timer:sleep(?DELETE_TIMEOUT + 1000), ?assertMatch(false, is_process_alive(Pid)). +fetch_status_test() -> + {RP, Filename} = {?TEST_AREA, "fetchstatus_test"}, + KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 4, 1, 20), + KVList1 = lists:ukeysort(1, KVList0), + [{FirstKey, _FV}|_Rest] = KVList1, + {LastKey, _LV} = lists:last(KVList1), + {ok, Pid, {FirstKey, LastKey}, _Bloom} = + testsst_new(RP, Filename, 1, KVList1, length(KVList1), native), + {status, Pid, {module, gen_fsm}, SItemL} = sys:get_status(Pid), + S = lists:keyfind(state, 1, lists:nth(5, SItemL)), + true = is_integer(array:size(S#state.fetch_cache)), + true = is_integer(array:size(S#state.blockindex_cache)), + ST = format_status(terminate, [dict:new(), S]), + ?assertMatch(redacted, ST#state.blockindex_cache), + ?assertMatch(redacted, ST#state.fetch_cache), + ok = sst_close(Pid), + ok = file:delete(filename:join(RP, Filename ++ ".sst")). + simple_persisted_test_() -> {timeout, 60, fun simple_persisted_test_bothformats/0}.