Mas d31 i416 (#418)

* Add compression controls (#417)

* Add compression controls

Add configuration options to allow for a compression algorithm of `none` to disable compression altogether.  Also an option to change the point in the LSM tree when compression is applied.

* Handle configurable defaults consistently

Move them into leveled.hrl.  This forces double-definitions to be resolved.

There are some other constants in leveled_bookie that are relevant outside of leveled_bookie.  These are all now in the non-configurable startup defaults section.

* Clarify referred-to default is OTP not leveled

* Update leveled_bookie.erl

Handle xref issue with eunit include
This commit is contained in:
Martin Sumner 2023-11-07 14:58:43 +00:00 committed by GitHub
parent b96518c32a
commit 9e804924a8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 338 additions and 163 deletions

View file

@ -1,34 +1,79 @@
% File paths
%%%============================================================================
%%% File paths
%%%============================================================================
-define(JOURNAL_FP, "journal").
-define(LEDGER_FP, "ledger").
%%%============================================================================
%%%============================================================================
%%% Configurable startup defaults
%%%============================================================================
-define(CACHE_SIZE, 2500).
-define(MAX_CACHE_MULTTIPLE, 2).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
-define(MAX_PCL_CACHE_SIZE, 28000).
% This is less than actual max - but COIN_SIDECOUNT
-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000).
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-define(COMPRESSION_LEVEL, 1).
-define(LOG_LEVEL, info).
-define(DEFAULT_DBID, 65536).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
-define(SST_PAGECACHELEVEL_NOLOOKUP, 1).
-define(SST_PAGECACHELEVEL_LOOKUP, 4).
-define(DEFAULT_STATS_PERC, 10).
-define(DEFAULT_SYNC_STRATEGY, none).
%%%============================================================================
%%%============================================================================
%%% Non-configurable startup defaults
%%%============================================================================
-define(MAX_SSTSLOTS, 256).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
%% Tag to be used on standard Riak KV objects
-define(RIAK_TAG, o_rkv).
%% Tag to be used on K/V objects for non-Riak purposes
-define(STD_TAG, o).
%% Tag used for secondary index keys
-define(IDX_TAG, i).
%% Tag used for head-only objects
-define(HEAD_TAG, h).
%% Inker key type used for 'normal' objects
-define(INKT_STND, stnd).
%% Inker key type used for 'batch' objects
-define(INKT_MPUT, mput).
%% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy on Inker
%% compaction
-define(INKT_KEYD, keyd).
%% Inker key type used for tombstones
-define(INKT_TOMB, tomb).
-define(CACHE_TYPE, skpl).
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 1000000).
% An individual task taking > 1s gets a specific log
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(MAX_LEVELS, 8).
%% Should equal the length of the LEVEL_SCALEFACTOR
%% Should equal the length of the LEVEL_SCALEFACTOR
-define(CACHE_TYPE, skpl).
%%%============================================================================
%%%============================================================================
%%% Tags
%%%============================================================================
-define(RIAK_TAG, o_rkv).
%% Tag to be used on standard Riak KV objects
-define(STD_TAG, o).
%% Tag to be used on K/V objects for non-Riak purposes
-define(IDX_TAG, i).
%% Tag used for secondary index keys
-define(HEAD_TAG, h).
%% Tag used for head-only objects
-define(INKT_STND, stnd).
%% Inker key type used for 'normal' objects
-define(INKT_MPUT, mput).
%% Inker key type used for 'batch' objects
-define(INKT_KEYD, keyd).
%% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy
%% on Inker compaction
-define(INKT_TOMB, tomb).
%% Inker key type used for tombstones
%%%============================================================================
%%%============================================================================
%%% Shared records
%%%============================================================================
-record(level,
{level :: integer(),
is_basement = false :: boolean(),
@ -47,19 +92,26 @@
file_path :: string() | undefined,
waste_path :: string() | undefined,
binary_mode = false :: boolean(),
sync_strategy = sync,
% Default set by bookie to be `true`
% `false` set here due to legacy of unit tests
% using non-binary keys
sync_strategy = ?DEFAULT_SYNC_STRATEGY,
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).
-record(sst_options,
{press_method = native
{press_method = ?COMPRESSION_METHOD
:: leveled_sst:press_method(),
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
max_sstslots = 256 :: pos_integer(),
pagecache_level = 1 :: pos_integer(),
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
max_sstslots = ?MAX_SSTSLOTS :: pos_integer(),
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
:: pos_integer(),
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).
-record(inker_options,
{cdb_max_size :: integer() | undefined,
@ -70,14 +122,16 @@
source_inker :: pid() | undefined,
reload_strategy = [] :: list(),
waste_retention_period :: integer() | undefined,
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
compress_on_receipt = false :: boolean(),
max_run_length,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
score_onein = 1 :: pos_integer(),
snaptimeout_long :: pos_integer() | undefined,
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).
-record(penciller_options,
{root_path :: string() | undefined,
@ -89,19 +143,23 @@
bookies_mem :: tuple() | undefined,
source_penciller :: pid() | undefined,
snapshot_longrunning = true :: boolean(),
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
levelzero_cointoss = false :: boolean(),
snaptimeout_short :: pos_integer() | undefined,
snaptimeout_long :: pos_integer() | undefined,
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).
-record(iclerk_options,
{inker :: pid() | undefined,
max_run_length :: integer() | undefined,
cdb_options = #cdb_options{} :: #cdb_options{},
waste_retention_period :: integer() | undefined,
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
score_onein = 1 :: pos_integer(),
reload_strategy = [] :: list()}).
%%%============================================================================

View file

@ -46,11 +46,10 @@
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "leveled.compression_method", "leveled.compression_method", [
{default, native},
{datatype, atom}
{datatype, {enum, [native, lz4, none]}},
{default, native}
]}.
%% @doc Compression point
%% The point at which compression is applied to the Journal (the Ledger is
%% always compressed). Use on_receipt or on_compact. on_compact is suitable
@ -61,6 +60,16 @@
{datatype, atom}
]}.
%% @doc Compresstion Level (Ledger LSM)
%% Specify the level in the LSM tree from which compression should occur.
%% Defaults to L1, so only L0 writes are uncompressed.
{mapping, "leveled.compression_level", "leveled.compression_level", [
{default, 1},
{datatype, integer},
{validators, ["range:0-7"]},
hidden
]}.
%% @doc Log level
%% Can be debug, info, warn, error or critical
%% Set the minimum log level to be used within leveled. Leveled will log many

View file

@ -39,8 +39,8 @@
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "multi_backend.$name.leveled.compression_method", "riak_kv.multi_backend", [
{default, lz4},
{datatype, atom},
{datatype, {enum, [native, lz4, none]}},
{default, native}
hidden
]}.
@ -55,6 +55,16 @@
hidden
]}.
%% @doc Compresstion Level (Ledger LSM)
%% Specify the level in the LSM tree from which compression should occur.
%% Defaults to L1, so only L0 writes are uncompressed.
{mapping, "multi_backend.$name.leveled.compression_level", "riak_kv.multi_backend", [
{default, 1},
{datatype, integer},
{validators, ["range:0-7"]},
hidden
]}.
%% @doc The approximate size (in bytes) when a Journal file should be rolled.
%% Normally keep this as around the size of o(100K) objects. Default is 1GB.

