Merge pull request #147 from martinsumner/mas-i146-startupoptions

Mas i146 startupoptions
This commit is contained in:
Martin Sumner 2018-06-21 10:45:17 +01:00 committed by GitHub
commit 65de71a2b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 266 additions and 105 deletions

View file

@ -2,8 +2,98 @@
%%%% leveled
%% @doc A path under which bitcask data files will be stored.
%% @doc A path under which leveled data files will be stored.
{mapping, "leveled.data_root", "leveled.data_root", [
{default, "$(platform_data_dir)/leveled"},
{datatype, directory}
]}.
%% @doc Strategy for flushing data to disk
%% Can be set to riak_sync, sync (if OTP > 16) or none. Use none, and the OS
%% will flush when most efficient. Use riak_sync or sync to flush after every
%% PUT (not recommended wihtout some hardware support e.g. flash drives and/or
%% Flash-backed Write Caches)
{mapping, "leveled.sync_strategy", "leveled.sync_strategy", [
{default, none},
{datatype, atom}
]}.
%% @doc The key size of the Bookie's in-memory cache
{mapping, "leveled.cache_size", "leveled.cache_size", [
{default, 4000},
{datatype, integer},
hidden
]}.
%% @doc The key size of the Penciller's in-memory cache
{mapping, "leveled.penciller_cache_size", "leveled.penciller_cache_size", [
{default, 28000},
{datatype, integer},
hidden
]}.
%% @doc Compression method
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "leveled.compression_method", "leveled.compression_method", [
{default, lz4},
{datatype, atom}
]}.
%% @doc Compression point
%% The point at which compression is applied to the Journal (the Ledger is
%% always compressed). Use on_receipt or on_compact. on_compact is suitable
%% when values are unlikely to yield much benefit from compression
%% (compression is only attempted when compacting)
{mapping, "leveled.compression_point", "leveled.compression_point", [
{default, on_receipt},
{datatype, atom}
]}.
%% @doc The approximate size (in bytes) when a Journal file should be rolled.
%% Normally keep this as around the size of o(100K) objects. Default is 500MB
{mapping, "leveled.journal_size", "leveled.journal_size", [
{default, 500000000},
{datatype, integer}
]}.
%% @doc The number of journal compactions per vnode per day
%% The higher the value, the more compaction runs, and the sooner space is
%% recovered. But each run has a cost
{mapping, "leveled.compaction_runs_perday", "leveled.compaction_runs_perday", [
{default, 16},
{datatype, integer}
]}.
%% @doc Compaction Low Hour
%% The hour of the day in which journal compaction can start. Use Low hour
%% of 0 and High hour of 23 to have no compaction window (i.e. always compact
%% regardless of time of day)
{mapping, "leveled.compaction_low_hour", "leveled.compaction_low_hour", [
{default, 0},
{datatype, integer}
]}.
%% @doc Compaction Top Hour
%% The hour of the day, after which journal compaction should stop.
%% If low hour > top hour then, compaction will work overnight between low
%% hour and top hour (inclusive). Timings rely on server's view of local time
{mapping, "leveled.compaction_top_hour", "leveled.compaction_top_hour", [
{default, 23},
{datatype, integer}
]}.
%% @doc Max Journal Files Per Compaction Run
%% In a single compaction run, what is the maximum number of consecutive files
%% which may be compacted.
{mapping, "leveled.max_run_length", "leveled.max_run_length", [
{default, 8},
{datatype, integer},
hidden
]}.

View file

