Merge pull request #291 from martinsumner/mas-i289-config
Mas i289 config
This commit is contained in:
commit
02c1c57edf
7 changed files with 169 additions and 74 deletions
|
@ -38,7 +38,8 @@
|
|||
bloom :: binary() | none | undefined}).
|
||||
|
||||
-record(cdb_options,
|
||||
{max_size :: integer() | undefined,
|
||||
{max_size :: pos_integer() | undefined,
|
||||
max_count :: pos_integer() | undefined,
|
||||
file_path :: string() | undefined,
|
||||
waste_path :: string() | undefined,
|
||||
binary_mode = false :: boolean(),
|
||||
|
|
|
@ -63,12 +63,27 @@
|
|||
]}.
|
||||
|
||||
%% @doc The approximate size (in bytes) when a Journal file should be rolled.
|
||||
%% Normally keep this as around the size of o(100K) objects. Default is 500MB
|
||||
%% Normally keep this as around the size of o(100K) objects. Default is 1GB.
|
||||
%% Note that on startup an actual maximum size will be chosen which varies by
|
||||
%% a random factor from this point - to avoid coordination of roll events
|
||||
%% across vnodes.
|
||||
{mapping, "leveled.journal_size", "leveled.journal_size", [
|
||||
{default, 1000000000},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
%% @doc The approximate size (in bytes) when a Journal file should be rolled.
|
||||
%% This time measured in object count, a file will be rolled if either the
|
||||
%% object count or the journal size limit is reached. Default 200K.
|
||||
%% Note that on startup an actual maximum size will be chosen which varies by
|
||||
%% a random factor from this point - to avoid coordination of roll events
|
||||
%% across vnodes.
|
||||
{mapping, "leveled.journal_objectcount", "leveled.journal_objectcount", [
|
||||
{default, 200000},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
|
||||
%% @doc The number of journal compactions per vnode per day
|
||||
%% The higher the value, the more compaction runs, and the sooner space is
|
||||
%% recovered. But each run has a cost
|
||||
|
|
|
@ -57,11 +57,24 @@
|
|||
|
||||
|
||||
%% @doc The approximate size (in bytes) when a Journal file should be rolled.
|
||||
%% Normally keep this as around the size of o(100K) objects. Default is 500MB
|
||||
%% Normally keep this as around the size of o(100K) objects. Default is 1GB.
|
||||
%% Note that on startup an actual maximum size will be chosen which varies by
|
||||
%% a random factor from this point - to avoid coordination of roll events
|
||||
%% across vnodes.
|
||||
{mapping, "multi_backend.$name.leveled.journal_size", "riak_kv.multi_backend", [
|
||||
{default, 500000000},
|
||||
{datatype, integer},
|
||||
hidden
|
||||
{default, 1000000000},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
%% @doc The approximate size (in bytes) when a Journal file should be rolled.
|
||||
%% This time measured in object count, a file will be rolled if either the
|
||||
%% object count or the journal size limit is reached. Default 200K.
|
||||
%% Note that on startup an actual maximum size will be chosen which varies by
|
||||
%% a random factor from this point - to avoid coordination of roll events
|
||||
%% across vnodes.
|
||||
{mapping, "multi_backend.$name.leveled.journal_objectcount", "riak_kv.multi_backend", [
|
||||
{default, 200000},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
%% @doc The number of journal compactions per vnode per day
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
%% -------- Overview ---------
|
||||
%%
|
||||
%% The eleveleddb is based on the LSM-tree similar to leveldb, except that:
|
||||
%% Leveled is based on the LSM-tree similar to leveldb, except that:
|
||||
%% - Keys, Metadata and Values are not persisted together - the Keys and
|
||||
%% Metadata are kept in a tree-based ledger, whereas the values are stored
|
||||
%% only in a sequential Journal.
|
||||
|
@ -11,7 +11,7 @@
|
|||
%% and frequent use of iterators)
|
||||
%% - The Journal is an extended nursery log in leveldb terms. It is keyed
|
||||
%% on the sequence number of the write
|
||||
%% - The ledger is a merge tree, where the key is the actaul object key, and
|
||||
%% - The ledger is a merge tree, where the key is the actual object key, and
|
||||
%% the value is the metadata of the object including the sequence number
|
||||
%%
|
||||
%%
|
||||
|
@ -134,6 +134,7 @@
|
|||
{snapshot_bookie, undefined},
|
||||
{cache_size, ?CACHE_SIZE},
|
||||
{max_journalsize, 1000000000},
|
||||
{max_journalobjectcount, 200000},
|
||||
{max_sstslots, 256},
|
||||
{sync_strategy, none},
|
||||
{head_only, false},
|
||||
|
@ -227,11 +228,15 @@
|
|||
% The size of the Bookie's memory, the cache of the recent
|
||||
% additions to the ledger. Defaults to ?CACHE_SIZE, plus some
|
||||
% randomised jitter (randomised jitter will still be added to
|
||||
% configured values
|
||||
% configured values)
|
||||
% The minimum value is 100 - any lower value will be ignored
|
||||
{max_journalsize, pos_integer()} |
|
||||
% The maximum size of a journal file in bytes. The abolute
|
||||
% The maximum size of a journal file in bytes. The absolute
|
||||
% maximum must be 4GB due to 4 byte file pointers being used
|
||||
{max_journalobjectcount, pos_integer()} |
|
||||
% The maximum size of the journal by count of the objects. The
|
||||
% journal must remain within the limit set by both this figures and
|
||||
% the max_journalsize
|
||||
{max_sstslots, pos_integer()} |
|
||||
% The maximum number of slots in a SST file. All testing is done
|
||||
% at a size of 256 (except for Quickcheck tests}, altering this
|
||||
|
@ -243,7 +248,7 @@
|
|||
% partially in hardware (e.g through use of FBWC).
|
||||
% riak_sync is used for backwards compatability with OTP16 - and
|
||||
% will manually call sync() after each write (rather than use the
|
||||
% O_SYNC option on startup
|
||||
% O_SYNC option on startup)
|
||||
{head_only, false|with_lookup|no_lookup} |
|
||||
% When set to true, there are three fundamental changes as to how
|
||||
% leveled will work:
|
||||
|
@ -450,7 +455,7 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL)
|
|||
%% - A Primary Key and a Value
|
||||
%% - IndexSpecs - a set of secondary key changes associated with the
|
||||
%% transaction
|
||||
%% - A tag indictaing the type of object. Behaviour for metadata extraction,
|
||||
%% - A tag indicating the type of object. Behaviour for metadata extraction,
|
||||
%% and ledger compaction will vary by type. There are three currently
|
||||
%% implemented types i (Index), o (Standard), o_rkv (Riak). Keys added with
|
||||
%% Index tags are not fetchable (as they will not be hashed), but are
|
||||
|
@ -461,7 +466,7 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL)
|
|||
%%
|
||||
%% The inker will pass the PK/Value/IndexSpecs to the current (append only)
|
||||
%% CDB journal file to persist the change. The call should return either 'ok'
|
||||
%% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for
|
||||
%% or 'roll'. 'roll' indicates that the CDB file has insufficient capacity for
|
||||
%% this write, and a new journal file should be created (with appropriate
|
||||
%% manifest changes to be made).
|
||||
%%
|
||||
|
@ -1644,6 +1649,11 @@ set_options(Opts) ->
|
|||
MaxJournalSize =
|
||||
min(?ABSOLUTEMAX_JOURNALSIZE,
|
||||
MaxJournalSize0 - erlang:phash2(self()) rem JournalSizeJitter),
|
||||
MaxJournalCount0 =
|
||||
proplists:get_value(max_journalobjectcount, Opts),
|
||||
JournalCountJitter = MaxJournalCount0 div (100 div ?JOURNAL_SIZE_JITTER),
|
||||
MaxJournalCount =
|
||||
MaxJournalCount0 - erlang:phash2(self()) rem JournalCountJitter,
|
||||
|
||||
SyncStrat = proplists:get_value(sync_strategy, Opts),
|
||||
WRP = proplists:get_value(waste_retention_period, Opts),
|
||||
|
@ -1697,6 +1707,7 @@ set_options(Opts) ->
|
|||
compress_on_receipt = CompressOnReceipt,
|
||||
cdb_options =
|
||||
#cdb_options{max_size=MaxJournalSize,
|
||||
max_count=MaxJournalCount,
|
||||
binary_mode=true,
|
||||
sync_strategy=SyncStrat,
|
||||
log_options=leveled_log:get_opts()}},
|
||||
|
@ -1742,7 +1753,7 @@ return_snapfun(State, SnapType, Query, LongRunning, SnapPreFold) ->
|
|||
-spec snaptype_by_presence(boolean()) -> store|ledger.
|
||||
%% @doc
|
||||
%% Folds that traverse over object heads, may also either require to return
|
||||
%% the object,or at least confirm th eobject is present in the Ledger. This
|
||||
%% the object, or at least confirm the object is present in the Ledger. This
|
||||
%% is achieved by enabling presence - and this will change the type of
|
||||
%% snapshot to one that covers the whole store (i.e. both ledger and journal),
|
||||
%% rather than just the ledger.
|
||||
|
@ -1753,7 +1764,7 @@ snaptype_by_presence(false) ->
|
|||
|
||||
-spec get_runner(book_state(), tuple()) -> {async, fun()}.
|
||||
%% @doc
|
||||
%% Getan {async, Runner} for a given fold type. Fold types have different
|
||||
%% Get an {async, Runner} for a given fold type. Fold types have different
|
||||
%% tuple inputs
|
||||
get_runner(State, {index_query, Constraint, FoldAccT, Range, TermHandling}) ->
|
||||
{IdxFld, StartT, EndT} = Range,
|
||||
|
@ -1922,7 +1933,7 @@ get_deprecatedrunner(State,
|
|||
{tuple(), tuple(), tuple()|no_lookup}.
|
||||
%% @doc
|
||||
%% Convert a range of binary keys into a ledger key range, returning
|
||||
%% {StartLK, EndLK, Query} where Query is to indicate whether the query
|
||||
%% {StartLK, EndLK, Query} where Query is to indicate whether the query
|
||||
%% range is worth using to minimise the cost of the snapshot
|
||||
return_ledger_keyrange(Tag, Bucket, KeyRange) ->
|
||||
{StartKey, EndKey, Snap} =
|
||||
|
@ -1966,7 +1977,7 @@ maybe_longrunning(SW, Aspect) ->
|
|||
-spec readycache_forsnapshot(ledger_cache(), tuple()|no_lookup|undefined)
|
||||
-> ledger_cache().
|
||||
%% @doc
|
||||
%% Strip the ledger cach back to only the relevant informaiton needed in
|
||||
%% Strip the ledger cach back to only the relevant information needed in
|
||||
%% the query, and to make the cache a snapshot (and so not subject to changes
|
||||
%% such as additions to the ets table)
|
||||
readycache_forsnapshot(LedgerCache, {StartKey, EndKey}) ->
|
||||
|
@ -2179,7 +2190,7 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
|
|||
loader)
|
||||
-> ledger_cache().
|
||||
%% @doc
|
||||
%% Add a set of changes associated witha single sequence number (journal
|
||||
%% Add a set of changes associated with a single sequence number (journal
|
||||
%% update) to the ledger cache. This is used explicitly when loading the
|
||||
%% ledger from the Journal (i.e. at startup) - and in this case the ETS insert
|
||||
%% can be bypassed, as all changes will be flushed to the Penciller before the
|
||||
|
|
|
@ -138,10 +138,12 @@
|
|||
-record(state, {hashtree,
|
||||
last_position :: integer() | undefined,
|
||||
last_key = empty,
|
||||
current_count = 0 :: non_neg_integer(),
|
||||
hash_index = {} :: tuple(),
|
||||
filename :: string() | undefined,
|
||||
handle :: file:fd() | undefined,
|
||||
max_size :: integer() | undefined,
|
||||
max_size :: pos_integer() | undefined,
|
||||
max_count :: pos_integer() | undefined,
|
||||
binary_mode = false :: boolean(),
|
||||
delete_point = 0 :: integer(),
|
||||
inker :: pid() | undefined,
|
||||
|
@ -425,15 +427,24 @@ cdb_clerkcomplete(Pid) ->
|
|||
%%%============================================================================
|
||||
|
||||
init([Opts]) ->
|
||||
MaxSize = case Opts#cdb_options.max_size of
|
||||
undefined ->
|
||||
?MAX_FILE_SIZE;
|
||||
M ->
|
||||
M
|
||||
end,
|
||||
MaxSize =
|
||||
case Opts#cdb_options.max_size of
|
||||
undefined ->
|
||||
?MAX_FILE_SIZE;
|
||||
MS ->
|
||||
MS
|
||||
end,
|
||||
MaxCount =
|
||||
case Opts#cdb_options.max_count of
|
||||
undefined ->
|
||||
?MAX_FILE_SIZE div 1000;
|
||||
MC ->
|
||||
MC
|
||||
end,
|
||||
{ok,
|
||||
starting,
|
||||
#state{max_size=MaxSize,
|
||||
max_count=MaxCount,
|
||||
binary_mode=Opts#cdb_options.binary_mode,
|
||||
waste_path=Opts#cdb_options.waste_path,
|
||||
sync_strategy=Opts#cdb_options.sync_strategy,
|
||||
|
@ -447,6 +458,7 @@ starting({open_writer, Filename}, _From, State) ->
|
|||
leveled_log:log("CDB13", [WriteOps]),
|
||||
{ok, Handle} = file:open(Filename, WriteOps),
|
||||
State0 = State#state{handle=Handle,
|
||||
current_count = size_hashtree(HashTree),
|
||||
sync_strategy = UpdStrategy,
|
||||
last_position=LastPosition,
|
||||
last_key=LastKey,
|
||||
|
@ -490,47 +502,63 @@ writer({key_check, Key}, _From, State) ->
|
|||
writer,
|
||||
State};
|
||||
writer({put_kv, Key, Value}, _From, State) ->
|
||||
Result = put(State#state.handle,
|
||||
Key,
|
||||
Value,
|
||||
{State#state.last_position, State#state.hashtree},
|
||||
State#state.binary_mode,
|
||||
State#state.max_size,
|
||||
State#state.last_key == empty),
|
||||
case Result of
|
||||
roll ->
|
||||
%% Key and value could not be written
|
||||
NewCount = State#state.current_count + 1,
|
||||
case NewCount >= State#state.max_count of
|
||||
true ->
|
||||
{reply, roll, writer, State};
|
||||
{UpdHandle, NewPosition, HashTree} ->
|
||||
ok =
|
||||
case State#state.sync_strategy of
|
||||
riak_sync ->
|
||||
file:datasync(UpdHandle);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
{reply, ok, writer, State#state{handle=UpdHandle,
|
||||
last_position=NewPosition,
|
||||
last_key=Key,
|
||||
hashtree=HashTree}}
|
||||
false ->
|
||||
Result = put(State#state.handle,
|
||||
Key,
|
||||
Value,
|
||||
{State#state.last_position, State#state.hashtree},
|
||||
State#state.binary_mode,
|
||||
State#state.max_size,
|
||||
State#state.last_key == empty),
|
||||
case Result of
|
||||
roll ->
|
||||
%% Key and value could not be written
|
||||
{reply, roll, writer, State};
|
||||
{UpdHandle, NewPosition, HashTree} ->
|
||||
ok =
|
||||
case State#state.sync_strategy of
|
||||
riak_sync ->
|
||||
file:datasync(UpdHandle);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
{reply, ok, writer, State#state{handle=UpdHandle,
|
||||
current_count=NewCount,
|
||||
last_position=NewPosition,
|
||||
last_key=Key,
|
||||
hashtree=HashTree}}
|
||||
end
|
||||
end;
|
||||
writer({mput_kv, []}, _From, State) ->
|
||||
{reply, ok, writer, State};
|
||||
writer({mput_kv, KVList}, _From, State) ->
|
||||
Result = mput(State#state.handle,
|
||||
KVList,
|
||||
{State#state.last_position, State#state.hashtree},
|
||||
State#state.binary_mode,
|
||||
State#state.max_size),
|
||||
case Result of
|
||||
roll ->
|
||||
%% Keys and values could not be written
|
||||
NewCount = State#state.current_count + length(KVList),
|
||||
TooMany = NewCount >= State#state.max_count,
|
||||
NotEmpty = State#state.current_count > 0,
|
||||
case (TooMany and NotEmpty) of
|
||||
true ->
|
||||
{reply, roll, writer, State};
|
||||
{UpdHandle, NewPosition, HashTree, LastKey} ->
|
||||
{reply, ok, writer, State#state{handle=UpdHandle,
|
||||
last_position=NewPosition,
|
||||
last_key=LastKey,
|
||||
hashtree=HashTree}}
|
||||
false ->
|
||||
Result = mput(State#state.handle,
|
||||
KVList,
|
||||
{State#state.last_position, State#state.hashtree},
|
||||
State#state.binary_mode,
|
||||
State#state.max_size),
|
||||
case Result of
|
||||
roll ->
|
||||
%% Keys and values could not be written
|
||||
{reply, roll, writer, State};
|
||||
{UpdHandle, NewPosition, HashTree, LastKey} ->
|
||||
{reply, ok, writer, State#state{handle=UpdHandle,
|
||||
current_count=NewCount,
|
||||
last_position=NewPosition,
|
||||
last_key=LastKey,
|
||||
hashtree=HashTree}}
|
||||
end
|
||||
end;
|
||||
writer(cdb_complete, _From, State) ->
|
||||
NewName = determine_new_filename(State#state.filename),
|
||||
|
@ -1775,6 +1803,9 @@ add_position_tohashtree(HashTree, Index, Hash, Position) ->
|
|||
new_hashtree() ->
|
||||
ets:new(hashtree, [ordered_set]).
|
||||
|
||||
size_hashtree(HashTree) ->
|
||||
ets:info(HashTree, size).
|
||||
|
||||
to_list(HashTree, Index) ->
|
||||
to_list(HashTree, Index, {0, -1}, []).
|
||||
|
||||
|
|
|
@ -305,7 +305,7 @@
|
|||
{"I0016",
|
||||
{info, "Writing new version of manifest for manifestSQN=~w"}},
|
||||
{"I0017",
|
||||
{info, "At SQN=~w journal has filename ~s"}},
|
||||
{debug, "At SQN=~w journal has filename ~s"}},
|
||||
{"I0018",
|
||||
{warn, "We're doomed - intention recorded to destroy all files"}},
|
||||
{"I0019",
|
||||
|
|
|
@ -603,11 +603,14 @@ allkeydelta_journal_multicompact(_Config) ->
|
|||
% Simply confirms that none of this causes a crash
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
B = <<"test_bucket">>,
|
||||
StartOpts1 = [{root_path, RootPath},
|
||||
{max_journalsize, 50000000},
|
||||
{max_run_length, 6},
|
||||
{sync_strategy, testutil:sync_strategy()}],
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
StartOptsFun =
|
||||
fun(JOC) ->
|
||||
[{root_path, RootPath},
|
||||
{max_journalobjectcount, JOC},
|
||||
{max_run_length, 6},
|
||||
{sync_strategy, testutil:sync_strategy()}]
|
||||
end,
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(16000)),
|
||||
{KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 40000),
|
||||
{KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1,
|
||||
B,
|
||||
|
@ -633,26 +636,47 @@ allkeydelta_journal_multicompact(_Config) ->
|
|||
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
leveled_penciller:clean_testdir(RootPath ++ "/ledger"),
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||
io:format("Restart without ledger~n"),
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(24000)),
|
||||
|
||||
ok = testutil:check_indexed_objects(Bookie2,
|
||||
B,
|
||||
KSpcL1 ++ KSpcL2,
|
||||
V2),
|
||||
|
||||
{KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2,
|
||||
{KSpcL3, _V3} = testutil:put_altered_indexed_objects(Bookie2,
|
||||
B,
|
||||
KSpcL2,
|
||||
false),
|
||||
compact_and_wait(Bookie2, 0),
|
||||
|
||||
ok = testutil:check_indexed_objects(Bookie2,
|
||||
B,
|
||||
KSpcL1 ++ KSpcL2 ++ KSpcL3,
|
||||
V3),
|
||||
|
||||
{ok, FileList3} =
|
||||
file:list_dir(
|
||||
filename:join(RootPath, "journal/journal_files/post_compact")),
|
||||
io:format("Number of files after compaction ~w~n", [length(FileList3)]),
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie2),
|
||||
|
||||
io:format("Restart with smaller journal object count~n"),
|
||||
{ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(8000)),
|
||||
|
||||
{KSpcL4, V4} = testutil:put_altered_indexed_objects(Bookie3,
|
||||
B,
|
||||
KSpcL3,
|
||||
false),
|
||||
|
||||
compact_and_wait(Bookie3, 0),
|
||||
|
||||
ok = testutil:check_indexed_objects(Bookie3,
|
||||
B,
|
||||
KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4,
|
||||
V4),
|
||||
{ok, FileList4} =
|
||||
file:list_dir(
|
||||
filename:join(RootPath, "journal/journal_files/post_compact")),
|
||||
io:format("Number of files after compaction ~w~n", [length(FileList4)]),
|
||||
true = length(FileList4) >= length(FileList3) + 4,
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie3),
|
||||
testutil:reset_filestructure(10000).
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue