From dab9652f6cd3ba400df16fa3910a3d2da99a0087 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 25 Jul 2019 09:45:23 +0100 Subject: [PATCH] Add ability to control journal size by object count This helps when there are files wiht large numbers of key deltas (and hence small values), where otherwise the object count may get out of control. --- include/leveled.hrl | 3 +- priv/leveled.schema | 17 ++++- priv/leveled_multi.schema | 21 +++++- src/leveled_bookie.erl | 13 +++- src/leveled_cdb.erl | 115 ++++++++++++++++++----------- test/end_to_end/recovery_SUITE.erl | 50 +++++++++---- 6 files changed, 157 insertions(+), 62 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 2074e46..761b9a6 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -38,7 +38,8 @@ bloom :: binary() | none | undefined}). -record(cdb_options, - {max_size :: integer() | undefined, + {max_size :: pos_integer() | undefined, + max_count :: pos_integer() | undefined, file_path :: string() | undefined, waste_path :: string() | undefined, binary_mode = false :: boolean(), diff --git a/priv/leveled.schema b/priv/leveled.schema index 241ee33..3e565e6 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -63,12 +63,27 @@ ]}. %% @doc The approximate size (in bytes) when a Journal file should be rolled. -%% Normally keep this as around the size of o(100K) objects. Default is 500MB +%% Normally keep this as around the size of o(100K) objects. Default is 1GB. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. {mapping, "leveled.journal_size", "leveled.journal_size", [ {default, 1000000000}, {datatype, integer} ]}. +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% This time measured in object count, a file will be rolled if either the +%% object count or the journal size limit is reached. Default 200K. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. +{mapping, "leveled.journal_objectcount", "leveled.journal_objectcount", [ + {default, 200000}, + {datatype, integer} +]}. + + %% @doc The number of journal compactions per vnode per day %% The higher the value, the more compaction runs, and the sooner space is %% recovered. But each run has a cost diff --git a/priv/leveled_multi.schema b/priv/leveled_multi.schema index b2b3afa..22857d7 100644 --- a/priv/leveled_multi.schema +++ b/priv/leveled_multi.schema @@ -57,11 +57,24 @@ %% @doc The approximate size (in bytes) when a Journal file should be rolled. -%% Normally keep this as around the size of o(100K) objects. Default is 500MB +%% Normally keep this as around the size of o(100K) objects. Default is 1GB. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. {mapping, "multi_backend.$name.leveled.journal_size", "riak_kv.multi_backend", [ - {default, 500000000}, - {datatype, integer}, - hidden + {default, 1000000000}, + {datatype, integer} +]}. + +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% This time measured in object count, a file will be rolled if either the +%% object count or the journal size limit is reached. Default 200K. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. +{mapping, "multi_backend.$name.leveled.journal_objectcount", "riak_kv.multi_backend", [ + {default, 200000}, + {datatype, integer} ]}. %% @doc The number of journal compactions per vnode per day diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index c16f3e0..109dfaf 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -134,6 +134,7 @@ {snapshot_bookie, undefined}, {cache_size, ?CACHE_SIZE}, {max_journalsize, 1000000000}, + {max_journalobjectcount, 200000}, {max_sstslots, 256}, {sync_strategy, none}, {head_only, false}, @@ -230,8 +231,12 @@ % configured values % The minimum value is 100 - any lower value will be ignored {max_journalsize, pos_integer()} | - % The maximum size of a journal file in bytes. The abolute + % The maximum size of a journal file in bytes. The absolute % maximum must be 4GB due to 4 byte file pointers being used + {max_journalobjectcount, pos_integer()} | + % The maximum size of the journal by count of the objects. The + % journal must remian within the limit set by both this figures and + % the max_journalsize {max_sstslots, pos_integer()} | % The maximum number of slots in a SST file. All testing is done % at a size of 256 (except for Quickcheck tests}, altering this @@ -1644,6 +1649,11 @@ set_options(Opts) -> MaxJournalSize = min(?ABSOLUTEMAX_JOURNALSIZE, MaxJournalSize0 - erlang:phash2(self()) rem JournalSizeJitter), + MaxJournalCount0 = + proplists:get_value(max_journalobjectcount, Opts), + JournalCountJitter = MaxJournalCount0 div (100 div ?JOURNAL_SIZE_JITTER), + MaxJournalCount = + MaxJournalCount0 - erlang:phash2(self()) rem JournalCountJitter, SyncStrat = proplists:get_value(sync_strategy, Opts), WRP = proplists:get_value(waste_retention_period, Opts), @@ -1697,6 +1707,7 @@ set_options(Opts) -> compress_on_receipt = CompressOnReceipt, cdb_options = #cdb_options{max_size=MaxJournalSize, + max_count=MaxJournalCount, binary_mode=true, sync_strategy=SyncStrat, log_options=leveled_log:get_opts()}}, diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 5c8caba..a2ff728 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -138,10 +138,12 @@ -record(state, {hashtree, last_position :: integer() | undefined, last_key = empty, + current_count = 0 :: non_neg_integer(), hash_index = {} :: tuple(), filename :: string() | undefined, handle :: file:fd() | undefined, - max_size :: integer() | undefined, + max_size :: pos_integer() | undefined, + max_count :: pos_integer() | undefined, binary_mode = false :: boolean(), delete_point = 0 :: integer(), inker :: pid() | undefined, @@ -425,15 +427,24 @@ cdb_clerkcomplete(Pid) -> %%%============================================================================ init([Opts]) -> - MaxSize = case Opts#cdb_options.max_size of - undefined -> - ?MAX_FILE_SIZE; - M -> - M - end, + MaxSize = + case Opts#cdb_options.max_size of + undefined -> + ?MAX_FILE_SIZE; + MS -> + MS + end, + MaxCount = + case Opts#cdb_options.max_count of + undefined -> + ?MAX_FILE_SIZE div 1000; + MC -> + MC + end, {ok, starting, #state{max_size=MaxSize, + max_count=MaxCount, binary_mode=Opts#cdb_options.binary_mode, waste_path=Opts#cdb_options.waste_path, sync_strategy=Opts#cdb_options.sync_strategy, @@ -447,6 +458,7 @@ starting({open_writer, Filename}, _From, State) -> leveled_log:log("CDB13", [WriteOps]), {ok, Handle} = file:open(Filename, WriteOps), State0 = State#state{handle=Handle, + current_count = size_hashtree(HashTree), sync_strategy = UpdStrategy, last_position=LastPosition, last_key=LastKey, @@ -490,47 +502,63 @@ writer({key_check, Key}, _From, State) -> writer, State}; writer({put_kv, Key, Value}, _From, State) -> - Result = put(State#state.handle, - Key, - Value, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size, - State#state.last_key == empty), - case Result of - roll -> - %% Key and value could not be written + NewCount = State#state.current_count + 1, + case NewCount >= State#state.max_count of + true -> {reply, roll, writer, State}; - {UpdHandle, NewPosition, HashTree} -> - ok = - case State#state.sync_strategy of - riak_sync -> - file:datasync(UpdHandle); - _ -> - ok - end, - {reply, ok, writer, State#state{handle=UpdHandle, - last_position=NewPosition, - last_key=Key, - hashtree=HashTree}} + false -> + Result = put(State#state.handle, + Key, + Value, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size, + State#state.last_key == empty), + case Result of + roll -> + %% Key and value could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree} -> + ok = + case State#state.sync_strategy of + riak_sync -> + file:datasync(UpdHandle); + _ -> + ok + end, + {reply, ok, writer, State#state{handle=UpdHandle, + current_count=NewCount, + last_position=NewPosition, + last_key=Key, + hashtree=HashTree}} + end end; writer({mput_kv, []}, _From, State) -> {reply, ok, writer, State}; writer({mput_kv, KVList}, _From, State) -> - Result = mput(State#state.handle, - KVList, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size), - case Result of - roll -> - %% Keys and values could not be written + NewCount = State#state.current_count + length(KVList), + TooMany = NewCount >= State#state.max_count, + NotEmpty = State#state.current_count > 0, + case (TooMany and NotEmpty) of + true -> {reply, roll, writer, State}; - {UpdHandle, NewPosition, HashTree, LastKey} -> - {reply, ok, writer, State#state{handle=UpdHandle, - last_position=NewPosition, - last_key=LastKey, - hashtree=HashTree}} + false -> + Result = mput(State#state.handle, + KVList, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size), + case Result of + roll -> + %% Keys and values could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree, LastKey} -> + {reply, ok, writer, State#state{handle=UpdHandle, + current_count=NewCount, + last_position=NewPosition, + last_key=LastKey, + hashtree=HashTree}} + end end; writer(cdb_complete, _From, State) -> NewName = determine_new_filename(State#state.filename), @@ -1775,6 +1803,9 @@ add_position_tohashtree(HashTree, Index, Hash, Position) -> new_hashtree() -> ets:new(hashtree, [ordered_set]). +size_hashtree(HashTree) -> + ets:info(HashTree, size). + to_list(HashTree, Index) -> to_list(HashTree, Index, {0, -1}, []). diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 4935721..f6621cd 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -603,11 +603,14 @@ allkeydelta_journal_multicompact(_Config) -> % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), B = <<"test_bucket">>, - StartOpts1 = [{root_path, RootPath}, - {max_journalsize, 50000000}, - {max_run_length, 6}, - {sync_strategy, testutil:sync_strategy()}], - {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + StartOptsFun = + fun(JOC) -> + [{root_path, RootPath}, + {max_journalobjectcount, JOC}, + {max_run_length, 6}, + {sync_strategy, testutil:sync_strategy()}] + end, + {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(16000)), {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 40000), {KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1, B, @@ -633,26 +636,47 @@ allkeydelta_journal_multicompact(_Config) -> ok = leveled_bookie:book_close(Bookie1), leveled_penciller:clean_testdir(RootPath ++ "/ledger"), - {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + io:format("Restart without ledger~n"), + {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(24000)), ok = testutil:check_indexed_objects(Bookie2, B, KSpcL1 ++ KSpcL2, V2), - {KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2, + {KSpcL3, _V3} = testutil:put_altered_indexed_objects(Bookie2, B, KSpcL2, false), compact_and_wait(Bookie2, 0), - - ok = testutil:check_indexed_objects(Bookie2, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3, - V3), - + {ok, FileList3} = + file:list_dir( + filename:join(RootPath, "journal/journal_files/post_compact")), + io:format("Number of files after compaction ~w~n", [length(FileList3)]), ok = leveled_bookie:book_close(Bookie2), + + io:format("Restart with smaller journal object count~n"), + {ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(8000)), + + {KSpcL4, V4} = testutil:put_altered_indexed_objects(Bookie3, + B, + KSpcL3, + false), + + compact_and_wait(Bookie3, 0), + + ok = testutil:check_indexed_objects(Bookie3, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, + V4), + {ok, FileList4} = + file:list_dir( + filename:join(RootPath, "journal/journal_files/post_compact")), + io:format("Number of files after compaction ~w~n", [length(FileList4)]), + true = length(FileList4) >= length(FileList3) + 4, + + ok = leveled_bookie:book_close(Bookie3), testutil:reset_filestructure(10000).