View file

@ -102,34 +102,8 @@
-export([book_returnactors/1]).
-endif.
-define(LOADING_PAUSE, 1000).
-define(CACHE_SIZE, 2500).
-define(MAX_CACHE_MULTTIPLE, 2).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
-define(MAX_PCL_CACHE_SIZE, 28000).
% This is less than actual max - but COIN_SIDECOUNT
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000).
-define(LONG_RUNNING, 1000000).
% An individual task taking > 1s gets a specific log
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-define(LOG_LEVEL, info).
-define(TIMING_SAMPLESIZE, 100).
-define(DEFAULT_DBID, 65536).
-define(TIMING_SAMPLECOUNTDOWN, 50000).
-define(DUMMY, dummy). % Dummy key used for mput operations
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
-define(SST_PAGECACHELEVEL_NOLOOKUP, 1).
-define(SST_PAGECACHELEVEL_LOOKUP, 4).
-define(CACHE_LOGPOINT, 50000).
-define(DEFAULT_STATS_PERC, 10).
-define(OPTION_DEFAULTS,
[{root_path, undefined},
{snapshot_bookie, undefined},
@ -138,7 +112,7 @@
{max_journalsize, 1000000000},
{max_journalobjectcount, 200000},
{max_sstslots, 256},
{sync_strategy, none},
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
@ -150,6 +124,7 @@
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
{compression_method, ?COMPRESSION_METHOD},
{compression_point, ?COMPRESSION_POINT},
{compression_level, ?COMPRESSION_LEVEL},
{log_level, ?LOG_LEVEL},
{forced_logs, []},
{database_id, ?DEFAULT_DBID},
@ -314,10 +289,12 @@
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4} |
{compression_method, native|lz4|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4).
% (lz4). To disable compression use none. This will disable in
% the ledger as well as the journla (both on_receipt and
% on_compact).
% Defaults to ?COMPRESSION_METHOD
{compression_point, on_compact|on_receipt} |
% The =compression point can be changed between on_receipt (all
@ -325,6 +302,10 @@
% values are originally stored uncompressed (speeding PUT times),
% and are only compressed when they are first subject to compaction
% Defaults to ?COMPRESSION_POINT
{compression_level, 0..7} |
% At what level of the LSM tree in the ledger should compression be
% enabled.
% Defaults to ?COMPRESSION_LEVEL
{log_level, debug|info|warn|error|critical} |
% Set the log level. The default log_level of info is noisy - the
% current implementation was targetted at environments that have
@ -1805,6 +1786,7 @@ set_options(Opts, Monitor) ->
% If using lz4 this is not recommended
false
end,
CompressionLevel = proplists:get_value(compression_level, Opts),
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
@ -1837,6 +1819,7 @@ set_options(Opts, Monitor) ->
sst_options =
#sst_options{
press_method = CompressionMethod,
press_level = CompressionLevel,
log_options = leveled_log:get_opts(),
max_sstslots = MaxSSTSlots,
monitor = Monitor},

View file

@ -106,7 +106,7 @@
-type object_spec() ::
object_spec_v0()|object_spec_v1().
-type compression_method() ::
lz4|native.
lz4|native|none.
-type index_specs() ::
list({add|remove, any(), any()}).
-type journal_keychanges() ::
@ -508,7 +508,9 @@ serialise_object(Object, true, Method) when is_binary(Object) ->
{ok, Bin} = lz4:pack(Object),
Bin;
native ->
zlib:compress(Object)
zlib:compress(Object);
none ->
Object
end;
serialise_object(Object, false, _Method) ->
term_to_binary(Object);
@ -554,7 +556,8 @@ encode_valuetype(IsBinary, IsCompressed, Method) ->
Bit3 =
case Method of
lz4 -> 4;
native -> 0
native -> 0;
none -> 0
end,
Bit2 =
case IsBinary of
@ -562,7 +565,7 @@ encode_valuetype(IsBinary, IsCompressed, Method) ->
false -> 0
end,
Bit1 =
case IsCompressed of
case IsCompressed and (Method =/= none) of
true -> 1;
false -> 0
end,

View file

@ -113,7 +113,7 @@
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(),
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
compression_method = native :: lz4|native,
compression_method = native :: lz4|native|none,
scored_files = [] :: list(candidate()),
scoring_state :: scoring_state()|undefined,
score_onein = 1 :: pos_integer()}).

