Merge pull request #232 from martinsumner/mas-pr231-review

Mas pr231 review
This commit is contained in:
Martin Sumner 2018-12-14 10:01:16 +00:00 committed by GitHub
commit b344b7d827
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 547 additions and 233 deletions

View file

@ -13,24 +13,17 @@
%% Inker key type used for 'normal' objects %% Inker key type used for 'normal' objects
-define(INKT_STND, stnd). -define(INKT_STND, stnd).
%% Inker key type used for 'batch' objects %% Inker key type used for 'batch' objects
-define(INKT_MPUT, mput). -define(INKT_MPUT, mput).
%% Inker key type used for objects which contain no value, only key changes %% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy on Inker %% This is used currently for objects formed under a 'retain' strategy on Inker
%% compaction %% compaction
-define(INKT_KEYD, keyd). -define(INKT_KEYD, keyd).
%% Inker key type used for tombstones %% Inker key type used for tombstones
-define(INKT_TOMB, tomb). -define(INKT_TOMB, tomb).
-define(CACHE_TYPE, skpl). -define(CACHE_TYPE, skpl).
-record(sft_options,
{wait = true :: boolean(),
expire_tombstones = false :: boolean(),
penciller :: pid()}).
-record(level, -record(level,
{level :: integer(), {level :: integer(),
@ -39,25 +32,31 @@
-record(manifest_entry, -record(manifest_entry,
{start_key :: tuple() | undefined, {start_key :: tuple() | undefined,
end_key :: tuple() | undefined, end_key :: tuple() | undefined,
owner :: pid()|list(), owner :: pid()|list(),
filename :: string() | undefined, filename :: string() | undefined,
bloom :: binary() | none | undefined}). bloom :: binary() | none | undefined}).
-record(cdb_options, -record(cdb_options,
{max_size :: integer() | undefined, {max_size :: integer() | undefined,
file_path :: string() | undefined, file_path :: string() | undefined,
waste_path :: string() | undefined, waste_path :: string() | undefined,
binary_mode = false :: boolean(), binary_mode = false :: boolean(),
sync_strategy = sync}). sync_strategy = sync,
log_options = leveled_log:get_opts()
:: leveled_log:log_options()}).
-record(sst_options,
{press_method = native
:: leveled_sst:press_method(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options()}).
-record(inker_options, -record(inker_options,
{cdb_max_size :: integer() | undefined, {cdb_max_size :: integer() | undefined,
root_path :: string() | undefined, root_path :: string() | undefined,
cdb_options :: #cdb_options{} | undefined, cdb_options = #cdb_options{} :: #cdb_options{},
start_snapshot = false :: boolean(), start_snapshot = false :: boolean(),
%% so a snapshot can monitor the bookie and
%% terminate when it does
bookies_pid :: pid() | undefined, bookies_pid :: pid() | undefined,
source_inker :: pid() | undefined, source_inker :: pid() | undefined,
reload_strategy = [] :: list(), reload_strategy = [] :: list(),
@ -70,11 +69,10 @@
-record(penciller_options, -record(penciller_options,
{root_path :: string() | undefined, {root_path :: string() | undefined,
sst_options = #sst_options{} :: #sst_options{},
max_inmemory_tablesize :: integer() | undefined, max_inmemory_tablesize :: integer() | undefined,
start_snapshot = false :: boolean(), start_snapshot = false :: boolean(),
snapshot_query, snapshot_query,
%% so a snapshot can monitor the bookie and
%% terminate when it does
bookies_pid :: pid() | undefined, bookies_pid :: pid() | undefined,
bookies_mem :: tuple() | undefined, bookies_mem :: tuple() | undefined,
source_penciller :: pid() | undefined, source_penciller :: pid() | undefined,
@ -91,43 +89,3 @@
singlefile_compactionperc :: float()|undefined, singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined, maxrunlength_compactionperc :: float()|undefined,
reload_strategy = [] :: list()}). reload_strategy = [] :: list()}).
-record(recent_aae, {filter :: whitelist|blacklist,
% the buckets list should either be a
% - whitelist - specific buckets are included, and
% entries are indexed by bucket name
% - blacklist - specific buckets are excluded, and
% all other entries are indexes using the special
% $all bucket
buckets :: list(),
% whitelist or blacklist of buckets to support recent
% AAE
limit_minutes :: integer(),
% how long to retain entries the temporary index for
% It will actually be retained for limit + unit minutes
% 60 minutes seems sensible
unit_minutes :: integer(),
% What the minimum unit size will be for a query
% e.g. the minimum time duration to be used in range
% queries of the aae index
% 5 minutes seems sensible
tree_size = small :: atom()
% Just defaulted to small for now
}).
-record(r_content, {
metadata,
value :: term()
}).
-record(r_object, {
bucket,
key,
contents :: [#r_content{}],
vclock,
updatemetadata=dict:store(clean, true, dict:new()),
updatevalue :: term()}).

View file

@ -67,7 +67,11 @@
book_hotbackup/1, book_hotbackup/1,
book_close/1, book_close/1,
book_destroy/1, book_destroy/1,
book_isempty/2]). book_isempty/2,
book_logsettings/1,
book_loglevel/2,
book_addlogs/2,
book_removelogs/2]).
%% folding API %% folding API
-export([ -export([
@ -1047,6 +1051,31 @@ book_isempty(Pid, Tag) ->
{async, Runner} = book_bucketlist(Pid, Tag, FoldAccT, first), {async, Runner} = book_bucketlist(Pid, Tag, FoldAccT, first),
Runner(). Runner().
-spec book_logsettings(pid()) -> {leveled_log:log_level(), list(string())}.
%% @doc
%% Retrieve the current log settings
book_logsettings(Pid) ->
gen_server:call(Pid, log_settings, infinity).
-spec book_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
%% Change the log level of the store
book_loglevel(Pid, LogLevel) ->
gen_server:cast(Pid, {log_level, LogLevel}).
-spec book_addlogs(pid(), list(string())) -> ok.
%% @doc
%% Add to the list of forced logs, a list of more forced logs
book_addlogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {add_logs, ForcedLogs}).
-spec book_removelogs(pid(), list(string())) -> ok.
%% @doc
%% Remove from the list of forced logs, a list of forced logs
book_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
%%%============================================================================ %%%============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================ %%%============================================================================
@ -1060,12 +1089,15 @@ init([Opts]) ->
{stop, no_root_path}; {stop, no_root_path};
{undefined, _RP} -> {undefined, _RP} ->
% Start from file not snapshot % Start from file not snapshot
{InkerOpts, PencillerOpts} = set_options(Opts),
% Must set log level first - as log level will be fetched within
% set_options/1. Also logs can now be added to set_options/1
LogLevel = proplists:get_value(log_level, Opts), LogLevel = proplists:get_value(log_level, Opts),
ok = application:set_env(leveled, log_level, LogLevel), leveled_log:set_loglevel(LogLevel),
ForcedLogs = proplists:get_value(forced_logs, Opts), ForcedLogs = proplists:get_value(forced_logs, Opts),
ok = application:set_env(leveled, forced_logs, ForcedLogs), leveled_log:add_forcedlogs(ForcedLogs),
{InkerOpts, PencillerOpts} = set_options(Opts),
OverrideFunctions = proplists:get_value(override_functions, Opts), OverrideFunctions = proplists:get_value(override_functions, Opts),
SetFun = SetFun =
@ -1299,6 +1331,8 @@ handle_call({snapshot, SnapType, Query, LongRunning}, _From, State) ->
% e.g. many minutes) % e.g. many minutes)
Reply = snapshot_store(State, SnapType, Query, LongRunning), Reply = snapshot_store(State, SnapType, Query, LongRunning),
{reply, Reply, State}; {reply, Reply, State};
handle_call(log_settings, _From, State) ->
{reply, leveled_log:return_settings(), State};
handle_call({return_runner, QueryType}, _From, State) -> handle_call({return_runner, QueryType}, _From, State) ->
SW = os:timestamp(), SW = os:timestamp(),
Runner = get_runner(State, QueryType), Runner = get_runner(State, QueryType),
@ -1349,9 +1383,30 @@ handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false ->
handle_call(Msg, _From, State) -> handle_call(Msg, _From, State) ->
{reply, {unsupported_message, element(1, Msg)}, State}. {reply, {unsupported_message, element(1, Msg)}, State}.
handle_cast(_Msg, State) ->
handle_cast({log_level, LogLevel}, State) ->
PCL = State#state.penciller,
INK = State#state.inker,
ok = leveled_penciller:pcl_loglevel(PCL, LogLevel),
ok = leveled_inker:ink_loglevel(INK, LogLevel),
ok = leveled_log:set_loglevel(LogLevel),
{noreply, State};
handle_cast({add_logs, ForcedLogs}, State) ->
PCL = State#state.penciller,
INK = State#state.inker,
ok = leveled_penciller:pcl_addlogs(PCL, ForcedLogs),
ok = leveled_inker:ink_addlogs(INK, ForcedLogs),
ok = leveled_log:add_forcedlogs(ForcedLogs),
{noreply, State};
handle_cast({remove_logs, ForcedLogs}, State) ->
PCL = State#state.penciller,
INK = State#state.inker,
ok = leveled_penciller:pcl_removelogs(PCL, ForcedLogs),
ok = leveled_inker:ink_removelogs(INK, ForcedLogs),
ok = leveled_log:remove_forcedlogs(ForcedLogs),
{noreply, State}. {noreply, State}.
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
@ -1552,12 +1607,16 @@ set_options(Opts) ->
compress_on_receipt = CompressOnReceipt, compress_on_receipt = CompressOnReceipt,
cdb_options = cdb_options =
#cdb_options{max_size=MaxJournalSize, #cdb_options{max_size=MaxJournalSize,
binary_mode=true, binary_mode=true,
sync_strategy=SyncStrat}}, sync_strategy=SyncStrat,
log_options=leveled_log:get_opts()}},
#penciller_options{root_path = LedgerFP, #penciller_options{root_path = LedgerFP,
max_inmemory_tablesize = PCLL0CacheSize, max_inmemory_tablesize = PCLL0CacheSize,
levelzero_cointoss = true, levelzero_cointoss = true,
compression_method = CompressionMethod}}. sst_options =
#sst_options{press_method = CompressionMethod,
log_options=leveled_log:get_opts()}}
}.
-spec return_snapfun(book_state(), store|ledger, -spec return_snapfun(book_state(), store|ledger,
@ -2872,8 +2931,7 @@ longrunning_test() ->
coverage_cheat_test() -> coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}), {noreply, _State0} = handle_info(timeout, #state{}),
{ok, _State1} = code_change(null, #state{}, null), {ok, _State1} = code_change(null, #state{}, null).
{noreply, _State2} = handle_cast(null, #state{}).
erase_journal_test() -> erase_journal_test() ->
RootPath = reset_filestructure(), RootPath = reset_filestructure(),

View file

@ -140,7 +140,9 @@
waste_path :: string() | undefined, waste_path :: string() | undefined,
sync_strategy = none, sync_strategy = none,
timings = no_timing :: cdb_timings(), timings = no_timing :: cdb_timings(),
timings_countdown = 0 :: integer()}). timings_countdown = 0 :: integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options()}).
-record(cdb_timings, {sample_count = 0 :: integer(), -record(cdb_timings, {sample_count = 0 :: integer(),
sample_cyclecount = 0 :: integer(), sample_cyclecount = 0 :: integer(),
@ -414,9 +416,11 @@ init([Opts]) ->
#state{max_size=MaxSize, #state{max_size=MaxSize,
binary_mode=Opts#cdb_options.binary_mode, binary_mode=Opts#cdb_options.binary_mode,
waste_path=Opts#cdb_options.waste_path, waste_path=Opts#cdb_options.waste_path,
sync_strategy=Opts#cdb_options.sync_strategy}}. sync_strategy=Opts#cdb_options.sync_strategy,
log_options=Opts#cdb_options.log_options}}.
starting({open_writer, Filename}, _From, State) -> starting({open_writer, Filename}, _From, State) ->
leveled_log:save(State#state.log_options),
leveled_log:log("CDB01", [Filename]), leveled_log:log("CDB01", [Filename]),
{LastPosition, HashTree, LastKey} = open_active_file(Filename), {LastPosition, HashTree, LastKey} = open_active_file(Filename),
{WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy), {WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy),
@ -429,6 +433,7 @@ starting({open_writer, Filename}, _From, State) ->
filename=Filename, filename=Filename,
hashtree=HashTree}}; hashtree=HashTree}};
starting({open_reader, Filename}, _From, State) -> starting({open_reader, Filename}, _From, State) ->
leveled_log:save(State#state.log_options),
leveled_log:log("CDB02", [Filename]), leveled_log:log("CDB02", [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, false), {Handle, Index, LastKey} = open_for_readonly(Filename, false),
{reply, ok, reader, State#state{handle=Handle, {reply, ok, reader, State#state{handle=Handle,
@ -436,6 +441,7 @@ starting({open_reader, Filename}, _From, State) ->
filename=Filename, filename=Filename,
hash_index=Index}}; hash_index=Index}};
starting({open_reader, Filename, LastKey}, _From, State) -> starting({open_reader, Filename, LastKey}, _From, State) ->
leveled_log:save(State#state.log_options),
leveled_log:log("CDB02", [Filename]), leveled_log:log("CDB02", [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
{reply, ok, reader, State#state{handle=Handle, {reply, ok, reader, State#state{handle=Handle,

View file

@ -498,10 +498,11 @@ generate_orderedkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
Acc; Acc;
generate_orderedkeys(Seqn, Count, Acc, BucketLow, BucketHigh) -> generate_orderedkeys(Seqn, Count, Acc, BucketLow, BucketHigh) ->
BNumber = Seqn div (BucketHigh - BucketLow), BNumber = Seqn div (BucketHigh - BucketLow),
BucketExt = string:right(integer_to_list(BucketLow + BNumber), 4, $0), BucketExt =
KNumber = Seqn * 100 + leveled_rand:uniform(100), io_lib:format("K~4..0B", [BucketLow + BNumber]),
KeyExt = KeyExt =
string:right(integer_to_list(KNumber), 8, $0), io_lib:format("K~8..0B", [Seqn * 100 + leveled_rand:uniform(100)]),
LK = leveled_codec:to_ledgerkey("Bucket" ++ BucketExt, "Key" ++ KeyExt, o), LK = leveled_codec:to_ledgerkey("Bucket" ++ BucketExt, "Key" ++ KeyExt, o),
Chunk = leveled_rand:rand_bytes(16), Chunk = leveled_rand:rand_bytes(16),
{_B, _K, MV, _H, _LMs} = {_B, _K, MV, _H, _LMs} =

View file

@ -79,12 +79,16 @@
handle_cast/2, handle_cast/2,
handle_info/2, handle_info/2,
terminate/2, terminate/2,
clerk_new/1, code_change/3]).
-export([clerk_new/1,
clerk_compact/7, clerk_compact/7,
clerk_hashtablecalc/3, clerk_hashtablecalc/3,
clerk_trim/3, clerk_trim/3,
clerk_stop/1, clerk_stop/1,
code_change/3]). clerk_loglevel/2,
clerk_addlogs/2,
clerk_removelogs/2]).
-export([schedule_compaction/3]). -export([schedule_compaction/3]).
@ -104,7 +108,7 @@
-record(state, {inker :: pid() | undefined, -record(state, {inker :: pid() | undefined,
max_run_length :: integer() | undefined, max_run_length :: integer() | undefined,
cdb_options, cdb_options = #cdb_options{} :: #cdb_options{},
waste_retention_period :: integer() | undefined, waste_retention_period :: integer() | undefined,
waste_path :: string() | undefined, waste_path :: string() | undefined,
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
@ -140,7 +144,7 @@
%% @doc %% @doc
%% Generate a new clerk %% Generate a new clerk
clerk_new(InkerClerkOpts) -> clerk_new(InkerClerkOpts) ->
gen_server:start_link(?MODULE, [InkerClerkOpts], []). gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerClerkOpts], []).
-spec clerk_compact(pid(), pid(), -spec clerk_compact(pid(), pid(),
fun(), fun(), fun(), fun(), fun(), fun(),
@ -170,7 +174,8 @@ clerk_trim(Pid, Inker, PersistedSQN) ->
%% of the hastable in the CDB file - so that the file is not blocked during %% of the hastable in the CDB file - so that the file is not blocked during
%% this calculation %% this calculation
clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
{ok, Clerk} = gen_server:start_link(?MODULE, [#iclerk_options{}], []), {ok, Clerk} = gen_server:start_link(?MODULE, [leveled_log:get_opts(),
#iclerk_options{}], []),
gen_server:cast(Clerk, {hashtable_calc, HashTree, StartPos, CDBpid}). gen_server:cast(Clerk, {hashtable_calc, HashTree, StartPos, CDBpid}).
-spec clerk_stop(pid()) -> ok. -spec clerk_stop(pid()) -> ok.
@ -179,11 +184,30 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
clerk_stop(Pid) -> clerk_stop(Pid) ->
gen_server:cast(Pid, stop). gen_server:cast(Pid, stop).
-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
%% Change the log level of the Journal
clerk_loglevel(Pid, LogLevel) ->
gen_server:cast(Pid, {log_level, LogLevel}).
-spec clerk_addlogs(pid(), list(string())) -> ok.
%% @doc
%% Add to the list of forced logs, a list of more forced logs
clerk_addlogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {add_logs, ForcedLogs}).
-spec clerk_removelogs(pid(), list(string())) -> ok.
%% @doc
%% Remove from the list of forced logs, a list of forced logs
clerk_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
%%%============================================================================ %%%============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================ %%%============================================================================
init([IClerkOpts]) -> init([LogOpts, IClerkOpts]) ->
leveled_log:save(LogOpts),
ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy, ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy,
CDBopts = IClerkOpts#iclerk_options.cdb_options, CDBopts = IClerkOpts#iclerk_options.cdb_options,
WP = CDBopts#cdb_options.waste_path, WP = CDBopts#cdb_options.waste_path,
@ -290,6 +314,21 @@ handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
{IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos),
ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin), ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin),
{stop, normal, State}; {stop, normal, State};
handle_cast({log_level, LogLevel}, State) ->
ok = leveled_log:set_loglevel(LogLevel),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({add_logs, ForcedLogs}, State) ->
ok = leveled_log:add_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast(stop, State) -> handle_cast(stop, State) ->
{stop, normal, State}. {stop, normal, State}.

View file

@ -116,7 +116,11 @@
ink_roll/1, ink_roll/1,
ink_backup/2, ink_backup/2,
ink_checksqn/2, ink_checksqn/2,
build_dummy_journal/0, ink_loglevel/2,
ink_addlogs/2,
ink_removelogs/2]).
-export([build_dummy_journal/0,
clean_testdir/1, clean_testdir/1,
filepath/2, filepath/2,
filepath/3]). filepath/3]).
@ -174,13 +178,13 @@
%% The inker will need to know what the reload strategy is, to inform the %% The inker will need to know what the reload strategy is, to inform the
%% clerk about the rules to enforce during compaction. %% clerk about the rules to enforce during compaction.
ink_start(InkerOpts) -> ink_start(InkerOpts) ->
gen_server:start_link(?MODULE, [InkerOpts], []). gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerOpts], []).
-spec ink_snapstart(inker_options()) -> {ok, pid()}. -spec ink_snapstart(inker_options()) -> {ok, pid()}.
%% @doc %% @doc
%% Don't link on startup as snapshot %% Don't link on startup as snapshot
ink_snapstart(InkerOpts) -> ink_snapstart(InkerOpts) ->
gen_server:start(?MODULE, [InkerOpts], []). gen_server:start(?MODULE, [leveled_log:get_opts(), InkerOpts], []).
-spec ink_put(pid(), -spec ink_put(pid(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
@ -447,11 +451,30 @@ ink_printmanifest(Pid) ->
ink_checksqn(Pid, LedgerSQN) -> ink_checksqn(Pid, LedgerSQN) ->
gen_server:call(Pid, {check_sqn, LedgerSQN}). gen_server:call(Pid, {check_sqn, LedgerSQN}).
-spec ink_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
%% Change the log level of the Journal
ink_loglevel(Pid, LogLevel) ->
gen_server:cast(Pid, {log_level, LogLevel}).
-spec ink_addlogs(pid(), list(string())) -> ok.
%% @doc
%% Add to the list of forced logs, a list of more forced logs
ink_addlogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {add_logs, ForcedLogs}).
-spec ink_removelogs(pid(), list(string())) -> ok.
%% @doc
%% Remove from the list of forced logs, a list of forced logs
ink_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
%%%============================================================================ %%%============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================ %%%============================================================================
init([InkerOpts]) -> init([LogOpts, InkerOpts]) ->
leveled_log:save(LogOpts),
leveled_rand:seed(), leveled_rand:seed(),
case {InkerOpts#inker_options.root_path, case {InkerOpts#inker_options.root_path,
InkerOpts#inker_options.start_snapshot} of InkerOpts#inker_options.start_snapshot} of
@ -697,7 +720,28 @@ handle_cast({release_snapshot, Snapshot}, State) ->
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
leveled_log:log("I0003", [Snapshot]), leveled_log:log("I0003", [Snapshot]),
leveled_log:log("I0004", [length(Rs)]), leveled_log:log("I0004", [length(Rs)]),
{noreply, State#state{registered_snapshots=Rs}}. {noreply, State#state{registered_snapshots=Rs}};
handle_cast({log_level, LogLevel}, State) ->
INC = State#state.clerk,
ok = leveled_iclerk:clerk_loglevel(INC, LogLevel),
ok = leveled_log:set_loglevel(LogLevel),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({add_logs, ForcedLogs}, State) ->
INC = State#state.clerk,
ok = leveled_iclerk:clerk_addlogs(INC, ForcedLogs),
ok = leveled_log:add_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast({remove_logs, ForcedLogs}, State) ->
INC = State#state.clerk,
ok = leveled_iclerk:clerk_removelogs(INC, ForcedLogs),
ok = leveled_log:remove_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}}.
%% handle the bookie stopping and stop this snapshot %% handle the bookie stopping and stop this snapshot
handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info}, handle_info({'DOWN', BookieMonRef, process, _BookiePid, _Info},
@ -1400,8 +1444,9 @@ compact_journal_testto(WRP, ExpectedFiles) ->
5000), 5000),
timer:sleep(1000), timer:sleep(1000),
CompactedManifest2 = ink_getmanifest(Ink1), CompactedManifest2 = ink_getmanifest(Ink1),
{ok, PrefixTest} = re:compile(?COMPACT_FP),
lists:foreach(fun({_SQN, FN, _P, _LK}) -> lists:foreach(fun({_SQN, FN, _P, _LK}) ->
?assertMatch(0, string:str(FN, ?COMPACT_FP)) nomatch = re:run(FN, PrefixTest)
end, end,
CompactedManifest2), CompactedManifest2),
?assertMatch(2, length(CompactedManifest2)), ?assertMatch(2, length(CompactedManifest2)),