@ -4,6 +4,8 @@
{platform_define, "^R", old_rand},
{platform_define, "^R", no_sync}]}.
{xref_checks, [undefined_function_calls,undefined_functions]}.
{profiles,
[{eqc, [{deps, [meck, fqc]},
{erl_opts, [debug_info, {parse_transform, lager_transform}, {parse_transform, eqc_cover}]},

View file

@ -67,9 +67,7 @@
book_destroy/1,
book_isempty/2]).
-export([get_opt/2,
get_opt/3,
empty_ledgercache/0,
-export([empty_ledgercache/0,
loadqueue_ledgercache/1,
push_ledgercache/2,
snapshot_store/6,
@ -91,6 +89,20 @@
-define(TIMING_SAMPLESIZE, 100).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(DUMMY, dummy). % Dummy key used for mput operations
-define(OPTION_DEFAULTS,
[{root_path, undefined},
{snapshot_bookie, undefined},
{cache_size, ?CACHE_SIZE},
{max_journalsize, 1000000000},
{sync_strategy, none},
{recent_aae, ?RECENT_AAE},
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
{reload_strategy, []},
{max_pencillercachesize, undefined},
{compression_method, ?COMPRESSION_METHOD},
{compression_point, ?COMPRESSION_POINT}]).
-record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE)
@ -148,6 +160,113 @@
-type head_timings() :: no_timing|#head_timings{}.
-type timing_types() :: head|get|put|fold.
-type recent_aae() :: false|#recent_aae{}|undefined.
-type open_options() ::
%% For full description of options see ../docs/STARTUP_OPTIONS.md
[{root_path, string()|undefined} |
% Folder to be used as the root path for storing all the database
% information. May be undefined is snapshot_bookie is a pid()
% TODO: Some sort of split root path to allow for mixed classes of
% storage (e.g. like eleveldb tiered storage - only with
% separation between ledger and non-current journal)
{snapshot_bookie, undefined|pid()} |
% Is the bookie being started required to a be a snapshot of an
% existing bookie, rather than a new bookie. The bookie to be
% snapped should have its pid passed as the startup option in this
% case
{cache_size, pos_integer()} |
% The size of the Bookie's memory, the cache of the recent
% additions to the ledger. Defaults to ?CACHE_SIZE, plus some
% randomised jitter (randomised jitter will still be added to
% configured values
{max_journalsize, pos_integer()} |
% The maximum size of a journal file in bytes. The abolute
% maximum must be 4GB due to 4 byte file pointers being used
{sync_strategy, sync_mode()} |
% Should be sync if it is necessary to flush to disk after every
% write, or none if not (allow the OS to schecdule). This has a
% significant impact on performance which can be mitigated
% partially in hardware (e.g through use of FBWC).
% riak_sync is used for backwards compatability with OTP16 - and
% will manually call sync() after each write (rather than use the
% O_SYNC option on startup
{recent_aae, false|{atom(), list(), integer(), integer()}} |
% DEPRECATED
% Before working on kv_index_tictactree looked at the possibility
% of maintaining AAE just for recent changes. Given the efficiency
% of the kv_index_tictactree approach this is unecessary.
% Should be set to false
{head_only, false|with_lookup|no_lookup} |
% When set to true, there are three fundamental changes as to how
% leveled will work:
% - Compaction of the journalwill be managed by simply removing any
% journal file thathas a highest sequence number persisted to the
% ledger;
% - GETs are not supported, only head requests;
% - PUTs should arrive batched object specs using the book_mput/2
% function.
% head_only mode is disabled with false (default). There are two
% different modes in which head_only can run with_lookup or
% no_lookup and heaD_only mode is enabled by passing one of these
% atoms:
% - with_lookup assumes that individual objects may need to be
% fetched;
% - no_lookup prevents individual objects from being fetched, so
% that the store can only be used for folds (without segment list
% acceleration)
{waste_retention_period, undefined|pos_integer()} |
% If a value is not required in the journal (i.e. it has been
% replaced and is now to be removed for compaction) for how long
% should it be retained. For example should it be kept for a
% period until the operator cna be sure a backup has been
% completed?
% If undefined, will not retian waste, otherwise the period is the
% number of seconds to wait
{max_run_length, undefined|pos_integer()} |
% The maximum number of consecutive files that can be compacted in
% one compaction operation.
% Defaults to leveled_iclerk:?MAX_COMPACTION_RUN (if undefined)
{reload_strategy, list()} |
% The reload_strategy is exposed as an option as currently no firm
% decision has been made about how recovery from failure should
% work. For instance if we were to trust everything as permanent
% in the Ledger once it is persisted, then there would be no need
% to retain a skinny history of key changes in the Journal after
% compaction. If, as an alternative we assume the Ledger is never
% permanent, and retain the skinny hisory - then backups need only
% be made against the Journal. The skinny history of key changes
% is primarily related to the issue of supporting secondary indexes
% in Riak.
%
% These two strategies are referred to as recovr (assume we can
% recover any deltas from a lost ledger and a lost history through
% resilience outside of the store), or retain (retain a history of
% key changes, even when the object value has been compacted).
%
% There is a third, theoretical and untested strategy, which is
% recalc - which would require when reloading the Ledger from the
% Journal, to recalculate the index changes based on the current
% state of the Ledger and the object metadata.
%
% reload_strategy ptions are a list - to map from a tag to the
% strategy (recovr|retain|recalc). Defualt strategies are:
% [{?RIAK_TAG, retain}, {?STD_TAG, retain}]
{max_pencillercachesize, pos_integer()|undefined} |
% How many ledger keys should the penciller retain in memory
% between flushing new level zero files.
% Defaults ot leveled_penciller:?MAX_TABLESIZE when undefined
{compression_method, native|lz4} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4).
% Defaults to ?COMPRESSION_METHOD
{compression_point, on_compact|on_receipt}
% The =compression point can be changed between on_receipt (all
% values are compressed as they are received), to on_compact where
% values are originally stored uncompressed (speeding PUT times),
% and are only compressed when they are first subject to compaction
% Defaults to ?COMPRESSION_POINT
].
%%%============================================================================
%%% API
@ -183,69 +302,20 @@
%% directly back into the Ledger.
book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
book_start([{root_path, RootPath},
{cache_size, LedgerCacheSize},
{max_journalsize, JournalSize},
{sync_strategy, SyncStrategy}]).
book_start(set_defaults([{root_path, RootPath},
{cache_size, LedgerCacheSize},
{max_journalsize, JournalSize},
{sync_strategy, SyncStrategy}])).
-spec book_start(list(tuple())) -> {ok, pid()}.
%% @doc Start a Leveled Key/Value store - full options support.
%%
%% Allows an options proplists to be passed for setting options. There are
%% four primary additional options this allows over book_start/4:
%% - retain_strategy
%% - waste_retention_period
%% - compression_method
%% - compression_point
%%
%% For full description of options see ../docs/STARTUP_OPTIONS.md
%%
%% Both of the first two options relate to compaction in the Journal. The
%% retain_strategy determines if a skinny record of the object should be
%% retained following compaction, and how that should be used when recovering
%% lost state in the Ledger.
%%
%% This is relevant to when Riak uses Leveled in that KeyChanges are presented
%% by the vnode to the backend as deltas. This means that if those key
%% changes do not remain recorded in the journal once the value has been
%% compacted - rebuilding the ledger from the Journal would lead to incorrect
%% index entries being present.
%%
%% Currently compacted records no longer in use are not removed but moved to
%% a journal_waste folder, and the waste_retention_period determines how long
%% this history should be kept for (for example to allow for it to be backed
%% up before deletion). If the waste_retention_period (in seconds) is
%% undefined, then there will be no holding of this waste - unused files will
%% be immediately deleted.
%%
%% Compression method and point allow Leveled to be switched from using bif
%% based compression (zlib) to suing nif based compression (lz4). The
%% compression point can be changed between on_receipt (all values are
%% compressed as they are received), to on_compact where values are originally
%% stored uncompressed (speeding PUT times), and are only compressed when
%% they are first subject to compaction
%%
%% TODO:
%% The reload_strategy is exposed as currently no firm decision has been made
%% about how recovery should work. For instance if we were to trust everything
%% as permanent in the Ledger once it is persisted, then there would be no
%% need to retain a skinny history of key changes in the Journal after
%% compaction. If, as an alternative we assume the Ledger is never permanent,
%% and retain the skinny hisory - then backups need only be made against the
%% Journal. The skinny history of key changes is primarily related to the
%% issue of supporting secondary indexes in Riak.
%%
%% These two strategies are referred to as recovr (assume we can recover any
%% deltas from a lost ledger and a lost history through resilience outside of
%% the store), or retain (retain a history of key changes, even when the object
%% value has been compacted). There is a third, unimplemented strategy, which
%% is recalc - which would require when reloading the Ledger from the Journal,
%% to recalculate the index changes based on the current state of the Ledger
%% and the object metadata.
%% For full description of options see ../docs/STARTUP_OPTIONS.md and also
%% comments on the open_options() type
book_start(Opts) ->
gen_server:start(?MODULE, [Opts], []).
gen_server:start(?MODULE, [set_defaults(Opts)], []).
-spec book_tempput(pid(), any(), any(), any(),
@ -515,18 +585,22 @@ book_isempty(Pid, Tag) ->
%%% gen_server callbacks
%%%============================================================================
-spec init([open_options()]) -> {ok, book_state()}.
init([Opts]) ->
leveled_rand:seed(),
case get_opt(snapshot_bookie, Opts) of
case proplists:get_value(snapshot_bookie, Opts) of
undefined ->
% Start from file not snapshot
{InkerOpts, PencillerOpts} = set_options(Opts),
CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER),
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE)
+ erlang:phash2(self()) rem CacheJitter,
CacheJitter =
proplists:get_value(cache_size, Opts)
div (100 div ?CACHE_SIZE_JITTER),
CacheSize =
proplists:get_value(cache_size, Opts)
+ erlang:phash2(self()) rem CacheJitter,
RecentAAE =
case get_opt(recent_aae, Opts, ?RECENT_AAE) of
case proplists:get_value(recent_aae, Opts) of
false ->
false;
{FilterType, BucketList, LimitMinutes, UnitMinutes} ->
@ -537,7 +611,7 @@ init([Opts]) ->
end,
{HeadOnly, HeadLookup} =
case get_opt(head_only, Opts, false) of
case proplists:get_value(head_only, Opts) of
false ->
{false, true};
with_lookup ->
@ -592,9 +666,9 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
{Timings, CountDown} =
update_statetimings(put, Timings2, State#state.put_countdown),
% If the previous push to memory was returned then punish this PUT with a
% delay. If the back-pressure in the Penciller continues, these delays
% will beocme more frequent
% If the previous push to memory was returned then punish this PUT with
% a delay. If the back-pressure in the Penciller continues, these
% delays will beocme more frequent
case State#state.slow_offer of
true ->
gen_server:reply(From, pause);
@ -723,9 +797,9 @@ handle_call({head, Bucket, Key, Tag}, _From, State)
end
end;
handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) ->
% Snapshot the store, specifying if the snapshot should be long running
% (i.e. will the snapshot be queued or be required for an extended period
% e.g. many minutes)
% Snapshot the store, specifying if the snapshot should be long running
% (i.e. will the snapshot be queued or be required for an extended period
% e.g. many minutes)
Reply = snapshot_store(State, SnapType, Query, LongRunning),
{reply, Reply, State};
handle_call({return_runner, QueryType}, _From, State) ->
@ -895,45 +969,45 @@ startup(InkerOpts, PencillerOpts, State) ->
{Inker, Penciller}.
-spec set_options(list()) -> {#inker_options{}, #penciller_options{}}.
-spec set_defaults(list()) -> open_options().
%% @doc
%% Set any pre-defined defaults for options if the option is not present in
%% the passed in options
set_defaults(Opts) ->
lists:ukeymerge(1,
lists:ukeysort(1, Opts),
lists:ukeysort(1, ?OPTION_DEFAULTS)).
-spec set_options(open_options()) -> {#inker_options{}, #penciller_options{}}.
%% @doc
%% Take the passed in property list of operations and extract out any relevant
%% options to the Inker or the Penciller
set_options(Opts) ->
MaxJournalSize0 =
min(?ABSOLUTEMAX_JOURNALSIZE,
get_opt(max_journalsize, Opts, 1000000000)),
proplists:get_value(max_journalsize, Opts)),
JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER),
MaxJournalSize =
min(?ABSOLUTEMAX_JOURNALSIZE,
MaxJournalSize0 - erlang:phash2(self()) rem JournalSizeJitter),
SyncStrat = get_opt(sync_strategy, Opts, sync),
WRP = get_opt(waste_retention_period, Opts),
SyncStrat = proplists:get_value(sync_strategy, Opts),
WRP = proplists:get_value(waste_retention_period, Opts),
AltStrategy = get_opt(reload_strategy, Opts, []),
AltStrategy = proplists:get_value(reload_strategy, Opts),
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
PCLL0CacheSize = get_opt(max_pencillercachesize, Opts),
RootPath = get_opt(root_path, Opts),
PCLL0CacheSize = proplists:get_value(max_pencillercachesize, Opts),
RootPath = proplists:get_value(root_path, Opts),
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
ok = filelib:ensure_dir(JournalFP),
ok = filelib:ensure_dir(LedgerFP),
CompressionMethod =
case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of
native ->
% Note native compression will have reduced performance
% https://github.com/martinsumner/leveled/issues/95
native;
lz4 ->
% Must include lz4 library in rebar.config
lz4
end,
CompressionMethod = proplists:get_value(compression_method, Opts),
CompressOnReceipt =
case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of
case proplists:get_value(compression_point, Opts) of
on_receipt ->
% Note this will add measurable delay to PUT time
% https://github.com/martinsumner/leveled/issues/95
@ -945,7 +1019,7 @@ set_options(Opts) ->
{#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy,
max_run_length = get_opt(max_run_length, Opts),
max_run_length = proplists:get_value(max_run_length, Opts),
waste_retention_period = WRP,
compression_method = CompressionMethod,
compress_on_receipt = CompressOnReceipt,
@ -1486,17 +1560,6 @@ get_loadfun(State) ->
LoadFun.
get_opt(Key, Opts) ->
get_opt(Key, Opts, undefined).
get_opt(Key, Opts, Default) ->
case proplists:get_value(Key, Opts) of
undefined ->
Default;
Value ->
Value
end.
delete_path(DirPath) ->
ok = filelib:ensure_dir(DirPath),
{ok, Files} = file:list_dir(DirPath),
@ -1908,10 +1971,16 @@ foldobjects_vs_foldheads_bybucket_test_() ->
{timeout, 60, fun foldobjects_vs_foldheads_bybucket_testto/0}.
foldobjects_vs_foldheads_bybucket_testto() ->
folder_cache_test(10),
folder_cache_test(100),
folder_cache_test(300),
folder_cache_test(1000).
folder_cache_test(CacheSize) ->
RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath},
{max_journalsize, 1000000},
{cache_size, 500}]),
{cache_size, CacheSize}]),
ObjL1 = generate_multiple_objects(400, 1),
ObjL2 = generate_multiple_objects(400, 1),
% Put in all the objects with a TTL in the future
@ -2042,7 +2111,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
?assertMatch(true, lists:usort(KeyHashList2B) == CompareL)
end,
lists:foreach(CheckSplitQueryFun, [1, 4, 8, 300, 100, 400]),
lists:foreach(CheckSplitQueryFun, [1, 4, 8, 300, 100, 400, 200, 600]),
ok = book_close(Bookie1),
reset_filestructure().

View file

@ -971,7 +971,7 @@ start_from_file(PCLopts) ->
case PCLopts#penciller_options.max_inmemory_tablesize of
undefined ->
?MAX_TABLESIZE;
M ->
M when is_integer(M) ->
M
end,
PressMethod = PCLopts#penciller_options.compression_method,