View file

@ -151,7 +151,7 @@
compaction_pending = false :: boolean(),
bookie_monref :: reference() | undefined,
is_snapshot = false :: boolean(),
compression_method = native :: lz4|native,
compression_method = native :: lz4|native|none,
compress_on_receipt = false :: boolean(),
snap_timeout :: pos_integer() | undefined, % in seconds
source_inker :: pid() | undefined}).

View file

@ -221,7 +221,6 @@
-define(ITERATOR_SCANWIDTH, 4).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(TIMING_SAMPLESIZE, 100).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots

View file

@ -68,8 +68,12 @@
-define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE
-define(NOLOOK_SLOTSIZE, 256).
-define(NOLOOK_BLOCKSIZE, {56, 32}). % 4x + y = ?NOLOOK_SLOTSIZE
-define(COMPRESSION_LEVEL, 1).
-define(BINARY_SETTINGS, [{compressed, ?COMPRESSION_LEVEL}]).
-define(COMPRESSION_FACTOR, 1).
% When using native compression - how hard should the compression code
% try to reduce the size of the compressed output. 1 Is to imply minimal
% effort, 6 is default in OTP:
% https://www.erlang.org/doc/man/erlang.html#term_to_binary-2
-define(BINARY_SETTINGS, [{compressed, ?COMPRESSION_FACTOR}]).
-define(MERGE_SCANWIDTH, 16).
-define(DISCARD_EXT, ".discarded").
-define(DELETE_TIMEOUT, 10000).
@ -80,7 +84,6 @@
-define(BLOCK_LENGTHS_LENGTH, 20).
-define(LMD_LENGTH, 4).
-define(FLIPPER32, 4294967295).
-define(COMPRESS_AT_LEVEL, 1).
-define(DOUBLESIZE_LEVEL, 3).
-define(INDEX_MODDATE, true).
-define(TOMB_COUNT, true).
@ -270,12 +273,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST) ->
sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
{ok, Pid} = gen_statem:start_link(?MODULE, [], ?START_OPTS),
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
MaxSlots0 = maxslots_level(Level, OptsSST#sst_options.max_sstslots),
OptsSST0 =
OptsSST#sst_options{
press_method = PressMethod0, max_sstslots = MaxSlots0},
OptsSST0 = update_options(OptsSST, Level),
{[], [], SlotList, FK, _CountOfTombs} =
merge_lists(KVList, OptsSST0, IndexModDate),
case gen_statem:call(Pid, {sst_new,
@ -325,14 +323,15 @@ sst_newmerge(RootPath, Filename,
sst_newmerge(RootPath, Filename,
KVL1, KVL2, IsBasement, Level,
MaxSQN, OptsSST, IndexModDate, TombCount) ->
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
MaxSlots0 = maxslots_level(Level, OptsSST#sst_options.max_sstslots),
OptsSST0 =
OptsSST#sst_options{press_method = PressMethod0,
max_sstslots = MaxSlots0},
OptsSST0 = update_options(OptsSST, Level),
{Rem1, Rem2, SlotList, FK, CountOfTombs} =
merge_lists(KVL1, KVL2, {IsBasement, Level}, OptsSST0,
IndexModDate, TombCount),
merge_lists(
KVL1,
KVL2,
{IsBasement, Level},
OptsSST0,
IndexModDate,
TombCount),
case SlotList of
[] ->
empty;
@ -371,11 +370,7 @@ sst_newmerge(RootPath, Filename,
sst_newlevelzero(RootPath, Filename,
Slots, Fetcher, Penciller,
MaxSQN, OptsSST) ->
PressMethod0 = compress_level(0, OptsSST#sst_options.press_method),
MaxSlots0 = maxslots_level(0, OptsSST#sst_options.max_sstslots),
OptsSST0 =
OptsSST#sst_options{press_method = PressMethod0,
max_sstslots = MaxSlots0},
OptsSST0 = update_options(OptsSST, 0),
{ok, Pid} = gen_statem:start_link(?MODULE, [], ?START_OPTS),
%% Initiate the file into the "starting" state
ok = gen_statem:call(Pid, {sst_newlevelzero,
@ -1253,6 +1248,16 @@ tune_seglist(SegList) ->
%%% Internal Functions
%%%============================================================================
-spec update_options(sst_options(), non_neg_integer()) -> sst_options().
update_options(OptsSST, Level) ->
CompressLevel = OptsSST#sst_options.press_level,
PressMethod0 =
compress_level(Level, CompressLevel, OptsSST#sst_options.press_method),
MaxSlots0 =
maxslots_level(Level, OptsSST#sst_options.max_sstslots),
OptsSST#sst_options{press_method = PressMethod0, max_sstslots = MaxSlots0}.
-spec new_blockindex_cache(pos_integer()) -> blockindex_cache().
new_blockindex_cache(Size) ->
{0, array:new([{size, Size}, {default, none}]), 0}.
@ -1494,12 +1499,14 @@ fetch_range(StartKey, EndKey, ScanWidth, SegList, LowLastMod, State) ->
State#state.index_moddate),
{NeededBlockIdx, SlotsToFetchBinList, SlotsToPoint}.
-spec compress_level(integer(), press_method()) -> press_method().
-spec compress_level(
non_neg_integer(), non_neg_integer(), press_method()) -> press_method().
%% @doc
%% disable compression at higher levels for improved performance
compress_level(Level, _PressMethod) when Level < ?COMPRESS_AT_LEVEL ->
compress_level(
Level, LevelToCompress, _PressMethod) when Level < LevelToCompress ->
none;
compress_level(_Level, PressMethod) ->
compress_level(_Level, _LevelToCompress, PressMethod) ->
PressMethod.
-spec maxslots_level(level(), pos_integer()) -> pos_integer().

View file

@ -202,12 +202,17 @@ bigsst_littlesst(_Config) ->
ObjL1 =
lists:keysort(
1,
testutil:generate_objects(80000, 1, [],
leveled_rand:rand_bytes(100),
fun() -> [] end, <<"B">>)
testutil:generate_objects(
100000,
1,
[],
leveled_rand:rand_bytes(100),
fun() -> [] end,
<<"B">>)
),
testutil:riakload(Bookie1, ObjL1),
testutil:check_forlist(Bookie1, ObjL1),
timer:sleep(10000), % Wait for delete timeout
JFP = RootPath ++ "/ledger/ledger_files/",
{ok, FNS1} = file:list_dir(JFP),
ok = leveled_bookie:book_destroy(Bookie1),
@ -216,6 +221,7 @@ bigsst_littlesst(_Config) ->
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:riakload(Bookie2, ObjL1),
testutil:check_forlist(Bookie2, ObjL1),
timer:sleep(10000), % Wait for delete timeout
{ok, FNS2} = file:list_dir(JFP),
ok = leveled_bookie:book_destroy(Bookie2),
io:format("Big SST ~w files Little SST ~w files~n",
@ -1007,46 +1013,137 @@ many_put_fetch_switchcompression(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_pencillercachesize, 16000},
{sync_strategy, riak_sync},
{max_journalobjectcount, 30000},
{compression_level, 3},
{sync_strategy, testutil:sync_strategy()},
{compression_method, native}],
StartOpts2 = [{root_path, RootPath},
{max_pencillercachesize, 24000},
{max_journalobjectcount, 30000},
{sync_strategy, testutil:sync_strategy()},
{compression_method, lz4}],
StartOpts3 = [{root_path, RootPath},
{max_pencillercachesize, 16000},
{max_journalobjectcount, 30000},
{sync_strategy, testutil:sync_strategy()},
{compression_method, none}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = testutil:generate_testobject(),
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
testutil:check_forobject(Bookie1, TestObject),
ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = [{root_path, RootPath},
{max_journalsize, 500000000},
{max_pencillercachesize, 32000},
{sync_strategy, testutil:sync_strategy()},
{compression_method, lz4}],
CL1s =
testutil:load_objects(
40000,
[2, 40002],
Bookie1,
TestObject,
fun testutil:generate_smallobjects/2),
%% Change compression method
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie1, CL) end, CL1s),
ok = leveled_bookie:book_close(Bookie1),
%% Change compression method -> lz4
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:check_forobject(Bookie2, TestObject),
GenList = [2, 40002, 80002, 120002],
CLs = testutil:load_objects(40000, GenList, Bookie2, TestObject,
fun testutil:generate_smallobjects/2),
CL1A = lists:nth(1, CLs),
ChkListFixed = lists:nth(length(CLs), CLs),
testutil:check_forlist(Bookie2, CL1A),
ObjList2A = testutil:generate_objects(5000, 2),
testutil:riakload(Bookie2, ObjList2A),
ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000),
testutil:check_forlist(Bookie2, ChkList2A),
testutil:check_forlist(Bookie2, ChkListFixed),
testutil:check_forobject(Bookie2, TestObject),
testutil:check_forlist(Bookie2, ChkList2A),
testutil:check_forlist(Bookie2, ChkListFixed),
testutil:check_forobject(Bookie2, TestObject),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie2, CL) end, CL1s),
CL2s =
testutil:load_objects(
40000,
[80002, 120002],
Bookie2,
TestObject,
fun testutil:generate_smallobjects/2),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie2, CL) end, CL2s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie2, CL) end, CL1s),
ok = leveled_bookie:book_close(Bookie2),
%% Change method back again
{ok, Bookie3} = leveled_bookie:book_start(StartOpts1),
testutil:check_forlist(Bookie3, ChkList2A),
testutil:check_forlist(Bookie3, ChkListFixed),
testutil:check_forobject(Bookie3, TestObject),
testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"),
ok = leveled_bookie:book_destroy(Bookie3).
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie3, CL) end, CL2s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie3, CL) end, CL1s),
CL3s =
testutil:load_objects(
40000,
[160002, 200002],
Bookie3,
TestObject,
fun testutil:generate_smallobjects/2,
30000
),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie3, CL) end, CL3s),
ok = leveled_bookie:book_close(Bookie3),
% Change method to no compression
{ok, Bookie4} = leveled_bookie:book_start(StartOpts3),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL2s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL1s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL3s),
CL4s =
testutil:load_objects(
40000,
[240002, 280002],
Bookie4,
TestObject,
fun testutil:generate_smallobjects/2
),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL3s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL4s),
testutil:delete_some_objects(Bookie4, lists:flatten(CL3s), 60000),
CL5s =
testutil:load_objects(
40000,
[320002, 360002],
Bookie4,
TestObject,
fun testutil:generate_smallobjects/2
),
ok = leveled_bookie:book_compactjournal(Bookie4, 30000),
testutil:wait_for_compaction(Bookie4),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL4s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL5s),
ok = leveled_bookie:book_close(Bookie4),
%% Change compression method -> lz4
{ok, Bookie5} = leveled_bookie:book_start(StartOpts2),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie5, CL) end, CL1s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie5, CL) end, CL4s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie5, CL) end, CL5s),
ok = leveled_bookie:book_close(Bookie5),
%% Change compression method -> native
{ok, Bookie6} = leveled_bookie:book_start(StartOpts1),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie6, CL) end, CL1s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie6, CL) end, CL4s),
lists:foreach(
fun(CL) -> ok = testutil:check_forlist(Bookie6, CL) end, CL5s),
ok = leveled_bookie:book_destroy(Bookie6).
safereaderror_startup(_Config) ->