View file

@ -9,10 +9,25 @@
-export([log/2, -export([log/2,
log_timer/3, log_timer/3,
log_randomtimer/4]). log_randomtimer/4]).
-export([set_loglevel/1,
add_forcedlogs/1,
remove_forcedlogs/1,
get_opts/0,
save/1,
return_settings/0]).
-record(log_options, {log_level = info :: log_level(),
forced_logs = [] :: [string()]}).
-type log_level() :: debug | info | warn | error | critical.
-type log_options() :: #log_options{}.
-export_type([log_options/0, log_level/0]).
-define(LOG_LEVELS, [debug, info, warn, error, critical]). -define(LOG_LEVELS, [debug, info, warn, error, critical]).
-define(DEFAULT_LOG_LEVEL, error). -define(DEFAULT_LOG_LEVEL, error).
-define(LOGBASE, [ -define(LOGBASE, [
@ -376,6 +391,70 @@
]). ]).
%%%============================================================================
%%% Manage Log Options
%%%============================================================================
-spec set_loglevel(log_level()) -> ok.
%% @doc
%% Set the log level for this PID
set_loglevel(LogLevel) when is_atom(LogLevel) ->
LO = get_opts(),
UpdLO = LO#log_options{log_level = LogLevel},
save(UpdLO).
-spec add_forcedlogs(list(string())) -> ok.
%% @doc
%% Add a forced log to the list of forced logs. this will cause the log of this
%% logReference to be logged even if the log_level of the process would not
%% otherwise require it to be logged - e.g. to fire an 'INFO' log when running
%% at an 'ERROR' log level.
add_forcedlogs(LogRefs) ->
LO = get_opts(),
ForcedLogs = LO#log_options.forced_logs,
UpdLO = LO#log_options{forced_logs = lists:usort(LogRefs ++ ForcedLogs)},
save(UpdLO).
-spec remove_forcedlogs(list(string())) -> ok.
%% @doc
%% Remove a forced log from the list of forced logs
remove_forcedlogs(LogRefs) ->
LO = get_opts(),
ForcedLogs = LO#log_options.forced_logs,
UpdLO = LO#log_options{forced_logs = lists:subtract(ForcedLogs, LogRefs)},
save(UpdLO).
-spec save(log_options()) -> ok.
%% @doc
%% Save the log options to the process dictionary
save(#log_options{} = LO) ->
put('$leveled_log_options', LO),
ok.
-spec get_opts() -> log_options().
%% @doc
%% Retrieve the log options from the process dictionary if present.
get_opts() ->
case get('$leveled_log_options') of
#log_options{} = LO ->
LO;
_ ->
#log_options{log_level = ?DEFAULT_LOG_LEVEL,
forced_logs = []}
end.
-spec return_settings() -> {log_level(), list(string())}.
%% @doc
%% Return the settings outside of the record
return_settings() ->
LO = get_opts(),
{LO#log_options.log_level, LO#log_options.forced_logs}.
%%%============================================================================
%%% Prompt Logs
%%%============================================================================
log(LogReference, Subs) -> log(LogReference, Subs) ->
log(LogReference, Subs, ?LOG_LEVELS). log(LogReference, Subs, ?LOG_LEVELS).
@ -396,23 +475,21 @@ log(LogRef, Subs, SupportedLogLevels) ->
end. end.
should_i_log(LogLevel, Levels, LogRef) -> should_i_log(LogLevel, Levels, LogRef) ->
ForcedLogs = application:get_env(leveled, forced_logs, []), #log_options{log_level = CurLevel, forced_logs = ForcedLogs} = get_opts(),
case lists:member(LogRef, ForcedLogs) of case lists:member(LogRef, ForcedLogs) of
true -> true ->
true; true;
false -> false ->
case application:get_env(leveled, log_level, ?DEFAULT_LOG_LEVEL) of if CurLevel == LogLevel ->
LogLevel ->
true; true;
CurLevel -> true ->
is_active_level(Levels, CurLevel, LogLevel) is_active_level(Levels, CurLevel, LogLevel)
end end
end. end.
is_active_level([L|_], L, _) -> true; is_active_level([L|_], L, _) -> true;
is_active_level([L|_], _, L) -> false; is_active_level([L|_], _, L) -> false;
is_active_level([_|T], C, L) -> is_active_level(T, C, L); is_active_level([_|T], C, L) -> is_active_level(T, C, L).
is_active_level([] , _, _) -> false.
log_timer(LogReference, Subs, StartTime) -> log_timer(LogReference, Subs, StartTime) ->
log_timer(LogReference, Subs, StartTime, ?LOG_LEVELS). log_timer(LogReference, Subs, StartTime, ?LOG_LEVELS).
@ -482,21 +559,21 @@ log_warn_test() ->
ok = log_timer("G8888", [], os:timestamp(), [info, warn, error]). ok = log_timer("G8888", [], os:timestamp(), [info, warn, error]).
shouldilog_test() -> shouldilog_test() ->
% What if an unsupported option is set for the log level ok = set_loglevel(debug),
% don't log
ok = application:set_env(leveled, log_level, unsupported),
?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0001")),
?assertMatch(false, should_i_log(inform, ?LOG_LEVELS, "G0001")),
ok = application:set_env(leveled, log_level, debug),
?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")),
ok = application:set_env(leveled, log_level, info), ok = set_loglevel(info),
?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")),
ok = application:set_env(leveled, forced_logs, ["G0001"]), ok = add_forcedlogs(["G0001"]),
ok = application:set_env(leveled, log_level, error), ok = set_loglevel(error),
?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")), ?assertMatch(true, should_i_log(info, ?LOG_LEVELS, "G0001")),
?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0002")), ?assertMatch(false, should_i_log(info, ?LOG_LEVELS, "G0002")),
ok = application:set_env(leveled, forced_logs, []), ok = remove_forcedlogs(["G0001"]),
ok = application:set_env(leveled, log_level, info), ok = set_loglevel(info),
?assertMatch(false, should_i_log(debug, ?LOG_LEVELS, "D0001")). ?assertMatch(false, should_i_log(debug, ?LOG_LEVELS, "D0001")).
badloglevel_test() ->
% Set a bad log level - and everything logs
?assertMatch(true, is_active_level(?LOG_LEVELS, debug, unsupported)),
?assertMatch(true, is_active_level(?LOG_LEVELS, critical, unsupported)).
-endif. -endif.

View file

@ -39,7 +39,10 @@
clerk_prompt/1, clerk_prompt/1,
clerk_push/2, clerk_push/2,
clerk_close/1, clerk_close/1,
clerk_promptdeletions/2 clerk_promptdeletions/2,
clerk_loglevel/2,
clerk_addlogs/2,
clerk_removelogs/2
]). ]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -50,17 +53,18 @@
-record(state, {owner :: pid() | undefined, -record(state, {owner :: pid() | undefined,
root_path :: string() | undefined, root_path :: string() | undefined,
pending_deletions = dict:new(), % OTP 16 does not like type pending_deletions = dict:new(), % OTP 16 does not like type
compression_method = native :: lz4|native sst_options :: #sst_options{}
}). }).
%%%============================================================================ %%%============================================================================
%%% API %%% API
%%%============================================================================ %%%============================================================================
clerk_new(Owner, Manifest, CompressionMethod) -> clerk_new(Owner, Manifest, OptsSST) ->
{ok, Pid} = {ok, Pid} =
gen_server:start_link(?MODULE, gen_server:start_link(?MODULE,
[{compression_method, CompressionMethod}], [leveled_log:get_opts(),
{sst_options, OptsSST}],
[]), []),
ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity), ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity),
leveled_log:log("PC001", [Pid, Owner]), leveled_log:log("PC001", [Pid, Owner]),
@ -75,6 +79,24 @@ clerk_promptdeletions(Pid, ManifestSQN) ->
clerk_push(Pid, Work) -> clerk_push(Pid, Work) ->
gen_server:cast(Pid, {push_work, Work}). gen_server:cast(Pid, {push_work, Work}).
-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
%% Change the log level of the Journal
clerk_loglevel(Pid, LogLevel) ->
gen_server:cast(Pid, {log_level, LogLevel}).
-spec clerk_addlogs(pid(), list(string())) -> ok.
%% @doc
%% Add to the list of forced logs, a list of more forced logs
clerk_addlogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {add_logs, ForcedLogs}).
-spec clerk_removelogs(pid(), list(string())) -> ok.
%% @doc
%% Remove from the list of forced logs, a list of forced logs
clerk_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
clerk_close(Pid) -> clerk_close(Pid) ->
gen_server:call(Pid, close, 20000). gen_server:call(Pid, close, 20000).
@ -82,8 +104,9 @@ clerk_close(Pid) ->
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================ %%%============================================================================
init([{compression_method, CompressionMethod}]) -> init([LogOpts, {sst_options, OptsSST}]) ->
{ok, #state{compression_method = CompressionMethod}}. leveled_log:save(LogOpts),
{ok, #state{sst_options = OptsSST}}.
handle_call({load, Owner, RootPath}, _From, State) -> handle_call({load, Owner, RootPath}, _From, State) ->
{reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT}; {reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT};
@ -101,7 +124,22 @@ handle_cast({prompt_deletions, ManifestSQN}, State) ->
{Deletions, UpdD} = return_deletions(ManifestSQN, {Deletions, UpdD} = return_deletions(ManifestSQN,
State#state.pending_deletions), State#state.pending_deletions),
ok = notify_deletions(Deletions, State#state.owner), ok = notify_deletions(Deletions, State#state.owner),
{noreply, State#state{pending_deletions = UpdD}, ?MIN_TIMEOUT}. {noreply, State#state{pending_deletions = UpdD}, ?MIN_TIMEOUT};
handle_cast({log_level, LogLevel}, State) ->
ok = leveled_log:set_loglevel(LogLevel),
SSTopts = State#state.sst_options,
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
{noreply, State#state{sst_options = SSTopts0}};
handle_cast({add_logs, ForcedLogs}, State) ->
ok = leveled_log:add_forcedlogs(ForcedLogs),
SSTopts = State#state.sst_options,
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
{noreply, State#state{sst_options = SSTopts0}};
handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
SSTopts = State#state.sst_options,
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
{noreply, State#state{sst_options = SSTopts0}}.
handle_info(timeout, State) -> handle_info(timeout, State) ->
request_work(State), request_work(State),
@ -125,7 +163,7 @@ handle_work({SrcLevel, Manifest}, State) ->
{UpdManifest, EntriesToDelete} = merge(SrcLevel, {UpdManifest, EntriesToDelete} = merge(SrcLevel,
Manifest, Manifest,
State#state.root_path, State#state.root_path,
State#state.compression_method), State#state.sst_options),
leveled_log:log("PC007", []), leveled_log:log("PC007", []),
SWMC = os:timestamp(), SWMC = os:timestamp(),
ok = leveled_penciller:pcl_manifestchange(State#state.owner, ok = leveled_penciller:pcl_manifestchange(State#state.owner,
@ -137,7 +175,7 @@ handle_work({SrcLevel, Manifest}, State) ->
leveled_log:log_timer("PC018", [], SWSM), leveled_log:log_timer("PC018", [], SWSM),
{leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}. {leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}.
merge(SrcLevel, Manifest, RootPath, CompressionMethod) -> merge(SrcLevel, Manifest, RootPath, OptsSST) ->
Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel), Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel),
NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1, NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1,
SinkList = leveled_pmanifest:merge_lookup(Manifest, SinkList = leveled_pmanifest:merge_lookup(Manifest,
@ -159,7 +197,7 @@ merge(SrcLevel, Manifest, RootPath, CompressionMethod) ->
SST_RP = leveled_penciller:sst_rootpath(RootPath), SST_RP = leveled_penciller:sst_rootpath(RootPath),
perform_merge(Manifest, perform_merge(Manifest,
Src, SinkList, SrcLevel, Src, SinkList, SrcLevel,
SST_RP, NewSQN, CompressionMethod) SST_RP, NewSQN, OptsSST)
end. end.
notify_deletions([], _Penciller) -> notify_deletions([], _Penciller) ->
@ -177,7 +215,7 @@ notify_deletions([Head|Tail], Penciller) ->
perform_merge(Manifest, perform_merge(Manifest,
Src, SinkList, SrcLevel, Src, SinkList, SrcLevel,
RootPath, NewSQN, RootPath, NewSQN,
CompressionMethod) -> OptsSST) ->
leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]),
SrcList = [{next, Src, all}], SrcList = [{next, Src, all}],
MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner),
@ -187,7 +225,7 @@ perform_merge(Manifest,
do_merge(SrcList, SinkList, do_merge(SrcList, SinkList,
SinkLevel, SinkBasement, SinkLevel, SinkBasement,
RootPath, NewSQN, MaxSQN, RootPath, NewSQN, MaxSQN,
CompressionMethod, OptsSST,
[]), []),
RevertPointerFun = RevertPointerFun =
fun({next, ME, _SK}) -> fun({next, ME, _SK}) ->
@ -205,23 +243,24 @@ perform_merge(Manifest,
Src), Src),
{Man2, [Src|SinkManifestList]}. {Man2, [Src|SinkManifestList]}.
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _CM, Additions) -> do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) ->
leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]),
Additions; Additions;
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) -> do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
FileName = leveled_penciller:sst_filename(NewSQN, FileName = leveled_penciller:sst_filename(NewSQN,
SinkLevel, SinkLevel,
length(Additions)), length(Additions)),
leveled_log:log("PC012", [NewSQN, FileName, SinkB]), leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(), TS1 = os:timestamp(),
case leveled_sst:sst_new(RP, FileName, case leveled_sst:sst_new(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN, CM) of KL1, KL2, SinkB, SinkLevel, MaxSQN,
OptsSST) of
empty -> empty ->
leveled_log:log("PC013", [FileName]), leveled_log:log("PC013", [FileName]),
do_merge([], [], do_merge([], [],
SinkLevel, SinkB, SinkLevel, SinkB,
RP, NewSQN, MaxSQN, RP, NewSQN, MaxSQN,
CM, OptsSST,
Additions); Additions);
{ok, Pid, Reply, Bloom} -> {ok, Pid, Reply, Bloom} ->
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
@ -234,7 +273,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) ->
do_merge(KL1Rem, KL2Rem, do_merge(KL1Rem, KL2Rem,
SinkLevel, SinkB, SinkLevel, SinkB,
RP, NewSQN, MaxSQN, RP, NewSQN, MaxSQN,
CM, OptsSST,
Additions ++ [Entry]) Additions ++ [Entry])
end. end.
@ -263,9 +302,13 @@ generate_randomkeys(Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(0, Acc, _BucketLow, _BucketHigh) -> generate_randomkeys(0, Acc, _BucketLow, _BucketHigh) ->
Acc; Acc;
generate_randomkeys(Count, Acc, BucketLow, BRange) -> generate_randomkeys(Count, Acc, BucketLow, BRange) ->
BNumber = string:right(integer_to_list(BucketLow + leveled_rand:uniform(BRange)), BNumber =
4, $0), lists:flatten(
KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0), io_lib:format("~4..0B",
[BucketLow + leveled_rand:uniform(BRange)])),
KNumber =
lists:flatten(
io_lib:format("~4..0B", [leveled_rand:uniform(1000)])),
K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber, null},
RandKey = {K, {Count + 1, RandKey = {K, {Count + 1,
{active, infinity}, {active, infinity},
@ -282,7 +325,7 @@ merge_file_test() ->
1, 1,
KL1_L1, KL1_L1,
999999, 999999,
native), #sst_options{}),
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
{ok, PidL2_1, _, _} = {ok, PidL2_1, _, _} =
leveled_sst:sst_new("../test/", leveled_sst:sst_new("../test/",
@ -290,7 +333,7 @@ merge_file_test() ->
2, 2,
KL1_L2, KL1_L2,
999999, 999999,
native), #sst_options{}),
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
{ok, PidL2_2, _, _} = {ok, PidL2_2, _, _} =
leveled_sst:sst_new("../test/", leveled_sst:sst_new("../test/",
@ -298,7 +341,7 @@ merge_file_test() ->
2, 2,
KL2_L2, KL2_L2,
999999, 999999,
lz4), #sst_options{press_method = lz4}),
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
{ok, PidL2_3, _, _} = {ok, PidL2_3, _, _} =
leveled_sst:sst_new("../test/", leveled_sst:sst_new("../test/",
@ -306,7 +349,7 @@ merge_file_test() ->
2, 2,
KL3_L2, KL3_L2,
999999, 999999,
lz4), #sst_options{press_method = lz4}),
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
{ok, PidL2_4, _, _} = {ok, PidL2_4, _, _} =
leveled_sst:sst_new("../test/", leveled_sst:sst_new("../test/",
@ -314,7 +357,7 @@ merge_file_test() ->
2, 2,
KL4_L2, KL4_L2,
999999, 999999,
lz4), #sst_options{press_method = lz4}),
E1 = #manifest_entry{owner = PidL1_1, E1 = #manifest_entry{owner = PidL1_1,
filename = "./KL1_L1.sst", filename = "./KL1_L1.sst",
@ -347,11 +390,12 @@ merge_file_test() ->
PointerList = lists:map(fun(ME) -> {next, ME, all} end, PointerList = lists:map(fun(ME) -> {next, ME, all} end,
[E2, E3, E4, E5]), [E2, E3, E4, E5]),
{Man6, _Dels} = {Man6, _Dels} =
perform_merge(Man5, E1, PointerList, 1, "../test", 3, native), perform_merge(Man5, E1, PointerList, 1, "../test", 3, #sst_options{}),
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)). ?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)).
coverage_cheat_test() -> coverage_cheat_test() ->
{ok, _State1} = code_change(null, #state{}, null). {ok, _State1} =
code_change(null, #state{sst_options=#sst_options{}}, null).
-endif. -endif.

View file

@ -191,7 +191,10 @@
pcl_getstartupsequencenumber/1, pcl_getstartupsequencenumber/1,
pcl_checkbloomtest/2, pcl_checkbloomtest/2,
pcl_checkforwork/1, pcl_checkforwork/1,
pcl_persistedsqn/1]). pcl_persistedsqn/1,
pcl_loglevel/2,
pcl_addlogs/2,
pcl_removelogs/2]).
-export([ -export([
sst_rootpath/1, sst_rootpath/1,
@ -258,7 +261,7 @@
is_snapshot = false :: boolean(), is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(), snapshot_fully_loaded = false :: boolean(),
source_penciller :: pid() | undefined, source_penciller :: pid() | undefined,
bookie_monref :: reference() | undefined, bookie_monref :: reference() | undefined,
levelzero_astree :: list() | undefined, levelzero_astree :: list() | undefined,
work_ongoing = false :: boolean(), % i.e. compaction work work_ongoing = false :: boolean(), % i.e. compaction work
@ -267,7 +270,7 @@
timings = no_timing :: pcl_timings(), timings = no_timing :: pcl_timings(),
timings_countdown = 0 :: integer(), timings_countdown = 0 :: integer(),
compression_method = native :: lz4|native}). sst_options = #sst_options{} :: #sst_options{}}).
-record(pcl_timings, -record(pcl_timings,
{sample_count = 0 :: integer(), {sample_count = 0 :: integer(),
@ -318,13 +321,13 @@
%% query is run against the level zero space and just the query results are %% query is run against the level zero space and just the query results are
%5 copied into the clone. %5 copied into the clone.
pcl_start(PCLopts) -> pcl_start(PCLopts) ->
gen_server:start_link(?MODULE, [PCLopts], []). gen_server:start_link(?MODULE, [leveled_log:get_opts(), PCLopts], []).
-spec pcl_snapstart(penciller_options()) -> {ok, pid()}. -spec pcl_snapstart(penciller_options()) -> {ok, pid()}.
%% @doc %% @doc
%% Don't link to the bookie - this is a snpashot %% Don't link to the bookie - this is a snpashot
pcl_snapstart(PCLopts) -> pcl_snapstart(PCLopts) ->
gen_server:start(?MODULE, [PCLopts], []). gen_server:start(?MODULE, [leveled_log:get_opts(), PCLopts], []).
-spec pcl_pushmem(pid(), bookies_memory()) -> ok|returned. -spec pcl_pushmem(pid(), bookies_memory()) -> ok|returned.
%% @doc %% @doc
@ -578,7 +581,6 @@ pcl_close(Pid) ->
pcl_doom(Pid) -> pcl_doom(Pid) ->
gen_server:call(Pid, doom, 60000). gen_server:call(Pid, doom, 60000).
-spec pcl_checkbloomtest(pid(), tuple()) -> boolean(). -spec pcl_checkbloomtest(pid(), tuple()) -> boolean().
%% @doc %% @doc
%% Function specifically added to help testing. In particular to make sure %% Function specifically added to help testing. In particular to make sure
@ -597,11 +599,30 @@ pcl_checkbloomtest(Pid, Key) ->
pcl_checkforwork(Pid) -> pcl_checkforwork(Pid) ->
gen_server:call(Pid, check_for_work, 2000). gen_server:call(Pid, check_for_work, 2000).
-spec pcl_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
%% Change the log level of the Journal
pcl_loglevel(Pid, LogLevel) ->
gen_server:cast(Pid, {log_level, LogLevel}).
-spec pcl_addlogs(pid(), list(string())) -> ok.
%% @doc
%% Add to the list of forced logs, a list of more forced logs
pcl_addlogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {add_logs, ForcedLogs}).
-spec pcl_removelogs(pid(), list(string())) -> ok.
%% @doc
%% Remove from the list of forced logs, a list of forced logs
pcl_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
%%%============================================================================ %%%============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================ %%%============================================================================
init([PCLopts]) -> init([LogOpts, PCLopts]) ->
leveled_log:save(LogOpts),
leveled_rand:seed(), leveled_rand:seed(),
case {PCLopts#penciller_options.root_path, case {PCLopts#penciller_options.root_path,
PCLopts#penciller_options.start_snapshot, PCLopts#penciller_options.start_snapshot,
@ -1001,7 +1022,28 @@ handle_cast(work_for_clerk, State) ->
end; end;
_ -> _ ->
{noreply, State} {noreply, State}
end. end;
handle_cast({log_level, LogLevel}, State) ->
PC = State#state.clerk,
ok = leveled_pclerk:clerk_loglevel(PC, LogLevel),
ok = leveled_log:set_loglevel(LogLevel),
SSTopts = State#state.sst_options,
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
{noreply, State#state{sst_options = SSTopts0}};
handle_cast({add_logs, ForcedLogs}, State) ->
PC = State#state.clerk,
ok = leveled_pclerk:clerk_addlogs(PC, ForcedLogs),
ok = leveled_log:add_forcedlogs(ForcedLogs),
SSTopts = State#state.sst_options,
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
{noreply, State#state{sst_options = SSTopts0}};
handle_cast({remove_logs, ForcedLogs}, State) ->
PC = State#state.clerk,
ok = leveled_pclerk:clerk_removelogs(PC, ForcedLogs),
ok = leveled_log:remove_forcedlogs(ForcedLogs),
SSTopts = State#state.sst_options,
SSTopts0 = SSTopts#sst_options{log_options = leveled_log:get_opts()},
{noreply, State#state{sst_options = SSTopts0}}.
%% handle the bookie stopping and stop this snapshot %% handle the bookie stopping and stop this snapshot
@ -1047,9 +1089,9 @@ sst_filename(ManSQN, Level, Count) ->
start_from_file(PCLopts) -> start_from_file(PCLopts) ->
RootPath = PCLopts#penciller_options.root_path, RootPath = PCLopts#penciller_options.root_path,
MaxTableSize = PCLopts#penciller_options.max_inmemory_tablesize, MaxTableSize = PCLopts#penciller_options.max_inmemory_tablesize,
PressMethod = PCLopts#penciller_options.compression_method, OptsSST = PCLopts#penciller_options.sst_options,
{ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, PressMethod), {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, OptsSST),
CoinToss = PCLopts#penciller_options.levelzero_cointoss, CoinToss = PCLopts#penciller_options.levelzero_cointoss,
% Used to randomly defer the writing of L0 file. Intended to help with % Used to randomly defer the writing of L0 file. Intended to help with
@ -1061,14 +1103,14 @@ start_from_file(PCLopts) ->
levelzero_maxcachesize = MaxTableSize, levelzero_maxcachesize = MaxTableSize,
levelzero_cointoss = CoinToss, levelzero_cointoss = CoinToss,
levelzero_index = leveled_pmem:new_index(), levelzero_index = leveled_pmem:new_index(),
compression_method = PressMethod}, sst_options = OptsSST},
%% Open manifest %% Open manifest
Manifest0 = leveled_pmanifest:open_manifest(RootPath), Manifest0 = leveled_pmanifest:open_manifest(RootPath),
OpenFun = OpenFun =
fun(FN) -> fun(FN) ->
{ok, Pid, {_FK, _LK}, Bloom} = {ok, Pid, {_FK, _LK}, Bloom} =
leveled_sst:sst_open(sst_rootpath(RootPath), FN), leveled_sst:sst_open(sst_rootpath(RootPath), FN, OptsSST),
{Pid, Bloom} {Pid, Bloom}
end, end,
SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1,
@ -1083,7 +1125,9 @@ start_from_file(PCLopts) ->
case filelib:is_file(filename:join(sst_rootpath(RootPath), L0FN)) of case filelib:is_file(filename:join(sst_rootpath(RootPath), L0FN)) of
true -> true ->
leveled_log:log("P0015", [L0FN]), leveled_log:log("P0015", [L0FN]),
L0Open = leveled_sst:sst_open(sst_rootpath(RootPath), L0FN), L0Open = leveled_sst:sst_open(sst_rootpath(RootPath),
L0FN,
OptsSST),
{ok, L0Pid, {L0StartKey, L0EndKey}, Bloom} = L0Open, {ok, L0Pid, {L0StartKey, L0EndKey}, Bloom} = L0Open,
L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid),
L0Entry = #manifest_entry{start_key = L0StartKey, L0Entry = #manifest_entry{start_key = L0StartKey,
@ -1091,10 +1135,11 @@ start_from_file(PCLopts) ->
filename = L0FN, filename = L0FN,
owner = L0Pid, owner = L0Pid,
bloom = Bloom}, bloom = Bloom},
Manifest2 = leveled_pmanifest:insert_manifest_entry(Manifest1, Manifest2 =
ManSQN + 1, leveled_pmanifest:insert_manifest_entry(Manifest1,
0, ManSQN + 1,
L0Entry), 0,
L0Entry),
leveled_log:log("P0016", [L0SQN]), leveled_log:log("P0016", [L0SQN]),
LedgerSQN = max(MaxSQN, L0SQN), LedgerSQN = max(MaxSQN, L0SQN),
{InitState#state{manifest = Manifest2, {InitState#state{manifest = Manifest2,
@ -1255,7 +1300,7 @@ roll_memory(State, false) ->
FetchFun, FetchFun,
PCL, PCL,
State#state.ledger_sqn, State#state.ledger_sqn,
State#state.compression_method), State#state.sst_options),
{ok, Constructor, _} = R, {ok, Constructor, _} = R,
{Constructor, none}; {Constructor, none};
roll_memory(State, true) -> roll_memory(State, true) ->
@ -1270,7 +1315,7 @@ roll_memory(State, true) ->
0, 0,
KVList, KVList,
State#state.ledger_sqn, State#state.ledger_sqn,
State#state.compression_method), State#state.sst_options),
{ok, Constructor, _, Bloom} = R, {ok, Constructor, _, Bloom} = R,
{Constructor, Bloom}. {Constructor, Bloom}.
@ -1904,9 +1949,10 @@ shutdown_when_compact(Pid) ->
simple_server_test() -> simple_server_test() ->
RootPath = "../test/ledger", RootPath = "../test/ledger",
clean_testdir(RootPath), clean_testdir(RootPath),
{ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, {ok, PCL} =
max_inmemory_tablesize=1000, pcl_start(#penciller_options{root_path=RootPath,
compression_method=native}), max_inmemory_tablesize=1000,
sst_options=#sst_options{}}),
Key1_Pre = {{o,"Bucket0001", "Key0001", null}, Key1_Pre = {{o,"Bucket0001", "Key0001", null},
{1, {active, infinity}, null}}, {1, {active, infinity}, null}},
Key1 = add_missing_hash(Key1_Pre), Key1 = add_missing_hash(Key1_Pre),
@ -1947,9 +1993,10 @@ simple_server_test() ->
ok = shutdown_when_compact(PCL), ok = shutdown_when_compact(PCL),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, {ok, PCLr} =
max_inmemory_tablesize=1000, pcl_start(#penciller_options{root_path=RootPath,
compression_method=native}), max_inmemory_tablesize=1000,
sst_options=#sst_options{}}),
?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)), ?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)),
% ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), % ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]),
true = pcl_checkbloomtest(PCLr, {o,"Bucket0001", "Key0001", null}), true = pcl_checkbloomtest(PCLr, {o,"Bucket0001", "Key0001", null}),
@ -2213,15 +2260,14 @@ create_file_test() ->
KVL = lists:usort(generate_randomkeys({50000, 0})), KVL = lists:usort(generate_randomkeys({50000, 0})),
Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE), Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE),
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
{ok, {ok, SP, noreply} =
SP, leveled_sst:sst_newlevelzero(RP,
noreply} = leveled_sst:sst_newlevelzero(RP, Filename,
Filename, 1,
1, FetchFun,
FetchFun, undefined,
undefined, 50000,
50000, #sst_options{press_method = native}),
native),
lists:foreach(fun(X) -> lists:foreach(fun(X) ->
case checkready(SP) of case checkready(SP) of
timeout -> timeout ->
@ -2272,9 +2318,10 @@ coverage_cheat_test() ->
handle_down_test() -> handle_down_test() ->
RootPath = "../test/ledger", RootPath = "../test/ledger",
clean_testdir(RootPath), clean_testdir(RootPath),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, {ok, PCLr} =
max_inmemory_tablesize=1000, pcl_start(#penciller_options{root_path=RootPath,
compression_method=native}), max_inmemory_tablesize=1000,
sst_options=#sst_options{}}),
FakeBookie = spawn(fun loop/0), FakeBookie = spawn(fun loop/0),
Mon = erlang:monitor(process, FakeBookie), Mon = erlang:monitor(process, FakeBookie),

View file

@ -262,9 +262,12 @@ generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
Acc; Acc;
generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
BNumber = string:right(integer_to_list(BucketLow + leveled_rand:uniform(BRange)), BNumber =
4, $0), lists:flatten(
KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0), io_lib:format("~4..0B",
[BucketLow + leveled_rand:uniform(BRange)])),
KNumber =
lists:flatten(io_lib:format("~4..0B", [leveled_rand:uniform(1000)])),
{K, V} = {{o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, {K, V} = {{o, "Bucket" ++ BNumber, "Key" ++ KNumber, null},
{Seqn, {active, infinity}, null}}, {Seqn, {active, infinity}, null}},
generate_randomkeys(Seqn + 1, generate_randomkeys(Seqn + 1,

View file

@ -111,7 +111,7 @@
-export([sst_new/6, -export([sst_new/6,
sst_new/8, sst_new/8,
sst_newlevelzero/7, sst_newlevelzero/7,
sst_open/2, sst_open/3,
sst_get/2, sst_get/2,
sst_get/3, sst_get/3,
sst_expandpointer/5, sst_expandpointer/5,
@ -164,6 +164,8 @@
:: false| :: false|
{sets, sets:set(non_neg_integer())}| {sets, sets:set(non_neg_integer())}|
{list, list(non_neg_integer())}. {list, list(non_neg_integer())}.
-type sst_options()
:: #sst_options{}.
%% yield_blockquery is used to detemrine if the work necessary to process a %% yield_blockquery is used to detemrine if the work necessary to process a
%% range query beyond the fetching the slot should be managed from within %% range query beyond the fetching the slot should be managed from within
@ -210,13 +212,13 @@
-type sst_timings() :: no_timing|#sst_timings{}. -type sst_timings() :: no_timing|#sst_timings{}.
-type build_timings() :: no_timing|#build_timings{}. -type build_timings() :: no_timing|#build_timings{}.
-export_type([expandable_pointer/0]). -export_type([expandable_pointer/0, press_method/0]).
%%%============================================================================ %%%============================================================================
%%% API %%% API
%%%============================================================================ %%%============================================================================
-spec sst_open(string(), string()) -spec sst_open(string(), string(), sst_options())
-> {ok, pid(), -> {ok, pid(),
{leveled_codec:ledger_key(), leveled_codec:ledger_key()}, {leveled_codec:ledger_key(), leveled_codec:ledger_key()},
binary()}. binary()}.
@ -228,10 +230,10 @@
%% term order. %% term order.
%% %%
%% The filename should include the file extension. %% The filename should include the file extension.
sst_open(RootPath, Filename) -> sst_open(RootPath, Filename, OptsSST) ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []), {ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
case gen_fsm:sync_send_event(Pid, case gen_fsm:sync_send_event(Pid,
{sst_open, RootPath, Filename}, {sst_open, RootPath, Filename, OptsSST},
infinity) of infinity) of
{ok, {SK, EK}, Bloom} -> {ok, {SK, EK}, Bloom} ->
{ok, Pid, {SK, EK}, Bloom} {ok, Pid, {SK, EK}, Bloom}
@ -239,7 +241,7 @@ sst_open(RootPath, Filename) ->
-spec sst_new(string(), string(), integer(), -spec sst_new(string(), string(), integer(),
list(leveled_codec:ledger_kv()), list(leveled_codec:ledger_kv()),
integer(), press_method()) integer(), sst_options())
-> {ok, pid(), -> {ok, pid(),
{leveled_codec:ledger_key(), leveled_codec:ledger_key()}, {leveled_codec:ledger_key(), leveled_codec:ledger_key()},
binary()}. binary()}.
@ -247,14 +249,14 @@ sst_open(RootPath, Filename) ->
%% Start a new SST file at the assigned level passing in a list of Key, Value %% Start a new SST file at the assigned level passing in a list of Key, Value
%% pairs. This should not be used for basement levels or unexpanded Key/Value %% pairs. This should not be used for basement levels or unexpanded Key/Value
%% lists as merge_lists will not be called. %% lists as merge_lists will not be called.
sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST) ->
sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, sst_new(RootPath, Filename, Level,
?INDEX_MODDATE). KVList, MaxSQN, OptsSST, ?INDEX_MODDATE).
sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
IndexModDate) ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []), {ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
PressMethod0 = compress_level(Level, PressMethod), PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{[], [], SlotList, FK} = {[], [], SlotList, FK} =
merge_lists(KVList, PressMethod0, IndexModDate), merge_lists(KVList, PressMethod0, IndexModDate),
case gen_fsm:sync_send_event(Pid, case gen_fsm:sync_send_event(Pid,
@ -264,7 +266,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod,
Level, Level,
{SlotList, FK}, {SlotList, FK},
MaxSQN, MaxSQN,
PressMethod0, OptsSST0,
IndexModDate}, IndexModDate},
infinity) of infinity) of
{ok, {SK, EK}, Bloom} -> {ok, {SK, EK}, Bloom} ->
@ -275,7 +277,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod,
list(leveled_codec:ledger_kv()|sst_pointer()), list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()), list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(), integer(), boolean(), integer(),
integer(), press_method()) integer(), sst_options())
-> empty|{ok, pid(), -> empty|{ok, pid(),
{{list(leveled_codec:ledger_kv()), {{list(leveled_codec:ledger_kv()),
list(leveled_codec:ledger_kv())}, list(leveled_codec:ledger_kv())},
@ -295,15 +297,16 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod,
%% file is not added to the manifest. %% file is not added to the manifest.
sst_new(RootPath, Filename, sst_new(RootPath, Filename,
KVL1, KVL2, IsBasement, Level, KVL1, KVL2, IsBasement, Level,
MaxSQN, PressMethod) -> MaxSQN, OptsSST) ->
sst_new(RootPath, Filename, sst_new(RootPath, Filename,
KVL1, KVL2, IsBasement, Level, KVL1, KVL2, IsBasement, Level,
MaxSQN, PressMethod, ?INDEX_MODDATE). MaxSQN, OptsSST, ?INDEX_MODDATE).
sst_new(RootPath, Filename, sst_new(RootPath, Filename,
KVL1, KVL2, IsBasement, Level, KVL1, KVL2, IsBasement, Level,
MaxSQN, PressMethod, IndexModDate) -> MaxSQN, OptsSST, IndexModDate) ->
PressMethod0 = compress_level(Level, PressMethod), PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{Rem1, Rem2, SlotList, FK} = {Rem1, Rem2, SlotList, FK} =
merge_lists(KVL1, KVL2, {IsBasement, Level}, merge_lists(KVL1, KVL2, {IsBasement, Level},
PressMethod0, IndexModDate), PressMethod0, IndexModDate),
@ -319,7 +322,7 @@ sst_new(RootPath, Filename,
Level, Level,
{SlotList, FK}, {SlotList, FK},
MaxSQN, MaxSQN,
PressMethod0, OptsSST0,
IndexModDate}, IndexModDate},
infinity) of infinity) of
{ok, {SK, EK}, Bloom} -> {ok, {SK, EK}, Bloom} ->
@ -329,7 +332,7 @@ sst_new(RootPath, Filename,
-spec sst_newlevelzero(string(), string(), -spec sst_newlevelzero(string(), string(),
integer(), fun(), pid()|undefined, integer(), integer(), fun(), pid()|undefined, integer(),
press_method()) -> sst_options()) ->
{ok, pid(), noreply}. {ok, pid(), noreply}.
%% @doc %% @doc
%% Start a new file at level zero. At this level the file size is not fixed - %% Start a new file at level zero. At this level the file size is not fixed -
@ -337,8 +340,9 @@ sst_new(RootPath, Filename,
%% fetched slot by slot using the FetchFun %% fetched slot by slot using the FetchFun
sst_newlevelzero(RootPath, Filename, sst_newlevelzero(RootPath, Filename,
Slots, FetchFun, Penciller, Slots, FetchFun, Penciller,
MaxSQN, PressMethod) -> MaxSQN, OptsSST) ->
PressMethod0 = compress_level(0, PressMethod), PressMethod0 = compress_level(0, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []), {ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
gen_fsm:send_event(Pid, gen_fsm:send_event(Pid,
{sst_newlevelzero, {sst_newlevelzero,
@ -348,7 +352,7 @@ sst_newlevelzero(RootPath, Filename,
FetchFun, FetchFun,
Penciller, Penciller,
MaxSQN, MaxSQN,
PressMethod0, OptsSST0,
?INDEX_MODDATE}), ?INDEX_MODDATE}),
{ok, Pid, noreply}. {ok, Pid, noreply}.
@ -448,7 +452,8 @@ sst_printtimings(Pid) ->
init([]) -> init([]) ->
{ok, starting, #state{}}. {ok, starting, #state{}}.
starting({sst_open, RootPath, Filename}, _From, State) -> starting({sst_open, RootPath, Filename, OptsSST}, _From, State) ->
leveled_log:save(OptsSST#sst_options.log_options),
{UpdState, Bloom} = {UpdState, Bloom} =
read_file(Filename, State#state{root_path=RootPath}), read_file(Filename, State#state{root_path=RootPath}),
Summary = UpdState#state.summary, Summary = UpdState#state.summary,
@ -459,8 +464,10 @@ starting({sst_open, RootPath, Filename}, _From, State) ->
starting({sst_new, starting({sst_new,
RootPath, Filename, Level, RootPath, Filename, Level,
{SlotList, FirstKey}, MaxSQN, {SlotList, FirstKey}, MaxSQN,
PressMethod, IdxModDate}, _From, State) -> OptsSST, IdxModDate}, _From, State) ->
SW = os:timestamp(), SW = os:timestamp(),
leveled_log:save(OptsSST#sst_options.log_options),
PressMethod = OptsSST#sst_options.press_method,
{Length, SlotIndex, BlockIndex, SlotsBin, Bloom} = {Length, SlotIndex, BlockIndex, SlotsBin, Bloom} =
build_all_slots(SlotList), build_all_slots(SlotList),
SummaryBin = SummaryBin =
@ -483,8 +490,10 @@ starting({sst_new,
starting({sst_newlevelzero, RootPath, Filename, starting({sst_newlevelzero, RootPath, Filename,
Slots, FetchFun, Penciller, MaxSQN, Slots, FetchFun, Penciller, MaxSQN,
PressMethod, IdxModDate}, State) -> OptsSST, IdxModDate}, State) ->
SW0 = os:timestamp(), SW0 = os:timestamp(),
leveled_log:save(OptsSST#sst_options.log_options),
PressMethod = OptsSST#sst_options.press_method,
KVList = leveled_pmem:to_list(Slots, FetchFun), KVList = leveled_pmem:to_list(Slots, FetchFun),
Time0 = timer:now_diff(os:timestamp(), SW0), Time0 = timer:now_diff(os:timestamp(), SW0),
@ -2472,12 +2481,18 @@ update_timings(SW, Timings, Stage, Continue) ->
-ifdef(TEST). -ifdef(TEST).
testsst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> testsst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) ->
sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod, false). OptsSST =
#sst_options{press_method=PressMethod,
log_options=leveled_log:get_opts()},
sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, false).
testsst_new(RootPath, Filename, testsst_new(RootPath, Filename,
KVL1, KVL2, IsBasement, Level, MaxSQN, PressMethod) -> KVL1, KVL2, IsBasement, Level, MaxSQN, PressMethod) ->
OptsSST =
#sst_options{press_method=PressMethod,
log_options=leveled_log:get_opts()},
sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN, sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN,
PressMethod, false). OptsSST, false).
generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(Seqn, generate_randomkeys(Seqn,
@ -2490,8 +2505,10 @@ generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
Acc; Acc;
generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
BRand = leveled_rand:uniform(BRange), BRand = leveled_rand:uniform(BRange),
BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0), BNumber =
KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 6, $0), lists:flatten(io_lib:format("K~4..0B", [BucketLow + BRand])),
KNumber =
lists:flatten(io_lib:format("K~6..0B", [leveled_rand:uniform(1000)])),
LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o), LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o),
Chunk = leveled_rand:rand_bytes(64), Chunk = leveled_rand:rand_bytes(64),
{_B, _K, MV, _H, _LMs} = {_B, _K, MV, _H, _LMs} =
@ -2814,8 +2831,7 @@ test_binary_slot(FullBin, Key, Hash, ExpectedValue) ->
merge_test() -> merge_test() ->
merge_tester(fun testsst_new/6, fun testsst_new/8), merge_tester(fun testsst_new/6, fun testsst_new/8).
merge_tester(fun sst_new/6, fun sst_new/8).
merge_tester(NewFunS, NewFunM) -> merge_tester(NewFunS, NewFunM) ->
@ -2868,8 +2884,7 @@ merge_tester(NewFunS, NewFunM) ->
simple_persisted_range_test() -> simple_persisted_range_test() ->
simple_persisted_range_tester(fun testsst_new/6), simple_persisted_range_tester(fun testsst_new/6).
simple_persisted_range_tester(fun sst_new/6).
simple_persisted_range_tester(SSTNewFun) -> simple_persisted_range_tester(SSTNewFun) ->
{RP, Filename} = {"../test/", "simple_test"}, {RP, Filename} = {"../test/", "simple_test"},
@ -2911,8 +2926,7 @@ simple_persisted_range_tester(SSTNewFun) ->
simple_persisted_rangesegfilter_test() -> simple_persisted_rangesegfilter_test() ->
simple_persisted_rangesegfilter_tester(fun testsst_new/6), simple_persisted_rangesegfilter_tester(fun testsst_new/6).
simple_persisted_rangesegfilter_tester(fun sst_new/6).
simple_persisted_rangesegfilter_tester(SSTNewFun) -> simple_persisted_rangesegfilter_tester(SSTNewFun) ->
{RP, Filename} = {"../test/", "range_segfilter_test"}, {RP, Filename} = {"../test/", "range_segfilter_test"},
@ -3007,7 +3021,7 @@ additional_range_test() ->
lists:seq(?NOLOOK_SLOTSIZE + Gap + 1, lists:seq(?NOLOOK_SLOTSIZE + Gap + 1,
2 * ?NOLOOK_SLOTSIZE + Gap)), 2 * ?NOLOOK_SLOTSIZE + Gap)),
{ok, P1, {{Rem1, Rem2}, SK, EK}, _Bloom1} = {ok, P1, {{Rem1, Rem2}, SK, EK}, _Bloom1} =
sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native), testsst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native),
?assertMatch([], Rem1), ?assertMatch([], Rem1),
?assertMatch([], Rem2), ?assertMatch([], Rem2),
?assertMatch(SK, element(1, lists:nth(1, IK1))), ?assertMatch(SK, element(1, lists:nth(1, IK1))),
@ -3058,8 +3072,7 @@ additional_range_test() ->
simple_persisted_slotsize_test() -> simple_persisted_slotsize_test() ->
simple_persisted_slotsize_tester(fun testsst_new/6), simple_persisted_slotsize_tester(fun testsst_new/6).
simple_persisted_slotsize_tester(fun sst_new/6).
simple_persisted_slotsize_tester(SSTNewFun) -> simple_persisted_slotsize_tester(SSTNewFun) ->
@ -3082,8 +3095,7 @@ simple_persisted_test_() ->
{timeout, 60, fun simple_persisted_test_bothformats/0}. {timeout, 60, fun simple_persisted_test_bothformats/0}.
simple_persisted_test_bothformats() -> simple_persisted_test_bothformats() ->
simple_persisted_tester(fun testsst_new/6), simple_persisted_tester(fun testsst_new/6).
simple_persisted_tester(fun sst_new/6).
simple_persisted_tester(SSTNewFun) -> simple_persisted_tester(SSTNewFun) ->
{RP, Filename} = {"../test/", "simple_test"}, {RP, Filename} = {"../test/", "simple_test"},

