diff --git a/include/leveled.hrl b/include/leveled.hrl index aacb7eb..348467f 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -49,6 +49,7 @@ reload_strategy = [] :: list(), waste_retention_period :: integer() | undefined, compression_method :: lz4|native, + compress_on_receipt :: boolean(), max_run_length}). -record(penciller_options, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index af1dd2b..60ed4bf 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -82,6 +82,7 @@ -define(LONG_RUNNING, 80000). -define(RECENT_AAE, false). -define(COMPRESSION_METHOD, lz4). +-define(COMPRESSION_POINT, on_receipt). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -923,12 +924,23 @@ set_options(Opts) -> % Must include lz4 library in rebar.config lz4 end, + CompressOnReceipt = + case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of + on_receipt -> + % Note this will add measurable delay to PUT time + % https://github.com/martinsumner/leveled/issues/95 + true; + on_compact -> + % If using lz4 this is not recommended + false + end, {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, max_run_length = get_opt(max_run_length, Opts), waste_retention_period = WRP, compression_method = CompressionMethod, + compress_on_receipt = CompressOnReceipt, cdb_options = #cdb_options{max_size=MaxJournalSize, binary_mode=true, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 6232a02..401c709 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -46,7 +46,8 @@ to_ledgerkey/3, to_ledgerkey/5, from_ledgerkey/1, - to_inkerkv/5, + to_inkerkv/3, + to_inkerkv/6, from_inkerkv/1, from_inkerkv/2, from_journalkey/1, @@ -73,7 +74,6 @@ -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). -define(NRT_IDX, "$aae."). -define(ALL_BUCKETS, <<"$all">>). --define(COMPRESS_ON_RECEIPT, true). -type recent_aae() :: #recent_aae{}. -type riak_metadata() :: {binary()|delete, % Sibling Metadata @@ -215,14 +215,13 @@ to_ledgerkey(Bucket, Key, Tag) -> %% Return the Key, Value and Hash Option for this object. The hash option %% indicates whether the key would ever be looked up directly, and so if it %% requires an entry in the hash table -to_inkerkv(LedgerKey, SQN, to_fetch, null, _CompressionMethod) -> - {{SQN, ?INKT_STND, LedgerKey}, null, true}; -to_inkerkv(LedgerKey, SQN, Object, KeyChanges, CompressionMethod) -> +to_inkerkv(LedgerKey, SQN, to_fetch) -> + {{SQN, ?INKT_STND, LedgerKey}, null, true}. + +to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) -> InkerType = check_forinkertype(LedgerKey, Object), Value = - create_value_for_journal({Object, KeyChanges}, - ?COMPRESS_ON_RECEIPT, - CompressionMethod), + create_value_for_journal({Object, KeyChanges}, Compress, PressMethod), {{SQN, InkerType, LedgerKey}, Value}. %% Used when fetching objects, so only handles standard, hashable entries diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 81aa36b..9663328 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -757,7 +757,8 @@ test_ledgerkey(Key) -> {o, "Bucket", Key, null}. test_inkerkv(SQN, Key, V, IdxSpecs) -> - leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs, native). + leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs, + native, false). fetch_testcdb(RP) -> FN1 = leveled_inker:filepath(RP, 1, new_journal), @@ -940,7 +941,8 @@ compact_singlefile_totwosmallfiles_testto() -> LK = test_ledgerkey("Key" ++ integer_to_list(X)), Value = leveled_rand:rand_bytes(1024), {IK, IV} = - leveled_codec:to_inkerkv(LK, X, Value, [], native), + leveled_codec:to_inkerkv(LK, X, Value, [], + native, true), ok = leveled_cdb:cdb_put(CDB1, IK, IV) end, lists:seq(1, 1000)), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 85da448..9859aa2 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -137,6 +137,7 @@ compaction_pending = false :: boolean(), is_snapshot = false :: boolean(), compression_method :: lz4|native, + compress_on_receipt :: boolean(), source_inker :: pid() | undefined}). @@ -510,12 +511,13 @@ start_from_file(InkOpts) -> ReloadStrategy = InkOpts#inker_options.reload_strategy, MRL = InkOpts#inker_options.max_run_length, WRP = InkOpts#inker_options.waste_retention_period, - Compression = InkOpts#inker_options.compression_method, + PressMethod = InkOpts#inker_options.compression_method, + PressOnReceipt = InkOpts#inker_options.compress_on_receipt, IClerkOpts = #iclerk_options{inker = self(), cdb_options=IClerkCDBOpts, waste_retention_period = WRP, reload_strategy = ReloadStrategy, - compression_method = Compression, + compression_method = PressMethod, max_run_length = MRL}, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), @@ -531,7 +533,8 @@ start_from_file(InkOpts) -> active_journaldb = ActiveJournal, root_path = RootPath, cdb_options = CDBopts#cdb_options{waste_path=WasteFP}, - compression_method = Compression, + compression_method = PressMethod, + compress_on_receipt = PressOnReceipt, clerk = Clerk}}. @@ -543,7 +546,8 @@ put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN, Object, KeyChanges, - State#state.compression_method), + State#state.compression_method, + State#state.compress_on_receipt), case leveled_cdb:cdb_put(ActiveJournal, JournalKey, JournalBin) of @@ -586,22 +590,14 @@ get_object(LedgerKey, SQN, Manifest) -> get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) -> JournalP = leveled_imanifest:find_entry(SQN, Manifest), {InkerKey, _V, true} = - leveled_codec:to_inkerkv(LedgerKey, - SQN, - to_fetch, - null, - not_applicable), + leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch), Obj = leveled_cdb:cdb_get(JournalP, InkerKey), leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges). key_check(LedgerKey, SQN, Manifest) -> JournalP = leveled_imanifest:find_entry(SQN, Manifest), {InkerKey, _V, true} = - leveled_codec:to_inkerkv(LedgerKey, - SQN, - to_fetch, - null, - not_applicable), + leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch), leveled_cdb:cdb_keycheck(JournalP, InkerKey). build_manifest(ManifestFilenames, @@ -944,7 +940,8 @@ simple_inker_test() -> CDBopts = #cdb_options{max_size=300000, binary_mode=true}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts, - compression_method=native}), + compression_method=native, + compress_on_receipt=true}), Obj1 = ink_get(Ink1, "Key1", 1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), Obj3 = ink_get(Ink1, "Key1", 3), @@ -967,7 +964,8 @@ simple_inker_completeactivejournal_test() -> ok = file:rename(F1, F1r), {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts, - compression_method=native}), + compression_method=native, + compress_on_receipt=true}), Obj1 = ink_get(Ink1, "Key1", 1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), Obj2 = ink_get(Ink1, "Key4", 4), @@ -986,7 +984,8 @@ compact_journal_test() -> {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts, reload_strategy=RStrategy, - compression_method=native}), + compression_method=native, + compress_on_receipt=false}), {ok, NewSQN1, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyAA"), "TestValueAA", @@ -1053,7 +1052,8 @@ empty_manifest_test() -> CDBopts = #cdb_options{max_size=300000}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts, - compression_method=native}), + compression_method=native, + compress_on_receipt=true}), ?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)), CheckFun = fun(L, K, SQN) -> lists:member({SQN, K}, L) end, @@ -1074,7 +1074,8 @@ empty_manifest_test() -> {ok, Ink2} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts, - compression_method=native}), + compression_method=native, + compress_on_receipt=false}), ?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}), ?assertMatch(2, SQN), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 4e82cc8..ec59e54 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -74,7 +74,8 @@ many_put_fetch_head(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_pencillercachesize, 16000}, - {sync_strategy, riak_sync}], + {sync_strategy, riak_sync}, + {compression_point, on_compact}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -83,7 +84,8 @@ many_put_fetch_head(_Config) -> StartOpts2 = [{root_path, RootPath}, {max_journalsize, 500000000}, {max_pencillercachesize, 32000}, - {sync_strategy, testutil:sync_strategy()}], + {sync_strategy, testutil:sync_strategy()}, + {compression_point, on_receipt}], {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), GenList = [2, 20002, 40002, 60002, 80002, diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 8df2e21..55e4c2c 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -307,17 +307,20 @@ aae_bustedjournal(_Config) -> journal_compaction_bustedjournal(_Config) -> % Different circumstances will be created in different runs - busted_journal_test(10000000), - busted_journal_test(7777777). + busted_journal_test(10000000, native, on_receipt), + busted_journal_test(7777777, native, on_compact), + busted_journal_test(8888888, lz4, on_receipt). -busted_journal_test(MaxJournalSize) -> +busted_journal_test(MaxJournalSize, PressMethod, PressPoint) -> % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_journalsize, MaxJournalSize}, {max_run_length, 10}, - {sync_strategy, testutil:sync_strategy()}], + {sync_strategy, testutil:sync_strategy()}, + {compression_method, PressMethod}, + {compression_point, PressPoint}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -360,6 +363,7 @@ busted_journal_test(MaxJournalSize) -> testutil:reset_filestructure(10000). + rotating_object_check(BookOpts, B, NumberOfObjects) -> {ok, Book1} = leveled_bookie:book_start(BookOpts), {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),