View file

@ -28,19 +28,21 @@ all() -> [
expiring_indexes(_Config) ->
% Add objects to ths tore with index entries, where the objects (and hence
% Add objects to the store with index entries, where the objects (and hence
% the indexes have an expiry time. Confirm that the indexes and the
% objects are no longer present after the expiry time (and are present
% before). Confirm that replacing an object has the expected outcome, if
% the IndexSpecs are updated as part of the request.
KeyCount = 50000,
Future = 120,
% 2 minutes - if running tests on a slow machine, may need to increase
Future = 60,
% 1 minute - if running tests on a slow machine, may need to increase
% this value
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 100000000},
{sync_strategy, testutil:sync_strategy()}],
StartOpts1 =
[{root_path, RootPath},
{max_pencillercachesize, 16000},
{max_journalobjectcount, 30000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
SW1 = os:timestamp(),
@ -48,12 +50,11 @@ expiring_indexes(_Config) ->
LoadTime = timer:now_diff(os:timestamp(), SW1)/1000000,
io:format("Load of ~w std objects in ~w seconds~n", [KeyCount, LoadTime]),
FilterFun = fun({I, _B, _K}) -> lists:member(I, [5, 6, 7, 8]) end,
LoadedEntriesInRange = lists:sort(lists:filter(FilterFun, IBKL1)),
true = LoadTime < (Future - 30),
% need 30 seconds spare to run query
true = LoadTime < (Future - 20),
% need 20 seconds spare to run query
% and add replacements
{I0, B0, K0} = hd(IBKL1),
@ -62,12 +63,12 @@ expiring_indexes(_Config) ->
CountI0Fold =
fun() ->
leveled_bookie:book_indexfold(Bookie1,
B0,
{fun(_BF, _KT, Acc) -> Acc + 1 end,
0},
{<<"temp_int">>, I0, I0},
{true, undefined})
leveled_bookie:book_indexfold(
Bookie1,
B0,
{fun(_BF, _KT, Acc) -> Acc + 1 end, 0},
{<<"temp_int">>, I0, I0},
{true, undefined})
end,
{async, I0Counter1} = CountI0Fold(),
I0Count1 = I0Counter1(),
@ -76,29 +77,30 @@ expiring_indexes(_Config) ->
InitAcc = [],
IndexFold =
fun() ->
leveled_bookie:book_indexfold(Bookie1,
B0,
{FoldFun, InitAcc},
{<<"temp_int">>, 5, 8},
{true, undefined})
leveled_bookie:book_indexfold(
Bookie1,
B0,
{FoldFun, InitAcc},
{<<"temp_int">>, 5, 8},
{true, undefined})
end,
{async, Folder1} = IndexFold(),
QR1 = Folder1(),
true = lists:sort(QR1) == LoadedEntriesInRange,
% Replace object with one with an index value of 6
testutil:stdload_object(Bookie1, B0, K0, 6, <<"value">>,
leveled_util:integer_now() + 600),
testutil:stdload_object(
Bookie1, B0, K0, 6, <<"value">>, leveled_util:integer_now() + 600),
% Confirm that this has reduced the index entries in I0 by 1
{async, I0Counter2} = CountI0Fold(),
I0Count2 = I0Counter2(),
io:format("Count with index value ~w changed from ~w to ~w~n",
[I0, I0Count1, I0Count2]),
true = I0Count2 == (I0Count1 - 1),
% Now replace again, shortening the timeout to 15s,
% Now replace again, shortening the timeout to 10s,
% this time index value of 6
testutil:stdload_object(Bookie1, B0, K0, 5, <<"value">>,
leveled_util:integer_now() + 15),
testutil:stdload_object(
Bookie1, B0, K0, 5, <<"value">>, leveled_util:integer_now() + 10),
{async, Folder2} = IndexFold(),
leveled_bookie:book_indexfold(Bookie1,
B0,
@ -108,8 +110,8 @@ expiring_indexes(_Config) ->
QR2 = Folder2(),
io:format("Query with additional entry length ~w~n", [length(QR2)]),
true = lists:sort(QR2) == lists:sort([{5, B0, K0}|LoadedEntriesInRange]),
% Wait for a 15s timeout + lus a second to be sure
timer:sleep(15000 + 1000),
% Wait for a 10s timeout plus a second to be sure
timer:sleep(10000 + 1000),
{async, Folder3} = IndexFold(),
QR3 = Folder3(),
% Now the entry should be missing (and the 600s TTL entry should not have
@ -118,16 +120,23 @@ expiring_indexes(_Config) ->
true = lists:sort(QR3) == LoadedEntriesInRange,
FoldTime = timer:now_diff(os:timestamp(), SW1)/1000000 - LoadTime,
io:format("Query returned ~w entries in ~w seconds - 3 queries + 15s wait~n",
io:format("Query returned ~w entries in ~w seconds - 3 queries + 10s wait~n",
[length(QR1), FoldTime]),
true = (LoadTime + FoldTime) < Future,
SleepTime = round((Future - (LoadTime + FoldTime)) * 1000 + 1000), % add a second
SleepTime = round((Future - (LoadTime + FoldTime)) * 1000),
io:format("Sleeping ~w s for all to expire~n", [SleepTime/1000]),
timer:sleep(SleepTime),
timer:sleep(SleepTime + 1000), % add a second
% Index entries should now have expired
{async, Folder4} = IndexFold(),
QR4 = Folder4(),
io:format("Unexpired indexes of length ~w~n", [length(QR4)]),
lists:foreach(
fun(I) ->
io:format("Unexpired index ~p~n", [I])
end,
QR4
),
true = QR4 == [],
ok = leveled_bookie:book_close(Bookie1),