View file

@ -581,8 +581,12 @@ generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
Acc; Acc;
generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
BRand = leveled_rand:uniform(BRange), BRand = leveled_rand:uniform(BRange),
BNumber = string:right(integer_to_list(BucketLow + BRand), 4, $0), BNumber =
KNumber = string:right(integer_to_list(leveled_rand:uniform(1000)), 4, $0), lists:flatten(
io_lib:format("K~4..0B", [BucketLow + BRand])),
KNumber =
lists:flatten(
io_lib:format("K~8..0B", [leveled_rand:uniform(1000)])),
{K, V} = {{o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, {K, V} = {{o, "Bucket" ++ BNumber, "Key" ++ KNumber, null},
{Seqn, {active, infinity}, null}}, {Seqn, {active, infinity}, null}},
generate_randomkeys(Seqn + 1, generate_randomkeys(Seqn + 1,

View file

@ -65,8 +65,6 @@ hash1(H, <<B:8/integer, Rest/bytes>>) ->
hash1(H2, Rest). hash1(H2, Rest).
%%%============================================================================ %%%============================================================================
%%% Test %%% Test
%%%============================================================================ %%%============================================================================
@ -88,4 +86,4 @@ magichashperf_test() ->
{TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]), {TimeMH2, _HL1} = timer:tc(lists, map, [fun(K) -> magic_hash(K) end, KL]),
io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]). io:format(user, "1000 keys magic hashed in ~w microseconds~n", [TimeMH2]).
-endif. -endif.

