Improve test coverage
Make compress on receipt/compaction configurable
This commit is contained in:
parent
61b7be5039
commit
1d475235d1
7 changed files with 56 additions and 35 deletions
|
@ -49,6 +49,7 @@
|
|||
reload_strategy = [] :: list(),
|
||||
waste_retention_period :: integer() | undefined,
|
||||
compression_method :: lz4|native,
|
||||
compress_on_receipt :: boolean(),
|
||||
max_run_length}).
|
||||
|
||||
-record(penciller_options,
|
||||
|
|
|
@ -82,6 +82,7 @@
|
|||
-define(LONG_RUNNING, 80000).
|
||||
-define(RECENT_AAE, false).
|
||||
-define(COMPRESSION_METHOD, lz4).
|
||||
-define(COMPRESSION_POINT, on_receipt).
|
||||
|
||||
-record(ledger_cache, {mem :: ets:tab(),
|
||||
loader = leveled_tree:empty(?CACHE_TYPE)
|
||||
|
@ -923,12 +924,23 @@ set_options(Opts) ->
|
|||
% Must include lz4 library in rebar.config
|
||||
lz4
|
||||
end,
|
||||
CompressOnReceipt =
|
||||
case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of
|
||||
on_receipt ->
|
||||
% Note this will add measurable delay to PUT time
|
||||
% https://github.com/martinsumner/leveled/issues/95
|
||||
true;
|
||||
on_compact ->
|
||||
% If using lz4 this is not recommended
|
||||
false
|
||||
end,
|
||||
|
||||
{#inker_options{root_path = JournalFP,
|
||||
reload_strategy = ReloadStrategy,
|
||||
max_run_length = get_opt(max_run_length, Opts),
|
||||
waste_retention_period = WRP,
|
||||
compression_method = CompressionMethod,
|
||||
compress_on_receipt = CompressOnReceipt,
|
||||
cdb_options =
|
||||
#cdb_options{max_size=MaxJournalSize,
|
||||
binary_mode=true,
|
||||
|
|
|
@ -46,7 +46,8 @@
|
|||
to_ledgerkey/3,
|
||||
to_ledgerkey/5,
|
||||
from_ledgerkey/1,
|
||||
to_inkerkv/5,
|
||||
to_inkerkv/3,
|
||||
to_inkerkv/6,
|
||||
from_inkerkv/1,
|
||||
from_inkerkv/2,
|
||||
from_journalkey/1,
|
||||
|
@ -73,7 +74,6 @@
|
|||
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
|
||||
-define(NRT_IDX, "$aae.").
|
||||
-define(ALL_BUCKETS, <<"$all">>).
|
||||
-define(COMPRESS_ON_RECEIPT, true).
|
||||
|
||||
-type recent_aae() :: #recent_aae{}.
|
||||
-type riak_metadata() :: {binary()|delete, % Sibling Metadata
|
||||
|
@ -215,14 +215,13 @@ to_ledgerkey(Bucket, Key, Tag) ->
|
|||
%% Return the Key, Value and Hash Option for this object. The hash option
|
||||
%% indicates whether the key would ever be looked up directly, and so if it
|
||||
%% requires an entry in the hash table
|
||||
to_inkerkv(LedgerKey, SQN, to_fetch, null, _CompressionMethod) ->
|
||||
{{SQN, ?INKT_STND, LedgerKey}, null, true};
|
||||
to_inkerkv(LedgerKey, SQN, Object, KeyChanges, CompressionMethod) ->
|
||||
to_inkerkv(LedgerKey, SQN, to_fetch) ->
|
||||
{{SQN, ?INKT_STND, LedgerKey}, null, true}.
|
||||
|
||||
to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
|
||||
InkerType = check_forinkertype(LedgerKey, Object),
|
||||
Value =
|
||||
create_value_for_journal({Object, KeyChanges},
|
||||
?COMPRESS_ON_RECEIPT,
|
||||
CompressionMethod),
|
||||
create_value_for_journal({Object, KeyChanges}, Compress, PressMethod),
|
||||
{{SQN, InkerType, LedgerKey}, Value}.
|
||||
|
||||
%% Used when fetching objects, so only handles standard, hashable entries
|
||||
|
|
|
@ -757,7 +757,8 @@ test_ledgerkey(Key) ->
|
|||
{o, "Bucket", Key, null}.
|
||||
|
||||
test_inkerkv(SQN, Key, V, IdxSpecs) ->
|
||||
leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs, native).
|
||||
leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs,
|
||||
native, false).
|
||||
|
||||
fetch_testcdb(RP) ->
|
||||
FN1 = leveled_inker:filepath(RP, 1, new_journal),
|
||||
|
@ -940,7 +941,8 @@ compact_singlefile_totwosmallfiles_testto() ->
|
|||
LK = test_ledgerkey("Key" ++ integer_to_list(X)),
|
||||
Value = leveled_rand:rand_bytes(1024),
|
||||
{IK, IV} =
|
||||
leveled_codec:to_inkerkv(LK, X, Value, [], native),
|
||||
leveled_codec:to_inkerkv(LK, X, Value, [],
|
||||
native, true),
|
||||
ok = leveled_cdb:cdb_put(CDB1, IK, IV)
|
||||
end,
|
||||
lists:seq(1, 1000)),
|
||||
|
|
|
@ -137,6 +137,7 @@
|
|||
compaction_pending = false :: boolean(),
|
||||
is_snapshot = false :: boolean(),
|
||||
compression_method :: lz4|native,
|
||||
compress_on_receipt :: boolean(),
|
||||
source_inker :: pid() | undefined}).
|
||||
|
||||
|
||||
|
@ -510,12 +511,13 @@ start_from_file(InkOpts) ->
|
|||
ReloadStrategy = InkOpts#inker_options.reload_strategy,
|
||||
MRL = InkOpts#inker_options.max_run_length,
|
||||
WRP = InkOpts#inker_options.waste_retention_period,
|
||||
Compression = InkOpts#inker_options.compression_method,
|
||||
PressMethod = InkOpts#inker_options.compression_method,
|
||||
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
|
||||
IClerkOpts = #iclerk_options{inker = self(),
|
||||
cdb_options=IClerkCDBOpts,
|
||||
waste_retention_period = WRP,
|
||||
reload_strategy = ReloadStrategy,
|
||||
compression_method = Compression,
|
||||
compression_method = PressMethod,
|
||||
max_run_length = MRL},
|
||||
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
|
||||
|
||||
|
@ -531,7 +533,8 @@ start_from_file(InkOpts) ->
|
|||
active_journaldb = ActiveJournal,
|
||||
root_path = RootPath,
|
||||
cdb_options = CDBopts#cdb_options{waste_path=WasteFP},
|
||||
compression_method = Compression,
|
||||
compression_method = PressMethod,
|
||||
compress_on_receipt = PressOnReceipt,
|
||||
clerk = Clerk}}.
|
||||
|
||||
|
||||
|
@ -543,7 +546,8 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
|||
NewSQN,
|
||||
Object,
|
||||
KeyChanges,
|
||||
State#state.compression_method),
|
||||
State#state.compression_method,
|
||||
State#state.compress_on_receipt),
|
||||
case leveled_cdb:cdb_put(ActiveJournal,
|
||||
JournalKey,
|
||||
JournalBin) of
|
||||
|
@ -586,22 +590,14 @@ get_object(LedgerKey, SQN, Manifest) ->
|
|||
get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) ->
|
||||
JournalP = leveled_imanifest:find_entry(SQN, Manifest),
|
||||
{InkerKey, _V, true} =
|
||||
leveled_codec:to_inkerkv(LedgerKey,
|
||||
SQN,
|
||||
to_fetch,
|
||||
null,
|
||||
not_applicable),
|
||||
leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch),
|
||||
Obj = leveled_cdb:cdb_get(JournalP, InkerKey),
|
||||
leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges).
|
||||
|
||||
key_check(LedgerKey, SQN, Manifest) ->
|
||||
JournalP = leveled_imanifest:find_entry(SQN, Manifest),
|
||||
{InkerKey, _V, true} =
|
||||
leveled_codec:to_inkerkv(LedgerKey,
|
||||
SQN,
|
||||
to_fetch,
|
||||
null,
|
||||
not_applicable),
|
||||
leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch),
|
||||
leveled_cdb:cdb_keycheck(JournalP, InkerKey).
|
||||
|
||||
build_manifest(ManifestFilenames,
|
||||
|
@ -944,7 +940,8 @@ simple_inker_test() ->
|
|||
CDBopts = #cdb_options{max_size=300000, binary_mode=true},
|
||||
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts,
|
||||
compression_method=native}),
|
||||
compression_method=native,
|
||||
compress_on_receipt=true}),
|
||||
Obj1 = ink_get(Ink1, "Key1", 1),
|
||||
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1),
|
||||
Obj3 = ink_get(Ink1, "Key1", 3),
|
||||
|
@ -967,7 +964,8 @@ simple_inker_completeactivejournal_test() ->
|
|||
ok = file:rename(F1, F1r),
|
||||
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts,
|
||||
compression_method=native}),
|
||||
compression_method=native,
|
||||
compress_on_receipt=true}),
|
||||
Obj1 = ink_get(Ink1, "Key1", 1),
|
||||
?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1),
|
||||
Obj2 = ink_get(Ink1, "Key4", 4),
|
||||
|
@ -986,7 +984,8 @@ compact_journal_test() ->
|
|||
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts,
|
||||
reload_strategy=RStrategy,
|
||||
compression_method=native}),
|
||||
compression_method=native,
|
||||
compress_on_receipt=false}),
|
||||
{ok, NewSQN1, _ObjSize} = ink_put(Ink1,
|
||||
test_ledgerkey("KeyAA"),
|
||||
"TestValueAA",
|
||||
|
@ -1053,7 +1052,8 @@ empty_manifest_test() ->
|
|||
CDBopts = #cdb_options{max_size=300000},
|
||||
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts,
|
||||
compression_method=native}),
|
||||
compression_method=native,
|
||||
compress_on_receipt=true}),
|
||||
?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)),
|
||||
|
||||
CheckFun = fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
|
||||
|
@ -1074,7 +1074,8 @@ empty_manifest_test() ->
|
|||
|
||||
{ok, Ink2} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts,
|
||||
compression_method=native}),
|
||||
compression_method=native,
|
||||
compress_on_receipt=false}),
|
||||
?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)),
|
||||
{ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}),
|
||||
?assertMatch(2, SQN),
|
||||
|
|
|
@ -74,7 +74,8 @@ many_put_fetch_head(_Config) ->
|
|||
RootPath = testutil:reset_filestructure(),
|
||||
StartOpts1 = [{root_path, RootPath},
|
||||
{max_pencillercachesize, 16000},
|
||||
{sync_strategy, riak_sync}],
|
||||
{sync_strategy, riak_sync},
|
||||
{compression_point, on_compact}],
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
||||
|
@ -83,7 +84,8 @@ many_put_fetch_head(_Config) ->
|
|||
StartOpts2 = [{root_path, RootPath},
|
||||
{max_journalsize, 500000000},
|
||||
{max_pencillercachesize, 32000},
|
||||
{sync_strategy, testutil:sync_strategy()}],
|
||||
{sync_strategy, testutil:sync_strategy()},
|
||||
{compression_point, on_receipt}],
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
|
||||
testutil:check_forobject(Bookie2, TestObject),
|
||||
GenList = [2, 20002, 40002, 60002, 80002,
|
||||
|
|
|
@ -307,17 +307,20 @@ aae_bustedjournal(_Config) ->
|
|||
|
||||
journal_compaction_bustedjournal(_Config) ->
|
||||
% Different circumstances will be created in different runs
|
||||
busted_journal_test(10000000),
|
||||
busted_journal_test(7777777).
|
||||
busted_journal_test(10000000, native, on_receipt),
|
||||
busted_journal_test(7777777, native, on_compact),
|
||||
busted_journal_test(8888888, lz4, on_receipt).
|
||||
|
||||
|
||||
busted_journal_test(MaxJournalSize) ->
|
||||
busted_journal_test(MaxJournalSize, PressMethod, PressPoint) ->
|
||||
% Simply confirms that none of this causes a crash
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
StartOpts1 = [{root_path, RootPath},
|
||||
{max_journalsize, MaxJournalSize},
|
||||
{max_run_length, 10},
|
||||
{sync_strategy, testutil:sync_strategy()}],
|
||||
{sync_strategy, testutil:sync_strategy()},
|
||||
{compression_method, PressMethod},
|
||||
{compression_point, PressPoint}],
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
||||
|
@ -360,6 +363,7 @@ busted_journal_test(MaxJournalSize) ->
|
|||
testutil:reset_filestructure(10000).
|
||||
|
||||
|
||||
|
||||
rotating_object_check(BookOpts, B, NumberOfObjects) ->
|
||||
{ok, Book1} = leveled_bookie:book_start(BookOpts),
|
||||
{KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue