diff --git a/include/leveled.hrl b/include/leveled.hrl index 2074e46..761b9a6 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -38,7 +38,8 @@ bloom :: binary() | none | undefined}). -record(cdb_options, - {max_size :: integer() | undefined, + {max_size :: pos_integer() | undefined, + max_count :: pos_integer() | undefined, file_path :: string() | undefined, waste_path :: string() | undefined, binary_mode = false :: boolean(), diff --git a/priv/leveled.schema b/priv/leveled.schema index 241ee33..3e565e6 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -63,12 +63,27 @@ ]}. %% @doc The approximate size (in bytes) when a Journal file should be rolled. -%% Normally keep this as around the size of o(100K) objects. Default is 500MB +%% Normally keep this as around the size of o(100K) objects. Default is 1GB. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. {mapping, "leveled.journal_size", "leveled.journal_size", [ {default, 1000000000}, {datatype, integer} ]}. +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% This time measured in object count, a file will be rolled if either the +%% object count or the journal size limit is reached. Default 200K. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. +{mapping, "leveled.journal_objectcount", "leveled.journal_objectcount", [ + {default, 200000}, + {datatype, integer} +]}. + + %% @doc The number of journal compactions per vnode per day %% The higher the value, the more compaction runs, and the sooner space is %% recovered. But each run has a cost diff --git a/priv/leveled_multi.schema b/priv/leveled_multi.schema index b2b3afa..22857d7 100644 --- a/priv/leveled_multi.schema +++ b/priv/leveled_multi.schema @@ -57,11 +57,24 @@ %% @doc The approximate size (in bytes) when a Journal file should be rolled. -%% Normally keep this as around the size of o(100K) objects. Default is 500MB +%% Normally keep this as around the size of o(100K) objects. Default is 1GB. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. {mapping, "multi_backend.$name.leveled.journal_size", "riak_kv.multi_backend", [ - {default, 500000000}, - {datatype, integer}, - hidden + {default, 1000000000}, + {datatype, integer} +]}. + +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% This time measured in object count, a file will be rolled if either the +%% object count or the journal size limit is reached. Default 200K. +%% Note that on startup an actual maximum size will be chosen which varies by +%% a random factor from this point - to avoid coordination of roll events +%% across vnodes. +{mapping, "multi_backend.$name.leveled.journal_objectcount", "riak_kv.multi_backend", [ + {default, 200000}, + {datatype, integer} ]}. %% @doc The number of journal compactions per vnode per day diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index c16f3e0..109dfaf 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -134,6 +134,7 @@ {snapshot_bookie, undefined}, {cache_size, ?CACHE_SIZE}, {max_journalsize, 1000000000}, + {max_journalobjectcount, 200000}, {max_sstslots, 256}, {sync_strategy, none}, {head_only, false}, @@ -230,8 +231,12 @@ % configured values % The minimum value is 100 - any lower value will be ignored {max_journalsize, pos_integer()} | - % The maximum size of a journal file in bytes. The abolute + % The maximum size of a journal file in bytes. The absolute % maximum must be 4GB due to 4 byte file pointers being used + {max_journalobjectcount, pos_integer()} | + % The maximum size of the journal by count of the objects. The + % journal must remian within the limit set by both this figures and + % the max_journalsize {max_sstslots, pos_integer()} | % The maximum number of slots in a SST file. All testing is done % at a size of 256 (except for Quickcheck tests}, altering this @@ -1644,6 +1649,11 @@ set_options(Opts) -> MaxJournalSize = min(?ABSOLUTEMAX_JOURNALSIZE, MaxJournalSize0 - erlang:phash2(self()) rem JournalSizeJitter), + MaxJournalCount0 = + proplists:get_value(max_journalobjectcount, Opts), + JournalCountJitter = MaxJournalCount0 div (100 div ?JOURNAL_SIZE_JITTER), + MaxJournalCount = + MaxJournalCount0 - erlang:phash2(self()) rem JournalCountJitter, SyncStrat = proplists:get_value(sync_strategy, Opts), WRP = proplists:get_value(waste_retention_period, Opts), @@ -1697,6 +1707,7 @@ set_options(Opts) -> compress_on_receipt = CompressOnReceipt, cdb_options = #cdb_options{max_size=MaxJournalSize, + max_count=MaxJournalCount, binary_mode=true, sync_strategy=SyncStrat, log_options=leveled_log:get_opts()}}, diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 5c8caba..a2ff728 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -138,10 +138,12 @@ -record(state, {hashtree, last_position :: integer() | undefined, last_key = empty, + current_count = 0 :: non_neg_integer(), hash_index = {} :: tuple(), filename :: string() | undefined, handle :: file:fd() | undefined, - max_size :: integer() | undefined, + max_size :: pos_integer() | undefined, + max_count :: pos_integer() | undefined, binary_mode = false :: boolean(), delete_point = 0 :: integer(), inker :: pid() | undefined, @@ -425,15 +427,24 @@ cdb_clerkcomplete(Pid) -> %%%============================================================================ init([Opts]) -> - MaxSize = case Opts#cdb_options.max_size of - undefined -> - ?MAX_FILE_SIZE; - M -> - M - end, + MaxSize = + case Opts#cdb_options.max_size of + undefined -> + ?MAX_FILE_SIZE; + MS -> + MS + end, + MaxCount = + case Opts#cdb_options.max_count of + undefined -> + ?MAX_FILE_SIZE div 1000; + MC -> + MC + end, {ok, starting, #state{max_size=MaxSize, + max_count=MaxCount, binary_mode=Opts#cdb_options.binary_mode, waste_path=Opts#cdb_options.waste_path, sync_strategy=Opts#cdb_options.sync_strategy, @@ -447,6 +458,7 @@ starting({open_writer, Filename}, _From, State) -> leveled_log:log("CDB13", [WriteOps]), {ok, Handle} = file:open(Filename, WriteOps), State0 = State#state{handle=Handle, + current_count = size_hashtree(HashTree), sync_strategy = UpdStrategy, last_position=LastPosition, last_key=LastKey, @@ -490,47 +502,63 @@ writer({key_check, Key}, _From, State) -> writer, State}; writer({put_kv, Key, Value}, _From, State) -> - Result = put(State#state.handle, - Key, - Value, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size, - State#state.last_key == empty), - case Result of - roll -> - %% Key and value could not be written + NewCount = State#state.current_count + 1, + case NewCount >= State#state.max_count of + true -> {reply, roll, writer, State}; - {UpdHandle, NewPosition, HashTree} -> - ok = - case State#state.sync_strategy of - riak_sync -> - file:datasync(UpdHandle); - _ -> - ok - end, - {reply, ok, writer, State#state{handle=UpdHandle, - last_position=NewPosition, - last_key=Key, - hashtree=HashTree}} + false -> + Result = put(State#state.handle, + Key, + Value, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size, + State#state.last_key == empty), + case Result of + roll -> + %% Key and value could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree} -> + ok = + case State#state.sync_strategy of + riak_sync -> + file:datasync(UpdHandle); + _ -> + ok + end, + {reply, ok, writer, State#state{handle=UpdHandle, + current_count=NewCount, + last_position=NewPosition, + last_key=Key, + hashtree=HashTree}} + end end; writer({mput_kv, []}, _From, State) -> {reply, ok, writer, State}; writer({mput_kv, KVList}, _From, State) -> - Result = mput(State#state.handle, - KVList, - {State#state.last_position, State#state.hashtree}, - State#state.binary_mode, - State#state.max_size), - case Result of - roll -> - %% Keys and values could not be written + NewCount = State#state.current_count + length(KVList), + TooMany = NewCount >= State#state.max_count, + NotEmpty = State#state.current_count > 0, + case (TooMany and NotEmpty) of + true -> {reply, roll, writer, State}; - {UpdHandle, NewPosition, HashTree, LastKey} -> - {reply, ok, writer, State#state{handle=UpdHandle, - last_position=NewPosition, - last_key=LastKey, - hashtree=HashTree}} + false -> + Result = mput(State#state.handle, + KVList, + {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, + State#state.max_size), + case Result of + roll -> + %% Keys and values could not be written + {reply, roll, writer, State}; + {UpdHandle, NewPosition, HashTree, LastKey} -> + {reply, ok, writer, State#state{handle=UpdHandle, + current_count=NewCount, + last_position=NewPosition, + last_key=LastKey, + hashtree=HashTree}} + end end; writer(cdb_complete, _From, State) -> NewName = determine_new_filename(State#state.filename), @@ -1775,6 +1803,9 @@ add_position_tohashtree(HashTree, Index, Hash, Position) -> new_hashtree() -> ets:new(hashtree, [ordered_set]). +size_hashtree(HashTree) -> + ets:info(HashTree, size). + to_list(HashTree, Index) -> to_list(HashTree, Index, {0, -1}, []). diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 4935721..f6621cd 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -603,11 +603,14 @@ allkeydelta_journal_multicompact(_Config) -> % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), B = <<"test_bucket">>, - StartOpts1 = [{root_path, RootPath}, - {max_journalsize, 50000000}, - {max_run_length, 6}, - {sync_strategy, testutil:sync_strategy()}], - {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + StartOptsFun = + fun(JOC) -> + [{root_path, RootPath}, + {max_journalobjectcount, JOC}, + {max_run_length, 6}, + {sync_strategy, testutil:sync_strategy()}] + end, + {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(16000)), {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 40000), {KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1, B, @@ -633,26 +636,47 @@ allkeydelta_journal_multicompact(_Config) -> ok = leveled_bookie:book_close(Bookie1), leveled_penciller:clean_testdir(RootPath ++ "/ledger"), - {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + io:format("Restart without ledger~n"), + {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(24000)), ok = testutil:check_indexed_objects(Bookie2, B, KSpcL1 ++ KSpcL2, V2), - {KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2, + {KSpcL3, _V3} = testutil:put_altered_indexed_objects(Bookie2, B, KSpcL2, false), compact_and_wait(Bookie2, 0), - - ok = testutil:check_indexed_objects(Bookie2, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3, - V3), - + {ok, FileList3} = + file:list_dir( + filename:join(RootPath, "journal/journal_files/post_compact")), + io:format("Number of files after compaction ~w~n", [length(FileList3)]), ok = leveled_bookie:book_close(Bookie2), + + io:format("Restart with smaller journal object count~n"), + {ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(8000)), + + {KSpcL4, V4} = testutil:put_altered_indexed_objects(Bookie3, + B, + KSpcL3, + false), + + compact_and_wait(Bookie3, 0), + + ok = testutil:check_indexed_objects(Bookie3, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, + V4), + {ok, FileList4} = + file:list_dir( + filename:join(RootPath, "journal/journal_files/post_compact")), + io:format("Number of files after compaction ~w~n", [length(FileList4)]), + true = length(FileList4) >= length(FileList3) + 4, + + ok = leveled_bookie:book_close(Bookie3), testutil:reset_filestructure(10000).