View file

@ -41,6 +41,7 @@ simple_put_fetch_head_delete(_Config) ->
simple_test_withlog(error, ["B0015", "B0016", "B0017", "B0018", simple_test_withlog(error, ["B0015", "B0016", "B0017", "B0018",
"P0032", "SST12", "CDB19", "SST13", "I0019"]). "P0032", "SST12", "CDB19", "SST13", "I0019"]).
simple_test_withlog(LogLevel, ForcedLogs) -> simple_test_withlog(LogLevel, ForcedLogs) ->
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, StartOpts1 = [{root_path, RootPath},
@ -59,6 +60,7 @@ simple_test_withlog(LogLevel, ForcedLogs) ->
{log_level, LogLevel}, {log_level, LogLevel},
{forced_logs, ForcedLogs}], {forced_logs, ForcedLogs}],
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2), {ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:check_forobject(Bookie2, TestObject), testutil:check_forobject(Bookie2, TestObject),
ObjList1 = testutil:generate_objects(5000, 2), ObjList1 = testutil:generate_objects(5000, 2),
testutil:riakload(Bookie2, ObjList1), testutil:riakload(Bookie2, ObjList1),
@ -106,14 +108,20 @@ many_put_fetch_head(_Config) ->
{sync_strategy, testutil:sync_strategy()}, {sync_strategy, testutil:sync_strategy()},
{compression_point, on_receipt}], {compression_point, on_receipt}],
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2), {ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
ok = leveled_bookie:book_loglevel(Bookie2, error),
ok = leveled_bookie:book_addlogs(Bookie2, ["B0015"]),
testutil:check_forobject(Bookie2, TestObject), testutil:check_forobject(Bookie2, TestObject),
GenList = [2, 20002, 40002, 60002, 80002, GenList = [2, 20002, 40002, 60002, 80002,
100002, 120002, 140002, 160002, 180002], 100002, 120002, 140002, 160002, 180002],
CLs = testutil:load_objects(20000, GenList, Bookie2, TestObject, CLs = testutil:load_objects(20000, GenList, Bookie2, TestObject,
fun testutil:generate_smallobjects/2), fun testutil:generate_smallobjects/2),
{error, ["B0015"]} = leveled_bookie:book_logsettings(Bookie2),
ok = leveled_bookie:book_removelogs(Bookie2, ["B0015"]),
CL1A = lists:nth(1, CLs), CL1A = lists:nth(1, CLs),
ChkListFixed = lists:nth(length(CLs), CLs), ChkListFixed = lists:nth(length(CLs), CLs),
testutil:check_forlist(Bookie2, CL1A), testutil:check_forlist(Bookie2, CL1A),
{error, []} = leveled_bookie:book_logsettings(Bookie2),
ok = leveled_bookie:book_loglevel(Bookie2, info),
ObjList2A = testutil:generate_objects(5000, 2), ObjList2A = testutil:generate_objects(5000, 2),
testutil:riakload(Bookie2, ObjList2A), testutil:riakload(Bookie2, ObjList2A),
ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000), ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000),
@ -197,8 +205,8 @@ journal_compaction_tester(Restart, WRP) ->
ObjListD = testutil:generate_objects(10000, 2), ObjListD = testutil:generate_objects(10000, 2),
lists:foreach(fun({_R, O, _S}) -> lists:foreach(fun({_R, O, _S}) ->
testutil:book_riakdelete(Bookie0, testutil:book_riakdelete(Bookie0,
O#r_object.bucket, testutil:get_bucket(O),
O#r_object.key, testutil:get_key(O),
[]) [])
end, end,
ObjListD), ObjListD),
@ -577,8 +585,8 @@ load_and_count_withdelete(_Config) ->
0, 0,
lists:seq(1, 20)), lists:seq(1, 20)),
testutil:check_forobject(Bookie1, TestObject), testutil:check_forobject(Bookie1, TestObject),
{BucketD, KeyD} = {TestObject#r_object.bucket, {BucketD, KeyD} =
TestObject#r_object.key}, {testutil:get_bucket(TestObject), testutil:get_key(TestObject)},
{_, 1} = testutil:check_bucket_stats(Bookie1, BucketD), {_, 1} = testutil:check_bucket_stats(Bookie1, BucketD),
ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []), ok = testutil:book_riakdelete(Bookie1, BucketD, KeyD, []),
not_found = testutil:book_riakget(Bookie1, BucketD, KeyD), not_found = testutil:book_riakget(Bookie1, BucketD, KeyD),

