From f8b3101a3af2c03aadbdfdb98b0fdbecf366bfa9 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 15 Jul 2019 13:44:39 +0100 Subject: [PATCH 01/10] Two memory management helpers Two helpers for memory management: 1 - a scan over the cdb file may lead to a lot of binary references being made. So force a GC fater the scan. 2 - the penciller files contain slots that will be frequently read - so advice the page cache to pre-load them on startup. This is in response to unexpected memory mangement issues in a potentially non-conventional setup - where the erlang VM held a lot of memory (that could be GC'd , in preference to the page cache - and consequently disk I/O and request latency were higher than expected. --- src/leveled_cdb.erl | 3 ++ src/leveled_sst.erl | 1 + test/end_to_end/riak_SUITE.erl | 57 +++++++++++++++++++++++++++++++--- 3 files changed, 57 insertions(+), 4 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 586f85a..ae1412a 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -1301,6 +1301,7 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> end, % Bring file back to that position {ok, Position} = file:position(Handle, {bof, Position}), + garbage_collect(), {eof, Output}; {Key, ValueAsBin, KeyLength, ValueLength} -> NewPosition = case Key of @@ -1316,10 +1317,12 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> Output, fun extract_valueandsize/1) of {stop, UpdOutput} -> + garbage_collect(), {Position, UpdOutput}; {loop, UpdOutput} -> case NewPosition of eof -> + garbage_collect(), {eof, UpdOutput}; _ -> scan_over_file(Handle, diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 6d98103..c5768cf 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -1277,6 +1277,7 @@ open_reader(Filename) -> <> = Lengths, + ok = file:advise(Handle, 9, SlotsLength, will_need), {ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength), {Handle, FileVersion, SummaryBin}. diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 7d0d70f..5e82ce7 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -1214,8 +1214,8 @@ dollar_bucket_index(_Config) -> bigobject_memorycheck(_Config) -> RootPath = testutil:reset_filestructure(), {ok, Bookie} = leveled_bookie:book_start(RootPath, - 100, - 100000000, + 200, + 1000000000, testutil:sync_strategy()), Bucket = <<"B">>, IndexGen = fun() -> [] end, @@ -1227,7 +1227,7 @@ bigobject_memorycheck(_Config) -> {Obj, Spc} = testutil:set_object(Bucket, Key, Value, IndexGen, []), testutil:book_riakput(Bookie, Obj, Spc) end, - lists:foreach(ObjPutFun, lists:seq(1, 600)), + lists:foreach(ObjPutFun, lists:seq(1, 700)), {ok, _Ink, Pcl} = leveled_bookie:book_returnactors(Bookie), {binary, BL} = process_info(Pcl, binary), {memory, M0} = process_info(Pcl, memory), @@ -1235,5 +1235,54 @@ bigobject_memorycheck(_Config) -> io:format("Pcl binary memory ~w ~w memory ~w~n", [B0, length(BL), M0]), true = B0 < 500 * 4000, true = M0 < 500 * 4000, + % All processes + {_TotalCDBBinMem, _TotalCDBProcesses} = cdb_memory_check(), ok = leveled_bookie:book_close(Bookie), - testutil:reset_filestructure(). \ No newline at end of file + {ok, BookieR} = leveled_bookie:book_start(RootPath, + 2000, + 1000000000, + testutil:sync_strategy()), + {RS_TotalCDBBinMem, _RS_TotalCDBProcesses} = cdb_memory_check(), + true = RS_TotalCDBBinMem < 1024 * 1024, + % No binary object references exist after startup + ok = leveled_bookie:book_close(BookieR), + testutil:reset_filestructure(). + + +cdb_memory_check() -> + TotalCDBProcesses = + lists:filter(fun(P) -> + {dictionary, PD} = + process_info(P, dictionary), + case lists:keyfind('$initial_call', 1, PD) of + {'$initial_call',{leveled_cdb,init,1}} -> + true; + _ -> + false + end + end, + processes()), + TotalCDBBinMem = + lists:foldl(fun(P, Acc) -> + BinMem = calc_total_binary_memory(P), + io:format("Memory for pid ~w is ~w~n", [P, BinMem]), + BinMem + Acc + end, + 0, + TotalCDBProcesses), + io:format("Total binary memory ~w in ~w CDB processes~n", + [TotalCDBBinMem, length(TotalCDBProcesses)]), + {TotalCDBBinMem, TotalCDBProcesses}. + +calc_total_binary_memory(Pid) -> + {binary, BL} = process_info(Pid, binary), + TBM = lists:foldl(fun({_R, Sz, _C}, Acc) -> Acc + Sz end, 0, BL), + case TBM > 1000000 of + true -> + FilteredBL = + lists:filter(fun(BMD) -> element(2, BMD) > 1024 end, BL), + io:format("Big-ref details for ~w ~w~n", [Pid, FilteredBL]); + false -> + ok + end, + TBM. \ No newline at end of file From 478c5b6db0b1e61bf0b5d47f14e5b0da414d090f Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 16 Jul 2019 10:25:49 +0100 Subject: [PATCH 02/10] Load ledger in reverse order Now that the SST files will fadvise on load (to force load into the page cache). The load should take place in reverse order, so that if th eledger is > page_cache, it is the higher levels that will end up in the cache at the expense of the lower levels. --- src/leveled_pmanifest.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index 84c02e3..a1cce11 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -152,6 +152,11 @@ copy_manifest(Manifest) -> %% manifest. The PidFun should be able to return the Pid of a file process %% (having started one). The SQNFun will return the max sequence number %% of that file, if passed the Pid that owns it. +%% +%% The manifest is started from the basement first, and then the higher levels +%% as the page cache will be loaded with each file, and it would be +%% preferable to have the higher levels in the cache if memory is insufficient +%% to load each level load_manifest(Manifest, LoadFun, SQNFun) -> UpdateLevelFun = fun(LevelIdx, {AccMaxSQN, AccMan, AccFL}) -> @@ -171,7 +176,7 @@ load_manifest(Manifest, LoadFun, SQNFun) -> end, lists:foldl(UpdateLevelFun, {0, Manifest, []}, - lists:seq(0, Manifest#manifest.basement)). + lists:reverse(lists:seq(0, Manifest#manifest.basement))). -spec close_manifest(manifest(), fun()) -> ok. %% @doc From 3c834afa082f74ae7eb1ba4bfd2d24baaa9e0d7f Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 18 Jul 2019 13:07:48 +0100 Subject: [PATCH 03/10] Use hibernate on open or roll to read CDB files may be opened or rolled then left untouched for a period, so clean up any memory. Been awoken from hibernate has a cost, but it is a rare event. --- src/leveled_cdb.erl | 86 +++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index ae1412a..8360a7f 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -436,28 +436,31 @@ starting({open_writer, Filename}, _From, State) -> {WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy), leveled_log:log("CDB13", [WriteOps]), {ok, Handle} = file:open(Filename, WriteOps), - {reply, ok, writer, State#state{handle=Handle, - sync_strategy = UpdStrategy, - last_position=LastPosition, - last_key=LastKey, - filename=Filename, - hashtree=HashTree}}; + State0 = State#state{handle=Handle, + sync_strategy = UpdStrategy, + last_position=LastPosition, + last_key=LastKey, + filename=Filename, + hashtree=HashTree} + {reply, ok, writer, State0, hibernate}; starting({open_reader, Filename}, _From, State) -> leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, false), - {reply, ok, reader, State#state{handle=Handle, - last_key=LastKey, - filename=Filename, - hash_index=Index}}; + State0 = State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index} + {reply, ok, reader, State0, hibernate}; starting({open_reader, Filename, LastKey}, _From, State) -> leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), - {reply, ok, reader, State#state{handle=Handle, - last_key=LastKey, - filename=Filename, - hash_index=Index}}. + State0 = State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index} + {reply, ok, reader, State0, hibernate}. writer({get_kv, Key}, _From, State) -> {reply, @@ -566,18 +569,16 @@ rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> ets:delete(State#state.hashtree), {NewHandle, Index, LastKey} = open_for_readonly(NewName, State#state.last_key), + State0 = State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + hash_index=Index}, case State#state.deferred_delete of true -> - {reply, ok, delete_pending, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - hash_index=Index}}; + {reply, ok, delete_pending, State0}; false -> leveled_log:log_timer("CDB18", [], SW), - {reply, ok, reader, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - hash_index=Index}} + {reply, ok, reader, State0, hibernate} end; rolling(check_hashtable, _From, State) -> {reply, false, rolling, State}. @@ -720,24 +721,21 @@ delete_pending(destroy, State) -> handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, - _From, + From, StateName, State) -> {ok, EndPos0} = file:position(State#state.handle, eof), - {ok, StartPos0} = case StartPos of - undefined -> - file:position(State#state.handle, - ?BASE_POSITION); - StartPos -> - {ok, StartPos} - end, + {ok, StartPos0} = + case StartPos of + undefined -> + file:position(State#state.handle, ?BASE_POSITION); + StartPos -> + {ok, StartPos} + end, file:position(State#state.handle, StartPos0), - file:advise(State#state.handle, - StartPos0, - EndPos0 - StartPos0, - sequential), - MaybeEnd = (check_last_key(State#state.last_key) == empty) or - (StartPos0 >= (EndPos0 - ?DWORD_SIZE)), + MaybeEnd = + (check_last_key(State#state.last_key) == empty) or + (StartPos0 >= (EndPos0 - ?DWORD_SIZE)), {LastPosition, Acc2} = case MaybeEnd of true -> @@ -749,12 +747,13 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, Acc, State#state.last_key) end, - {ok, LastReadPos} = file:position(State#state.handle, cur), - file:advise(State#state.handle, - StartPos0, - LastReadPos - StartPos0, - dont_need), - {reply, {LastPosition, Acc2}, StateName, State}; + % The scan may have created a lot of binary references, clear up the + % reference counters for this process here manually. The cdb process + % may be inactive for a period after the scan, and so GC may not kick in + % otherwise + gen_fsm:reply(From, {LastPosition, Acc2}), + garbage_collect(), + {next_state, StateName, State}; handle_sync_event(cdb_lastkey, _From, StateName, State) -> {reply, State#state.last_key, StateName, State}; handle_sync_event(cdb_firstkey, _From, StateName, State) -> @@ -1301,7 +1300,6 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> end, % Bring file back to that position {ok, Position} = file:position(Handle, {bof, Position}), - garbage_collect(), {eof, Output}; {Key, ValueAsBin, KeyLength, ValueLength} -> NewPosition = case Key of @@ -1317,12 +1315,10 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> Output, fun extract_valueandsize/1) of {stop, UpdOutput} -> - garbage_collect(), {Position, UpdOutput}; {loop, UpdOutput} -> case NewPosition of eof -> - garbage_collect(), {eof, UpdOutput}; _ -> scan_over_file(Handle, From 5a853ee44d2aabffafa5ec9a62c6a06be2a97f3b Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 18 Jul 2019 13:10:11 +0100 Subject: [PATCH 04/10] Hibernate iclerk on completion of compaction Will be inactive for a period. Will also force garbage collection. --- src/leveled_iclerk.erl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 076d73b..bf0cca6 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -361,13 +361,12 @@ handle_cast(scoring_complete, State) -> ok = CloseFun(FilterServer), ok = leveled_inker:ink_clerkcomplete(State#state.inker, ManifestSlice, - FilesToDelete), - {noreply, State#state{scoring_state = undefined}}; + FilesToDelete); false -> ok = CloseFun(FilterServer), - ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []), - {noreply, State#state{scoring_state = undefined}} - end; + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []) + end, + {noreply, State#state{scoring_state = undefined}, hibernate}; handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> FilesToDelete = leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList), From 85bfa7fbb41e821fd172202b4a8b2f884ec54f8e Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 18 Jul 2019 13:21:38 +0100 Subject: [PATCH 05/10] Use hibernate not garbage_collect Use hibernation rather than manual garbage_collect calls as per standard recommendation. Hibernate will be default gabage_collect anyway. Maybe help with SST files that naturally go quiet. Plus typos from previous commit in leveled_cdb. --- src/leveled_cdb.erl | 9 +++++---- src/leveled_sst.erl | 6 ++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 8360a7f..a24feb4 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -58,7 +58,8 @@ {gen_fsm, sync_send_event, 2}, {gen_fsm, send_event, 2}, {gen_fsm, sync_send_all_state_event, 3}, - {gen_fsm, send_all_state_event, 2}]}). + {gen_fsm, send_all_state_event, 2}, + {gen_fsm, reply, 2}]}). -endif. -ifdef(slow_test). @@ -441,7 +442,7 @@ starting({open_writer, Filename}, _From, State) -> last_position=LastPosition, last_key=LastKey, filename=Filename, - hashtree=HashTree} + hashtree=HashTree}, {reply, ok, writer, State0, hibernate}; starting({open_reader, Filename}, _From, State) -> leveled_log:save(State#state.log_options), @@ -450,7 +451,7 @@ starting({open_reader, Filename}, _From, State) -> State0 = State#state{handle=Handle, last_key=LastKey, filename=Filename, - hash_index=Index} + hash_index=Index}, {reply, ok, reader, State0, hibernate}; starting({open_reader, Filename, LastKey}, _From, State) -> leveled_log:save(State#state.log_options), @@ -459,7 +460,7 @@ starting({open_reader, Filename, LastKey}, _From, State) -> State0 = State#state{handle=Handle, last_key=LastKey, filename=Filename, - hash_index=Index} + hash_index=Index}, {reply, ok, reader, State0, hibernate}. writer({get_kv, Key}, _From, State) -> diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index c5768cf..512c289 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -717,8 +717,7 @@ reader(close, _From, State) -> {stop, normal, ok, State}. reader({switch_levels, NewLevel}, State) -> - erlang:garbage_collect(self()), - {next_state, reader, State#state{level = NewLevel}}. + {next_state, reader, State#state{level = NewLevel}, hibernate}. delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> @@ -792,8 +791,7 @@ handle_info(tidyup_after_startup, delete_pending, State) -> handle_info(tidyup_after_startup, StateName, State) -> case is_process_alive(State#state.starting_pid) of true -> - erlang:garbage_collect(self()), - {next_state, StateName, State}; + {next_state, StateName, State, hibernate}; false -> {stop, normal, State} end. From 7862a6c523f6e302965d49d0a72e398c65df31d2 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 18 Jul 2019 14:00:19 +0100 Subject: [PATCH 06/10] Change page cache loading by lookup/no_lookup By default load the first 4 levels of the ledger into the page cache of lookup is to be supported, but just levels 0 and 1 otherwise. --- include/leveled.hrl | 3 ++- src/leveled_bookie.erl | 19 ++++++++++++++----- src/leveled_sst.erl | 24 +++++++++++++++++------- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 9357bd5..2074e46 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -51,7 +51,8 @@ :: leveled_sst:press_method(), log_options = leveled_log:get_opts() :: leveled_log:log_options(), - max_sstslots = 256 :: pos_integer()}). + max_sstslots = 256 :: pos_integer(), + pagecache_level = 1 :: pos_integer()}). -record(inker_options, {cdb_max_size :: integer() | undefined, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 35be019..c16f3e0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -127,6 +127,8 @@ -define(OPEN_LASTMOD_RANGE, {0, infinity}). -define(SNAPTIMEOUT_SHORT, 900). % 15 minutes -define(SNAPTIMEOUT_LONG, 43200). % 12 hours +-define(SST_PAGECACHELEVEL_NOLOOKUP, 1). +-define(SST_PAGECACHELEVEL_LOOKUP, 4). -define(OPTION_DEFAULTS, [{root_path, undefined}, {snapshot_bookie, undefined}, @@ -1177,15 +1179,22 @@ init([Opts]) -> ok end, - {HeadOnly, HeadLookup} = + {HeadOnly, HeadLookup, SSTPageCacheLevel} = case proplists:get_value(head_only, Opts) of false -> - {false, true}; + {false, true, ?SST_PAGECACHELEVEL_LOOKUP}; with_lookup -> - {true, true}; + {true, true, ?SST_PAGECACHELEVEL_LOOKUP}; no_lookup -> - {true, false} + {true, false, ?SST_PAGECACHELEVEL_NOLOOKUP} end, + % Override the default page cache level - we want to load into the + % page cache many levels if we intend to support lookups, and only + % levels 0 and 1 otherwise + SSTOpts = PencillerOpts#penciller_options.sst_options, + SSTOpts0 = SSTOpts#sst_options{pagecache_level = SSTPageCacheLevel}, + PencillerOpts0 = + PencillerOpts#penciller_options{sst_options = SSTOpts0}, State0 = #state{cache_size=CacheSize, is_snapshot=false, @@ -1193,7 +1202,7 @@ init([Opts]) -> head_lookup = HeadLookup}, {Inker, Penciller} = - startup(InkerOpts, PencillerOpts, State0), + startup(InkerOpts, PencillerOpts0, State0), NewETS = ets:new(mem, [ordered_set]), leveled_log:log("B0001", [Inker, Penciller]), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 512c289..e1a9d98 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -486,7 +486,9 @@ init([]) -> starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) -> leveled_log:save(OptsSST#sst_options.log_options), {UpdState, Bloom} = - read_file(Filename, State#state{root_path=RootPath}), + read_file(Filename, + State#state{root_path=RootPath}, + OptsSST#sst_options.pagecache_level >= Level), Summary = UpdState#state.summary, {reply, {ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom}, @@ -509,7 +511,8 @@ starting({sst_new, YBQ = Level =< 2, {UpdState, Bloom} = read_file(ActualFilename, - State#state{root_path=RootPath, yield_blockquery=YBQ}), + State#state{root_path=RootPath, yield_blockquery=YBQ}, + OptsSST#sst_options.pagecache_level >= Level), Summary = UpdState#state.summary, leveled_log:log_timer("SST08", [ActualFilename, Level, Summary#summary.max_sqn], @@ -573,7 +576,8 @@ starting(complete_l0startup, State) -> % Important to empty this from state rather % than carry it through to the next stage new_slots=undefined, - deferred_startup_tuple=undefined}), + deferred_startup_tuple=undefined}, + true), Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), @@ -1215,9 +1219,10 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin, filename:join(RootPath, FinalName)), FinalName. -read_file(Filename, State) -> +read_file(Filename, State, LoadPageCache) -> {Handle, FileVersion, SummaryBin} = - open_reader(filename:join(State#state.root_path, Filename)), + open_reader(filename:join(State#state.root_path, Filename), + LoadPageCache), UpdState0 = imp_fileversion(FileVersion, State), {Summary, Bloom, SlotList} = read_table_summary(SummaryBin), BlockIndexCache = array:new([{size, Summary#summary.size}, @@ -1269,13 +1274,18 @@ imp_fileversion(VersionInt, State) -> end, UpdState1. -open_reader(Filename) -> +open_reader(Filename, LoadPageCache) -> {ok, Handle} = file:open(Filename, [binary, raw, read]), {ok, Lengths} = file:pread(Handle, 0, 9), <> = Lengths, - ok = file:advise(Handle, 9, SlotsLength, will_need), + case LoadPageCache of + true -> + file:advise(Handle, 9, SlotsLength, will_need); + false -> + ok + end, {ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength), {Handle, FileVersion, SummaryBin}. From da1ecc144a91d0e56733a4ad639648e178202aa3 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 19 Jul 2019 13:30:53 +0100 Subject: [PATCH 07/10] Tidy-up GC on compaction Make sure we hibernate any CDB files after we score them, as they may not be used for sometime, and there may be garbage binary references present. --- src/leveled_cdb.erl | 78 ++++++++++++++++++++++++++---------------- src/leveled_iclerk.erl | 10 ++++-- 2 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index a24feb4..5c8caba 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -114,7 +114,8 @@ cdb_destroy/1, cdb_deletepending/1, cdb_deletepending/3, - cdb_isrolling/1]). + cdb_isrolling/1, + cdb_clerkcomplete/1]). -export([finished_rolling/1, hashtable_calc/2]). @@ -410,6 +411,14 @@ cdb_keycheck(Pid, Key) -> cdb_isrolling(Pid) -> gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity). +-spec cdb_clerkcomplete(pid()) -> ok. +%% @doc +%% When an Inker's clerk has finished with a CDB process, then it will call +%% complete. Currently this will prompt hibernation, as the CDB process may +%% not be needed for a period. +cdb_clerkcomplete(Pid) -> + gen_fsm:send_all_state_event(Pid, clerk_complete). + %%%============================================================================ %%% gen_server callbacks @@ -620,33 +629,40 @@ reader({get_positions, SampleSize, Index, Acc}, _From, State) -> _ -> {reply, lists:sublist(UpdAcc, SampleSize), reader, State} end; -reader({direct_fetch, PositionList, Info}, _From, State) -> +reader({direct_fetch, PositionList, Info}, From, State) -> H = State#state.handle, - FilterFalseKey = fun(Tpl) -> case element(1, Tpl) of - false -> - false; - _Key -> - {true, Tpl} - end end, - Reply = - case Info of - key_only -> - FM = lists:filtermap( - fun(P) -> - FilterFalseKey(extract_key(H, P)) end, - PositionList), - lists:map(fun(T) -> element(1, T) end, FM); - key_size -> - lists:filtermap( - fun(P) -> - FilterFalseKey(extract_key_size(H, P)) end, - PositionList); - key_value_check -> - BM = State#state.binary_mode, - lists:map(fun(P) -> extract_key_value_check(H, P, BM) end, - PositionList) + FilterFalseKey = + fun(Tpl) -> + case element(1, Tpl) of + false -> + false; + _Key -> + {true, Tpl} + end end, - {reply, Reply, reader, State}; + + case Info of + key_only -> + FM = lists:filtermap( + fun(P) -> + FilterFalseKey(extract_key(H, P)) end, + PositionList), + MapFun = fun(T) -> element(1, T) end, + {reply, lists:map(MapFun, FM), reader, State}; + key_size -> + FilterFun = fun(P) -> FilterFalseKey(extract_key_size(H, P)) end, + {reply, lists:filtermap(FilterFun, PositionList), reader, State}; + key_value_check -> + BM = State#state.binary_mode, + MapFun = fun(P) -> extract_key_value_check(H, P, BM) end, + % direct_fetch will occur in batches, so it doesn't make sense to + % hibernate the process that is likely to be used again. However, + % a significant amount of unused binary references may have + % accumulated, so push a GC at this point + gen_fsm:reply(From, lists:map(MapFun, PositionList)), + garbage_collect(), + {next_state, reader, State} + end; reader(cdb_complete, _From, State) -> leveled_log:log("CDB05", [State#state.filename, reader, cdb_ccomplete]), ok = file:close(State#state.handle), @@ -752,6 +768,10 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, % reference counters for this process here manually. The cdb process % may be inactive for a period after the scan, and so GC may not kick in % otherwise + % + % garbage_collect/0 is used in preference to hibernate, as we're generally + % scanning in batches at startup - so the process will be needed straight + % away. gen_fsm:reply(From, {LastPosition, Acc2}), garbage_collect(), {next_state, StateName, State}; @@ -790,8 +810,8 @@ handle_sync_event(cdb_close, _From, StateName, State) -> file:close(State#state.handle), {stop, normal, ok, State}. -handle_event(_Msg, StateName, State) -> - {next_state, StateName, State}. +handle_event(clerk_complete, StateName, State) -> + {next_state, StateName, State, hibernate}. handle_info(_Msg, StateName, State) -> {next_state, StateName, State}. @@ -2668,8 +2688,6 @@ getpositions_sample_test() -> nonsense_coverage_test() -> - {ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []), - ok = gen_fsm:send_all_state_event(Pid, nonsense), ?assertMatch({next_state, reader, #state{}}, handle_info(nonsense, reader, #state{})), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index bf0cca6..5ac8f3c 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -547,7 +547,8 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> end. -fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> +fetch_inbatches([], _BatchSize, CDB, CheckedList) -> + ok = leveled_cdb:cdb_clerkcomplete(CDB), CheckedList; fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> {Batch, Tail} = if @@ -698,6 +699,11 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, ActiveJournal0, ManSlice0, PressMethod), + % The inker's clerk will no longer need these (potentially large) binaries, + % so force garbage collection at this point. This will mean when we roll + % each CDB file there will be no remaining references to the binaries that + % have been transferred and the memory can immediately be cleared + garbage_collect(), compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, RStrategy, PressMethod, ManSlice1). @@ -762,7 +768,7 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> % strategy [KVC0|Acc]; {false, retain} -> - % If we have a retain startegy, it can't be + % If we have a retain strategy, it can't be % discarded - but the value part is no longer % required as this version has been replaced {JK0, JV0} = From c9c577259e541fe1f6f4ee31bd19f12b07846325 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 19 Jul 2019 13:37:27 +0100 Subject: [PATCH 08/10] Need to binary copy the header Otherwise the whole binary is kept in memory ... and the SST memory footprint is much bigger. --- src/leveled_sst.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index e1a9d98..acabb08 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -675,7 +675,7 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod}, {_ID, none} -> Cache; {ID, Header} -> - array:set(ID - 1, Header, Cache) + array:set(ID - 1, binary:copy(Header), Cache) end end, BlockIdxC0 = lists:foldl(FoldFun, State#state.blockindex_cache, BIC), @@ -1064,7 +1064,9 @@ fetch(LedgerKey, Hash, State, Timings0) -> {Result, Header} = binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate), BlockIndexCache = - array:set(SlotID - 1, Header, State#state.blockindex_cache), + array:set(SlotID - 1, + binary:copy(Header), + State#state.blockindex_cache), {_SW3, Timings3} = update_timings(SW2, Timings2, noncached_block, false), {Result, From 5bef21d9719a561beeaf735e168bd140d9771e38 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 22 Jul 2019 10:35:55 +0100 Subject: [PATCH 09/10] Add unit test to prove binary/copy issue Need to understand why the binary:copy is necessary - unit test now shows this. --- src/leveled_sst.erl | 54 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index acabb08..35b8690 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -2128,7 +2128,7 @@ crc_check_slot(FullBin) -> CRC32H:32/integer, Rest/binary>> = FullBin, PosBL0 = min(PosBL, byte_size(FullBin) - 12), - % If the position has been bit-flipped to beyond the maximum paossible + % If the position has been bit-flipped to beyond the maximum possible % length, use the maximum possible length <> = Rest, case {hmac(Header), hmac(PosBL0)} of @@ -2630,16 +2630,20 @@ generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> BRand = leveled_rand:uniform(BRange), BNumber = - lists:flatten(io_lib:format("K~4..0B", [BucketLow + BRand])), + lists:flatten(io_lib:format("B~6..0B", [BucketLow + BRand])), KNumber = - lists:flatten(io_lib:format("K~6..0B", [leveled_rand:uniform(1000)])), + lists:flatten(io_lib:format("K~8..0B", [leveled_rand:uniform(1000000)])), LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o), Chunk = leveled_rand:rand_bytes(64), {_B, _K, MV, _H, _LMs} = leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity), + MD = element(4, MV), + ?assertMatch(undefined, element(3, MD)), + MD0 = [{magic_md, [<<0:32/integer>>, base64:encode(Chunk)]}], + MV0 = setelement(4, MV, setelement(3, MD, MD0)), generate_randomkeys(Seqn + 1, Count - 1, - [{LK, MV}|Acc], + [{LK, MV0}|Acc], BucketLow, BRange). @@ -2661,6 +2665,7 @@ generate_indexkey(Term, Count) -> Count, infinity). + form_slot_test() -> % If a skip key happens, mustn't switch to loookup by accident as could be % over the expected size @@ -3243,13 +3248,17 @@ simple_persisted_test_bothformats() -> simple_persisted_tester(fun testsst_new/6). simple_persisted_tester(SSTNewFun) -> + Level = 3, {RP, Filename} = {?TEST_AREA, "simple_test"}, KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20), KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), {ok, Pid, {FirstKey, LastKey}, _Bloom} = - SSTNewFun(RP, Filename, 1, KVList1, length(KVList1), native), + SSTNewFun(RP, Filename, Level, KVList1, length(KVList1), native), + + B0 = check_binary_references(Pid), + SW0 = os:timestamp(), lists:foreach(fun({K, V}) -> ?assertMatch({K, V}, sst_get(Pid, K)) @@ -3346,9 +3355,44 @@ simple_persisted_tester(SSTNewFun) -> FetchedListB4 = lists:foldl(FoldFun, [], FetchListB4), ?assertMatch([{Eight000Key, _v800}], FetchedListB4), + B1 = check_binary_references(Pid), + ok = sst_close(Pid), + + io:format(user, "Reopen SST file~n", []), + OptsSST = #sst_options{press_method=native, + log_options=leveled_log:get_opts()}, + {ok, OpenP, {FirstKey, LastKey}, _Bloom} = + sst_open(RP, Filename ++ ".sst", OptsSST, Level), + + B2 = check_binary_references(OpenP), + + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP, K)), + ?assertMatch({K, V}, sst_get(OpenP, K)) + end, + KVList1), + + garbage_collect(OpenP), + B3 = check_binary_references(OpenP), + ?assertMatch(0, B2), % Opens with an empty cache + ?assertMatch(true, B3 > B2), % Now has headers in cache + ?assertMatch(false, B3 > B0 * 2), + % Not significantly bigger than when created new + ?assertMatch(false, B3 > B1 * 2), + % Not significantly bigger than when created new + + ok = sst_close(OpenP), ok = file:delete(filename:join(RP, Filename ++ ".sst")). +check_binary_references(Pid) -> + garbage_collect(Pid), + {binary, BinList} = process_info(Pid, binary), + TotalBinMem = + lists:foldl(fun({_R, BM, _RC}, Acc) -> Acc + BM end, 0, BinList), + io:format(user, "Total binary memory ~w~n", [TotalBinMem]), + TotalBinMem. + key_dominates_test() -> KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}}, KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, 0, []}}, From 90909e7c171f583e400255e2540faad768142208 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 22 Jul 2019 12:16:03 +0100 Subject: [PATCH 10/10] Delete current_counterexample.eqc --- current_counterexample.eqc | Bin 1680 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 current_counterexample.eqc diff --git a/current_counterexample.eqc b/current_counterexample.eqc deleted file mode 100644 index aafab95ece01758cee7f887e999924a2b1ca84bd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1680 zcmZq9VPIfj%wS4kU@lHA1aUY(9M;_Yl+>IQ2Hu?1vecZ^l&s|V)WYNpX0Wmnu=29R zqD00FmJ|k-sANez%BKS(kwzqBYXF$W}?#K7RE0Fn?&&d)68a#0viu@bxnjgJR?h)Lkx7GS^o!5zjpG>!3 z@0>fMG2g~+*2A|GEVtfm-mbG+^39}{+kVT>e{`GiW1h^Bqy;YbY@L-3oUaLazu9T0 z*RrPF<}Px|mcO}HJQlMyJ$^QwPyBI?Lwi$jLmSKfdyeXbEGavuKfdc!##&aX?vy;I zyK;5>0fE(yhxZ?AJ8-Hq*Q%nucU#Um#ZD_l@AJo(Jz9BuVsY$S_Hg-%y|c5^kM}oQ zpG}w))6CQ=cxmT{YoI{zH)R9{jBYbHxqu>yH77MQClQ#=fNZv;)S{Bi)I=sQCp|U4 zC_OciISZ%(sELu0aij+zFd+DmlNDns128Q^vRoEsa%RSpoLRDg*$EU0kdgtMkTFeQ z!DGU(%l)i)Jj+sCl3#!tTr8Nu#fHa(VHaHNcs$DvDt!}^OF$t8&dlJFk}Z)TD=P~C D{RLI*