diff --git a/current_counterexample.eqc b/current_counterexample.eqc deleted file mode 100644 index aafab95..0000000 Binary files a/current_counterexample.eqc and /dev/null differ diff --git a/include/leveled.hrl b/include/leveled.hrl index 9357bd5..2074e46 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -51,7 +51,8 @@ :: leveled_sst:press_method(), log_options = leveled_log:get_opts() :: leveled_log:log_options(), - max_sstslots = 256 :: pos_integer()}). + max_sstslots = 256 :: pos_integer(), + pagecache_level = 1 :: pos_integer()}). -record(inker_options, {cdb_max_size :: integer() | undefined, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 35be019..c16f3e0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -127,6 +127,8 @@ -define(OPEN_LASTMOD_RANGE, {0, infinity}). -define(SNAPTIMEOUT_SHORT, 900). % 15 minutes -define(SNAPTIMEOUT_LONG, 43200). % 12 hours +-define(SST_PAGECACHELEVEL_NOLOOKUP, 1). +-define(SST_PAGECACHELEVEL_LOOKUP, 4). -define(OPTION_DEFAULTS, [{root_path, undefined}, {snapshot_bookie, undefined}, @@ -1177,15 +1179,22 @@ init([Opts]) -> ok end, - {HeadOnly, HeadLookup} = + {HeadOnly, HeadLookup, SSTPageCacheLevel} = case proplists:get_value(head_only, Opts) of false -> - {false, true}; + {false, true, ?SST_PAGECACHELEVEL_LOOKUP}; with_lookup -> - {true, true}; + {true, true, ?SST_PAGECACHELEVEL_LOOKUP}; no_lookup -> - {true, false} + {true, false, ?SST_PAGECACHELEVEL_NOLOOKUP} end, + % Override the default page cache level - we want to load into the + % page cache many levels if we intend to support lookups, and only + % levels 0 and 1 otherwise + SSTOpts = PencillerOpts#penciller_options.sst_options, + SSTOpts0 = SSTOpts#sst_options{pagecache_level = SSTPageCacheLevel}, + PencillerOpts0 = + PencillerOpts#penciller_options{sst_options = SSTOpts0}, State0 = #state{cache_size=CacheSize, is_snapshot=false, @@ -1193,7 +1202,7 @@ init([Opts]) -> head_lookup = HeadLookup}, {Inker, Penciller} = - startup(InkerOpts, PencillerOpts, State0), + startup(InkerOpts, PencillerOpts0, State0), NewETS = ets:new(mem, [ordered_set]), leveled_log:log("B0001", [Inker, Penciller]), diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 586f85a..5c8caba 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -58,7 +58,8 @@ {gen_fsm, sync_send_event, 2}, {gen_fsm, send_event, 2}, {gen_fsm, sync_send_all_state_event, 3}, - {gen_fsm, send_all_state_event, 2}]}). + {gen_fsm, send_all_state_event, 2}, + {gen_fsm, reply, 2}]}). -endif. -ifdef(slow_test). @@ -113,7 +114,8 @@ cdb_destroy/1, cdb_deletepending/1, cdb_deletepending/3, - cdb_isrolling/1]). + cdb_isrolling/1, + cdb_clerkcomplete/1]). -export([finished_rolling/1, hashtable_calc/2]). @@ -409,6 +411,14 @@ cdb_keycheck(Pid, Key) -> cdb_isrolling(Pid) -> gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity). +-spec cdb_clerkcomplete(pid()) -> ok. +%% @doc +%% When an Inker's clerk has finished with a CDB process, then it will call +%% complete. Currently this will prompt hibernation, as the CDB process may +%% not be needed for a period. +cdb_clerkcomplete(Pid) -> + gen_fsm:send_all_state_event(Pid, clerk_complete). + %%%============================================================================ %%% gen_server callbacks @@ -436,28 +446,31 @@ starting({open_writer, Filename}, _From, State) -> {WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy), leveled_log:log("CDB13", [WriteOps]), {ok, Handle} = file:open(Filename, WriteOps), - {reply, ok, writer, State#state{handle=Handle, - sync_strategy = UpdStrategy, - last_position=LastPosition, - last_key=LastKey, - filename=Filename, - hashtree=HashTree}}; + State0 = State#state{handle=Handle, + sync_strategy = UpdStrategy, + last_position=LastPosition, + last_key=LastKey, + filename=Filename, + hashtree=HashTree}, + {reply, ok, writer, State0, hibernate}; starting({open_reader, Filename}, _From, State) -> leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, false), - {reply, ok, reader, State#state{handle=Handle, - last_key=LastKey, - filename=Filename, - hash_index=Index}}; + State0 = State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index}, + {reply, ok, reader, State0, hibernate}; starting({open_reader, Filename, LastKey}, _From, State) -> leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), - {reply, ok, reader, State#state{handle=Handle, - last_key=LastKey, - filename=Filename, - hash_index=Index}}. + State0 = State#state{handle=Handle, + last_key=LastKey, + filename=Filename, + hash_index=Index}, + {reply, ok, reader, State0, hibernate}. writer({get_kv, Key}, _From, State) -> {reply, @@ -566,18 +579,16 @@ rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> ets:delete(State#state.hashtree), {NewHandle, Index, LastKey} = open_for_readonly(NewName, State#state.last_key), + State0 = State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + hash_index=Index}, case State#state.deferred_delete of true -> - {reply, ok, delete_pending, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - hash_index=Index}}; + {reply, ok, delete_pending, State0}; false -> leveled_log:log_timer("CDB18", [], SW), - {reply, ok, reader, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - hash_index=Index}} + {reply, ok, reader, State0, hibernate} end; rolling(check_hashtable, _From, State) -> {reply, false, rolling, State}. @@ -618,33 +629,40 @@ reader({get_positions, SampleSize, Index, Acc}, _From, State) -> _ -> {reply, lists:sublist(UpdAcc, SampleSize), reader, State} end; -reader({direct_fetch, PositionList, Info}, _From, State) -> +reader({direct_fetch, PositionList, Info}, From, State) -> H = State#state.handle, - FilterFalseKey = fun(Tpl) -> case element(1, Tpl) of - false -> - false; - _Key -> - {true, Tpl} - end end, - Reply = - case Info of - key_only -> - FM = lists:filtermap( - fun(P) -> - FilterFalseKey(extract_key(H, P)) end, - PositionList), - lists:map(fun(T) -> element(1, T) end, FM); - key_size -> - lists:filtermap( - fun(P) -> - FilterFalseKey(extract_key_size(H, P)) end, - PositionList); - key_value_check -> - BM = State#state.binary_mode, - lists:map(fun(P) -> extract_key_value_check(H, P, BM) end, - PositionList) + FilterFalseKey = + fun(Tpl) -> + case element(1, Tpl) of + false -> + false; + _Key -> + {true, Tpl} + end end, - {reply, Reply, reader, State}; + + case Info of + key_only -> + FM = lists:filtermap( + fun(P) -> + FilterFalseKey(extract_key(H, P)) end, + PositionList), + MapFun = fun(T) -> element(1, T) end, + {reply, lists:map(MapFun, FM), reader, State}; + key_size -> + FilterFun = fun(P) -> FilterFalseKey(extract_key_size(H, P)) end, + {reply, lists:filtermap(FilterFun, PositionList), reader, State}; + key_value_check -> + BM = State#state.binary_mode, + MapFun = fun(P) -> extract_key_value_check(H, P, BM) end, + % direct_fetch will occur in batches, so it doesn't make sense to + % hibernate the process that is likely to be used again. However, + % a significant amount of unused binary references may have + % accumulated, so push a GC at this point + gen_fsm:reply(From, lists:map(MapFun, PositionList)), + garbage_collect(), + {next_state, reader, State} + end; reader(cdb_complete, _From, State) -> leveled_log:log("CDB05", [State#state.filename, reader, cdb_ccomplete]), ok = file:close(State#state.handle), @@ -720,24 +738,21 @@ delete_pending(destroy, State) -> handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, - _From, + From, StateName, State) -> {ok, EndPos0} = file:position(State#state.handle, eof), - {ok, StartPos0} = case StartPos of - undefined -> - file:position(State#state.handle, - ?BASE_POSITION); - StartPos -> - {ok, StartPos} - end, + {ok, StartPos0} = + case StartPos of + undefined -> + file:position(State#state.handle, ?BASE_POSITION); + StartPos -> + {ok, StartPos} + end, file:position(State#state.handle, StartPos0), - file:advise(State#state.handle, - StartPos0, - EndPos0 - StartPos0, - sequential), - MaybeEnd = (check_last_key(State#state.last_key) == empty) or - (StartPos0 >= (EndPos0 - ?DWORD_SIZE)), + MaybeEnd = + (check_last_key(State#state.last_key) == empty) or + (StartPos0 >= (EndPos0 - ?DWORD_SIZE)), {LastPosition, Acc2} = case MaybeEnd of true -> @@ -749,12 +764,17 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos}, Acc, State#state.last_key) end, - {ok, LastReadPos} = file:position(State#state.handle, cur), - file:advise(State#state.handle, - StartPos0, - LastReadPos - StartPos0, - dont_need), - {reply, {LastPosition, Acc2}, StateName, State}; + % The scan may have created a lot of binary references, clear up the + % reference counters for this process here manually. The cdb process + % may be inactive for a period after the scan, and so GC may not kick in + % otherwise + % + % garbage_collect/0 is used in preference to hibernate, as we're generally + % scanning in batches at startup - so the process will be needed straight + % away. + gen_fsm:reply(From, {LastPosition, Acc2}), + garbage_collect(), + {next_state, StateName, State}; handle_sync_event(cdb_lastkey, _From, StateName, State) -> {reply, State#state.last_key, StateName, State}; handle_sync_event(cdb_firstkey, _From, StateName, State) -> @@ -790,8 +810,8 @@ handle_sync_event(cdb_close, _From, StateName, State) -> file:close(State#state.handle), {stop, normal, ok, State}. -handle_event(_Msg, StateName, State) -> - {next_state, StateName, State}. +handle_event(clerk_complete, StateName, State) -> + {next_state, StateName, State, hibernate}. handle_info(_Msg, StateName, State) -> {next_state, StateName, State}. @@ -2668,8 +2688,6 @@ getpositions_sample_test() -> nonsense_coverage_test() -> - {ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []), - ok = gen_fsm:send_all_state_event(Pid, nonsense), ?assertMatch({next_state, reader, #state{}}, handle_info(nonsense, reader, #state{})), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 076d73b..5ac8f3c 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -361,13 +361,12 @@ handle_cast(scoring_complete, State) -> ok = CloseFun(FilterServer), ok = leveled_inker:ink_clerkcomplete(State#state.inker, ManifestSlice, - FilesToDelete), - {noreply, State#state{scoring_state = undefined}}; + FilesToDelete); false -> ok = CloseFun(FilterServer), - ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []), - {noreply, State#state{scoring_state = undefined}} - end; + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []) + end, + {noreply, State#state{scoring_state = undefined}, hibernate}; handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> FilesToDelete = leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList), @@ -548,7 +547,8 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> end. -fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> +fetch_inbatches([], _BatchSize, CDB, CheckedList) -> + ok = leveled_cdb:cdb_clerkcomplete(CDB), CheckedList; fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> {Batch, Tail} = if @@ -699,6 +699,11 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, ActiveJournal0, ManSlice0, PressMethod), + % The inker's clerk will no longer need these (potentially large) binaries, + % so force garbage collection at this point. This will mean when we roll + % each CDB file there will be no remaining references to the binaries that + % have been transferred and the memory can immediately be cleared + garbage_collect(), compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, RStrategy, PressMethod, ManSlice1). @@ -763,7 +768,7 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> % strategy [KVC0|Acc]; {false, retain} -> - % If we have a retain startegy, it can't be + % If we have a retain strategy, it can't be % discarded - but the value part is no longer % required as this version has been replaced {JK0, JV0} = diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index 84c02e3..a1cce11 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -152,6 +152,11 @@ copy_manifest(Manifest) -> %% manifest. The PidFun should be able to return the Pid of a file process %% (having started one). The SQNFun will return the max sequence number %% of that file, if passed the Pid that owns it. +%% +%% The manifest is started from the basement first, and then the higher levels +%% as the page cache will be loaded with each file, and it would be +%% preferable to have the higher levels in the cache if memory is insufficient +%% to load each level load_manifest(Manifest, LoadFun, SQNFun) -> UpdateLevelFun = fun(LevelIdx, {AccMaxSQN, AccMan, AccFL}) -> @@ -171,7 +176,7 @@ load_manifest(Manifest, LoadFun, SQNFun) -> end, lists:foldl(UpdateLevelFun, {0, Manifest, []}, - lists:seq(0, Manifest#manifest.basement)). + lists:reverse(lists:seq(0, Manifest#manifest.basement))). -spec close_manifest(manifest(), fun()) -> ok. %% @doc diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 6d98103..35b8690 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -486,7 +486,9 @@ init([]) -> starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) -> leveled_log:save(OptsSST#sst_options.log_options), {UpdState, Bloom} = - read_file(Filename, State#state{root_path=RootPath}), + read_file(Filename, + State#state{root_path=RootPath}, + OptsSST#sst_options.pagecache_level >= Level), Summary = UpdState#state.summary, {reply, {ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom}, @@ -509,7 +511,8 @@ starting({sst_new, YBQ = Level =< 2, {UpdState, Bloom} = read_file(ActualFilename, - State#state{root_path=RootPath, yield_blockquery=YBQ}), + State#state{root_path=RootPath, yield_blockquery=YBQ}, + OptsSST#sst_options.pagecache_level >= Level), Summary = UpdState#state.summary, leveled_log:log_timer("SST08", [ActualFilename, Level, Summary#summary.max_sqn], @@ -573,7 +576,8 @@ starting(complete_l0startup, State) -> % Important to empty this from state rather % than carry it through to the next stage new_slots=undefined, - deferred_startup_tuple=undefined}), + deferred_startup_tuple=undefined}, + true), Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), @@ -671,7 +675,7 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod}, {_ID, none} -> Cache; {ID, Header} -> - array:set(ID - 1, Header, Cache) + array:set(ID - 1, binary:copy(Header), Cache) end end, BlockIdxC0 = lists:foldl(FoldFun, State#state.blockindex_cache, BIC), @@ -717,8 +721,7 @@ reader(close, _From, State) -> {stop, normal, ok, State}. reader({switch_levels, NewLevel}, State) -> - erlang:garbage_collect(self()), - {next_state, reader, State#state{level = NewLevel}}. + {next_state, reader, State#state{level = NewLevel}, hibernate}. delete_pending({get_kv, LedgerKey, Hash}, _From, State) -> @@ -792,8 +795,7 @@ handle_info(tidyup_after_startup, delete_pending, State) -> handle_info(tidyup_after_startup, StateName, State) -> case is_process_alive(State#state.starting_pid) of true -> - erlang:garbage_collect(self()), - {next_state, StateName, State}; + {next_state, StateName, State, hibernate}; false -> {stop, normal, State} end. @@ -1062,7 +1064,9 @@ fetch(LedgerKey, Hash, State, Timings0) -> {Result, Header} = binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate), BlockIndexCache = - array:set(SlotID - 1, Header, State#state.blockindex_cache), + array:set(SlotID - 1, + binary:copy(Header), + State#state.blockindex_cache), {_SW3, Timings3} = update_timings(SW2, Timings2, noncached_block, false), {Result, @@ -1217,9 +1221,10 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin, filename:join(RootPath, FinalName)), FinalName. -read_file(Filename, State) -> +read_file(Filename, State, LoadPageCache) -> {Handle, FileVersion, SummaryBin} = - open_reader(filename:join(State#state.root_path, Filename)), + open_reader(filename:join(State#state.root_path, Filename), + LoadPageCache), UpdState0 = imp_fileversion(FileVersion, State), {Summary, Bloom, SlotList} = read_table_summary(SummaryBin), BlockIndexCache = array:new([{size, Summary#summary.size}, @@ -1271,12 +1276,18 @@ imp_fileversion(VersionInt, State) -> end, UpdState1. -open_reader(Filename) -> +open_reader(Filename, LoadPageCache) -> {ok, Handle} = file:open(Filename, [binary, raw, read]), {ok, Lengths} = file:pread(Handle, 0, 9), <> = Lengths, + case LoadPageCache of + true -> + file:advise(Handle, 9, SlotsLength, will_need); + false -> + ok + end, {ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength), {Handle, FileVersion, SummaryBin}. @@ -2117,7 +2128,7 @@ crc_check_slot(FullBin) -> CRC32H:32/integer, Rest/binary>> = FullBin, PosBL0 = min(PosBL, byte_size(FullBin) - 12), - % If the position has been bit-flipped to beyond the maximum paossible + % If the position has been bit-flipped to beyond the maximum possible % length, use the maximum possible length <> = Rest, case {hmac(Header), hmac(PosBL0)} of @@ -2619,16 +2630,20 @@ generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> BRand = leveled_rand:uniform(BRange), BNumber = - lists:flatten(io_lib:format("K~4..0B", [BucketLow + BRand])), + lists:flatten(io_lib:format("B~6..0B", [BucketLow + BRand])), KNumber = - lists:flatten(io_lib:format("K~6..0B", [leveled_rand:uniform(1000)])), + lists:flatten(io_lib:format("K~8..0B", [leveled_rand:uniform(1000000)])), LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o), Chunk = leveled_rand:rand_bytes(64), {_B, _K, MV, _H, _LMs} = leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity), + MD = element(4, MV), + ?assertMatch(undefined, element(3, MD)), + MD0 = [{magic_md, [<<0:32/integer>>, base64:encode(Chunk)]}], + MV0 = setelement(4, MV, setelement(3, MD, MD0)), generate_randomkeys(Seqn + 1, Count - 1, - [{LK, MV}|Acc], + [{LK, MV0}|Acc], BucketLow, BRange). @@ -2650,6 +2665,7 @@ generate_indexkey(Term, Count) -> Count, infinity). + form_slot_test() -> % If a skip key happens, mustn't switch to loookup by accident as could be % over the expected size @@ -3232,13 +3248,17 @@ simple_persisted_test_bothformats() -> simple_persisted_tester(fun testsst_new/6). simple_persisted_tester(SSTNewFun) -> + Level = 3, {RP, Filename} = {?TEST_AREA, "simple_test"}, KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20), KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), {ok, Pid, {FirstKey, LastKey}, _Bloom} = - SSTNewFun(RP, Filename, 1, KVList1, length(KVList1), native), + SSTNewFun(RP, Filename, Level, KVList1, length(KVList1), native), + + B0 = check_binary_references(Pid), + SW0 = os:timestamp(), lists:foreach(fun({K, V}) -> ?assertMatch({K, V}, sst_get(Pid, K)) @@ -3335,9 +3355,44 @@ simple_persisted_tester(SSTNewFun) -> FetchedListB4 = lists:foldl(FoldFun, [], FetchListB4), ?assertMatch([{Eight000Key, _v800}], FetchedListB4), + B1 = check_binary_references(Pid), + ok = sst_close(Pid), + + io:format(user, "Reopen SST file~n", []), + OptsSST = #sst_options{press_method=native, + log_options=leveled_log:get_opts()}, + {ok, OpenP, {FirstKey, LastKey}, _Bloom} = + sst_open(RP, Filename ++ ".sst", OptsSST, Level), + + B2 = check_binary_references(OpenP), + + lists:foreach(fun({K, V}) -> + ?assertMatch({K, V}, sst_get(OpenP, K)), + ?assertMatch({K, V}, sst_get(OpenP, K)) + end, + KVList1), + + garbage_collect(OpenP), + B3 = check_binary_references(OpenP), + ?assertMatch(0, B2), % Opens with an empty cache + ?assertMatch(true, B3 > B2), % Now has headers in cache + ?assertMatch(false, B3 > B0 * 2), + % Not significantly bigger than when created new + ?assertMatch(false, B3 > B1 * 2), + % Not significantly bigger than when created new + + ok = sst_close(OpenP), ok = file:delete(filename:join(RP, Filename ++ ".sst")). +check_binary_references(Pid) -> + garbage_collect(Pid), + {binary, BinList} = process_info(Pid, binary), + TotalBinMem = + lists:foldl(fun({_R, BM, _RC}, Acc) -> Acc + BM end, 0, BinList), + io:format(user, "Total binary memory ~w~n", [TotalBinMem]), + TotalBinMem. + key_dominates_test() -> KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}}, KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, 0, []}}, diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 7d0d70f..5e82ce7 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -1214,8 +1214,8 @@ dollar_bucket_index(_Config) -> bigobject_memorycheck(_Config) -> RootPath = testutil:reset_filestructure(), {ok, Bookie} = leveled_bookie:book_start(RootPath, - 100, - 100000000, + 200, + 1000000000, testutil:sync_strategy()), Bucket = <<"B">>, IndexGen = fun() -> [] end, @@ -1227,7 +1227,7 @@ bigobject_memorycheck(_Config) -> {Obj, Spc} = testutil:set_object(Bucket, Key, Value, IndexGen, []), testutil:book_riakput(Bookie, Obj, Spc) end, - lists:foreach(ObjPutFun, lists:seq(1, 600)), + lists:foreach(ObjPutFun, lists:seq(1, 700)), {ok, _Ink, Pcl} = leveled_bookie:book_returnactors(Bookie), {binary, BL} = process_info(Pcl, binary), {memory, M0} = process_info(Pcl, memory), @@ -1235,5 +1235,54 @@ bigobject_memorycheck(_Config) -> io:format("Pcl binary memory ~w ~w memory ~w~n", [B0, length(BL), M0]), true = B0 < 500 * 4000, true = M0 < 500 * 4000, + % All processes + {_TotalCDBBinMem, _TotalCDBProcesses} = cdb_memory_check(), ok = leveled_bookie:book_close(Bookie), - testutil:reset_filestructure(). \ No newline at end of file + {ok, BookieR} = leveled_bookie:book_start(RootPath, + 2000, + 1000000000, + testutil:sync_strategy()), + {RS_TotalCDBBinMem, _RS_TotalCDBProcesses} = cdb_memory_check(), + true = RS_TotalCDBBinMem < 1024 * 1024, + % No binary object references exist after startup + ok = leveled_bookie:book_close(BookieR), + testutil:reset_filestructure(). + + +cdb_memory_check() -> + TotalCDBProcesses = + lists:filter(fun(P) -> + {dictionary, PD} = + process_info(P, dictionary), + case lists:keyfind('$initial_call', 1, PD) of + {'$initial_call',{leveled_cdb,init,1}} -> + true; + _ -> + false + end + end, + processes()), + TotalCDBBinMem = + lists:foldl(fun(P, Acc) -> + BinMem = calc_total_binary_memory(P), + io:format("Memory for pid ~w is ~w~n", [P, BinMem]), + BinMem + Acc + end, + 0, + TotalCDBProcesses), + io:format("Total binary memory ~w in ~w CDB processes~n", + [TotalCDBBinMem, length(TotalCDBProcesses)]), + {TotalCDBBinMem, TotalCDBProcesses}. + +calc_total_binary_memory(Pid) -> + {binary, BL} = process_info(Pid, binary), + TBM = lists:foldl(fun({_R, Sz, _C}, Acc) -> Acc + Sz end, 0, BL), + case TBM > 1000000 of + true -> + FilteredBL = + lists:filter(fun(BMD) -> element(2, BMD) > 1024 end, BL), + io:format("Big-ref details for ~w ~w~n", [Pid, FilteredBL]); + false -> + ok + end, + TBM. \ No newline at end of file