View file

@ -445,10 +445,12 @@ small_load_with2i(_Config) ->
%% Delete the objects from the ChkList removing the indexes %% Delete the objects from the ChkList removing the indexes
lists:foreach(fun({_RN, Obj, Spc}) -> lists:foreach(fun({_RN, Obj, Spc}) ->
DSpc = lists:map(fun({add, F, T}) -> {remove, F, T} DSpc = lists:map(fun({add, F, T}) ->
end, {remove, F, T}
end,
Spc), Spc),
{B, K} = {Obj#r_object.bucket, Obj#r_object.key}, {B, K} =
{testutil:get_bucket(Obj), testutil:get_key(Obj)},
testutil:book_riakdelete(Bookie1, B, K, DSpc) testutil:book_riakdelete(Bookie1, B, K, DSpc)
end, end,
ChkList1), ChkList1),

View file

@ -256,8 +256,8 @@ recovr_strategy(_Config) ->
{TestObject, TestSpec} = testutil:generate_testobject(), {TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Book1, TestObject, TestSpec), ok = testutil:book_riakput(Book1, TestObject, TestSpec),
ok = testutil:book_riakdelete(Book1, ok = testutil:book_riakdelete(Book1,
TestObject#r_object.bucket, testutil:get_bucket(TestObject),
TestObject#r_object.key, testutil:get_key(TestObject),
[]), []),
lists:foreach(fun({K, _SpcL}) -> lists:foreach(fun({K, _SpcL}) ->

View file

@ -639,14 +639,14 @@ test_segfilter_query(Bookie, CLs) ->
SegMapFun = SegMapFun =
fun({_RN, RiakObject, _Spc}) -> fun({_RN, RiakObject, _Spc}) ->
B = RiakObject#r_object.bucket, B = testutil:get_bucket(RiakObject),
K = RiakObject#r_object.key, K = testutil:get_key(RiakObject),
leveled_tictac:keyto_segment32(<<B/binary, K/binary>>) leveled_tictac:keyto_segment32(<<B/binary, K/binary>>)
end, end,
BKMapFun = BKMapFun =
fun({_RN, RiakObject, _Spc}) -> fun({_RN, RiakObject, _Spc}) ->
B = RiakObject#r_object.bucket, B = testutil:get_bucket(RiakObject),
K = RiakObject#r_object.key, K = testutil:get_key(RiakObject),
{B, K} {B, K}
end, end,

View file

@ -71,6 +71,18 @@
-define(EMPTY_VTAG_BIN, <<"e">>). -define(EMPTY_VTAG_BIN, <<"e">>).
-define(ROOT_PATH, "test"). -define(ROOT_PATH, "test").
-record(r_content, {
metadata,
value :: term()
}).
-record(r_object, {
bucket,
key,
contents :: [#r_content{}],
vclock,
updatemetadata=dict:store(clean, true, dict:new()),
updatevalue :: term()}).
riak_object(Bucket, Key, Value, MetaData) -> riak_object(Bucket, Key, Value, MetaData) ->
Content = #r_content{metadata=dict:from_list(MetaData), value=Value}, Content = #r_content{metadata=dict:from_list(MetaData), value=Value},