From 6677f2e5c6ef4c569e8f8a43223d9f4772a16bbf Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 11 Dec 2018 20:42:00 +0000 Subject: [PATCH] Push log update through to cdb/sst Using the cdb_options and sst_options records --- include/leveled.hrl | 78 +++++---------------- src/leveled_bookie.erl | 18 +++-- src/leveled_cdb.erl | 10 ++- src/leveled_log.erl | 109 ++++++++++++++++++----------- src/leveled_pclerk.erl | 46 ++++++------ src/leveled_penciller.erl | 67 ++++++++++-------- src/leveled_sst.erl | 88 ++++++++++++----------- test/end_to_end/basic_SUITE.erl | 8 +-- test/end_to_end/iterator_SUITE.erl | 8 ++- test/end_to_end/recovery_SUITE.erl | 4 +- test/end_to_end/riak_SUITE.erl | 8 +-- test/end_to_end/testutil.erl | 12 ++++ 12 files changed, 242 insertions(+), 214 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 43b1064..8106c02 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -13,24 +13,17 @@ %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). - %% Inker key type used for 'batch' objects -define(INKT_MPUT, mput). - %% Inker key type used for objects which contain no value, only key changes %% This is used currently for objects formed under a 'retain' strategy on Inker %% compaction -define(INKT_KEYD, keyd). - %% Inker key type used for tombstones -define(INKT_TOMB, tomb). -define(CACHE_TYPE, skpl). --record(sft_options, - {wait = true :: boolean(), - expire_tombstones = false :: boolean(), - penciller :: pid()}). -record(level, {level :: integer(), @@ -39,25 +32,31 @@ -record(manifest_entry, {start_key :: tuple() | undefined, - end_key :: tuple() | undefined, - owner :: pid()|list(), - filename :: string() | undefined, - bloom :: binary() | none | undefined}). + end_key :: tuple() | undefined, + owner :: pid()|list(), + filename :: string() | undefined, + bloom :: binary() | none | undefined}). -record(cdb_options, {max_size :: integer() | undefined, - file_path :: string() | undefined, - waste_path :: string() | undefined, - binary_mode = false :: boolean(), - sync_strategy = sync}). + file_path :: string() | undefined, + waste_path :: string() | undefined, + binary_mode = false :: boolean(), + sync_strategy = sync, + log_options = leveled_log:get_opts() + :: leveled_log:log_options()}). + +-record(sst_options, + {press_method = native + :: leveled_sst:press_method(), + log_options = leveled_log:get_opts() + :: leveled_log:log_options()}). -record(inker_options, {cdb_max_size :: integer() | undefined, root_path :: string() | undefined, - cdb_options :: #cdb_options{} | undefined, + cdb_options = #cdb_options{} :: #cdb_options{}, start_snapshot = false :: boolean(), - %% so a snapshot can monitor the bookie and - %% terminate when it does bookies_pid :: pid() | undefined, source_inker :: pid() | undefined, reload_strategy = [] :: list(), @@ -70,11 +69,10 @@ -record(penciller_options, {root_path :: string() | undefined, + sst_options = #sst_options{} :: #sst_options{}, max_inmemory_tablesize :: integer() | undefined, start_snapshot = false :: boolean(), snapshot_query, - %% so a snapshot can monitor the bookie and - %% terminate when it does bookies_pid :: pid() | undefined, bookies_mem :: tuple() | undefined, source_penciller :: pid() | undefined, @@ -91,43 +89,3 @@ singlefile_compactionperc :: float()|undefined, maxrunlength_compactionperc :: float()|undefined, reload_strategy = [] :: list()}). - --record(recent_aae, {filter :: whitelist|blacklist, - % the buckets list should either be a - % - whitelist - specific buckets are included, and - % entries are indexed by bucket name - % - blacklist - specific buckets are excluded, and - % all other entries are indexes using the special - % $all bucket - - buckets :: list(), - % whitelist or blacklist of buckets to support recent - % AAE - - limit_minutes :: integer(), - % how long to retain entries the temporary index for - % It will actually be retained for limit + unit minutes - % 60 minutes seems sensible - - unit_minutes :: integer(), - % What the minimum unit size will be for a query - % e.g. the minimum time duration to be used in range - % queries of the aae index - % 5 minutes seems sensible - - tree_size = small :: atom() - % Just defaulted to small for now - }). - --record(r_content, { - metadata, - value :: term() - }). - --record(r_object, { - bucket, - key, - contents :: [#r_content{}], - vclock, - updatemetadata=dict:store(clean, true, dict:new()), - updatevalue :: term()}). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 8dd8736..89067ea 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1060,11 +1060,15 @@ init([Opts]) -> {stop, no_root_path}; {undefined, _RP} -> % Start from file not snapshot - {InkerOpts, PencillerOpts} = set_options(Opts), + % Must set log level first - as log level will be fetched within + % set_options/1. Also logs can now be added to set_options/1 LogLevel = proplists:get_value(log_level, Opts), + leveled_log:set_loglevel(LogLevel), ForcedLogs = proplists:get_value(forced_logs, Opts), - leveled_log:save(LogLevel, ForcedLogs), + leveled_log:add_forcedlogs(ForcedLogs), + + {InkerOpts, PencillerOpts} = set_options(Opts), OverrideFunctions = proplists:get_value(override_functions, Opts), SetFun = @@ -1551,12 +1555,16 @@ set_options(Opts) -> compress_on_receipt = CompressOnReceipt, cdb_options = #cdb_options{max_size=MaxJournalSize, - binary_mode=true, - sync_strategy=SyncStrat}}, + binary_mode=true, + sync_strategy=SyncStrat, + log_options=leveled_log:get_opts()}}, #penciller_options{root_path = LedgerFP, max_inmemory_tablesize = PCLL0CacheSize, levelzero_cointoss = true, - compression_method = CompressionMethod}}. + sst_options = + #sst_options{press_method = CompressionMethod, + log_options=leveled_log:get_opts()}} + }. -spec return_snapfun(book_state(), store|ledger, diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 935d2aa..80f706f 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -140,7 +140,9 @@ waste_path :: string() | undefined, sync_strategy = none, timings = no_timing :: cdb_timings(), - timings_countdown = 0 :: integer()}). + timings_countdown = 0 :: integer(), + log_options = leveled_log:get_opts() + :: leveled_log:log_options()}). -record(cdb_timings, {sample_count = 0 :: integer(), sample_cyclecount = 0 :: integer(), @@ -414,9 +416,11 @@ init([Opts]) -> #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode, waste_path=Opts#cdb_options.waste_path, - sync_strategy=Opts#cdb_options.sync_strategy}}. + sync_strategy=Opts#cdb_options.sync_strategy, + log_options=Opts#cdb_options.log_options}}. starting({open_writer, Filename}, _From, State) -> + leveled_log:save(State#state.log_options), leveled_log:log("CDB01", [Filename]), {LastPosition, HashTree, LastKey} = open_active_file(Filename), {WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy), @@ -429,6 +433,7 @@ starting({open_writer, Filename}, _From, State) -> filename=Filename, hashtree=HashTree}}; starting({open_reader, Filename}, _From, State) -> + leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, false), {reply, ok, reader, State#state{handle=Handle, @@ -436,6 +441,7 @@ starting({open_reader, Filename}, _From, State) -> filename=Filename, hash_index=Index}}; starting({open_reader, Filename, LastKey}, _From, State) -> + leveled_log:save(State#state.log_options), leveled_log:log("CDB02", [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), {reply, ok, reader, State#state{handle=Handle, diff --git a/src/leveled_log.erl b/src/leveled_log.erl index d438ea2..31213fa 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -11,13 +11,22 @@ log_timer/3, log_randomtimer/4]). --export([save/1, save/2, - get_opts/0]). +-export([set_loglevel/1, + add_forcedlogs/1, + remove_forcedlogs/1, + get_opts/0, + save/1]). + + +-record(log_options, {log_level = info :: log_level(), + forced_logs = [] :: [string()]}). -type log_level() :: debug | info | warn | error | critical. --type log_levels() :: [log_level()]. --define(LOG_LEVELS, [debug, info, warn, error, critical]). +-type log_options() :: #log_options{}. +-export_type([log_options/0]). + +-define(LOG_LEVELS, [debug, info, warn, error, critical]). -define(DEFAULT_LOG_LEVEL, error). -define(LOGBASE, [ @@ -380,29 +389,65 @@ {warn, "Error ~w caught when safe reading a file to length ~w"}} ]). --record(log_options, - {log_level = info :: leveled_log:log_levels(), - forced_logs = [] :: [string()]}). -save(LogLevel, ForcedLogs) when is_list(ForcedLogs), is_atom(LogLevel) -> - save(#log_options{log_level = LogLevel, - forced_logs = ForcedLogs}). +%%%============================================================================ +%%% Manage Log Options +%%%============================================================================ +-spec set_loglevel(log_level()) -> ok. +%% @doc +%% Set the log level for this PID +set_loglevel(LogLevel) when is_atom(LogLevel) -> + LO = get_opts(), + UpdLO = LO#log_options{log_level = LogLevel}, + save(UpdLO). + +-spec add_forcedlogs(list(string())) -> ok. +%% @doc +%% Add a forced log to the list of forced logs. this will cause the log of this +%% logReference to be logged even if the log_level of the process would not +%% otherwise require it to be logged - e.g. to fire an 'INFO' log when running +%% at an 'ERROR' log level. +add_forcedlogs(LogRefs) -> + LO = get_opts(), + ForcedLogs = LO#log_options.forced_logs, + UpdLO = LO#log_options{forced_logs = lists:usort(LogRefs ++ ForcedLogs)}, + save(UpdLO). + +-spec remove_forcedlogs(list(string())) -> ok. +%% @doc +%% Remove a forced log from the list of forced logs +remove_forcedlogs(LogRefs) -> + LO = get_opts(), + ForcedLogs = LO#log_options.forced_logs, + UpdLO = LO#log_options{forced_logs = lists:subtract(ForcedLogs, LogRefs)}, + save(UpdLO). + +-spec save(log_options()) -> ok. +%% @doc +%% Save the log options to the process dictionary save(#log_options{} = LO) -> put('$leveled_log_options', LO), ok. +-spec get_opts() -> log_options(). +%% @doc +%% Retrieve the log options from the process dictionary if present. get_opts() -> case get('$leveled_log_options') of - undefined -> - LogLevel = application:get_env(leveled, log_level, ?DEFAULT_LOG_LEVEL), - ForcedLogs = application:get_env(leveled, forced_logs, []), - #log_options{log_level = LogLevel, - forced_logs = ForcedLogs}; #log_options{} = LO -> - LO + LO; + _ -> + #log_options{log_level = ?DEFAULT_LOG_LEVEL, + forced_logs = []} end. + +%%%============================================================================ +%%% Prompt Logs +%%%============================================================================ + + log(LogReference, Subs) -> log(LogReference, Subs, ?LOG_LEVELS). @@ -430,7 +475,6 @@ should_i_log(LogLevel, Levels, LogRef) -> false -> if CurLevel == LogLevel -> true; - true; true -> is_active_level(Levels, CurLevel, LogLevel) end @@ -509,35 +553,16 @@ log_warn_test() -> ok = log_timer("G8888", [], os:timestamp(), [info, warn, error]). shouldilog_test() -> - % What if an unsupported option is set for the log level - % don't log - ok = application:set_env(leveled, log_level, unsupported), - ?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0001")), - ?assertMatch(false, should_i_log(inform, ?LOG_LEVELS, "G0001")), - ok = application:set_env(leveled, log_level, debug), + ok = set_loglevel(debug), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), - ok = application:set_env(leveled, log_level, info), + ok = set_loglevel(info), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), - ok = application:set_env(leveled, forced_logs, ["G0001"]), - ok = application:set_env(leveled, log_level, error), + ok = add_forcedlogs(["G0001"]), + ok = set_loglevel(error), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), ?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0002")), - ok = application:set_env(leveled, forced_logs, []), - ok = application:set_env(leveled, log_level, info), - ?assertMatch(false, should_i_log(debug, ?LOG_LEVELS, "D0001")). - -shouldilog2_test() -> - ok = save(unsupported, []), - ?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0001")), - ?assertMatch(false, should_i_log(inform, ?LOG_LEVELS, "G0001")), - ok = save(debug, []), - ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), - ok = save(info, []), - ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), - ok = save(error, ["G0001"]), - ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), - ?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0002")), - ok = save(info, []), + ok = remove_forcedlogs(["G0001"]), + ok = set_loglevel(info), ?assertMatch(false, should_i_log(debug, ?LOG_LEVELS, "D0001")). -endif. diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 9500664..ec2482b 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -50,18 +50,18 @@ -record(state, {owner :: pid() | undefined, root_path :: string() | undefined, pending_deletions = dict:new(), % OTP 16 does not like type - compression_method = native :: lz4|native + sst_options :: #sst_options{} }). %%%============================================================================ %%% API %%%============================================================================ -clerk_new(Owner, Manifest, CompressionMethod) -> +clerk_new(Owner, Manifest, OptsSST) -> {ok, Pid} = gen_server:start_link(?MODULE, [leveled_log:get_opts(), - {compression_method, CompressionMethod}], + {sst_options, OptsSST}], []), ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity), leveled_log:log("PC001", [Pid, Owner]), @@ -83,9 +83,9 @@ clerk_close(Pid) -> %%% gen_server callbacks %%%============================================================================ -init([LogOpts, {compression_method, CompressionMethod}]) -> +init([LogOpts, {sst_options, OptsSST}]) -> leveled_log:save(LogOpts), - {ok, #state{compression_method = CompressionMethod}}. + {ok, #state{sst_options = OptsSST}}. handle_call({load, Owner, RootPath}, _From, State) -> {reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT}; @@ -127,7 +127,7 @@ handle_work({SrcLevel, Manifest}, State) -> {UpdManifest, EntriesToDelete} = merge(SrcLevel, Manifest, State#state.root_path, - State#state.compression_method), + State#state.sst_options), leveled_log:log("PC007", []), SWMC = os:timestamp(), ok = leveled_penciller:pcl_manifestchange(State#state.owner, @@ -139,7 +139,7 @@ handle_work({SrcLevel, Manifest}, State) -> leveled_log:log_timer("PC018", [], SWSM), {leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}. -merge(SrcLevel, Manifest, RootPath, CompressionMethod) -> +merge(SrcLevel, Manifest, RootPath, OptsSST) -> Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel), NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1, SinkList = leveled_pmanifest:merge_lookup(Manifest, @@ -161,7 +161,7 @@ merge(SrcLevel, Manifest, RootPath, CompressionMethod) -> SST_RP = leveled_penciller:sst_rootpath(RootPath), perform_merge(Manifest, Src, SinkList, SrcLevel, - SST_RP, NewSQN, CompressionMethod) + SST_RP, NewSQN, OptsSST) end. notify_deletions([], _Penciller) -> @@ -179,7 +179,7 @@ notify_deletions([Head|Tail], Penciller) -> perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, - CompressionMethod) -> + OptsSST) -> leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), SrcList = [{next, Src, all}], MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), @@ -189,7 +189,7 @@ perform_merge(Manifest, do_merge(SrcList, SinkList, SinkLevel, SinkBasement, RootPath, NewSQN, MaxSQN, - CompressionMethod, + OptsSST, []), RevertPointerFun = fun({next, ME, _SK}) -> @@ -207,23 +207,24 @@ perform_merge(Manifest, Src), {Man2, [Src|SinkManifestList]}. -do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _CM, Additions) -> +do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) -> leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), Additions; -do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) -> +do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) -> FileName = leveled_penciller:sst_filename(NewSQN, SinkLevel, length(Additions)), leveled_log:log("PC012", [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), case leveled_sst:sst_new(RP, FileName, - KL1, KL2, SinkB, SinkLevel, MaxSQN, CM) of + KL1, KL2, SinkB, SinkLevel, MaxSQN, + OptsSST) of empty -> leveled_log:log("PC013", [FileName]), do_merge([], [], SinkLevel, SinkB, RP, NewSQN, MaxSQN, - CM, + OptsSST, Additions); {ok, Pid, Reply, Bloom} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, @@ -236,7 +237,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) -> do_merge(KL1Rem, KL2Rem, SinkLevel, SinkB, RP, NewSQN, MaxSQN, - CM, + OptsSST, Additions ++ [Entry]) end. @@ -288,7 +289,7 @@ merge_file_test() -> 1, KL1_L1, 999999, - native), + #sst_options{}), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), {ok, PidL2_1, _, _} = leveled_sst:sst_new("../test/", @@ -296,7 +297,7 @@ merge_file_test() -> 2, KL1_L2, 999999, - native), + #sst_options{}), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), {ok, PidL2_2, _, _} = leveled_sst:sst_new("../test/", @@ -304,7 +305,7 @@ merge_file_test() -> 2, KL2_L2, 999999, - lz4), + #sst_options{press_method = lz4}), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), {ok, PidL2_3, _, _} = leveled_sst:sst_new("../test/", @@ -312,7 +313,7 @@ merge_file_test() -> 2, KL3_L2, 999999, - lz4), + #sst_options{press_method = lz4}), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), {ok, PidL2_4, _, _} = leveled_sst:sst_new("../test/", @@ -320,7 +321,7 @@ merge_file_test() -> 2, KL4_L2, 999999, - lz4), + #sst_options{press_method = lz4}), E1 = #manifest_entry{owner = PidL1_1, filename = "./KL1_L1.sst", @@ -353,11 +354,12 @@ merge_file_test() -> PointerList = lists:map(fun(ME) -> {next, ME, all} end, [E2, E3, E4, E5]), {Man6, _Dels} = - perform_merge(Man5, E1, PointerList, 1, "../test", 3, native), + perform_merge(Man5, E1, PointerList, 1, "../test", 3, #sst_options{}), ?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)). coverage_cheat_test() -> - {ok, _State1} = code_change(null, #state{}, null). + {ok, _State1} = + code_change(null, #state{sst_options=#sst_options{}}, null). -endif. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 38867d2..607b5a1 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -258,7 +258,7 @@ is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid() | undefined, - bookie_monref :: reference() | undefined, + bookie_monref :: reference() | undefined, levelzero_astree :: list() | undefined, work_ongoing = false :: boolean(), % i.e. compaction work @@ -267,7 +267,7 @@ timings = no_timing :: pcl_timings(), timings_countdown = 0 :: integer(), - compression_method = native :: lz4|native}). + sst_options = #sst_options{} :: #sst_options{}}). -record(pcl_timings, {sample_count = 0 :: integer(), @@ -1048,9 +1048,9 @@ sst_filename(ManSQN, Level, Count) -> start_from_file(PCLopts) -> RootPath = PCLopts#penciller_options.root_path, MaxTableSize = PCLopts#penciller_options.max_inmemory_tablesize, - PressMethod = PCLopts#penciller_options.compression_method, + OptsSST = PCLopts#penciller_options.sst_options, - {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, PressMethod), + {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, OptsSST), CoinToss = PCLopts#penciller_options.levelzero_cointoss, % Used to randomly defer the writing of L0 file. Intended to help with @@ -1062,14 +1062,14 @@ start_from_file(PCLopts) -> levelzero_maxcachesize = MaxTableSize, levelzero_cointoss = CoinToss, levelzero_index = leveled_pmem:new_index(), - compression_method = PressMethod}, + sst_options = OptsSST}, %% Open manifest Manifest0 = leveled_pmanifest:open_manifest(RootPath), OpenFun = fun(FN) -> {ok, Pid, {_FK, _LK}, Bloom} = - leveled_sst:sst_open(sst_rootpath(RootPath), FN), + leveled_sst:sst_open(sst_rootpath(RootPath), FN, OptsSST), {Pid, Bloom} end, SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, @@ -1084,7 +1084,9 @@ start_from_file(PCLopts) -> case filelib:is_file(filename:join(sst_rootpath(RootPath), L0FN)) of true -> leveled_log:log("P0015", [L0FN]), - L0Open = leveled_sst:sst_open(sst_rootpath(RootPath), L0FN), + L0Open = leveled_sst:sst_open(sst_rootpath(RootPath), + L0FN, + OptsSST), {ok, L0Pid, {L0StartKey, L0EndKey}, Bloom} = L0Open, L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), L0Entry = #manifest_entry{start_key = L0StartKey, @@ -1092,10 +1094,11 @@ start_from_file(PCLopts) -> filename = L0FN, owner = L0Pid, bloom = Bloom}, - Manifest2 = leveled_pmanifest:insert_manifest_entry(Manifest1, - ManSQN + 1, - 0, - L0Entry), + Manifest2 = + leveled_pmanifest:insert_manifest_entry(Manifest1, + ManSQN + 1, + 0, + L0Entry), leveled_log:log("P0016", [L0SQN]), LedgerSQN = max(MaxSQN, L0SQN), {InitState#state{manifest = Manifest2, @@ -1256,7 +1259,7 @@ roll_memory(State, false) -> FetchFun, PCL, State#state.ledger_sqn, - State#state.compression_method), + State#state.sst_options), {ok, Constructor, _} = R, {Constructor, none}; roll_memory(State, true) -> @@ -1271,7 +1274,7 @@ roll_memory(State, true) -> 0, KVList, State#state.ledger_sqn, - State#state.compression_method), + State#state.sst_options), {ok, Constructor, _, Bloom} = R, {Constructor, Bloom}. @@ -1905,9 +1908,10 @@ shutdown_when_compact(Pid) -> simple_server_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), - {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000, - compression_method=native}), + {ok, PCL} = + pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000, + sst_options=#sst_options{}}), Key1_Pre = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, Key1 = add_missing_hash(Key1_Pre), @@ -1948,9 +1952,10 @@ simple_server_test() -> ok = shutdown_when_compact(PCL), - {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000, - compression_method=native}), + {ok, PCLr} = + pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000, + sst_options=#sst_options{}}), ?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)), % ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), true = pcl_checkbloomtest(PCLr, {o,"Bucket0001", "Key0001", null}), @@ -2214,15 +2219,14 @@ create_file_test() -> KVL = lists:usort(generate_randomkeys({50000, 0})), Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE), FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, - {ok, - SP, - noreply} = leveled_sst:sst_newlevelzero(RP, - Filename, - 1, - FetchFun, - undefined, - 50000, - native), + {ok, SP, noreply} = + leveled_sst:sst_newlevelzero(RP, + Filename, + 1, + FetchFun, + undefined, + 50000, + #sst_options{press_method = native}), lists:foreach(fun(X) -> case checkready(SP) of timeout -> @@ -2273,9 +2277,10 @@ coverage_cheat_test() -> handle_down_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), - {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000, - compression_method=native}), + {ok, PCLr} = + pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000, + sst_options=#sst_options{}}), FakeBookie = spawn(fun loop/0), Mon = erlang:monitor(process, FakeBookie), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index d7b4be1..8c15254 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -111,7 +111,7 @@ -export([sst_new/6, sst_new/8, sst_newlevelzero/7, - sst_open/2, + sst_open/3, sst_get/2, sst_get/3, sst_expandpointer/5, @@ -164,6 +164,8 @@ :: false| {sets, sets:set(non_neg_integer())}| {list, list(non_neg_integer())}. +-type sst_options() + :: #sst_options{}. %% yield_blockquery is used to detemrine if the work necessary to process a %% range query beyond the fetching the slot should be managed from within @@ -210,13 +212,13 @@ -type sst_timings() :: no_timing|#sst_timings{}. -type build_timings() :: no_timing|#build_timings{}. --export_type([expandable_pointer/0]). +-export_type([expandable_pointer/0, press_method/0]). %%%============================================================================ %%% API %%%============================================================================ --spec sst_open(string(), string()) +-spec sst_open(string(), string(), sst_options()) -> {ok, pid(), {leveled_codec:ledger_key(), leveled_codec:ledger_key()}, binary()}. @@ -228,10 +230,10 @@ %% term order. %% %% The filename should include the file extension. -sst_open(RootPath, Filename) -> +sst_open(RootPath, Filename, OptsSST) -> {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), case gen_fsm:sync_send_event(Pid, - {sst_open, RootPath, Filename}, + {sst_open, RootPath, Filename, OptsSST}, infinity) of {ok, {SK, EK}, Bloom} -> {ok, Pid, {SK, EK}, Bloom} @@ -239,7 +241,7 @@ sst_open(RootPath, Filename) -> -spec sst_new(string(), string(), integer(), list(leveled_codec:ledger_kv()), - integer(), press_method()) + integer(), sst_options()) -> {ok, pid(), {leveled_codec:ledger_key(), leveled_codec:ledger_key()}, binary()}. @@ -247,14 +249,14 @@ sst_open(RootPath, Filename) -> %% Start a new SST file at the assigned level passing in a list of Key, Value %% pairs. This should not be used for basement levels or unexpanded Key/Value %% lists as merge_lists will not be called. -sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> - sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, - ?INDEX_MODDATE). +sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST) -> + sst_new(RootPath, Filename, Level, + KVList, MaxSQN, OptsSST, ?INDEX_MODDATE). -sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, - IndexModDate) -> +sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), - PressMethod0 = compress_level(Level, PressMethod), + PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), + OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {[], [], SlotList, FK} = merge_lists(KVList, PressMethod0, IndexModDate), case gen_fsm:sync_send_event(Pid, @@ -264,7 +266,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, Level, {SlotList, FK}, MaxSQN, - PressMethod0, + OptsSST0, IndexModDate}, infinity) of {ok, {SK, EK}, Bloom} -> @@ -275,7 +277,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, list(leveled_codec:ledger_kv()|sst_pointer()), list(leveled_codec:ledger_kv()|sst_pointer()), boolean(), integer(), - integer(), press_method()) + integer(), sst_options()) -> empty|{ok, pid(), {{list(leveled_codec:ledger_kv()), list(leveled_codec:ledger_kv())}, @@ -295,15 +297,16 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, %% file is not added to the manifest. sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, - MaxSQN, PressMethod) -> + MaxSQN, OptsSST) -> sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, - MaxSQN, PressMethod, ?INDEX_MODDATE). + MaxSQN, OptsSST, ?INDEX_MODDATE). sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, - MaxSQN, PressMethod, IndexModDate) -> - PressMethod0 = compress_level(Level, PressMethod), + MaxSQN, OptsSST, IndexModDate) -> + PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), + OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {Rem1, Rem2, SlotList, FK} = merge_lists(KVL1, KVL2, {IsBasement, Level}, PressMethod0, IndexModDate), @@ -319,7 +322,7 @@ sst_new(RootPath, Filename, Level, {SlotList, FK}, MaxSQN, - PressMethod0, + OptsSST0, IndexModDate}, infinity) of {ok, {SK, EK}, Bloom} -> @@ -329,7 +332,7 @@ sst_new(RootPath, Filename, -spec sst_newlevelzero(string(), string(), integer(), fun(), pid()|undefined, integer(), - press_method()) -> + sst_options()) -> {ok, pid(), noreply}. %% @doc %% Start a new file at level zero. At this level the file size is not fixed - @@ -337,8 +340,9 @@ sst_new(RootPath, Filename, %% fetched slot by slot using the FetchFun sst_newlevelzero(RootPath, Filename, Slots, FetchFun, Penciller, - MaxSQN, PressMethod) -> - PressMethod0 = compress_level(0, PressMethod), + MaxSQN, OptsSST) -> + PressMethod0 = compress_level(0, OptsSST#sst_options.press_method), + OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {ok, Pid} = gen_fsm:start_link(?MODULE, [], []), gen_fsm:send_event(Pid, {sst_newlevelzero, @@ -348,7 +352,7 @@ sst_newlevelzero(RootPath, Filename, FetchFun, Penciller, MaxSQN, - PressMethod0, + OptsSST0, ?INDEX_MODDATE}), {ok, Pid, noreply}. @@ -448,7 +452,8 @@ sst_printtimings(Pid) -> init([]) -> {ok, starting, #state{}}. -starting({sst_open, RootPath, Filename}, _From, State) -> +starting({sst_open, RootPath, Filename, OptsSST}, _From, State) -> + leveled_log:save(OptsSST#sst_options.log_options), {UpdState, Bloom} = read_file(Filename, State#state{root_path=RootPath}), Summary = UpdState#state.summary, @@ -459,8 +464,10 @@ starting({sst_open, RootPath, Filename}, _From, State) -> starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN, - PressMethod, IdxModDate}, _From, State) -> + OptsSST, IdxModDate}, _From, State) -> SW = os:timestamp(), + leveled_log:save(OptsSST#sst_options.log_options), + PressMethod = OptsSST#sst_options.press_method, {Length, SlotIndex, BlockIndex, SlotsBin, Bloom} = build_all_slots(SlotList), SummaryBin = @@ -483,8 +490,10 @@ starting({sst_new, starting({sst_newlevelzero, RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN, - PressMethod, IdxModDate}, State) -> + OptsSST, IdxModDate}, State) -> SW0 = os:timestamp(), + leveled_log:save(OptsSST#sst_options.log_options), + PressMethod = OptsSST#sst_options.press_method, KVList = leveled_pmem:to_list(Slots, FetchFun), Time0 = timer:now_diff(os:timestamp(), SW0), @@ -2472,12 +2481,18 @@ update_timings(SW, Timings, Stage, Continue) -> -ifdef(TEST). testsst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> - sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, false). + OptsSST = + #sst_options{press_method=PressMethod, + log_options=leveled_log:get_opts()}, + sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, false). testsst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN, PressMethod) -> + OptsSST = + #sst_options{press_method=PressMethod, + log_options=leveled_log:get_opts()}, sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN, - PressMethod, false). + OptsSST, false). generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Seqn, @@ -2816,8 +2831,7 @@ test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> merge_test() -> - merge_tester(fun testsst_new/6, fun testsst_new/8), - merge_tester(fun sst_new/6, fun sst_new/8). + merge_tester(fun testsst_new/6, fun testsst_new/8). merge_tester(NewFunS, NewFunM) -> @@ -2870,8 +2884,7 @@ merge_tester(NewFunS, NewFunM) -> simple_persisted_range_test() -> - simple_persisted_range_tester(fun testsst_new/6), - simple_persisted_range_tester(fun sst_new/6). + simple_persisted_range_tester(fun testsst_new/6). simple_persisted_range_tester(SSTNewFun) -> {RP, Filename} = {"../test/", "simple_test"}, @@ -2913,8 +2926,7 @@ simple_persisted_range_tester(SSTNewFun) -> simple_persisted_rangesegfilter_test() -> - simple_persisted_rangesegfilter_tester(fun testsst_new/6), - simple_persisted_rangesegfilter_tester(fun sst_new/6). + simple_persisted_rangesegfilter_tester(fun testsst_new/6). simple_persisted_rangesegfilter_tester(SSTNewFun) -> {RP, Filename} = {"../test/", "range_segfilter_test"}, @@ -3009,7 +3021,7 @@ additional_range_test() -> lists:seq(?NOLOOK_SLOTSIZE + Gap + 1, 2 * ?NOLOOK_SLOTSIZE + Gap)), {ok, P1, {{Rem1, Rem2}, SK, EK}, _Bloom1} = - sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native), + testsst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native), ?assertMatch([], Rem1), ?assertMatch([], Rem2), ?assertMatch(SK, element(1, lists:nth(1, IK1))), @@ -3060,8 +3072,7 @@ additional_range_test() -> simple_persisted_slotsize_test() -> - simple_persisted_slotsize_tester(fun testsst_new/6), - simple_persisted_slotsize_tester(fun sst_new/6). + simple_persisted_slotsize_tester(fun testsst_new/6). simple_persisted_slotsize_tester(SSTNewFun) -> @@ -3084,8 +3095,7 @@ simple_persisted_test_() -> {timeout, 60, fun simple_persisted_test_bothformats/0}. simple_persisted_test_bothformats() -> - simple_persisted_tester(fun testsst_new/6), - simple_persisted_tester(fun sst_new/6). + simple_persisted_tester(fun testsst_new/6). simple_persisted_tester(SSTNewFun) -> {RP, Filename} = {"../test/", "simple_test"}, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index fdde0c4..0a3b2b1 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -197,8 +197,8 @@ journal_compaction_tester(Restart, WRP) -> ObjListD = testutil:generate_objects(10000, 2), lists:foreach(fun({_R, O, _S}) -> testutil:book_riakdelete(Bookie0, - O#r_object.bucket, - O#r_object.key, + testutil:get_bucket(O), + testutil:get_key(O), []) end, ObjListD), @@ -577,8 +577,8 @@ load_and_count_withdelete(_Config) -> 0, lists:seq(1, 20)), testutil:check_forobject(Bookie1, TestObject), - {BucketD, KeyD} = {TestObject#r_object.bucket, - TestObject#r_object.key}, + {BucketD, KeyD} = + {testutil:get_bucket(TestObject), testutil:get_key(TestObject)}, {_, 1} = testutil:check_bucket_stats(Bookie1, BucketD), ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []), not_found = testutil:book_riakget(Bookie1, BucketD, KeyD), diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index eb8f3f3..b4c570b 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -445,10 +445,12 @@ small_load_with2i(_Config) -> %% Delete the objects from the ChkList removing the indexes lists:foreach(fun({_RN, Obj, Spc}) -> - DSpc = lists:map(fun({add, F, T}) -> {remove, F, T} - end, + DSpc = lists:map(fun({add, F, T}) -> + {remove, F, T} + end, Spc), - {B, K} = {Obj#r_object.bucket, Obj#r_object.key}, + {B, K} = + {testutil:get_bucket(Obj), testutil:get_key(Obj)}, testutil:book_riakdelete(Bookie1, B, K, DSpc) end, ChkList1), diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index c9306fd..304b1d7 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -256,8 +256,8 @@ recovr_strategy(_Config) -> {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Book1, TestObject, TestSpec), ok = testutil:book_riakdelete(Book1, - TestObject#r_object.bucket, - TestObject#r_object.key, + testutil:get_bucket(TestObject), + testutil:get_key(TestObject), []), lists:foreach(fun({K, _SpcL}) -> diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 0f5e482..aadf13f 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -639,14 +639,14 @@ test_segfilter_query(Bookie, CLs) -> SegMapFun = fun({_RN, RiakObject, _Spc}) -> - B = RiakObject#r_object.bucket, - K = RiakObject#r_object.key, + B = testutil:get_bucket(RiakObject), + K = testutil:get_key(RiakObject), leveled_tictac:keyto_segment32(<>) end, BKMapFun = fun({_RN, RiakObject, _Spc}) -> - B = RiakObject#r_object.bucket, - K = RiakObject#r_object.key, + B = testutil:get_bucket(RiakObject), + K = testutil:get_key(RiakObject), {B, K} end, diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 98ea9af..e2918dc 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -71,6 +71,18 @@ -define(EMPTY_VTAG_BIN, <<"e">>). -define(ROOT_PATH, "test"). +-record(r_content, { + metadata, + value :: term() + }). + +-record(r_object, { + bucket, + key, + contents :: [#r_content{}], + vclock, + updatemetadata=dict:store(clean, true, dict:new()), + updatevalue :: term()}). riak_object(Bucket, Key, Value, MetaData) -> Content = #r_content{metadata=dict:from_list(MetaData), value=Value},