diff --git a/include/leveled.hrl b/include/leveled.hrl index 43b1064..8106c02 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -13,24 +13,17 @@ %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). - %% Inker key type used for 'batch' objects -define(INKT_MPUT, mput). - %% Inker key type used for objects which contain no value, only key changes %% This is used currently for objects formed under a 'retain' strategy on Inker %% compaction -define(INKT_KEYD, keyd). - %% Inker key type used for tombstones -define(INKT_TOMB, tomb). -define(CACHE_TYPE, skpl). --record(sft_options, - {wait = true :: boolean(), - expire_tombstones = false :: boolean(), - penciller :: pid()}). -record(level, {level :: integer(), @@ -39,25 +32,31 @@ -record(manifest_entry, {start_key :: tuple() | undefined, - end_key :: tuple() | undefined, - owner :: pid()|list(), - filename :: string() | undefined, - bloom :: binary() | none | undefined}). + end_key :: tuple() | undefined, + owner :: pid()|list(), + filename :: string() | undefined, + bloom :: binary() | none | undefined}). -record(cdb_options, {max_size :: integer() | undefined, - file_path :: string() | undefined, - waste_path :: string() | undefined, - binary_mode = false :: boolean(), - sync_strategy = sync}). + file_path :: string() | undefined, + waste_path :: string() | undefined, + binary_mode = false :: boolean(), + sync_strategy = sync, + log_options = leveled_log:get_opts() + :: leveled_log:log_options()}). + +-record(sst_options, + {press_method = native + :: leveled_sst:press_method(), + log_options = leveled_log:get_opts() + :: leveled_log:log_options()}). -record(inker_options, {cdb_max_size :: integer() | undefined, root_path :: string() | undefined, - cdb_options :: #cdb_options{} | undefined, + cdb_options = #cdb_options{} :: #cdb_options{}, start_snapshot = false :: boolean(), - %% so a snapshot can monitor the bookie and - %% terminate when it does bookies_pid :: pid() | undefined, source_inker :: pid() | undefined, reload_strategy = [] :: list(), @@ -70,11 +69,10 @@ -record(penciller_options, {root_path :: string() | undefined, + sst_options = #sst_options{} :: #sst_options{}, max_inmemory_tablesize :: integer() | undefined, start_snapshot = false :: boolean(), snapshot_query, - %% so a snapshot can monitor the bookie and - %% terminate when it does bookies_pid :: pid() | undefined, bookies_mem :: tuple() | undefined, source_penciller :: pid() | undefined, @@ -91,43 +89,3 @@ singlefile_compactionperc :: float()|undefined, maxrunlength_compactionperc :: float()|undefined, reload_strategy = [] :: list()}). - --record(recent_aae, {filter :: whitelist|blacklist, - % the buckets list should either be a - % - whitelist - specific buckets are included, and - % entries are indexed by bucket name - % - blacklist - specific buckets are excluded, and - % all other entries are indexes using the special - % $all bucket - - buckets :: list(), - % whitelist or blacklist of buckets to support recent - % AAE - - limit_minutes :: integer(), - % how long to retain entries the temporary index for - % It will actually be retained for limit + unit minutes - % 60 minutes seems sensible - - unit_minutes :: integer(), - % What the minimum unit size will be for a query - % e.g. the minimum time duration to be used in range - % queries of the aae index - % 5 minutes seems sensible - - tree_size = small :: atom() - % Just defaulted to small for now - }). - --record(r_content, { - metadata, - value :: term() - }). - --record(r_object, { - bucket, - key, - contents :: [#r_content{}], - vclock, - updatemetadata=dict:store(clean, true, dict:new()), - updatevalue :: term()}). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 51b783d..9cfba69 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -67,7 +67,11 @@ book_hotbackup/1, book_close/1, book_destroy/1, - book_isempty/2]). + book_isempty/2, + book_logsettings/1, + book_loglevel/2, + book_addlogs/2, + book_removelogs/2]). %% folding API -export([ @@ -1047,6 +1051,31 @@ book_isempty(Pid, Tag) -> {async, Runner} = book_bucketlist(Pid, Tag, FoldAccT, first), Runner(). +-spec book_logsettings(pid()) -> {leveled_log:log_level(), list(string())}. +%% @doc +%% Retrieve the current log settings +book_logsettings(Pid) -> + gen_server:call(Pid, log_settings, infinity). + +-spec book_loglevel(pid(), leveled_log:log_level()) -> ok. +%% @doc +%% Change the log level of the store +book_loglevel(Pid, LogLevel) -> + gen_server:cast(Pid, {log_level, LogLevel}). + +-spec book_addlogs(pid(), list(string())) -> ok. +%% @doc +%% Add to the list of forced logs, a list of more forced logs +book_addlogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {add_logs, ForcedLogs}). + +-spec book_removelogs(pid(), list(string())) -> ok. +%% @doc +%% Remove from the list of forced logs, a list of forced logs +book_removelogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {remove_logs, ForcedLogs}). + + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -1060,12 +1089,15 @@ init([Opts]) -> {stop, no_root_path}; {undefined, _RP} -> % Start from file not snapshot - {InkerOpts, PencillerOpts} = set_options(Opts), + % Must set log level first - as log level will be fetched within + % set_options/1. Also logs can now be added to set_options/1 LogLevel = proplists:get_value(log_level, Opts), - ok = application:set_env(leveled, log_level, LogLevel), + leveled_log:set_loglevel(LogLevel), ForcedLogs = proplists:get_value(forced_logs, Opts), - ok = application:set_env(leveled, forced_logs, ForcedLogs), + leveled_log:add_forcedlogs(ForcedLogs), + + {InkerOpts, PencillerOpts} = set_options(Opts), OverrideFunctions = proplists:get_value(override_functions, Opts), SetFun = @@ -1299,6 +1331,8 @@ handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) -> % e.g. many minutes) Reply = snapshot_store(State, SnapType, Query, LongRunning), {reply, Reply, State}; +handle_call(log_settings, _From, State) -> + {reply, leveled_log:return_settings(), State}; handle_call({return_runner, QueryType}, _From, State) -> SW = os:timestamp(), Runner = get_runner(State, QueryType), @@ -1349,9 +1383,30 @@ handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false -> handle_call(Msg, _From, State) -> {reply, {unsupported_message, element(1, Msg)}, State}. -handle_cast(_Msg, State) -> + +handle_cast({log_level, LogLevel}, State) -> + PCL = State#state.penciller, + INK = State#state.inker, + ok = leveled_penciller:pcl_loglevel(PCL, LogLevel), + ok = leveled_inker:ink_loglevel(INK, LogLevel), + ok = leveled_log:set_loglevel(LogLevel), + {noreply, State}; +handle_cast({add_logs, ForcedLogs}, State) -> + PCL = State#state.penciller, + INK = State#state.inker, + ok = leveled_penciller:pcl_addlogs(PCL, ForcedLogs), + ok = leveled_inker:ink_addlogs(INK, ForcedLogs), + ok = leveled_log:add_forcedlogs(ForcedLogs), + {noreply, State}; +handle_cast({remove_logs, ForcedLogs}, State) -> + PCL = State#state.penciller, + INK = State#state.inker, + ok = leveled_penciller:pcl_removelogs(PCL, ForcedLogs), + ok = leveled_inker:ink_removelogs(INK, ForcedLogs), + ok = leveled_log:remove_forcedlogs(ForcedLogs), {noreply, State}. + handle_info(_Info, State) -> {noreply, State}. @@ -1552,12 +1607,16 @@ set_options(Opts) -> compress_on_receipt = CompressOnReceipt, cdb_options = #cdb_options{max_size=MaxJournalSize, - binary_mode=true, - sync_strategy=SyncStrat}}, + binary_mode=true, + sync_strategy=SyncStrat, + log_options=leveled_log:get_opts()}}, #penciller_options{root_path = LedgerFP, max_inmemory_tablesize = PCLL0CacheSize, levelzero_cointoss = true, - compression_method = CompressionMethod}}. + sst_options = + #sst_options{press_method = CompressionMethod, + log_options=leveled_log:get_opts()}} + }. -spec return_snapfun(book_state(), store|ledger, @@ -2872,8 +2931,7 @@ longrunning_test() -> coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), - {ok, _State1} = code_change(null, #state{}, null), - {noreply, _State2} = handle_cast(null, #state{}). + {ok, _State1} = code_change(null, #state{}, null). erase_journal_test() -> RootPath = reset_filestructure(), diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 935d2aa..80f706f 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -140,7 +140,9 @@ waste_path :: string() | undefined, sync_strategy = none, timings = no_timing :: cdb_timings(), - timings_countdown = 0 :: integer()}). + timings_countdown = 0 :: integer(), + log_options = leveled_log:get_opts() + :: leveled_log:log_options()}). -record(cdb_timings, {sample_count = 0 :: integer(), sample_cyclecount = 0 :: integer(), @@ -414,9 +416,11 @@ init([Opts]) -> #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode, waste_path=Opts#cdb_options.waste_path, - sync_strategy=Opts#cdb_options.sync_strategy}}. + sync_strategy=Opts#cdb_options.sync_strategy, + log_options=Opts#cdb_options.log_options}}. starting({open_writer, Filename}, _From, State) -> + leveled_log:save(State#state.log_options), leveled_log:log("CDB01", [Filename]), {LastPosition, HashTree, LastKey} = open_active_file(Filename), {WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy), @@ -429,6 +433,7 @@ starting({open_writer, Filename}, _From, State) -> filename=Filename, hashtree=HashTree}}; starting({open_reader, Filename}, _From, State) -> + leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, false), {reply, ok, reader, State#state{handle=Handle, @@ -436,6 +441,7 @@ starting({open_reader, Filename}, _From, State) -> filename=Filename, hash_index=Index}}; starting({open_reader, Filename, LastKey}, _From, State) -> + leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), {reply, ok, reader, State#state{handle=Handle, diff --git a/src/leveled_ebloom.erl b/src/leveled_ebloom.erl index 91d8879..9612f72 100644 --- a/src/leveled_ebloom.erl +++ b/src/leveled_ebloom.erl @@ -498,10 +498,11 @@ generate_orderedkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> Acc; generate_orderedkeys(Seqn, Count, Acc, BucketLow, BucketHigh) -> BNumber = Seqn div (BucketHigh - BucketLow), - BucketExt = string:right(integer_to_list(BucketLow + BNumber), 4, $0), - KNumber = Seqn * 100 + leveled_rand:uniform(100), - KeyExt = - string:right(integer_to_list(KNumber), 8, $0), + BucketExt = + io_lib:format("K~4..0B", [BucketLow + BNumber]), + KeyExt = + io_lib:format("K~8..0B", [Seqn * 100 + leveled_rand:uniform(100)]), + LK = leveled_codec:to_ledgerkey("Bucket" ++ BucketExt, "Key" ++ KeyExt, o), Chunk = leveled_rand:rand_bytes(16), {_B, _K, MV, _H, _LMs} = diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 957f3cb..167c63d 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -79,12 +79,16 @@ handle_cast/2, handle_info/2, terminate/2, - clerk_new/1, + code_change/3]). + +-export([clerk_new/1, clerk_compact/7, clerk_hashtablecalc/3, clerk_trim/3, clerk_stop/1, - code_change/3]). + clerk_loglevel/2, + clerk_addlogs/2, + clerk_removelogs/2]). -export([schedule_compaction/3]). @@ -104,7 +108,7 @@ -record(state, {inker :: pid() | undefined, max_run_length :: integer() | undefined, - cdb_options, + cdb_options = #cdb_options{} :: #cdb_options{}, waste_retention_period :: integer() | undefined, waste_path :: string() | undefined, reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), @@ -140,7 +144,7 @@ %% @doc %% Generate a new clerk clerk_new(InkerClerkOpts) -> - gen_server:start_link(?MODULE, [InkerClerkOpts], []). + gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerClerkOpts], []). -spec clerk_compact(pid(), pid(), fun(), fun(), fun(), @@ -170,7 +174,8 @@ clerk_trim(Pid, Inker, PersistedSQN) -> %% of the hastable in the CDB file - so that the file is not blocked during %% this calculation clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> - {ok, Clerk} = gen_server:start_link(?MODULE, [#iclerk_options{}], []), + {ok, Clerk} = gen_server:start_link(?MODULE, [leveled_log:get_opts(), + #iclerk_options{}], []), gen_server:cast(Clerk, {hashtable_calc, HashTree, StartPos, CDBpid}). -spec clerk_stop(pid()) -> ok. @@ -179,11 +184,30 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> clerk_stop(Pid) -> gen_server:cast(Pid, stop). +-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. +%% @doc +%% Change the log level of the Journal +clerk_loglevel(Pid, LogLevel) -> + gen_server:cast(Pid, {log_level, LogLevel}). + +-spec clerk_addlogs(pid(), list(string())) -> ok. +%% @doc +%% Add to the list of forced logs, a list of more forced logs +clerk_addlogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {add_logs, ForcedLogs}). + +-spec clerk_removelogs(pid(), list(string())) -> ok. +%% @doc +%% Remove from the list of forced logs, a list of forced logs +clerk_removelogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {remove_logs, ForcedLogs}). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ -init([IClerkOpts]) -> +init([LogOpts, IClerkOpts]) -> + leveled_log:save(LogOpts), ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy, CDBopts = IClerkOpts#iclerk_options.cdb_options, WP = CDBopts#cdb_options.waste_path, @@ -290,6 +314,21 @@ handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin), {stop, normal, State}; +handle_cast({log_level, LogLevel}, State) -> + ok = leveled_log:set_loglevel(LogLevel), + CDBopts = State#state.cdb_options, + CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{cdb_options = CDBopts0}}; +handle_cast({add_logs, ForcedLogs}, State) -> + ok = leveled_log:add_forcedlogs(ForcedLogs), + CDBopts = State#state.cdb_options, + CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{cdb_options = CDBopts0}}; +handle_cast({remove_logs, ForcedLogs}, State) -> + ok = leveled_log:remove_forcedlogs(ForcedLogs), + CDBopts = State#state.cdb_options, + CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{cdb_options = CDBopts0}}; handle_cast(stop, State) -> {stop, normal, State}. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 0a758cf..01571f6 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -116,7 +116,11 @@ ink_roll/1, ink_backup/2, ink_checksqn/2, - build_dummy_journal/0, + ink_loglevel/2, + ink_addlogs/2, + ink_removelogs/2]). + +-export([build_dummy_journal/0, clean_testdir/1, filepath/2, filepath/3]). @@ -174,13 +178,13 @@ %% The inker will need to know what the reload strategy is, to inform the %% clerk about the rules to enforce during compaction. ink_start(InkerOpts) -> - gen_server:start_link(?MODULE, [InkerOpts], []). + gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerOpts], []). -spec ink_snapstart(inker_options()) -> {ok, pid()}. %% @doc %% Don't link on startup as snapshot ink_snapstart(InkerOpts) -> - gen_server:start(?MODULE, [InkerOpts], []). + gen_server:start(?MODULE, [leveled_log:get_opts(), InkerOpts], []). -spec ink_put(pid(), leveled_codec:ledger_key(), @@ -447,11 +451,30 @@ ink_printmanifest(Pid) -> ink_checksqn(Pid, LedgerSQN) -> gen_server:call(Pid, {check_sqn, LedgerSQN}). +-spec ink_loglevel(pid(), leveled_log:log_level()) -> ok. +%% @doc +%% Change the log level of the Journal +ink_loglevel(Pid, LogLevel) -> + gen_server:cast(Pid, {log_level, LogLevel}). + +-spec ink_addlogs(pid(), list(string())) -> ok. +%% @doc +%% Add to the list of forced logs, a list of more forced logs +ink_addlogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {add_logs, ForcedLogs}). + +-spec ink_removelogs(pid(), list(string())) -> ok. +%% @doc +%% Remove from the list of forced logs, a list of forced logs +ink_removelogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {remove_logs, ForcedLogs}). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ -init([InkerOpts]) -> +init([LogOpts, InkerOpts]) -> + leveled_log:save(LogOpts), leveled_rand:seed(), case {InkerOpts#inker_options.root_path, InkerOpts#inker_options.start_snapshot} of @@ -697,7 +720,28 @@ handle_cast({release_snapshot, Snapshot}, State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), leveled_log:log("I0003", [Snapshot]), leveled_log:log("I0004", [length(Rs)]), - {noreply, State#state{registered_snapshots=Rs}}. + {noreply, State#state{registered_snapshots=Rs}}; +handle_cast({log_level, LogLevel}, State) -> + INC = State#state.clerk, + ok = leveled_iclerk:clerk_loglevel(INC, LogLevel), + ok = leveled_log:set_loglevel(LogLevel), + CDBopts = State#state.cdb_options, + CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{cdb_options = CDBopts0}}; +handle_cast({add_logs, ForcedLogs}, State) -> + INC = State#state.clerk, + ok = leveled_iclerk:clerk_addlogs(INC, ForcedLogs), + ok = leveled_log:add_forcedlogs(ForcedLogs), + CDBopts = State#state.cdb_options, + CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{cdb_options = CDBopts0}}; +handle_cast({remove_logs, ForcedLogs}, State) -> + INC = State#state.clerk, + ok = leveled_iclerk:clerk_removelogs(INC, ForcedLogs), + ok = leveled_log:remove_forcedlogs(ForcedLogs), + CDBopts = State#state.cdb_options, + CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{cdb_options = CDBopts0}}. %% handle the bookie stopping and stop this snapshot handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, @@ -1400,8 +1444,9 @@ compact_journal_testto(WRP, ExpectedFiles) -> 5000), timer:sleep(1000), CompactedManifest2 = ink_getmanifest(Ink1), + {ok, PrefixTest} = re:compile(?COMPACT_FP), lists:foreach(fun({_SQN, FN, _P, _LK}) -> - ?assertMatch(0, string:str(FN, ?COMPACT_FP)) + nomatch = re:run(FN, PrefixTest) end, CompactedManifest2), ?assertMatch(2, length(CompactedManifest2)), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 4d60d5b..3bf6593 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -9,10 +9,25 @@ -export([log/2, log_timer/3, - log_randomtimer/4]). + log_randomtimer/4]). + +-export([set_loglevel/1, + add_forcedlogs/1, + remove_forcedlogs/1, + get_opts/0, + save/1, + return_settings/0]). + + +-record(log_options, {log_level = info :: log_level(), + forced_logs = [] :: [string()]}). + +-type log_level() :: debug | info | warn | error | critical. +-type log_options() :: #log_options{}. + +-export_type([log_options/0, log_level/0]). -define(LOG_LEVELS, [debug, info, warn, error, critical]). - -define(DEFAULT_LOG_LEVEL, error). -define(LOGBASE, [ @@ -376,6 +391,70 @@ ]). +%%%============================================================================ +%%% Manage Log Options +%%%============================================================================ + +-spec set_loglevel(log_level()) -> ok. +%% @doc +%% Set the log level for this PID +set_loglevel(LogLevel) when is_atom(LogLevel) -> + LO = get_opts(), + UpdLO = LO#log_options{log_level = LogLevel}, + save(UpdLO). + +-spec add_forcedlogs(list(string())) -> ok. +%% @doc +%% Add a forced log to the list of forced logs. this will cause the log of this +%% logReference to be logged even if the log_level of the process would not +%% otherwise require it to be logged - e.g. to fire an 'INFO' log when running +%% at an 'ERROR' log level. +add_forcedlogs(LogRefs) -> + LO = get_opts(), + ForcedLogs = LO#log_options.forced_logs, + UpdLO = LO#log_options{forced_logs = lists:usort(LogRefs ++ ForcedLogs)}, + save(UpdLO). + +-spec remove_forcedlogs(list(string())) -> ok. +%% @doc +%% Remove a forced log from the list of forced logs +remove_forcedlogs(LogRefs) -> + LO = get_opts(), + ForcedLogs = LO#log_options.forced_logs, + UpdLO = LO#log_options{forced_logs = lists:subtract(ForcedLogs, LogRefs)}, + save(UpdLO). + +-spec save(log_options()) -> ok. +%% @doc +%% Save the log options to the process dictionary +save(#log_options{} = LO) -> + put('$leveled_log_options', LO), + ok. + +-spec get_opts() -> log_options(). +%% @doc +%% Retrieve the log options from the process dictionary if present. +get_opts() -> + case get('$leveled_log_options') of + #log_options{} = LO -> + LO; + _ -> + #log_options{log_level = ?DEFAULT_LOG_LEVEL, + forced_logs = []} + end. + +-spec return_settings() -> {log_level(), list(string())}. +%% @doc +%% Return the settings outside of the record +return_settings() -> + LO = get_opts(), + {LO#log_options.log_level, LO#log_options.forced_logs}. + +%%%============================================================================ +%%% Prompt Logs +%%%============================================================================ + + log(LogReference, Subs) -> log(LogReference, Subs, ?LOG_LEVELS). @@ -396,23 +475,21 @@ log(LogRef, Subs, SupportedLogLevels) -> end. should_i_log(LogLevel, Levels, LogRef) -> - ForcedLogs = application:get_env(leveled, forced_logs, []), + #log_options{log_level = CurLevel, forced_logs = ForcedLogs} = get_opts(), case lists:member(LogRef, ForcedLogs) of true -> true; false -> - case application:get_env(leveled, log_level, ?DEFAULT_LOG_LEVEL) of - LogLevel -> + if CurLevel == LogLevel -> true; - CurLevel -> + true -> is_active_level(Levels, CurLevel, LogLevel) end end. is_active_level([L|_], L, _) -> true; is_active_level([L|_], _, L) -> false; -is_active_level([_|T], C, L) -> is_active_level(T, C, L); -is_active_level([] , _, _) -> false. +is_active_level([_|T], C, L) -> is_active_level(T, C, L). log_timer(LogReference, Subs, StartTime) -> log_timer(LogReference, Subs, StartTime, ?LOG_LEVELS). @@ -482,21 +559,21 @@ log_warn_test() -> ok = log_timer("G8888", [], os:timestamp(), [info, warn, error]). shouldilog_test() -> - % What if an unsupported option is set for the log level - % don't log - ok = application:set_env(leveled, log_level, unsupported), - ?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0001")), - ?assertMatch(false, should_i_log(inform, ?LOG_LEVELS, "G0001")), - ok = application:set_env(leveled, log_level, debug), + ok = set_loglevel(debug), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), - ok = application:set_env(leveled, log_level, info), + ok = set_loglevel(info), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), - ok = application:set_env(leveled, forced_logs, ["G0001"]), - ok = application:set_env(leveled, log_level, error), + ok = add_forcedlogs(["G0001"]), + ok = set_loglevel(error), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), ?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0002")), - ok = application:set_env(leveled, forced_logs, []), - ok = application:set_env(leveled, log_level, info), + ok = remove_forcedlogs(["G0001"]), + ok = set_loglevel(info), ?assertMatch(false, should_i_log(debug, ?LOG_LEVELS, "D0001")). +badloglevel_test() -> + % Set a bad log level - and everything logs + ?assertMatch(true, is_active_level(?LOG_LEVELS, debug, unsupported)), + ?assertMatch(true, is_active_level(?LOG_LEVELS, critical, unsupported)). + -endif. diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 8f9e3b2..5bbbae4 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -39,7 +39,10 @@ clerk_prompt/1, clerk_push/2, clerk_close/1, - clerk_promptdeletions/2 + clerk_promptdeletions/2, + clerk_loglevel/2, + clerk_addlogs/2, + clerk_removelogs/2 ]). -include_lib("eunit/include/eunit.hrl"). @@ -50,17 +53,18 @@ -record(state, {owner :: pid() | undefined, root_path :: string() | undefined, pending_deletions = dict:new(), % OTP 16 does not like type - compression_method = native :: lz4|native + sst_options :: #sst_options{} }). %%%============================================================================ %%% API %%%============================================================================ -clerk_new(Owner, Manifest, CompressionMethod) -> +clerk_new(Owner, Manifest, OptsSST) -> {ok, Pid} = gen_server:start_link(?MODULE, - [{compression_method, CompressionMethod}], + [leveled_log:get_opts(), + {sst_options, OptsSST}], []), ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity), leveled_log:log("PC001", [Pid, Owner]), @@ -75,6 +79,24 @@ clerk_promptdeletions(Pid, ManifestSQN) -> clerk_push(Pid, Work) -> gen_server:cast(Pid, {push_work, Work}). +-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. +%% @doc +%% Change the log level of the Journal +clerk_loglevel(Pid, LogLevel) -> + gen_server:cast(Pid, {log_level, LogLevel}). + +-spec clerk_addlogs(pid(), list(string())) -> ok. +%% @doc +%% Add to the list of forced logs, a list of more forced logs +clerk_addlogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {add_logs, ForcedLogs}). + +-spec clerk_removelogs(pid(), list(string())) -> ok. +%% @doc +%% Remove from the list of forced logs, a list of forced logs +clerk_removelogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {remove_logs, ForcedLogs}). + clerk_close(Pid) -> gen_server:call(Pid, close, 20000). @@ -82,8 +104,9 @@ clerk_close(Pid) -> %%% gen_server callbacks %%%============================================================================ -init([{compression_method, CompressionMethod}]) -> - {ok, #state{compression_method = CompressionMethod}}. +init([LogOpts, {sst_options, OptsSST}]) -> + leveled_log:save(LogOpts), + {ok, #state{sst_options = OptsSST}}. handle_call({load, Owner, RootPath}, _From, State) -> {reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT}; @@ -101,7 +124,22 @@ handle_cast({prompt_deletions, ManifestSQN}, State) -> {Deletions, UpdD} = return_deletions(ManifestSQN, State#state.pending_deletions), ok = notify_deletions(Deletions, State#state.owner), - {noreply, State#state{pending_deletions = UpdD}, ?MIN_TIMEOUT}. + {noreply, State#state{pending_deletions = UpdD}, ?MIN_TIMEOUT}; +handle_cast({log_level, LogLevel}, State) -> + ok = leveled_log:set_loglevel(LogLevel), + SSTopts = State#state.sst_options, + SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{sst_options = SSTopts0}}; +handle_cast({add_logs, ForcedLogs}, State) -> + ok = leveled_log:add_forcedlogs(ForcedLogs), + SSTopts = State#state.sst_options, + SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{sst_options = SSTopts0}}; +handle_cast({remove_logs, ForcedLogs}, State) -> + ok = leveled_log:remove_forcedlogs(ForcedLogs), + SSTopts = State#state.sst_options, + SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{sst_options = SSTopts0}}. handle_info(timeout, State) -> request_work(State), @@ -125,7 +163,7 @@ handle_work({SrcLevel, Manifest}, State) -> {UpdManifest, EntriesToDelete} = merge(SrcLevel, Manifest, State#state.root_path, - State#state.compression_method), + State#state.sst_options), leveled_log:log("PC007", []), SWMC = os:timestamp(), ok = leveled_penciller:pcl_manifestchange(State#state.owner, @@ -137,7 +175,7 @@ handle_work({SrcLevel, Manifest}, State) -> leveled_log:log_timer("PC018", [], SWSM), {leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}. -merge(SrcLevel, Manifest, RootPath, CompressionMethod) -> +merge(SrcLevel, Manifest, RootPath, OptsSST) -> Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel), NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1, SinkList = leveled_pmanifest:merge_lookup(Manifest, @@ -159,7 +197,7 @@ merge(SrcLevel, Manifest, RootPath, CompressionMethod) -> SST_RP = leveled_penciller:sst_rootpath(RootPath), perform_merge(Manifest, Src, SinkList, SrcLevel, - SST_RP, NewSQN, CompressionMethod) + SST_RP, NewSQN, OptsSST) end. notify_deletions([], _Penciller) -> @@ -177,7 +215,7 @@ notify_deletions([Head|Tail], Penciller) -> perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, - CompressionMethod) -> + OptsSST) -> leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), SrcList = [{next, Src, all}], MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), @@ -187,7 +225,7 @@ perform_merge(Manifest, do_merge(SrcList, SinkList, SinkLevel, SinkBasement, RootPath, NewSQN, MaxSQN, - CompressionMethod, + OptsSST, []), RevertPointerFun = fun({next, ME, _SK}) -> @@ -205,23 +243,24 @@ perform_merge(Manifest, Src), {Man2, [Src|SinkManifestList]}. -do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _CM, Additions) -> +do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) -> leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), Additions; -do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) -> +do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) -> FileName = leveled_penciller:sst_filename(NewSQN, SinkLevel, length(Additions)), leveled_log:log("PC012", [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), case leveled_sst:sst_new(RP, FileName, - KL1, KL2, SinkB, SinkLevel, MaxSQN, CM) of + KL1, KL2, SinkB, SinkLevel, MaxSQN, + OptsSST) of empty -> leveled_log:log("PC013", [FileName]), do_merge([], [], SinkLevel, SinkB, RP, NewSQN, MaxSQN, - CM, + OptsSST, Additions); {ok, Pid, Reply, Bloom} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, @@ -234,7 +273,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) -> do_merge(KL1Rem, KL2Rem, SinkLevel, SinkB, RP, NewSQN, MaxSQN, - CM, + OptsSST, Additions ++ [Entry]) end. @@ -263,9 +302,13 @@ generate_randomkeys(Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(0, Acc, _BucketLow, _BucketHigh) -> Acc; generate_randomkeys(Count, Acc, BucketLow, BRange) -> - BNumber = string:right(integer_to_list(BucketLow + leveled_rand:uniform(BRange)), - 4, $0), - KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0), + BNumber = + lists:flatten( + io_lib:format("~4..0B", + [BucketLow + leveled_rand:uniform(BRange)])), + KNumber = + lists:flatten( + io_lib:format("~4..0B", [leveled_rand:uniform(1000)])), K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, RandKey = {K, {Count + 1, {active, infinity}, @@ -282,7 +325,7 @@ merge_file_test() -> 1, KL1_L1, 999999, - native), + #sst_options{}), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), {ok, PidL2_1, _, _} = leveled_sst:sst_new("../test/", @@ -290,7 +333,7 @@ merge_file_test() -> 2, KL1_L2, 999999, - native), + #sst_options{}), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), {ok, PidL2_2, _, _} = leveled_sst:sst_new("../test/", @@ -298,7 +341,7 @@ merge_file_test() -> 2, KL2_L2, 999999, - lz4), + #sst_options{press_method = lz4}), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), {ok, PidL2_3, _, _} = leveled_sst:sst_new("../test/", @@ -306,7 +349,7 @@ merge_file_test() -> 2, KL3_L2, 999999, - lz4), + #sst_options{press_method = lz4}), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), {ok, PidL2_4, _, _} = leveled_sst:sst_new("../test/", @@ -314,7 +357,7 @@ merge_file_test() -> 2, KL4_L2, 999999, - lz4), + #sst_options{press_method = lz4}), E1 = #manifest_entry{owner = PidL1_1, filename = "./KL1_L1.sst", @@ -347,11 +390,12 @@ merge_file_test() -> PointerList = lists:map(fun(ME) -> {next, ME, all} end, [E2, E3, E4, E5]), {Man6, _Dels} = - perform_merge(Man5, E1, PointerList, 1, "../test", 3, native), + perform_merge(Man5, E1, PointerList, 1, "../test", 3, #sst_options{}), ?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)). coverage_cheat_test() -> - {ok, _State1} = code_change(null, #state{}, null). + {ok, _State1} = + code_change(null, #state{sst_options=#sst_options{}}, null). -endif. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 076b198..c1e9fdc 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -191,7 +191,10 @@ pcl_getstartupsequencenumber/1, pcl_checkbloomtest/2, pcl_checkforwork/1, - pcl_persistedsqn/1]). + pcl_persistedsqn/1, + pcl_loglevel/2, + pcl_addlogs/2, + pcl_removelogs/2]). -export([ sst_rootpath/1, @@ -258,7 +261,7 @@ is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid() | undefined, - bookie_monref :: reference() | undefined, + bookie_monref :: reference() | undefined, levelzero_astree :: list() | undefined, work_ongoing = false :: boolean(), % i.e. compaction work @@ -267,7 +270,7 @@ timings = no_timing :: pcl_timings(), timings_countdown = 0 :: integer(), - compression_method = native :: lz4|native}). + sst_options = #sst_options{} :: #sst_options{}}). -record(pcl_timings, {sample_count = 0 :: integer(), @@ -318,13 +321,13 @@ %% query is run against the level zero space and just the query results are %5 copied into the clone. pcl_start(PCLopts) -> - gen_server:start_link(?MODULE, [PCLopts], []). + gen_server:start_link(?MODULE, [leveled_log:get_opts(), PCLopts], []). -spec pcl_snapstart(penciller_options()) -> {ok, pid()}. %% @doc %% Don't link to the bookie - this is a snpashot pcl_snapstart(PCLopts) -> - gen_server:start(?MODULE, [PCLopts], []). + gen_server:start(?MODULE, [leveled_log:get_opts(), PCLopts], []). -spec pcl_pushmem(pid(), bookies_memory()) -> ok|returned. %% @doc @@ -578,7 +581,6 @@ pcl_close(Pid) -> pcl_doom(Pid) -> gen_server:call(Pid, doom, 60000). - -spec pcl_checkbloomtest(pid(), tuple()) -> boolean(). %% @doc %% Function specifically added to help testing. In particular to make sure @@ -597,11 +599,30 @@ pcl_checkbloomtest(Pid, Key) -> pcl_checkforwork(Pid) -> gen_server:call(Pid, check_for_work, 2000). +-spec pcl_loglevel(pid(), leveled_log:log_level()) -> ok. +%% @doc +%% Change the log level of the Journal +pcl_loglevel(Pid, LogLevel) -> + gen_server:cast(Pid, {log_level, LogLevel}). + +-spec pcl_addlogs(pid(), list(string())) -> ok. +%% @doc +%% Add to the list of forced logs, a list of more forced logs +pcl_addlogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {add_logs, ForcedLogs}). + +-spec pcl_removelogs(pid(), list(string())) -> ok. +%% @doc +%% Remove from the list of forced logs, a list of forced logs +pcl_removelogs(Pid, ForcedLogs) -> + gen_server:cast(Pid, {remove_logs, ForcedLogs}). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ -init([PCLopts]) -> +init([LogOpts, PCLopts]) -> + leveled_log:save(LogOpts), leveled_rand:seed(), case {PCLopts#penciller_options.root_path, PCLopts#penciller_options.start_snapshot, @@ -1001,7 +1022,28 @@ handle_cast(work_for_clerk, State) -> end; _ -> {noreply, State} - end. + end; +handle_cast({log_level, LogLevel}, State) -> + PC = State#state.clerk, + ok = leveled_pclerk:clerk_loglevel(PC, LogLevel), + ok = leveled_log:set_loglevel(LogLevel), + SSTopts = State#state.sst_options, + SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{sst_options = SSTopts0}}; +handle_cast({add_logs, ForcedLogs}, State) -> + PC = State#state.clerk, + ok = leveled_pclerk:clerk_addlogs(PC, ForcedLogs), + ok = leveled_log:add_forcedlogs(ForcedLogs), + SSTopts = State#state.sst_options, + SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{sst_options = SSTopts0}}; +handle_cast({remove_logs, ForcedLogs}, State) -> + PC = State#state.clerk, + ok = leveled_pclerk:clerk_removelogs(PC, ForcedLogs), + ok = leveled_log:remove_forcedlogs(ForcedLogs), + SSTopts = State#state.sst_options, + SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()}, + {noreply, State#state{sst_options = SSTopts0}}. %% handle the bookie stopping and stop this snapshot @@ -1047,9 +1089,9 @@ sst_filename(ManSQN, Level, Count) -> start_from_file(PCLopts) -> RootPath = PCLopts#penciller_options.root_path, MaxTableSize = PCLopts#penciller_options.max_inmemory_tablesize, - PressMethod = PCLopts#penciller_options.compression_method, + OptsSST = PCLopts#penciller_options.sst_options, - {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, PressMethod), + {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, OptsSST), CoinToss = PCLopts#penciller_options.levelzero_cointoss, % Used to randomly defer the writing of L0 file. Intended to help with @@ -1061,14 +1103,14 @@ start_from_file(PCLopts) -> levelzero_maxcachesize = MaxTableSize, levelzero_cointoss = CoinToss, levelzero_index = leveled_pmem:new_index(), - compression_method = PressMethod}, + sst_options = OptsSST}, %% Open manifest Manifest0 = leveled_pmanifest:open_manifest(RootPath), OpenFun = fun(FN) -> {ok, Pid, {_FK, _LK}, Bloom} = - leveled_sst:sst_open(sst_rootpath(RootPath), FN), + leveled_sst:sst_open(sst_rootpath(RootPath), FN, OptsSST), {Pid, Bloom} end, SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, @@ -1083,7 +1125,9 @@ start_from_file(PCLopts) -> case filelib:is_file(filename:join(sst_rootpath(RootPath), L0FN)) of true -> leveled_log:log("P0015", [L0FN]), - L0Open = leveled_sst:sst_open(sst_rootpath(RootPath), L0FN), + L0Open = leveled_sst:sst_open(sst_rootpath(RootPath), + L0FN, + OptsSST), {ok, L0Pid, {L0StartKey, L0EndKey}, Bloom} = L0Open, L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), L0Entry = #manifest_entry{start_key = L0StartKey, @@ -1091,10 +1135,11 @@ start_from_file(PCLopts) -> filename = L0FN, owner = L0Pid, bloom = Bloom}, - Manifest2 = leveled_pmanifest:insert_manifest_entry(Manifest1, - ManSQN + 1, - 0, - L0Entry), + Manifest2 = + leveled_pmanifest:insert_manifest_entry(Manifest1, + ManSQN + 1, + 0, + L0Entry), leveled_log:log("P0016", [L0SQN]), LedgerSQN = max(MaxSQN, L0SQN), {InitState#state{manifest = Manifest2, @@ -1255,7 +1300,7 @@ roll_memory(State, false) -> FetchFun, PCL, State#state.ledger_sqn, - State#state.compression_method), + State#state.sst_options), {ok, Constructor, _} = R, {Constructor, none}; roll_memory(State, true) -> @@ -1270,7 +1315,7 @@ roll_memory(State, true) -> 0, KVList, State#state.ledger_sqn, - State#state.compression_method), + State#state.sst_options), {ok, Constructor, _, Bloom} = R, {Constructor, Bloom}. @@ -1904,9 +1949,10 @@ shutdown_when_compact(Pid) -> simple_server_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), - {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000, - compression_method=native}), + {ok, PCL} = + pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000, + sst_options=#sst_options{}}), Key1_Pre = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, Key1 = add_missing_hash(Key1_Pre), @@ -1947,9 +1993,10 @@ simple_server_test() -> ok = shutdown_when_compact(PCL), - {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000, - compression_method=native}), + {ok, PCLr} = + pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000, + sst_options=#sst_options{}}), ?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)), % ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), true = pcl_checkbloomtest(PCLr, {o,"Bucket0001", "Key0001", null}), @@ -2213,15 +2260,14 @@ create_file_test() -> KVL = lists:usort(generate_randomkeys({50000, 0})), Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE), FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, - {ok, - SP, - noreply} = leveled_sst:sst_newlevelzero(RP, - Filename, - 1, - FetchFun, - undefined, - 50000, - native), + {ok, SP, noreply} = + leveled_sst:sst_newlevelzero(RP, + Filename, + 1, + FetchFun, + undefined, + 50000, + #sst_options{press_method = native}), lists:foreach(fun(X) -> case checkready(SP) of timeout -> @@ -2272,9 +2318,10 @@ coverage_cheat_test() -> handle_down_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), - {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000, - compression_method=native}), + {ok, PCLr} = + pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000, + sst_options=#sst_options{}}), FakeBookie = spawn(fun loop/0), Mon = erlang:monitor(process, FakeBookie), diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 3bb8cd8..f19103a 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -262,9 +262,12 @@ generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> Acc; generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> - BNumber = string:right(integer_to_list(BucketLow + leveled_rand:uniform(BRange)), - 4, $0), - KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0), + BNumber = + lists:flatten( + io_lib:format("~4..0B", + [BucketLow + leveled_rand:uniform(BRange)])), + KNumber = + lists:flatten(io_lib:format("~4..0B", [leveled_rand:uniform(1000)])), {K, V} = {{o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, {Seqn, {active, infinity}, null}}, generate_randomkeys(Seqn + 1, diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index d3e3afe..8c15254 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -111,7 +111,7 @@ -export([sst_new/6, sst_new/8, sst_newlevelzero/7, - sst_open/2, + sst_open/3, sst_get/2, sst_get/3, sst_expandpointer/5, @@ -164,6 +164,8 @@ :: false| {sets, sets:set(non_neg_integer())}| {list, list(non_neg_integer())}. +-type sst_options() + :: #sst_options{}. %% yield_blockquery is used to detemrine if the work necessary to process a %% range query beyond the fetching the slot should be managed from within @@ -210,13 +212,13 @@ -type sst_timings() :: no_timing|#sst_timings{}. -type build_timings() :: no_timing|#build_timings{}. --export_type([expandable_pointer/0]). +-export_type([expandable_pointer/0, press_method/0]). %%%============================================================================ %%% API %%%============================================================================ --spec sst_open(string(), string()) +-spec sst_open(string(), string(), sst_options()) -> {ok, pid(), {leveled_codec:ledger_key(), leveled_codec:ledger_key()}, binary()}. @@ -228,10 +230,10 @@ %% term order. %% %% The filename should include the file extension. -sst_open(RootPath, Filename) -> +sst_open(RootPath, Filename, OptsSST) -> {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), case gen_fsm:sync_send_event(Pid, - {sst_open, RootPath, Filename}, + {sst_open, RootPath, Filename, OptsSST}, infinity) of {ok, {SK, EK}, Bloom} -> {ok, Pid, {SK, EK}, Bloom} @@ -239,7 +241,7 @@ sst_open(RootPath, Filename) -> -spec sst_new(string(), string(), integer(), list(leveled_codec:ledger_kv()), - integer(), press_method()) + integer(), sst_options()) -> {ok, pid(), {leveled_codec:ledger_key(), leveled_codec:ledger_key()}, binary()}. @@ -247,14 +249,14 @@ sst_open(RootPath, Filename) -> %% Start a new SST file at the assigned level passing in a list of Key, Value %% pairs. This should not be used for basement levels or unexpanded Key/Value %% lists as merge_lists will not be called. -sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> - sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, - ?INDEX_MODDATE). +sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST) -> + sst_new(RootPath, Filename, Level, + KVList, MaxSQN, OptsSST, ?INDEX_MODDATE). -sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, - IndexModDate) -> +sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), - PressMethod0 = compress_level(Level, PressMethod), + PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), + OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {[], [], SlotList, FK} = merge_lists(KVList, PressMethod0, IndexModDate), case gen_fsm:sync_send_event(Pid, @@ -264,7 +266,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, Level, {SlotList, FK}, MaxSQN, - PressMethod0, + OptsSST0, IndexModDate}, infinity) of {ok, {SK, EK}, Bloom} -> @@ -275,7 +277,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, list(leveled_codec:ledger_kv()|sst_pointer()), list(leveled_codec:ledger_kv()|sst_pointer()), boolean(), integer(), - integer(), press_method()) + integer(), sst_options()) -> empty|{ok, pid(), {{list(leveled_codec:ledger_kv()), list(leveled_codec:ledger_kv())}, @@ -295,15 +297,16 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, %% file is not added to the manifest. sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, - MaxSQN, PressMethod) -> + MaxSQN, OptsSST) -> sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, - MaxSQN, PressMethod, ?INDEX_MODDATE). + MaxSQN, OptsSST, ?INDEX_MODDATE). sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, - MaxSQN, PressMethod, IndexModDate) -> - PressMethod0 = compress_level(Level, PressMethod), + MaxSQN, OptsSST, IndexModDate) -> + PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), + OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {Rem1, Rem2, SlotList, FK} = merge_lists(KVL1, KVL2, {IsBasement, Level}, PressMethod0, IndexModDate), @@ -319,7 +322,7 @@ sst_new(RootPath, Filename, Level, {SlotList, FK}, MaxSQN, - PressMethod0, + OptsSST0, IndexModDate}, infinity) of {ok, {SK, EK}, Bloom} -> @@ -329,7 +332,7 @@ sst_new(RootPath, Filename, -spec sst_newlevelzero(string(), string(), integer(), fun(), pid()|undefined, integer(), - press_method()) -> + sst_options()) -> {ok, pid(), noreply}. %% @doc %% Start a new file at level zero. At this level the file size is not fixed - @@ -337,8 +340,9 @@ sst_new(RootPath, Filename, %% fetched slot by slot using the FetchFun sst_newlevelzero(RootPath, Filename, Slots, FetchFun, Penciller, - MaxSQN, PressMethod) -> - PressMethod0 = compress_level(0, PressMethod), + MaxSQN, OptsSST) -> + PressMethod0 = compress_level(0, OptsSST#sst_options.press_method), + OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), gen_fsm:send_event(Pid, {sst_newlevelzero, @@ -348,7 +352,7 @@ sst_newlevelzero(RootPath, Filename, FetchFun, Penciller, MaxSQN, - PressMethod0, + OptsSST0, ?INDEX_MODDATE}), {ok, Pid, noreply}. @@ -448,7 +452,8 @@ sst_printtimings(Pid) -> init([]) -> {ok, starting, #state{}}. -starting({sst_open, RootPath, Filename}, _From, State) -> +starting({sst_open, RootPath, Filename, OptsSST}, _From, State) -> + leveled_log:save(OptsSST#sst_options.log_options), {UpdState, Bloom} = read_file(Filename, State#state{root_path=RootPath}), Summary = UpdState#state.summary, @@ -459,8 +464,10 @@ starting({sst_open, RootPath, Filename}, _From, State) -> starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN, - PressMethod, IdxModDate}, _From, State) -> + OptsSST, IdxModDate}, _From, State) -> SW = os:timestamp(), + leveled_log:save(OptsSST#sst_options.log_options), + PressMethod = OptsSST#sst_options.press_method, {Length, SlotIndex, BlockIndex, SlotsBin, Bloom} = build_all_slots(SlotList), SummaryBin = @@ -483,8 +490,10 @@ starting({sst_new, starting({sst_newlevelzero, RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN, - PressMethod, IdxModDate}, State) -> + OptsSST, IdxModDate}, State) -> SW0 = os:timestamp(), + leveled_log:save(OptsSST#sst_options.log_options), + PressMethod = OptsSST#sst_options.press_method, KVList = leveled_pmem:to_list(Slots, FetchFun), Time0 = timer:now_diff(os:timestamp(), SW0), @@ -2472,12 +2481,18 @@ update_timings(SW, Timings, Stage, Continue) -> -ifdef(TEST). testsst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> - sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, false). + OptsSST = + #sst_options{press_method=PressMethod, + log_options=leveled_log:get_opts()}, + sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, false). testsst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN, PressMethod) -> + OptsSST = + #sst_options{press_method=PressMethod, + log_options=leveled_log:get_opts()}, sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN, - PressMethod, false). + OptsSST, false). generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Seqn, @@ -2490,8 +2505,10 @@ generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> Acc; generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> BRand = leveled_rand:uniform(BRange), - BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0), - KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 6, $0), + BNumber = + lists:flatten(io_lib:format("K~4..0B", [BucketLow + BRand])), + KNumber = + lists:flatten(io_lib:format("K~6..0B", [leveled_rand:uniform(1000)])), LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o), Chunk = leveled_rand:rand_bytes(64), {_B, _K, MV, _H, _LMs} = @@ -2814,8 +2831,7 @@ test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> merge_test() -> - merge_tester(fun testsst_new/6, fun testsst_new/8), - merge_tester(fun sst_new/6, fun sst_new/8). + merge_tester(fun testsst_new/6, fun testsst_new/8). merge_tester(NewFunS, NewFunM) -> @@ -2868,8 +2884,7 @@ merge_tester(NewFunS, NewFunM) -> simple_persisted_range_test() -> - simple_persisted_range_tester(fun testsst_new/6), - simple_persisted_range_tester(fun sst_new/6). + simple_persisted_range_tester(fun testsst_new/6). simple_persisted_range_tester(SSTNewFun) -> {RP, Filename} = {"../test/", "simple_test"}, @@ -2911,8 +2926,7 @@ simple_persisted_range_tester(SSTNewFun) -> simple_persisted_rangesegfilter_test() -> - simple_persisted_rangesegfilter_tester(fun testsst_new/6), - simple_persisted_rangesegfilter_tester(fun sst_new/6). + simple_persisted_rangesegfilter_tester(fun testsst_new/6). simple_persisted_rangesegfilter_tester(SSTNewFun) -> {RP, Filename} = {"../test/", "range_segfilter_test"}, @@ -3007,7 +3021,7 @@ additional_range_test() -> lists:seq(?NOLOOK_SLOTSIZE + Gap + 1, 2 * ?NOLOOK_SLOTSIZE + Gap)), {ok, P1, {{Rem1, Rem2}, SK, EK}, _Bloom1} = - sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native), + testsst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native), ?assertMatch([], Rem1), ?assertMatch([], Rem2), ?assertMatch(SK, element(1, lists:nth(1, IK1))), @@ -3058,8 +3072,7 @@ additional_range_test() -> simple_persisted_slotsize_test() -> - simple_persisted_slotsize_tester(fun testsst_new/6), - simple_persisted_slotsize_tester(fun sst_new/6). + simple_persisted_slotsize_tester(fun testsst_new/6). simple_persisted_slotsize_tester(SSTNewFun) -> @@ -3082,8 +3095,7 @@ simple_persisted_test_() -> {timeout, 60, fun simple_persisted_test_bothformats/0}. simple_persisted_test_bothformats() -> - simple_persisted_tester(fun testsst_new/6), - simple_persisted_tester(fun sst_new/6). + simple_persisted_tester(fun testsst_new/6). simple_persisted_tester(SSTNewFun) -> {RP, Filename} = {"../test/", "simple_test"}, diff --git a/src/leveled_tree.erl b/src/leveled_tree.erl index f033bb4..d38aaa6 100644 --- a/src/leveled_tree.erl +++ b/src/leveled_tree.erl @@ -581,8 +581,12 @@ generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> Acc; generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> BRand = leveled_rand:uniform(BRange), - BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0), - KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0), + BNumber = + lists:flatten( + io_lib:format("K~4..0B", [BucketLow + BRand])), + KNumber = + lists:flatten( + io_lib:format("K~8..0B", [leveled_rand:uniform(1000)])), {K, V} = {{o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, {Seqn, {active, infinity}, null}}, generate_randomkeys(Seqn + 1, diff --git a/src/leveled_util.erl b/src/leveled_util.erl index 0539a22..03c2083 100644 --- a/src/leveled_util.erl +++ b/src/leveled_util.erl @@ -65,8 +65,6 @@ hash1(H, <>) -> hash1(H2, Rest). - - %%%============================================================================ %%% Test %%%============================================================================ @@ -88,4 +86,4 @@ magichashperf_test() -> {TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]), io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]). --endif. \ No newline at end of file +-endif. diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index fdde0c4..b610440 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -41,6 +41,7 @@ simple_put_fetch_head_delete(_Config) -> simple_test_withlog(error, ["B0015", "B0016", "B0017", "B0018", "P0032", "SST12", "CDB19", "SST13", "I0019"]). + simple_test_withlog(LogLevel, ForcedLogs) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, @@ -59,6 +60,7 @@ simple_test_withlog(LogLevel, ForcedLogs) -> {log_level, LogLevel}, {forced_logs, ForcedLogs}], {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + testutil:check_forobject(Bookie2, TestObject), ObjList1 = testutil:generate_objects(5000, 2), testutil:riakload(Bookie2, ObjList1), @@ -106,14 +108,20 @@ many_put_fetch_head(_Config) -> {sync_strategy, testutil:sync_strategy()}, {compression_point, on_receipt}], {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + ok = leveled_bookie:book_loglevel(Bookie2, error), + ok = leveled_bookie:book_addlogs(Bookie2, ["B0015"]), testutil:check_forobject(Bookie2, TestObject), GenList = [2, 20002, 40002, 60002, 80002, 100002, 120002, 140002, 160002, 180002], CLs = testutil:load_objects(20000, GenList, Bookie2, TestObject, fun testutil:generate_smallobjects/2), + {error, ["B0015"]} = leveled_bookie:book_logsettings(Bookie2), + ok = leveled_bookie:book_removelogs(Bookie2, ["B0015"]), CL1A = lists:nth(1, CLs), ChkListFixed = lists:nth(length(CLs), CLs), testutil:check_forlist(Bookie2, CL1A), + {error, []} = leveled_bookie:book_logsettings(Bookie2), + ok = leveled_bookie:book_loglevel(Bookie2, info), ObjList2A = testutil:generate_objects(5000, 2), testutil:riakload(Bookie2, ObjList2A), ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000), @@ -197,8 +205,8 @@ journal_compaction_tester(Restart, WRP) -> ObjListD = testutil:generate_objects(10000, 2), lists:foreach(fun({_R, O, _S}) -> testutil:book_riakdelete(Bookie0, - O#r_object.bucket, - O#r_object.key, + testutil:get_bucket(O), + testutil:get_key(O), []) end, ObjListD), @@ -577,8 +585,8 @@ load_and_count_withdelete(_Config) -> 0, lists:seq(1, 20)), testutil:check_forobject(Bookie1, TestObject), - {BucketD, KeyD} = {TestObject#r_object.bucket, - TestObject#r_object.key}, + {BucketD, KeyD} = + {testutil:get_bucket(TestObject), testutil:get_key(TestObject)}, {_, 1} = testutil:check_bucket_stats(Bookie1, BucketD), ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []), not_found = testutil:book_riakget(Bookie1, BucketD, KeyD), diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index eb8f3f3..b4c570b 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -445,10 +445,12 @@ small_load_with2i(_Config) -> %% Delete the objects from the ChkList removing the indexes lists:foreach(fun({_RN, Obj, Spc}) -> - DSpc = lists:map(fun({add, F, T}) -> {remove, F, T} - end, + DSpc = lists:map(fun({add, F, T}) -> + {remove, F, T} + end, Spc), - {B, K} = {Obj#r_object.bucket, Obj#r_object.key}, + {B, K} = + {testutil:get_bucket(Obj), testutil:get_key(Obj)}, testutil:book_riakdelete(Bookie1, B, K, DSpc) end, ChkList1), diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index c9306fd..304b1d7 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -256,8 +256,8 @@ recovr_strategy(_Config) -> {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Book1, TestObject, TestSpec), ok = testutil:book_riakdelete(Book1, - TestObject#r_object.bucket, - TestObject#r_object.key, + testutil:get_bucket(TestObject), + testutil:get_key(TestObject), []), lists:foreach(fun({K, _SpcL}) -> diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 0f5e482..aadf13f 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -639,14 +639,14 @@ test_segfilter_query(Bookie, CLs) -> SegMapFun = fun({_RN, RiakObject, _Spc}) -> - B = RiakObject#r_object.bucket, - K = RiakObject#r_object.key, + B = testutil:get_bucket(RiakObject), + K = testutil:get_key(RiakObject), leveled_tictac:keyto_segment32(<>) end, BKMapFun = fun({_RN, RiakObject, _Spc}) -> - B = RiakObject#r_object.bucket, - K = RiakObject#r_object.key, + B = testutil:get_bucket(RiakObject), + K = testutil:get_key(RiakObject), {B, K} end, diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 98ea9af..e2918dc 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -71,6 +71,18 @@ -define(EMPTY_VTAG_BIN, <<"e">>). -define(ROOT_PATH, "test"). +-record(r_content, { + metadata, + value :: term() + }). + +-record(r_object, { + bucket, + key, + contents :: [#r_content{}], + vclock, + updatemetadata=dict:store(clean, true, dict:new()), + updatevalue :: term()}). riak_object(Bucket, Key, Value, MetaData) -> Content = #r_content{metadata=dict:from_list(MetaData), value=Value},