diff --git a/priv/leveled.schema b/priv/leveled.schema index 14dc2e7..c061898 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -2,8 +2,98 @@ %%%% leveled -%% @doc A path under which bitcask data files will be stored. +%% @doc A path under which leveled data files will be stored. {mapping, "leveled.data_root", "leveled.data_root", [ {default, "$(platform_data_dir)/leveled"}, {datatype, directory} ]}. + +%% @doc Strategy for flushing data to disk +%% Can be set to riak_sync, sync (if OTP > 16) or none. Use none, and the OS +%% will flush when most efficient. Use riak_sync or sync to flush after every +%% PUT (not recommended wihtout some hardware support e.g. flash drives and/or +%% Flash-backed Write Caches) +{mapping, "leveled.sync_strategy", "leveled.sync_strategy", [ + {default, none}, + {datatype, atom} +]}. + + +%% @doc The key size of the Bookie's in-memory cache +{mapping, "leveled.cache_size", "leveled.cache_size", [ + {default, 4000}, + {datatype, integer}, + hidden +]}. + +%% @doc The key size of the Penciller's in-memory cache +{mapping, "leveled.penciller_cache_size", "leveled.penciller_cache_size", [ + {default, 28000}, + {datatype, integer}, + hidden +]}. + +%% @doc Compression method +%% Can be lz4 or native (which will use the Erlang native zlib compression) +%% within term_to_binary +{mapping, "leveled.compression_method", "leveled.compression_method", [ + {default, lz4}, + {datatype, atom} +]}. + +%% @doc Compression point +%% The point at which compression is applied to the Journal (the Ledger is +%% always compressed). Use on_receipt or on_compact. on_compact is suitable +%% when values are unlikely to yield much benefit from compression +%% (compression is only attempted when compacting) +{mapping, "leveled.compression_point", "leveled.compression_point", [ + {default, on_receipt}, + {datatype, atom} +]}. + + +%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% Normally keep this as around the size of o(100K) objects. Default is 500MB +{mapping, "leveled.journal_size", "leveled.journal_size", [ + {default, 500000000}, + {datatype, integer} +]}. + +%% @doc The number of journal compactions per vnode per day +%% The higher the value, the more compaction runs, and the sooner space is +%% recovered. But each run has a cost +{mapping, "leveled.compaction_runs_perday", "leveled.compaction_runs_perday", [ + {default, 16}, + {datatype, integer} +]}. + +%% @doc Compaction Low Hour +%% The hour of the day in which journal compaction can start. Use Low hour +%% of 0 and High hour of 23 to have no compaction window (i.e. always compact +%% regardless of time of day) +{mapping, "leveled.compaction_low_hour", "leveled.compaction_low_hour", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Compaction Top Hour +%% The hour of the day, after which journal compaction should stop. +%% If low hour > top hour then, compaction will work overnight between low +%% hour and top hour (inclusive). Timings rely on server's view of local time +{mapping, "leveled.compaction_top_hour", "leveled.compaction_top_hour", [ + {default, 23}, + {datatype, integer} +]}. + +%% @doc Max Journal Files Per Compaction Run +%% In a single compaction run, what is the maximum number of consecutive files +%% which may be compacted. +{mapping, "leveled.max_run_length", "leveled.max_run_length", [ + {default, 8}, + {datatype, integer}, + hidden +]}. + + + + diff --git a/rebar.config b/rebar.config index 7aa7bcc..04f914b 100644 --- a/rebar.config +++ b/rebar.config @@ -4,6 +4,8 @@ {platform_define, "^R", old_rand}, {platform_define, "^R", no_sync}]}. +{xref_checks, [undefined_function_calls,undefined_functions]}. + {profiles, [{eqc, [{deps, [meck, fqc]}, {erl_opts, [debug_info, {parse_transform, lager_transform}, {parse_transform, eqc_cover}]}, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 10a960f..b57238b 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -67,9 +67,7 @@ book_destroy/1, book_isempty/2]). --export([get_opt/2, - get_opt/3, - empty_ledgercache/0, +-export([empty_ledgercache/0, loadqueue_ledgercache/1, push_ledgercache/2, snapshot_store/6, @@ -91,6 +89,20 @@ -define(TIMING_SAMPLESIZE, 100). -define(TIMING_SAMPLECOUNTDOWN, 10000). -define(DUMMY, dummy). % Dummy key used for mput operations +-define(OPTION_DEFAULTS, + [{root_path, undefined}, + {snapshot_bookie, undefined}, + {cache_size, ?CACHE_SIZE}, + {max_journalsize, 1000000000}, + {sync_strategy, none}, + {recent_aae, ?RECENT_AAE}, + {head_only, false}, + {waste_retention_period, undefined}, + {max_run_length, undefined}, + {reload_strategy, []}, + {max_pencillercachesize, undefined}, + {compression_method, ?COMPRESSION_METHOD}, + {compression_point, ?COMPRESSION_POINT}]). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -148,6 +160,113 @@ -type head_timings() :: no_timing|#head_timings{}. -type timing_types() :: head|get|put|fold. -type recent_aae() :: false|#recent_aae{}|undefined. +-type open_options() :: + %% For full description of options see ../docs/STARTUP_OPTIONS.md + [{root_path, string()|undefined} | + % Folder to be used as the root path for storing all the database + % information. May be undefined is snapshot_bookie is a pid() + % TODO: Some sort of split root path to allow for mixed classes of + % storage (e.g. like eleveldb tiered storage - only with + % separation between ledger and non-current journal) + {snapshot_bookie, undefined|pid()} | + % Is the bookie being started required to a be a snapshot of an + % existing bookie, rather than a new bookie. The bookie to be + % snapped should have its pid passed as the startup option in this + % case + {cache_size, pos_integer()} | + % The size of the Bookie's memory, the cache of the recent + % additions to the ledger. Defaults to ?CACHE_SIZE, plus some + % randomised jitter (randomised jitter will still be added to + % configured values + {max_journalsize, pos_integer()} | + % The maximum size of a journal file in bytes. The abolute + % maximum must be 4GB due to 4 byte file pointers being used + {sync_strategy, sync_mode()} | + % Should be sync if it is necessary to flush to disk after every + % write, or none if not (allow the OS to schecdule). This has a + % significant impact on performance which can be mitigated + % partially in hardware (e.g through use of FBWC). + % riak_sync is used for backwards compatability with OTP16 - and + % will manually call sync() after each write (rather than use the + % O_SYNC option on startup + {recent_aae, false|{atom(), list(), integer(), integer()}} | + % DEPRECATED + % Before working on kv_index_tictactree looked at the possibility + % of maintaining AAE just for recent changes. Given the efficiency + % of the kv_index_tictactree approach this is unecessary. + % Should be set to false + {head_only, false|with_lookup|no_lookup} | + % When set to true, there are three fundamental changes as to how + % leveled will work: + % - Compaction of the journalwill be managed by simply removing any + % journal file thathas a highest sequence number persisted to the + % ledger; + % - GETs are not supported, only head requests; + % - PUTs should arrive batched object specs using the book_mput/2 + % function. + % head_only mode is disabled with false (default). There are two + % different modes in which head_only can run with_lookup or + % no_lookup and heaD_only mode is enabled by passing one of these + % atoms: + % - with_lookup assumes that individual objects may need to be + % fetched; + % - no_lookup prevents individual objects from being fetched, so + % that the store can only be used for folds (without segment list + % acceleration) + {waste_retention_period, undefined|pos_integer()} | + % If a value is not required in the journal (i.e. it has been + % replaced and is now to be removed for compaction) for how long + % should it be retained. For example should it be kept for a + % period until the operator cna be sure a backup has been + % completed? + % If undefined, will not retian waste, otherwise the period is the + % number of seconds to wait + {max_run_length, undefined|pos_integer()} | + % The maximum number of consecutive files that can be compacted in + % one compaction operation. + % Defaults to leveled_iclerk:?MAX_COMPACTION_RUN (if undefined) + {reload_strategy, list()} | + % The reload_strategy is exposed as an option as currently no firm + % decision has been made about how recovery from failure should + % work. For instance if we were to trust everything as permanent + % in the Ledger once it is persisted, then there would be no need + % to retain a skinny history of key changes in the Journal after + % compaction. If, as an alternative we assume the Ledger is never + % permanent, and retain the skinny hisory - then backups need only + % be made against the Journal. The skinny history of key changes + % is primarily related to the issue of supporting secondary indexes + % in Riak. + % + % These two strategies are referred to as recovr (assume we can + % recover any deltas from a lost ledger and a lost history through + % resilience outside of the store), or retain (retain a history of + % key changes, even when the object value has been compacted). + % + % There is a third, theoretical and untested strategy, which is + % recalc - which would require when reloading the Ledger from the + % Journal, to recalculate the index changes based on the current + % state of the Ledger and the object metadata. + % + % reload_strategy ptions are a list - to map from a tag to the + % strategy (recovr|retain|recalc). Defualt strategies are: + % [{?RIAK_TAG, retain}, {?STD_TAG, retain}] + {max_pencillercachesize, pos_integer()|undefined} | + % How many ledger keys should the penciller retain in memory + % between flushing new level zero files. + % Defaults ot leveled_penciller:?MAX_TABLESIZE when undefined + {compression_method, native|lz4} | + % Compression method and point allow Leveled to be switched from + % using bif based compression (zlib) to using nif based compression + % (lz4). + % Defaults to ?COMPRESSION_METHOD + {compression_point, on_compact|on_receipt} + % The =compression point can be changed between on_receipt (all + % values are compressed as they are received), to on_compact where + % values are originally stored uncompressed (speeding PUT times), + % and are only compressed when they are first subject to compaction + % Defaults to ?COMPRESSION_POINT + ]. + %%%============================================================================ %%% API @@ -183,69 +302,20 @@ %% directly back into the Ledger. book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> - book_start([{root_path, RootPath}, - {cache_size, LedgerCacheSize}, - {max_journalsize, JournalSize}, - {sync_strategy, SyncStrategy}]). + book_start(set_defaults([{root_path, RootPath}, + {cache_size, LedgerCacheSize}, + {max_journalsize, JournalSize}, + {sync_strategy, SyncStrategy}])). -spec book_start(list(tuple())) -> {ok, pid()}. %% @doc Start a Leveled Key/Value store - full options support. %% -%% Allows an options proplists to be passed for setting options. There are -%% four primary additional options this allows over book_start/4: -%% - retain_strategy -%% - waste_retention_period -%% - compression_method -%% - compression_point -%% -%% For full description of options see ../docs/STARTUP_OPTIONS.md -%% -%% Both of the first two options relate to compaction in the Journal. The -%% retain_strategy determines if a skinny record of the object should be -%% retained following compaction, and how that should be used when recovering -%% lost state in the Ledger. -%% -%% This is relevant to when Riak uses Leveled in that KeyChanges are presented -%% by the vnode to the backend as deltas. This means that if those key -%% changes do not remain recorded in the journal once the value has been -%% compacted - rebuilding the ledger from the Journal would lead to incorrect -%% index entries being present. -%% -%% Currently compacted records no longer in use are not removed but moved to -%% a journal_waste folder, and the waste_retention_period determines how long -%% this history should be kept for (for example to allow for it to be backed -%% up before deletion). If the waste_retention_period (in seconds) is -%% undefined, then there will be no holding of this waste - unused files will -%% be immediately deleted. -%% -%% Compression method and point allow Leveled to be switched from using bif -%% based compression (zlib) to suing nif based compression (lz4). The -%% compression point can be changed between on_receipt (all values are -%% compressed as they are received), to on_compact where values are originally -%% stored uncompressed (speeding PUT times), and are only compressed when -%% they are first subject to compaction -%% -%% TODO: -%% The reload_strategy is exposed as currently no firm decision has been made -%% about how recovery should work. For instance if we were to trust everything -%% as permanent in the Ledger once it is persisted, then there would be no -%% need to retain a skinny history of key changes in the Journal after -%% compaction. If, as an alternative we assume the Ledger is never permanent, -%% and retain the skinny hisory - then backups need only be made against the -%% Journal. The skinny history of key changes is primarily related to the -%% issue of supporting secondary indexes in Riak. -%% -%% These two strategies are referred to as recovr (assume we can recover any -%% deltas from a lost ledger and a lost history through resilience outside of -%% the store), or retain (retain a history of key changes, even when the object -%% value has been compacted). There is a third, unimplemented strategy, which -%% is recalc - which would require when reloading the Ledger from the Journal, -%% to recalculate the index changes based on the current state of the Ledger -%% and the object metadata. +%% For full description of options see ../docs/STARTUP_OPTIONS.md and also +%% comments on the open_options() type book_start(Opts) -> - gen_server:start(?MODULE, [Opts], []). + gen_server:start(?MODULE, [set_defaults(Opts)], []). -spec book_tempput(pid(), any(), any(), any(), @@ -515,18 +585,22 @@ book_isempty(Pid, Tag) -> %%% gen_server callbacks %%%============================================================================ +-spec init([open_options()]) -> {ok, book_state()}. init([Opts]) -> leveled_rand:seed(), - case get_opt(snapshot_bookie, Opts) of + case proplists:get_value(snapshot_bookie, Opts) of undefined -> % Start from file not snapshot {InkerOpts, PencillerOpts} = set_options(Opts), - CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER), - CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE) - + erlang:phash2(self()) rem CacheJitter, + CacheJitter = + proplists:get_value(cache_size, Opts) + div (100 div ?CACHE_SIZE_JITTER), + CacheSize = + proplists:get_value(cache_size, Opts) + + erlang:phash2(self()) rem CacheJitter, RecentAAE = - case get_opt(recent_aae, Opts, ?RECENT_AAE) of + case proplists:get_value(recent_aae, Opts) of false -> false; {FilterType, BucketList, LimitMinutes, UnitMinutes} -> @@ -537,7 +611,7 @@ init([Opts]) -> end, {HeadOnly, HeadLookup} = - case get_opt(head_only, Opts, false) of + case proplists:get_value(head_only, Opts) of false -> {false, true}; with_lookup -> @@ -592,9 +666,9 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) {Timings, CountDown} = update_statetimings(put, Timings2, State#state.put_countdown), - % If the previous push to memory was returned then punish this PUT with a - % delay. If the back-pressure in the Penciller continues, these delays - % will beocme more frequent + % If the previous push to memory was returned then punish this PUT with + % a delay. If the back-pressure in the Penciller continues, these + % delays will beocme more frequent case State#state.slow_offer of true -> gen_server:reply(From, pause); @@ -723,9 +797,9 @@ handle_call({head, Bucket, Key, Tag}, _From, State) end end; handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> - % Snapshot the store, specifying if the snapshot should be long running - % (i.e. will the snapshot be queued or be required for an extended period - % e.g. many minutes) + % Snapshot the store, specifying if the snapshot should be long running + % (i.e. will the snapshot be queued or be required for an extended period + % e.g. many minutes) Reply = snapshot_store(State, SnapType, Query, LongRunning), {reply, Reply, State}; handle_call({return_runner, QueryType}, _From, State) -> @@ -895,45 +969,45 @@ startup(InkerOpts, PencillerOpts, State) -> {Inker, Penciller}. --spec set_options(list()) -> {#inker_options{}, #penciller_options{}}. +-spec set_defaults(list()) -> open_options(). +%% @doc +%% Set any pre-defined defaults for options if the option is not present in +%% the passed in options +set_defaults(Opts) -> + lists:ukeymerge(1, + lists:ukeysort(1, Opts), + lists:ukeysort(1, ?OPTION_DEFAULTS)). + +-spec set_options(open_options()) -> {#inker_options{}, #penciller_options{}}. %% @doc %% Take the passed in property list of operations and extract out any relevant %% options to the Inker or the Penciller set_options(Opts) -> MaxJournalSize0 = min(?ABSOLUTEMAX_JOURNALSIZE, - get_opt(max_journalsize, Opts, 1000000000)), + proplists:get_value(max_journalsize, Opts)), JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER), MaxJournalSize = min(?ABSOLUTEMAX_JOURNALSIZE, MaxJournalSize0 - erlang:phash2(self()) rem JournalSizeJitter), - SyncStrat = get_opt(sync_strategy, Opts, sync), - WRP = get_opt(waste_retention_period, Opts), + SyncStrat = proplists:get_value(sync_strategy, Opts), + WRP = proplists:get_value(waste_retention_period, Opts), - AltStrategy = get_opt(reload_strategy, Opts, []), + AltStrategy = proplists:get_value(reload_strategy, Opts), ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy), - PCLL0CacheSize = get_opt(max_pencillercachesize, Opts), - RootPath = get_opt(root_path, Opts), + PCLL0CacheSize = proplists:get_value(max_pencillercachesize, Opts), + RootPath = proplists:get_value(root_path, Opts), JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, ok = filelib:ensure_dir(JournalFP), ok = filelib:ensure_dir(LedgerFP), - CompressionMethod = - case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of - native -> - % Note native compression will have reduced performance - % https://github.com/martinsumner/leveled/issues/95 - native; - lz4 -> - % Must include lz4 library in rebar.config - lz4 - end, + CompressionMethod = proplists:get_value(compression_method, Opts), CompressOnReceipt = - case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of + case proplists:get_value(compression_point, Opts) of on_receipt -> % Note this will add measurable delay to PUT time % https://github.com/martinsumner/leveled/issues/95 @@ -945,7 +1019,7 @@ set_options(Opts) -> {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, - max_run_length = get_opt(max_run_length, Opts), + max_run_length = proplists:get_value(max_run_length, Opts), waste_retention_period = WRP, compression_method = CompressionMethod, compress_on_receipt = CompressOnReceipt, @@ -1486,17 +1560,6 @@ get_loadfun(State) -> LoadFun. -get_opt(Key, Opts) -> - get_opt(Key, Opts, undefined). - -get_opt(Key, Opts, Default) -> - case proplists:get_value(Key, Opts) of - undefined -> - Default; - Value -> - Value - end. - delete_path(DirPath) -> ok = filelib:ensure_dir(DirPath), {ok, Files} = file:list_dir(DirPath), @@ -1908,10 +1971,16 @@ foldobjects_vs_foldheads_bybucket_test_() -> {timeout, 60, fun foldobjects_vs_foldheads_bybucket_testto/0}. foldobjects_vs_foldheads_bybucket_testto() -> + folder_cache_test(10), + folder_cache_test(100), + folder_cache_test(300), + folder_cache_test(1000). + +folder_cache_test(CacheSize) -> RootPath = reset_filestructure(), {ok, Bookie1} = book_start([{root_path, RootPath}, {max_journalsize, 1000000}, - {cache_size, 500}]), + {cache_size, CacheSize}]), ObjL1 = generate_multiple_objects(400, 1), ObjL2 = generate_multiple_objects(400, 1), % Put in all the objects with a TTL in the future @@ -2042,7 +2111,7 @@ foldobjects_vs_foldheads_bybucket_testto() -> ?assertMatch(true, lists:usort(KeyHashList2B) == CompareL) end, - lists:foreach(CheckSplitQueryFun, [1, 4, 8, 300, 100, 400]), + lists:foreach(CheckSplitQueryFun, [1, 4, 8, 300, 100, 400, 200, 600]), ok = book_close(Bookie1), reset_filestructure(). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a409d8e..d26bf62 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -971,7 +971,7 @@ start_from_file(PCLopts) -> case PCLopts#penciller_options.max_inmemory_tablesize of undefined -> ?MAX_TABLESIZE; - M -> + M when is_integer(M) -> M end, PressMethod = PCLopts#penciller_options.compression_method,