diff --git a/include/leveled.hrl b/include/leveled.hrl index 2074e46..761b9a6 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -38,7 +38,8 @@ bloom :: binary() | none | undefined}). -record(cdb_options, - {max_size :: integer() | undefined, + {max_size :: pos_integer() | undefined, + max_count :: pos_integer() | undefined, file_path :: string() | undefined, waste_path :: string() | undefined, binary_mode = false :: boolean(), diff --git a/priv/leveled.schema b/priv/leveled.schema index 241ee33..3e565e6 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -63,12 +63,27 @@ ]}. %% @doc The approximate size (in bytes) when a Journal file should be rolled. -%% Normally keep this as around the size of o(100K) objects. Default is 500MB +%% Normally keep this as around the size of o(100K) objects. Default is 1GB. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. {mapping, "leveled.journal_size", "leveled.journal_size", [ {default, 1000000000}, {datatype, integer} ]}. +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% This time measured in object count, a file will be rolled if either the +%% object count or the journal size limit is reached. Default 200K. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. +{mapping, "leveled.journal_objectcount", "leveled.journal_objectcount", [ + {default, 200000}, + {datatype, integer} +]}. + + %% @doc The number of journal compactions per vnode per day %% The higher the value, the more compaction runs, and the sooner space is %% recovered. But each run has a cost diff --git a/priv/leveled_multi.schema b/priv/leveled_multi.schema index b2b3afa..22857d7 100644 --- a/priv/leveled_multi.schema +++ b/priv/leveled_multi.schema @@ -57,11 +57,24 @@ %% @doc The approximate size (in bytes) when a Journal file should be rolled. -%% Normally keep this as around the size of o(100K) objects. Default is 500MB +%% Normally keep this as around the size of o(100K) objects. Default is 1GB. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. {mapping, "multi_backend.$name.leveled.journal_size", "riak_kv.multi_backend", [ - {default, 500000000}, - {datatype, integer}, - hidden + {default, 1000000000}, + {datatype, integer} +]}. + +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% This time measured in object count, a file will be rolled if either the +%% object count or the journal size limit is reached. Default 200K. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. +{mapping, "multi_backend.$name.leveled.journal_objectcount", "riak_kv.multi_backend", [ + {default, 200000}, + {datatype, integer} ]}. %% @doc The number of journal compactions per vnode per day diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index c16f3e0..b14dfde 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1,6 +1,6 @@ %% -------- Overview --------- %% -%% The eleveleddb is based on the LSM-tree similar to leveldb, except that: +%% Leveled is based on the LSM-tree similar to leveldb, except that: %% - Keys, Metadata and Values are not persisted together - the Keys and %% Metadata are kept in a tree-based ledger, whereas the values are stored %% only in a sequential Journal. @@ -11,7 +11,7 @@ %% and frequent use of iterators) %% - The Journal is an extended nursery log in leveldb terms. It is keyed %% on the sequence number of the write -%% - The ledger is a merge tree, where the key is the actaul object key, and +%% - The ledger is a merge tree, where the key is the actual object key, and %% the value is the metadata of the object including the sequence number %% %% @@ -134,6 +134,7 @@ {snapshot_bookie, undefined}, {cache_size, ?CACHE_SIZE}, {max_journalsize, 1000000000}, + {max_journalobjectcount, 200000}, {max_sstslots, 256}, {sync_strategy, none}, {head_only, false}, @@ -227,11 +228,15 @@ % The size of the Bookie's memory, the cache of the recent % additions to the ledger. Defaults to ?CACHE_SIZE, plus some % randomised jitter (randomised jitter will still be added to - % configured values + % configured values) % The minimum value is 100 - any lower value will be ignored {max_journalsize, pos_integer()} | - % The maximum size of a journal file in bytes. The abolute + % The maximum size of a journal file in bytes. The absolute % maximum must be 4GB due to 4 byte file pointers being used + {max_journalobjectcount, pos_integer()} | + % The maximum size of the journal by count of the objects. The + % journal must remain within the limit set by both this figures and + % the max_journalsize {max_sstslots, pos_integer()} | % The maximum number of slots in a SST file. All testing is done % at a size of 256 (except for Quickcheck tests}, altering this @@ -243,7 +248,7 @@ % partially in hardware (e.g through use of FBWC). % riak_sync is used for backwards compatability with OTP16 - and % will manually call sync() after each write (rather than use the - % O_SYNC option on startup + % O_SYNC option on startup) {head_only, false|with_lookup|no_lookup} | % When set to true, there are three fundamental changes as to how % leveled will work: @@ -450,7 +455,7 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) %% - A Primary Key and a Value %% - IndexSpecs - a set of secondary key changes associated with the %% transaction -%% - A tag indictaing the type of object. Behaviour for metadata extraction, +%% - A tag indicating the type of object. Behaviour for metadata extraction, %% and ledger compaction will vary by type. There are three currently %% implemented types i (Index), o (Standard), o_rkv (Riak). Keys added with %% Index tags are not fetchable (as they will not be hashed), but are @@ -461,7 +466,7 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) %% %% The inker will pass the PK/Value/IndexSpecs to the current (append only) %% CDB journal file to persist the change. The call should return either 'ok' -%% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for +%% or 'roll'. 'roll' indicates that the CDB file has insufficient capacity for %% this write, and a new journal file should be created (with appropriate %% manifest changes to be made). %% @@ -1644,6 +1649,11 @@ set_options(Opts) -> MaxJournalSize = min(?ABSOLUTEMAX_JOURNALSIZE, MaxJournalSize0 - erlang:phash2(self()) rem JournalSizeJitter), + MaxJournalCount0 = + proplists:get_value(max_journalobjectcount, Opts), + JournalCountJitter = MaxJournalCount0 div (100 div ?JOURNAL_SIZE_JITTER), + MaxJournalCount = + MaxJournalCount0 - erlang:phash2(self()) rem JournalCountJitter, SyncStrat = proplists:get_value(sync_strategy, Opts), WRP = proplists:get_value(waste_retention_period, Opts), @@ -1697,6 +1707,7 @@ set_options(Opts) -> compress_on_receipt = CompressOnReceipt, cdb_options = #cdb_options{max_size=MaxJournalSize, + max_count=MaxJournalCount, binary_mode=true, sync_strategy=SyncStrat, log_options=leveled_log:get_opts()}}, @@ -1742,7 +1753,7 @@ return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) -> -spec snaptype_by_presence(boolean()) -> store|ledger. %% @doc %% Folds that traverse over object heads, may also either require to return -%% the object,or at least confirm th eobject is present in the Ledger. This +%% the object, or at least confirm the object is present in the Ledger. This %% is achieved by enabling presence - and this will change the type of %% snapshot to one that covers the whole store (i.e. both ledger and journal), %% rather than just the ledger. @@ -1753,7 +1764,7 @@ snaptype_by_presence(false) -> -spec get_runner(book_state(), tuple()) -> {async, fun()}. %% @doc -%% Getan {async, Runner} for a given fold type. Fold types have different +%% Get an {async, Runner} for a given fold type. Fold types have different %% tuple inputs get_runner(State, {index_query, Constraint, FoldAccT, Range, TermHandling}) -> {IdxFld, StartT, EndT} = Range, @@ -1922,7 +1933,7 @@ get_deprecatedrunner(State, {tuple(), tuple(), tuple()|no_lookup}. %% @doc %% Convert a range of binary keys into a ledger key range, returning -%% {StartLK, EndLK, Query} where Query is to indicate whether the query +%% {StartLK, EndLK, Query} where Query is to indicate whether the query %% range is worth using to minimise the cost of the snapshot return_ledger_keyrange(Tag, Bucket, KeyRange) -> {StartKey, EndKey, Snap} = @@ -1966,7 +1977,7 @@ maybe_longrunning(SW, Aspect) -> -spec readycache_forsnapshot(ledger_cache(), tuple()|no_lookup|undefined) -> ledger_cache(). %% @doc -%% Strip the ledger cach back to only the relevant informaiton needed in +%% Strip the ledger cach back to only the relevant information needed in %% the query, and to make the cache a snapshot (and so not subject to changes %% such as additions to the ets table) readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) -> @@ -2179,7 +2190,7 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache) -> loader) -> ledger_cache(). %% @doc -%% Add a set of changes associated witha single sequence number (journal +%% Add a set of changes associated with a single sequence number (journal %% update) to the ledger cache. This is used explicitly when loading the %% ledger from the Journal (i.e. at startup) - and in this case the ETS insert %% can be bypassed, as all changes will be flushed to the Penciller before the diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 5c8caba..a2ff728 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -138,10 +138,12 @@ -record(state, {hashtree, last_position :: integer() | undefined, last_key = empty, + current_count = 0 :: non_neg_integer(), hash_index = {} :: tuple(), filename :: string() | undefined, handle :: file:fd() | undefined, - max_size :: integer() | undefined, + max_size :: pos_integer() | undefined, + max_count :: pos_integer() | undefined, binary_mode = false :: boolean(), delete_point = 0 :: integer(), inker :: pid() | undefined, @@ -425,15 +427,24 @@ cdb_clerkcomplete(Pid) -> %%%============================================================================ init([Opts]) -> - MaxSize = case Opts#cdb_options.max_size of - undefined -> - ?MAX_FILE_SIZE; - M -> - M - end, + MaxSize = + case Opts#cdb_options.max_size of + undefined -> + ?MAX_FILE_SIZE; + MS -> + MS + end, + MaxCount = + case Opts#cdb_options.max_count of + undefined -> + ?MAX_FILE_SIZE div 1000; + MC -> + MC + end, {ok, starting, #state{max_size=MaxSize, + max_count=MaxCount, binary_mode=Opts#cdb_options.binary_mode, waste_path=Opts#cdb_options.waste_path, sync_strategy=Opts#cdb_options.sync_strategy, @@ -447,6 +458,7 @@ starting({open_writer, Filename}, _From, State) -> leveled_log:log("CDB13", [WriteOps]), {ok, Handle} = file:open(Filename, WriteOps), State0 = State#state{handle=Handle, + current_count = size_hashtree(HashTree), sync_strategy = UpdStrategy, last_position=LastPosition, last_key=LastKey, @@ -490,47 +502,63 @@ writer({key_check, Key}, _From, State) -> writer, State}; writer({put_kv, Key, Value}, _From, State) -> - Result = put(State#state.handle, - Key, - Value, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size, - State#state.last_key == empty), - case Result of - roll -> - %% Key and value could not be written + NewCount = State#state.current_count + 1, + case NewCount >= State#state.max_count of + true -> {reply, roll, writer, State}; - {UpdHandle, NewPosition, HashTree} -> - ok = - case State#state.sync_strategy of - riak_sync -> - file:datasync(UpdHandle); - _ -> - ok - end, - {reply, ok, writer, State#state{handle=UpdHandle, - last_position=NewPosition, - last_key=Key, - hashtree=HashTree}} + false -> + Result = put(State#state.handle, + Key, + Value, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size, + State#state.last_key == empty), + case Result of + roll -> + %% Key and value could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree} -> + ok = + case State#state.sync_strategy of + riak_sync -> + file:datasync(UpdHandle); + _ -> + ok + end, + {reply, ok, writer, State#state{handle=UpdHandle, + current_count=NewCount, + last_position=NewPosition, + last_key=Key, + hashtree=HashTree}} + end end; writer({mput_kv, []}, _From, State) -> {reply, ok, writer, State}; writer({mput_kv, KVList}, _From, State) -> - Result = mput(State#state.handle, - KVList, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size), - case Result of - roll -> - %% Keys and values could not be written + NewCount = State#state.current_count + length(KVList), + TooMany = NewCount >= State#state.max_count, + NotEmpty = State#state.current_count > 0, + case (TooMany and NotEmpty) of + true -> {reply, roll, writer, State}; - {UpdHandle, NewPosition, HashTree, LastKey} -> - {reply, ok, writer, State#state{handle=UpdHandle, - last_position=NewPosition, - last_key=LastKey, - hashtree=HashTree}} + false -> + Result = mput(State#state.handle, + KVList, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size), + case Result of + roll -> + %% Keys and values could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree, LastKey} -> + {reply, ok, writer, State#state{handle=UpdHandle, + current_count=NewCount, + last_position=NewPosition, + last_key=LastKey, + hashtree=HashTree}} + end end; writer(cdb_complete, _From, State) -> NewName = determine_new_filename(State#state.filename), @@ -1775,6 +1803,9 @@ add_position_tohashtree(HashTree, Index, Hash, Position) -> new_hashtree() -> ets:new(hashtree, [ordered_set]). +size_hashtree(HashTree) -> + ets:info(HashTree, size). + to_list(HashTree, Index) -> to_list(HashTree, Index, {0, -1}, []). diff --git a/src/leveled_log.erl b/src/leveled_log.erl index fd59366..1a78d36 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -305,7 +305,7 @@ {"I0016", {info, "Writing new version of manifest for manifestSQN=~w"}}, {"I0017", - {info, "At SQN=~w journal has filename ~s"}}, + {debug, "At SQN=~w journal has filename ~s"}}, {"I0018", {warn, "We're doomed - intention recorded to destroy all files"}}, {"I0019", diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 4935721..f6621cd 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -603,11 +603,14 @@ allkeydelta_journal_multicompact(_Config) -> % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), B = <<"test_bucket">>, - StartOpts1 = [{root_path, RootPath}, - {max_journalsize, 50000000}, - {max_run_length, 6}, - {sync_strategy, testutil:sync_strategy()}], - {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + StartOptsFun = + fun(JOC) -> + [{root_path, RootPath}, + {max_journalobjectcount, JOC}, + {max_run_length, 6}, + {sync_strategy, testutil:sync_strategy()}] + end, + {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(16000)), {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 40000), {KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1, B, @@ -633,26 +636,47 @@ allkeydelta_journal_multicompact(_Config) -> ok = leveled_bookie:book_close(Bookie1), leveled_penciller:clean_testdir(RootPath ++ "/ledger"), - {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + io:format("Restart without ledger~n"), + {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(24000)), ok = testutil:check_indexed_objects(Bookie2, B, KSpcL1 ++ KSpcL2, V2), - {KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2, + {KSpcL3, _V3} = testutil:put_altered_indexed_objects(Bookie2, B, KSpcL2, false), compact_and_wait(Bookie2, 0), - - ok = testutil:check_indexed_objects(Bookie2, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3, - V3), - + {ok, FileList3} = + file:list_dir( + filename:join(RootPath, "journal/journal_files/post_compact")), + io:format("Number of files after compaction ~w~n", [length(FileList3)]), ok = leveled_bookie:book_close(Bookie2), + + io:format("Restart with smaller journal object count~n"), + {ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(8000)), + + {KSpcL4, V4} = testutil:put_altered_indexed_objects(Bookie3, + B, + KSpcL3, + false), + + compact_and_wait(Bookie3, 0), + + ok = testutil:check_indexed_objects(Bookie3, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, + V4), + {ok, FileList4} = + file:list_dir( + filename:join(RootPath, "journal/journal_files/post_compact")), + io:format("Number of files after compaction ~w~n", [length(FileList4)]), + true = length(FileList4) >= length(FileList3) + 4, + + ok = leveled_bookie:book_close(Bookie3), testutil:reset_filestructure(10000).