From dab9652f6cd3ba400df16fa3910a3d2da99a0087 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 25 Jul 2019 09:45:23 +0100 Subject: [PATCH 1/2] Add ability to control journal size by object count This helps when there are files wiht large numbers of key deltas (and hence small values), where otherwise the object count may get out of control. --- include/leveled.hrl | 3 +- priv/leveled.schema | 17 ++++- priv/leveled_multi.schema | 21 +++++- src/leveled_bookie.erl | 13 +++- src/leveled_cdb.erl | 115 ++++++++++++++++++----------- test/end_to_end/recovery_SUITE.erl | 50 +++++++++---- 6 files changed, 157 insertions(+), 62 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 2074e46..761b9a6 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -38,7 +38,8 @@ bloom :: binary() | none | undefined}). -record(cdb_options, - {max_size :: integer() | undefined, + {max_size :: pos_integer() | undefined, + max_count :: pos_integer() | undefined, file_path :: string() | undefined, waste_path :: string() | undefined, binary_mode = false :: boolean(), diff --git a/priv/leveled.schema b/priv/leveled.schema index 241ee33..3e565e6 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -63,12 +63,27 @@ ]}. %% @doc The approximate size (in bytes) when a Journal file should be rolled. -%% Normally keep this as around the size of o(100K) objects. Default is 500MB +%% Normally keep this as around the size of o(100K) objects. Default is 1GB. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. {mapping, "leveled.journal_size", "leveled.journal_size", [ {default, 1000000000}, {datatype, integer} ]}. +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% This time measured in object count, a file will be rolled if either the +%% object count or the journal size limit is reached. Default 200K. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. +{mapping, "leveled.journal_objectcount", "leveled.journal_objectcount", [ + {default, 200000}, + {datatype, integer} +]}. + + %% @doc The number of journal compactions per vnode per day %% The higher the value, the more compaction runs, and the sooner space is %% recovered. But each run has a cost diff --git a/priv/leveled_multi.schema b/priv/leveled_multi.schema index b2b3afa..22857d7 100644 --- a/priv/leveled_multi.schema +++ b/priv/leveled_multi.schema @@ -57,11 +57,24 @@ %% @doc The approximate size (in bytes) when a Journal file should be rolled. -%% Normally keep this as around the size of o(100K) objects. Default is 500MB +%% Normally keep this as around the size of o(100K) objects. Default is 1GB. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. {mapping, "multi_backend.$name.leveled.journal_size", "riak_kv.multi_backend", [ - {default, 500000000}, - {datatype, integer}, - hidden + {default, 1000000000}, + {datatype, integer} +]}. + +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% This time measured in object count, a file will be rolled if either the +%% object count or the journal size limit is reached. Default 200K. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. +{mapping, "multi_backend.$name.leveled.journal_objectcount", "riak_kv.multi_backend", [ + {default, 200000}, + {datatype, integer} ]}. %% @doc The number of journal compactions per vnode per day diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index c16f3e0..109dfaf 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -134,6 +134,7 @@ {snapshot_bookie, undefined}, {cache_size, ?CACHE_SIZE}, {max_journalsize, 1000000000}, + {max_journalobjectcount, 200000}, {max_sstslots, 256}, {sync_strategy, none}, {head_only, false}, @@ -230,8 +231,12 @@ % configured values % The minimum value is 100 - any lower value will be ignored {max_journalsize, pos_integer()} | - % The maximum size of a journal file in bytes. The abolute + % The maximum size of a journal file in bytes. The absolute % maximum must be 4GB due to 4 byte file pointers being used + {max_journalobjectcount, pos_integer()} | + % The maximum size of the journal by count of the objects. The + % journal must remian within the limit set by both this figures and + % the max_journalsize {max_sstslots, pos_integer()} | % The maximum number of slots in a SST file. All testing is done % at a size of 256 (except for Quickcheck tests}, altering this @@ -1644,6 +1649,11 @@ set_options(Opts) -> MaxJournalSize = min(?ABSOLUTEMAX_JOURNALSIZE, MaxJournalSize0 - erlang:phash2(self()) rem JournalSizeJitter), + MaxJournalCount0 = + proplists:get_value(max_journalobjectcount, Opts), + JournalCountJitter = MaxJournalCount0 div (100 div ?JOURNAL_SIZE_JITTER), + MaxJournalCount = + MaxJournalCount0 - erlang:phash2(self()) rem JournalCountJitter, SyncStrat = proplists:get_value(sync_strategy, Opts), WRP = proplists:get_value(waste_retention_period, Opts), @@ -1697,6 +1707,7 @@ set_options(Opts) -> compress_on_receipt = CompressOnReceipt, cdb_options = #cdb_options{max_size=MaxJournalSize, + max_count=MaxJournalCount, binary_mode=true, sync_strategy=SyncStrat, log_options=leveled_log:get_opts()}}, diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 5c8caba..a2ff728 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -138,10 +138,12 @@ -record(state, {hashtree, last_position :: integer() | undefined, last_key = empty, + current_count = 0 :: non_neg_integer(), hash_index = {} :: tuple(), filename :: string() | undefined, handle :: file:fd() | undefined, - max_size :: integer() | undefined, + max_size :: pos_integer() | undefined, + max_count :: pos_integer() | undefined, binary_mode = false :: boolean(), delete_point = 0 :: integer(), inker :: pid() | undefined, @@ -425,15 +427,24 @@ cdb_clerkcomplete(Pid) -> %%%============================================================================ init([Opts]) -> - MaxSize = case Opts#cdb_options.max_size of - undefined -> - ?MAX_FILE_SIZE; - M -> - M - end, + MaxSize = + case Opts#cdb_options.max_size of + undefined -> + ?MAX_FILE_SIZE; + MS -> + MS + end, + MaxCount = + case Opts#cdb_options.max_count of + undefined -> + ?MAX_FILE_SIZE div 1000; + MC -> + MC + end, {ok, starting, #state{max_size=MaxSize, + max_count=MaxCount, binary_mode=Opts#cdb_options.binary_mode, waste_path=Opts#cdb_options.waste_path, sync_strategy=Opts#cdb_options.sync_strategy, @@ -447,6 +458,7 @@ starting({open_writer, Filename}, _From, State) -> leveled_log:log("CDB13", [WriteOps]), {ok, Handle} = file:open(Filename, WriteOps), State0 = State#state{handle=Handle, + current_count = size_hashtree(HashTree), sync_strategy = UpdStrategy, last_position=LastPosition, last_key=LastKey, @@ -490,47 +502,63 @@ writer({key_check, Key}, _From, State) -> writer, State}; writer({put_kv, Key, Value}, _From, State) -> - Result = put(State#state.handle, - Key, - Value, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size, - State#state.last_key == empty), - case Result of - roll -> - %% Key and value could not be written + NewCount = State#state.current_count + 1, + case NewCount >= State#state.max_count of + true -> {reply, roll, writer, State}; - {UpdHandle, NewPosition, HashTree} -> - ok = - case State#state.sync_strategy of - riak_sync -> - file:datasync(UpdHandle); - _ -> - ok - end, - {reply, ok, writer, State#state{handle=UpdHandle, - last_position=NewPosition, - last_key=Key, - hashtree=HashTree}} + false -> + Result = put(State#state.handle, + Key, + Value, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size, + State#state.last_key == empty), + case Result of + roll -> + %% Key and value could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree} -> + ok = + case State#state.sync_strategy of + riak_sync -> + file:datasync(UpdHandle); + _ -> + ok + end, + {reply, ok, writer, State#state{handle=UpdHandle, + current_count=NewCount, + last_position=NewPosition, + last_key=Key, + hashtree=HashTree}} + end end; writer({mput_kv, []}, _From, State) -> {reply, ok, writer, State}; writer({mput_kv, KVList}, _From, State) -> - Result = mput(State#state.handle, - KVList, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size), - case Result of - roll -> - %% Keys and values could not be written + NewCount = State#state.current_count + length(KVList), + TooMany = NewCount >= State#state.max_count, + NotEmpty = State#state.current_count > 0, + case (TooMany and NotEmpty) of + true -> {reply, roll, writer, State}; - {UpdHandle, NewPosition, HashTree, LastKey} -> - {reply, ok, writer, State#state{handle=UpdHandle, - last_position=NewPosition, - last_key=LastKey, - hashtree=HashTree}} + false -> + Result = mput(State#state.handle, + KVList, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size), + case Result of + roll -> + %% Keys and values could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree, LastKey} -> + {reply, ok, writer, State#state{handle=UpdHandle, + current_count=NewCount, + last_position=NewPosition, + last_key=LastKey, + hashtree=HashTree}} + end end; writer(cdb_complete, _From, State) -> NewName = determine_new_filename(State#state.filename), @@ -1775,6 +1803,9 @@ add_position_tohashtree(HashTree, Index, Hash, Position) -> new_hashtree() -> ets:new(hashtree, [ordered_set]). +size_hashtree(HashTree) -> + ets:info(HashTree, size). + to_list(HashTree, Index) -> to_list(HashTree, Index, {0, -1}, []). diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 4935721..f6621cd 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -603,11 +603,14 @@ allkeydelta_journal_multicompact(_Config) -> % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), B = <<"test_bucket">>, - StartOpts1 = [{root_path, RootPath}, - {max_journalsize, 50000000}, - {max_run_length, 6}, - {sync_strategy, testutil:sync_strategy()}], - {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + StartOptsFun = + fun(JOC) -> + [{root_path, RootPath}, + {max_journalobjectcount, JOC}, + {max_run_length, 6}, + {sync_strategy, testutil:sync_strategy()}] + end, + {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(16000)), {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 40000), {KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1, B, @@ -633,26 +636,47 @@ allkeydelta_journal_multicompact(_Config) -> ok = leveled_bookie:book_close(Bookie1), leveled_penciller:clean_testdir(RootPath ++ "/ledger"), - {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + io:format("Restart without ledger~n"), + {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(24000)), ok = testutil:check_indexed_objects(Bookie2, B, KSpcL1 ++ KSpcL2, V2), - {KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2, + {KSpcL3, _V3} = testutil:put_altered_indexed_objects(Bookie2, B, KSpcL2, false), compact_and_wait(Bookie2, 0), - - ok = testutil:check_indexed_objects(Bookie2, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3, - V3), - + {ok, FileList3} = + file:list_dir( + filename:join(RootPath, "journal/journal_files/post_compact")), + io:format("Number of files after compaction ~w~n", [length(FileList3)]), ok = leveled_bookie:book_close(Bookie2), + + io:format("Restart with smaller journal object count~n"), + {ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(8000)), + + {KSpcL4, V4} = testutil:put_altered_indexed_objects(Bookie3, + B, + KSpcL3, + false), + + compact_and_wait(Bookie3, 0), + + ok = testutil:check_indexed_objects(Bookie3, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, + V4), + {ok, FileList4} = + file:list_dir( + filename:join(RootPath, "journal/journal_files/post_compact")), + io:format("Number of files after compaction ~w~n", [length(FileList4)]), + true = length(FileList4) >= length(FileList3) + 4, + + ok = leveled_bookie:book_close(Bookie3), testutil:reset_filestructure(10000). From e7c8dd7a780e5711c38f1bcc7c3e80e403e9c2ad Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 25 Jul 2019 10:24:40 +0100 Subject: [PATCH 2/2] Typo round-up Also reduce log noise when persisting new Journal files --- src/leveled_bookie.erl | 24 ++++++++++++------------ src/leveled_log.erl | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 109dfaf..b14dfde 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1,6 +1,6 @@ %% -------- Overview --------- %% -%% The eleveleddb is based on the LSM-tree similar to leveldb, except that: +%% Leveled is based on the LSM-tree similar to leveldb, except that: %% - Keys, Metadata and Values are not persisted together - the Keys and %% Metadata are kept in a tree-based ledger, whereas the values are stored %% only in a sequential Journal. @@ -11,7 +11,7 @@ %% and frequent use of iterators) %% - The Journal is an extended nursery log in leveldb terms. It is keyed %% on the sequence number of the write -%% - The ledger is a merge tree, where the key is the actaul object key, and +%% - The ledger is a merge tree, where the key is the actual object key, and %% the value is the metadata of the object including the sequence number %% %% @@ -228,14 +228,14 @@ % The size of the Bookie's memory, the cache of the recent % additions to the ledger. Defaults to ?CACHE_SIZE, plus some % randomised jitter (randomised jitter will still be added to - % configured values + % configured values) % The minimum value is 100 - any lower value will be ignored {max_journalsize, pos_integer()} | % The maximum size of a journal file in bytes. The absolute % maximum must be 4GB due to 4 byte file pointers being used {max_journalobjectcount, pos_integer()} | % The maximum size of the journal by count of the objects. The - % journal must remian within the limit set by both this figures and + % journal must remain within the limit set by both this figures and % the max_journalsize {max_sstslots, pos_integer()} | % The maximum number of slots in a SST file. All testing is done @@ -248,7 +248,7 @@ % partially in hardware (e.g through use of FBWC). % riak_sync is used for backwards compatability with OTP16 - and % will manually call sync() after each write (rather than use the - % O_SYNC option on startup + % O_SYNC option on startup) {head_only, false|with_lookup|no_lookup} | % When set to true, there are three fundamental changes as to how % leveled will work: @@ -455,7 +455,7 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) %% - A Primary Key and a Value %% - IndexSpecs - a set of secondary key changes associated with the %% transaction -%% - A tag indictaing the type of object. Behaviour for metadata extraction, +%% - A tag indicating the type of object. Behaviour for metadata extraction, %% and ledger compaction will vary by type. There are three currently %% implemented types i (Index), o (Standard), o_rkv (Riak). Keys added with %% Index tags are not fetchable (as they will not be hashed), but are @@ -466,7 +466,7 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) %% %% The inker will pass the PK/Value/IndexSpecs to the current (append only) %% CDB journal file to persist the change. The call should return either 'ok' -%% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for +%% or 'roll'. 'roll' indicates that the CDB file has insufficient capacity for %% this write, and a new journal file should be created (with appropriate %% manifest changes to be made). %% @@ -1753,7 +1753,7 @@ return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) -> -spec snaptype_by_presence(boolean()) -> store|ledger. %% @doc %% Folds that traverse over object heads, may also either require to return -%% the object,or at least confirm th eobject is present in the Ledger. This +%% the object, or at least confirm the object is present in the Ledger. This %% is achieved by enabling presence - and this will change the type of %% snapshot to one that covers the whole store (i.e. both ledger and journal), %% rather than just the ledger. @@ -1764,7 +1764,7 @@ snaptype_by_presence(false) -> -spec get_runner(book_state(), tuple()) -> {async, fun()}. %% @doc -%% Getan {async, Runner} for a given fold type. Fold types have different +%% Get an {async, Runner} for a given fold type. Fold types have different %% tuple inputs get_runner(State, {index_query, Constraint, FoldAccT, Range, TermHandling}) -> {IdxFld, StartT, EndT} = Range, @@ -1933,7 +1933,7 @@ get_deprecatedrunner(State, {tuple(), tuple(), tuple()|no_lookup}. %% @doc %% Convert a range of binary keys into a ledger key range, returning -%% {StartLK, EndLK, Query} where Query is to indicate whether the query +%% {StartLK, EndLK, Query} where Query is to indicate whether the query %% range is worth using to minimise the cost of the snapshot return_ledger_keyrange(Tag, Bucket, KeyRange) -> {StartKey, EndKey, Snap} = @@ -1977,7 +1977,7 @@ maybe_longrunning(SW, Aspect) -> -spec readycache_forsnapshot(ledger_cache(), tuple()|no_lookup|undefined) -> ledger_cache(). %% @doc -%% Strip the ledger cach back to only the relevant informaiton needed in +%% Strip the ledger cach back to only the relevant information needed in %% the query, and to make the cache a snapshot (and so not subject to changes %% such as additions to the ets table) readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) -> @@ -2190,7 +2190,7 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache) -> loader) -> ledger_cache(). %% @doc -%% Add a set of changes associated witha single sequence number (journal +%% Add a set of changes associated with a single sequence number (journal %% update) to the ledger cache. This is used explicitly when loading the %% ledger from the Journal (i.e. at startup) - and in this case the ETS insert %% can be bypassed, as all changes will be flushed to the Penciller before the diff --git a/src/leveled_log.erl b/src/leveled_log.erl index fd59366..1a78d36 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -305,7 +305,7 @@ {"I0016", {info, "Writing new version of manifest for manifestSQN=~w"}}, {"I0017", - {info, "At SQN=~w journal has filename ~s"}}, + {debug, "At SQN=~w journal has filename ~s"}}, {"I0018", {warn, "We're doomed - intention recorded to destroy all files"}}, {"I0019",