From 61b7be5039d0f225ff41dc1ed5e09a62d3d989e9 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 6 Nov 2017 15:54:58 +0000 Subject: [PATCH] Make compression algorithm an option Compression can be switched between LZ4 and zlib (native). The setting to determine if compression should happen on receipt is now a macro definition in leveled_codec. --- include/leveled.hrl | 3 + src/leveled_bookie.erl | 26 ++- src/leveled_codec.erl | 79 ++++--- src/leveled_iclerk.erl | 15 +- src/leveled_inker.erl | 51 ++-- src/leveled_pclerk.erl | 66 ++++-- src/leveled_penciller.erl | 49 ++-- src/leveled_sst.erl | 397 +++++++++++++++++++------------- test/end_to_end/basic_SUITE.erl | 52 ++++- 9 files changed, 474 insertions(+), 264 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index e3ed9c7..aacb7eb 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -48,6 +48,7 @@ source_inker :: pid() | undefined, reload_strategy = [] :: list(), waste_retention_period :: integer() | undefined, + compression_method :: lz4|native, max_run_length}). -record(penciller_options, @@ -58,6 +59,7 @@ bookies_mem :: tuple() | undefined, source_penciller :: pid() | undefined, snapshot_longrunning = true :: boolean(), + compression_method :: lz4|native, levelzero_cointoss = false :: boolean()}). -record(iclerk_options, @@ -65,6 +67,7 @@ max_run_length :: integer() | undefined, cdb_options = #cdb_options{} :: #cdb_options{}, waste_retention_period :: integer() | undefined, + compression_method :: lz4|native, reload_strategy = [] :: list()}). -record(recent_aae, {filter :: whitelist|blacklist, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 42d5e0f..af1dd2b 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -81,6 +81,7 @@ -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 80000). -define(RECENT_AAE, false). +-define(COMPRESSION_METHOD, lz4). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -387,7 +388,8 @@ init([Opts]) -> unit_minutes = UnitMinutes} end, - {Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE), + {Inker, Penciller} = + startup(InkerOpts, PencillerOpts, RecentAAE), NewETS = ets:new(mem, [ordered_set]), leveled_log:log("B0001", [Inker, Penciller]), @@ -911,16 +913,30 @@ set_options(Opts) -> ok = filelib:ensure_dir(JournalFP), ok = filelib:ensure_dir(LedgerFP), + CompressionMethod = + case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of + native -> + % Note native compression will have reduced performance + % https://github.com/martinsumner/leveled/issues/95 + native; + lz4 -> + % Must include lz4 library in rebar.config + lz4 + end, + {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, max_run_length = get_opt(max_run_length, Opts), waste_retention_period = WRP, - cdb_options = #cdb_options{max_size=MaxJournalSize, - binary_mode=true, - sync_strategy=SyncStrat}}, + compression_method = CompressionMethod, + cdb_options = + #cdb_options{max_size=MaxJournalSize, + binary_mode=true, + sync_strategy=SyncStrat}}, #penciller_options{root_path = LedgerFP, max_inmemory_tablesize = PCLL0CacheSize, - levelzero_cointoss = true}}. + levelzero_cointoss = true, + compression_method = CompressionMethod}}. startup(InkerOpts, PencillerOpts, RecentAAE) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 64755d7..6232a02 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -46,7 +46,7 @@ to_ledgerkey/3, to_ledgerkey/5, from_ledgerkey/1, - to_inkerkv/4, + to_inkerkv/5, from_inkerkv/1, from_inkerkv/2, from_journalkey/1, @@ -54,7 +54,7 @@ split_inkvalue/1, check_forinkertype/2, maybe_compress/1, - create_value_for_journal/2, + create_value_for_journal/3, build_metadata_object/2, generate_ledgerkv/5, get_size/2, @@ -73,6 +73,7 @@ -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). -define(NRT_IDX, "$aae."). -define(ALL_BUCKETS, <<"$all">>). +-define(COMPRESS_ON_RECEIPT, true). -type recent_aae() :: #recent_aae{}. -type riak_metadata() :: {binary()|delete, % Sibling Metadata @@ -214,11 +215,14 @@ to_ledgerkey(Bucket, Key, Tag) -> %% Return the Key, Value and Hash Option for this object. The hash option %% indicates whether the key would ever be looked up directly, and so if it %% requires an entry in the hash table -to_inkerkv(LedgerKey, SQN, to_fetch, null) -> +to_inkerkv(LedgerKey, SQN, to_fetch, null, _CompressionMethod) -> {{SQN, ?INKT_STND, LedgerKey}, null, true}; -to_inkerkv(LedgerKey, SQN, Object, KeyChanges) -> +to_inkerkv(LedgerKey, SQN, Object, KeyChanges, CompressionMethod) -> InkerType = check_forinkertype(LedgerKey, Object), - Value = create_value_for_journal({Object, KeyChanges}, true), + Value = + create_value_for_journal({Object, KeyChanges}, + ?COMPRESS_ON_RECEIPT, + CompressionMethod), {{SQN, InkerType, LedgerKey}, Value}. %% Used when fetching objects, so only handles standard, hashable entries @@ -235,46 +239,55 @@ from_inkerkv(Object, ToIgnoreKeyChanges) -> Object end. -create_value_for_journal({Object, KeyChanges}, Compress) +create_value_for_journal({Object, KeyChanges}, Compress, Method) when not is_binary(KeyChanges) -> KeyChangeBin = term_to_binary(KeyChanges, [compressed]), - create_value_for_journal({Object, KeyChangeBin}, Compress); -create_value_for_journal({Object, KeyChangeBin}, Compress) -> + create_value_for_journal({Object, KeyChangeBin}, Compress, Method); +create_value_for_journal({Object, KeyChangeBin}, Compress, Method) -> KeyChangeBinLen = byte_size(KeyChangeBin), - ObjectBin = serialise_object(Object, Compress), - TypeCode = encode_valuetype(is_binary(Object), Compress), + ObjectBin = serialise_object(Object, Compress, Method), + TypeCode = encode_valuetype(is_binary(Object), Compress, Method), <>. maybe_compress({null, KeyChanges}) -> - create_value_for_journal({null, KeyChanges}, false); + create_value_for_journal({null, KeyChanges}, false, native); maybe_compress(JournalBin) -> Length0 = byte_size(JournalBin) - 5, <> = JournalBin, - {IsBinary, IsCompressed} = decode_valuetype(Type), + {IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type), case IsCompressed of true -> JournalBin; false -> Length1 = Length0 - KeyChangeLength, <> = JBin0, - V0 = {deserialise_object(OBin2, IsBinary, IsCompressed), + V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), binary_to_term(KCBin2)}, - create_value_for_journal(V0, true) + PressMethod = case IsLz4 of + true -> lz4; + false -> native + end, + create_value_for_journal(V0, true, PressMethod) end. -serialise_object(Object, false) when is_binary(Object) -> +serialise_object(Object, false, _Method) when is_binary(Object) -> Object; -serialise_object(Object, true) when is_binary(Object) -> - {ok, Bin} = lz4:pack(Object), - Bin; -serialise_object(Object, false) -> +serialise_object(Object, true, Method) when is_binary(Object) -> + case Method of + lz4 -> + {ok, Bin} = lz4:pack(Object), + Bin; + native -> + zlib:compress(Object) + end; +serialise_object(Object, false, _Method) -> term_to_binary(Object); -serialise_object(Object, true) -> +serialise_object(Object, true, _Method) -> term_to_binary(Object, [compressed]). revert_value_from_journal(JournalBin) -> @@ -285,27 +298,34 @@ revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) -> <> = JournalBin, - {IsBinary, IsCompressed} = decode_valuetype(Type), + {IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type), Length1 = Length0 - KeyChangeLength, case ToIgnoreKeyChanges of true -> <> = JBin0, - {deserialise_object(OBin2, IsBinary, IsCompressed), []}; + {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), []}; false -> <> = JBin0, - {deserialise_object(OBin2, IsBinary, IsCompressed), + {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), binary_to_term(KCBin2)} end. -deserialise_object(Binary, true, true) -> +deserialise_object(Binary, true, true, true) -> {ok, Deflated} = lz4:unpack(Binary), Deflated; -deserialise_object(Binary, true, false) -> +deserialise_object(Binary, true, true, false) -> + zlib:uncompress(Binary); +deserialise_object(Binary, true, false, _IsLz4) -> Binary; -deserialise_object(Binary, false, _) -> +deserialise_object(Binary, false, _, _IsLz4) -> binary_to_term(Binary). -encode_valuetype(IsBinary, IsCompressed) -> +encode_valuetype(IsBinary, IsCompressed, Method) -> + Bit3 = + case Method of + lz4 -> 4; + native -> 0 + end, Bit2 = case IsBinary of true -> 2; @@ -316,12 +336,13 @@ encode_valuetype(IsBinary, IsCompressed) -> true -> 1; false -> 0 end, - Bit1 + Bit2. + Bit1 + Bit2 + Bit3. decode_valuetype(TypeInt) -> IsCompressed = TypeInt band 1 == 1, IsBinary = TypeInt band 2 == 2, - {IsBinary, IsCompressed}. + IsLz4 = TypeInt band 4 ==4, + {IsBinary, IsCompressed, IsLz4}. from_journalkey({SQN, _Type, LedgerKey}) -> {SQN, LedgerKey}. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 5c6b126..81aa36b 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -109,7 +109,8 @@ cdb_options, waste_retention_period :: integer() | undefined, waste_path :: string() | undefined, - reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list()}). + reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), + compression_method :: lz4|native}). -record(candidate, {low_sqn :: integer() | undefined, filename :: string() | undefined, @@ -167,7 +168,9 @@ init([IClerkOpts]) -> cdb_options = CDBopts, reload_strategy = ReloadStrategy, waste_path = WP, - waste_retention_period = WRP}}. + waste_retention_period = WRP, + compression_method = + IClerkOpts#iclerk_options.compression_method}}. handle_call(_Msg, _From, State) -> {reply, not_supported, State}. @@ -754,7 +757,7 @@ test_ledgerkey(Key) -> {o, "Bucket", Key, null}. test_inkerkv(SQN, Key, V, IdxSpecs) -> - leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs). + leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs, native). fetch_testcdb(RP) -> FN1 = leveled_inker:filepath(RP, 1, new_journal), @@ -936,13 +939,15 @@ compact_singlefile_totwosmallfiles_testto() -> lists:foreach(fun(X) -> LK = test_ledgerkey("Key" ++ integer_to_list(X)), Value = leveled_rand:rand_bytes(1024), - {IK, IV} = leveled_codec:to_inkerkv(LK, X, Value, []), + {IK, IV} = + leveled_codec:to_inkerkv(LK, X, Value, [], native), ok = leveled_cdb:cdb_put(CDB1, IK, IV) end, lists:seq(1, 1000)), {ok, NewName} = leveled_cdb:cdb_complete(CDB1), {ok, CDBr} = leveled_cdb:cdb_open_reader(NewName), - CDBoptsSmall = #cdb_options{binary_mode=true, max_size=400000, file_path=CP}, + CDBoptsSmall = + #cdb_options{binary_mode=true, max_size=400000, file_path=CP}, BestRun1 = [#candidate{low_sqn=1, filename=leveled_cdb:cdb_filename(CDBr), journal=CDBr, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 209abc4..85da448 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -136,6 +136,7 @@ clerk :: pid() | undefined, compaction_pending = false :: boolean(), is_snapshot = false :: boolean(), + compression_method :: lz4|native, source_inker :: pid() | undefined}). @@ -509,10 +510,12 @@ start_from_file(InkOpts) -> ReloadStrategy = InkOpts#inker_options.reload_strategy, MRL = InkOpts#inker_options.max_run_length, WRP = InkOpts#inker_options.waste_retention_period, + Compression = InkOpts#inker_options.compression_method, IClerkOpts = #iclerk_options{inker = self(), cdb_options=IClerkCDBOpts, waste_retention_period = WRP, reload_strategy = ReloadStrategy, + compression_method = Compression, max_run_length = MRL}, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), @@ -528,16 +531,19 @@ start_from_file(InkOpts) -> active_journaldb = ActiveJournal, root_path = RootPath, cdb_options = CDBopts#cdb_options{waste_path=WasteFP}, + compression_method = Compression, clerk = Clerk}}. put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, ActiveJournal = State#state.active_journaldb, - {JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey, - NewSQN, - Object, - KeyChanges), + {JournalKey, JournalBin} = + leveled_codec:to_inkerkv(LedgerKey, + NewSQN, + Object, + KeyChanges, + State#state.compression_method), case leveled_cdb:cdb_put(ActiveJournal, JournalKey, JournalBin) of @@ -579,19 +585,23 @@ get_object(LedgerKey, SQN, Manifest) -> get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) -> JournalP = leveled_imanifest:find_entry(SQN, Manifest), - {InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, - SQN, - to_fetch, - null), + {InkerKey, _V, true} = + leveled_codec:to_inkerkv(LedgerKey, + SQN, + to_fetch, + null, + not_applicable), Obj = leveled_cdb:cdb_get(JournalP, InkerKey), leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges). key_check(LedgerKey, SQN, Manifest) -> JournalP = leveled_imanifest:find_entry(SQN, Manifest), - {InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, - SQN, - to_fetch, - null), + {InkerKey, _V, true} = + leveled_codec:to_inkerkv(LedgerKey, + SQN, + to_fetch, + null, + not_applicable), leveled_cdb:cdb_keycheck(JournalP, InkerKey). build_manifest(ManifestFilenames, @@ -851,7 +861,7 @@ initiate_penciller_snapshot(Bookie) -> -ifdef(TEST). create_value_for_journal(Obj, Comp) -> - leveled_codec:create_value_for_journal(Obj, Comp). + leveled_codec:create_value_for_journal(Obj, Comp, native). build_dummy_journal() -> F = fun(X) -> X end, @@ -933,7 +943,8 @@ simple_inker_test() -> build_dummy_journal(), CDBopts = #cdb_options{max_size=300000, binary_mode=true}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), + cdb_options=CDBopts, + compression_method=native}), Obj1 = ink_get(Ink1, "Key1", 1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), Obj3 = ink_get(Ink1, "Key1", 3), @@ -955,7 +966,8 @@ simple_inker_completeactivejournal_test() -> F1r = filename:join(JournalFP, "nursery_1.pnd"), ok = file:rename(F1, F1r), {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), + cdb_options=CDBopts, + compression_method=native}), Obj1 = ink_get(Ink1, "Key1", 1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), Obj2 = ink_get(Ink1, "Key4", 4), @@ -973,7 +985,8 @@ compact_journal_test() -> RStrategy = [{?STD_TAG, recovr}], {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts, - reload_strategy=RStrategy}), + reload_strategy=RStrategy, + compression_method=native}), {ok, NewSQN1, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyAA"), "TestValueAA", @@ -1039,7 +1052,8 @@ empty_manifest_test() -> clean_testdir(RootPath), CDBopts = #cdb_options{max_size=300000}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), + cdb_options=CDBopts, + compression_method=native}), ?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)), CheckFun = fun(L, K, SQN) -> lists:member({SQN, K}, L) end, @@ -1059,7 +1073,8 @@ empty_manifest_test() -> ok = file:write_file(FN, term_to_binary("Hello")), {ok, Ink2} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), + cdb_options=CDBopts, + compression_method=native}), ?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}), ?assertMatch(2, SQN), diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index c412cf4..ec10982 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -35,7 +35,7 @@ ]). -export([ - clerk_new/2, + clerk_new/3, clerk_prompt/1, clerk_push/2, clerk_close/1, @@ -49,15 +49,19 @@ -record(state, {owner :: pid() | undefined, root_path :: string() | undefined, - pending_deletions = dict:new() % OTP 16 does not like type + pending_deletions = dict:new(), % OTP 16 does not like type + compression_method :: lz4|native }). %%%============================================================================ %%% API %%%============================================================================ -clerk_new(Owner, Manifest) -> - {ok, Pid} = gen_server:start(?MODULE, [], []), +clerk_new(Owner, Manifest, CompressionMethod) -> + {ok, Pid} = + gen_server:start(?MODULE, + [{compression_method, CompressionMethod}], + []), ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity), leveled_log:log("PC001", [Pid, Owner]), {ok, Pid}. @@ -78,8 +82,8 @@ clerk_close(Pid) -> %%% gen_server callbacks %%%============================================================================ -init([]) -> - {ok, #state{}}. +init([{compression_method, CompressionMethod}]) -> + {ok, #state{compression_method = CompressionMethod}}. handle_call({load, Owner, RootPath}, _From, State) -> {reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT}; @@ -120,7 +124,8 @@ request_work(State) -> handle_work({SrcLevel, Manifest}, State) -> {UpdManifest, EntriesToDelete} = merge(SrcLevel, Manifest, - State#state.root_path), + State#state.root_path, + State#state.compression_method), leveled_log:log("PC007", []), SWMC = os:timestamp(), ok = leveled_penciller:pcl_manifestchange(State#state.owner, @@ -132,7 +137,7 @@ handle_work({SrcLevel, Manifest}, State) -> leveled_log:log_timer("PC018", [], SWSM), {leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}. -merge(SrcLevel, Manifest, RootPath) -> +merge(SrcLevel, Manifest, RootPath, CompressionMethod) -> Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel), NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1, SinkList = leveled_pmanifest:merge_lookup(Manifest, @@ -152,7 +157,9 @@ merge(SrcLevel, Manifest, RootPath) -> {Man0, []}; _ -> SST_RP = leveled_penciller:sst_rootpath(RootPath), - perform_merge(Manifest, Src, SinkList, SrcLevel, SST_RP, NewSQN) + perform_merge(Manifest, + Src, SinkList, SrcLevel, + SST_RP, NewSQN, CompressionMethod) end. notify_deletions([], _Penciller) -> @@ -167,16 +174,21 @@ notify_deletions([Head|Tail], Penciller) -> %% %% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1 -perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> +perform_merge(Manifest, + Src, SinkList, SrcLevel, + RootPath, NewSQN, + CompressionMethod) -> leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), SrcList = [{next, Src, all}], MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), SinkLevel = SrcLevel + 1, SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel), - Additions = do_merge(SrcList, SinkList, - SinkLevel, SinkBasement, - RootPath, NewSQN, MaxSQN, - []), + Additions = + do_merge(SrcList, SinkList, + SinkLevel, SinkBasement, + RootPath, NewSQN, MaxSQN, + CompressionMethod, + []), RevertPointerFun = fun({next, ME, _SK}) -> ME @@ -193,22 +205,23 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> Src), {Man2, [Src|SinkManifestList]}. -do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) -> +do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _CM, Additions) -> leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), Additions; -do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) -> +do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) -> FileName = leveled_penciller:sst_filename(NewSQN, SinkLevel, length(Additions)), leveled_log:log("PC012", [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), case leveled_sst:sst_new(RP, FileName, - KL1, KL2, SinkB, SinkLevel, MaxSQN) of + KL1, KL2, SinkB, SinkLevel, MaxSQN, CM) of empty -> leveled_log:log("PC013", [FileName]), do_merge([], [], SinkLevel, SinkB, RP, NewSQN, MaxSQN, + CM, Additions); {ok, Pid, Reply} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, @@ -220,6 +233,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) -> do_merge(KL1Rem, KL2Rem, SinkLevel, SinkB, RP, NewSQN, MaxSQN, + CM, Additions ++ [Entry]) end. @@ -265,31 +279,36 @@ merge_file_test() -> "KL1_L1.sst", 1, KL1_L1, - 999999), + 999999, + native), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), {ok, PidL2_1, _} = leveled_sst:sst_new("../test/", "KL1_L2.sst", 2, KL1_L2, - 999999), + 999999, + native), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), {ok, PidL2_2, _} = leveled_sst:sst_new("../test/", "KL2_L2.sst", 2, KL2_L2, - 999999), + 999999, + lz4), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), {ok, PidL2_3, _} = leveled_sst:sst_new("../test/", "KL3_L2.sst", 2, KL3_L2, - 999999), + 999999, + lz4), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), {ok, PidL2_4, _} = leveled_sst:sst_new("../test/", "KL4_L2.sst", 2, KL4_L2, - 999999), + 999999, + lz4), E1 = #manifest_entry{owner = PidL1_1, filename = "./KL1_L1.sst", @@ -321,7 +340,8 @@ merge_file_test() -> PointerList = lists:map(fun(ME) -> {next, ME, all} end, [E2, E3, E4, E5]), - {Man6, _Dels} = perform_merge(Man5, E1, PointerList, 1, "../test", 3), + {Man6, _Dels} = + perform_merge(Man5, E1, PointerList, 1, "../test", 3, native), ?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 8abe56b..adf8252 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -243,7 +243,9 @@ work_ongoing = false :: boolean(), % i.e. compaction work work_backlog = false :: boolean(), % i.e. compaction work - head_timing :: tuple() | undefined}). + head_timing :: tuple() | undefined, + + compression_method :: lz4|native}). -type penciller_options() :: #penciller_options{}. -type bookies_memory() :: {tuple()|empty_cache, @@ -835,25 +837,28 @@ sst_filename(ManSQN, Level, Count) -> start_from_file(PCLopts) -> RootPath = PCLopts#penciller_options.root_path, - MaxTableSize = case PCLopts#penciller_options.max_inmemory_tablesize of - undefined -> - ?MAX_TABLESIZE; - M -> - M - end, + MaxTableSize = + case PCLopts#penciller_options.max_inmemory_tablesize of + undefined -> + ?MAX_TABLESIZE; + M -> + M + end, + PressMethod = PCLopts#penciller_options.compression_method, - {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath), + {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, PressMethod), CoinToss = PCLopts#penciller_options.levelzero_cointoss, % Used to randomly defer the writing of L0 file. Intended to help with % vnode syncronisation issues (e.g. stop them all by default merging to % level zero concurrently) - InitState = #state{clerk=MergeClerk, - root_path=RootPath, - levelzero_maxcachesize=MaxTableSize, - levelzero_cointoss=CoinToss, - levelzero_index=leveled_pmem:new_index()}, + InitState = #state{clerk = MergeClerk, + root_path = RootPath, + levelzero_maxcachesize = MaxTableSize, + levelzero_cointoss = CoinToss, + levelzero_index = leveled_pmem:new_index(), + compression_method = PressMethod}, %% Open manifest Manifest0 = leveled_pmanifest:open_manifest(RootPath), @@ -861,7 +866,8 @@ start_from_file(PCLopts) -> fun(FN) -> {ok, Pid, - {_FK, _LK}} = leveled_sst:sst_open(sst_rootpath(RootPath), FN), + {_FK, _LK}} = + leveled_sst:sst_open(sst_rootpath(RootPath), FN), Pid end, SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, @@ -1006,7 +1012,8 @@ roll_memory(State, false) -> length(State#state.levelzero_cache), FetchFun, PCL, - State#state.ledger_sqn), + State#state.ledger_sqn, + State#state.compression_method), {ok, Constructor, _} = R, Constructor; roll_memory(State, true) -> @@ -1020,7 +1027,8 @@ roll_memory(State, true) -> FileName, 0, KVList, - State#state.ledger_sqn), + State#state.ledger_sqn, + State#state.compression_method), {ok, Constructor, _} = R, Constructor. @@ -1401,7 +1409,8 @@ simple_server_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000}), + max_inmemory_tablesize=1000, + compression_method=native}), Key1_Pre = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, Key1 = add_missing_hash(Key1_Pre), @@ -1440,7 +1449,8 @@ simple_server_test() -> ok = pcl_close(PCL), {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000}), + max_inmemory_tablesize=1000, + compression_method=native}), ?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)), % ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), @@ -1689,7 +1699,8 @@ create_file_test() -> 1, FetchFun, undefined, - 10000), + 10000, + native), lists:foreach(fun(X) -> case checkready(SP) of timeout -> diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index f78de93..79b0212 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -91,9 +91,9 @@ delete_pending/2, delete_pending/3]). --export([sst_new/5, - sst_new/7, - sst_newlevelzero/6, +-export([sst_new/6, + sst_new/8, + sst_newlevelzero/7, sst_open/2, sst_get/2, sst_get/3, @@ -127,14 +127,16 @@ %% see Issue 52. Handling within the SST process may lead to contention and %% extra copying. Files at the top of the tree yield, those lower down don't. --record(state, {summary, - handle :: file:fd() | undefined, - sst_timings :: tuple() | undefined, - penciller :: pid() | undefined, - root_path, - filename, - yield_blockquery = false :: boolean(), - blockindex_cache}). +-record(state, + {summary, + handle :: file:fd() | undefined, + sst_timings :: tuple() | undefined, + penciller :: pid() | undefined, + root_path, + filename, + yield_blockquery = false :: boolean(), + blockindex_cache, + compression_method :: lz4|native}). %%%============================================================================ @@ -159,29 +161,30 @@ sst_open(RootPath, Filename) -> {ok, Pid, {SK, EK}} end. --spec sst_new(string(), string(), integer(), list(), integer()) -> +-spec sst_new(string(), string(), integer(), list(), integer(), lz4|native) -> {ok, pid(), {tuple(), tuple()}}. %% @doc %% Start a new SST file at the assigned level passing in a list of Key, Value %% pairs. This should not be used for basement levels or unexpanded Key/Value %% lists as merge_lists will not be called. -sst_new(RootPath, Filename, Level, KVList, MaxSQN) -> +sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), - {[], [], SlotList, FK} = merge_lists(KVList), + {[], [], SlotList, FK} = merge_lists(KVList, PressMethod), case gen_fsm:sync_send_event(Pid, {sst_new, RootPath, Filename, Level, {SlotList, FK}, - MaxSQN}, + MaxSQN, + PressMethod}, infinity) of {ok, {SK, EK}} -> {ok, Pid, {SK, EK}} end. -spec sst_new(string(), string(), list(), list(), - boolean(), integer(), integer()) -> + boolean(), integer(), integer(), lz4|native) -> empty|{ok, pid(), {{list(), list()}, tuple(), tuple()}}. %% @doc %% Start a new SST file at the assigned level passing in a two lists of @@ -194,8 +197,11 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN) -> %% be that the merge_lists returns nothin (for example when a basement file is %% all tombstones) - and the atome empty is returned in this case so that the %% file is not added to the manifest. -sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN) -> - {Rem1, Rem2, SlotList, FK} = merge_lists(KVL1, KVL2, {IsBasement, Level}), +sst_new(RootPath, Filename, + KVL1, KVL2, IsBasement, Level, + MaxSQN, PressMethod) -> + {Rem1, Rem2, SlotList, FK} = + merge_lists(KVL1, KVL2, {IsBasement, Level}, PressMethod), case SlotList of [] -> empty; @@ -207,7 +213,8 @@ sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN) -> Filename, Level, {SlotList, FK}, - MaxSQN}, + MaxSQN, + PressMethod}, infinity) of {ok, {SK, EK}} -> {ok, Pid, {{Rem1, Rem2}, SK, EK}} @@ -215,13 +222,16 @@ sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN) -> end. -spec sst_newlevelzero(string(), string(), - integer(), fun(), pid()|undefined, integer()) -> + integer(), fun(), pid()|undefined, integer(), + lz4|native) -> {ok, pid(), noreply}. %% @doc %% Start a new file at level zero. At this level the file size is not fixed - %% it will be as big as the input. Also the KVList is not passed in, it is %% fetched slot by slot using the FetchFun -sst_newlevelzero(RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN) -> +sst_newlevelzero(RootPath, Filename, + Slots, FetchFun, Penciller, + MaxSQN, PressMethod) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), gen_fsm:send_event(Pid, {sst_newlevelzero, @@ -230,7 +240,8 @@ sst_newlevelzero(RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN) -> Slots, FetchFun, Penciller, - MaxSQN}), + MaxSQN, + PressMethod}), {ok, Pid, noreply}. -spec sst_get(pid(), tuple()) -> tuple()|not_present. @@ -261,10 +272,10 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> case gen_fsm:sync_send_event(Pid, {get_kvrange, StartKey, EndKey, ScanWidth}, infinity) of - {yield, SlotsToFetchBinList, SlotsToPoint} -> + {yield, SlotsToFetchBinList, SlotsToPoint, PressMethod} -> FetchFun = fun({SlotBin, SK, EK}, Acc) -> - Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK) + Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK, PressMethod) end, lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint; Reply -> @@ -276,10 +287,11 @@ sst_getkvrange(Pid, StartKey, EndKey, ScanWidth) -> %% Get a list of slots by their ID. The slot will be converted from the binary %% to term form outside of the FSM loop sst_getslots(Pid, SlotList) -> - SlotBins = gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity), + {SlotBins, PressMethod} + = gen_fsm:sync_send_event(Pid, {get_slots, SlotList}, infinity), FetchFun = fun({SlotBin, SK, EK}, Acc) -> - Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK) + Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK, PressMethod) end, lists:foldl(FetchFun, [], SlotBins). @@ -350,19 +362,22 @@ starting({sst_open, RootPath, Filename}, _From, State) -> {ok, {Summary#summary.first_key, Summary#summary.last_key}}, reader, UpdState}; -starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN}, - _From, State) -> +starting({sst_new, + RootPath, Filename, Level, + {SlotList, FirstKey}, MaxSQN, + PressMethod}, _From, State) -> SW = os:timestamp(), {Length, SlotIndex, BlockIndex, - SlotsBin} = build_all_slots(SlotList), + SlotsBin} = build_all_slots(SlotList, PressMethod), SummaryBin = build_table_summary(SlotIndex, Level, FirstKey, Length, MaxSQN), - ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin), + ActualFilename = + write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod), YBQ = Level =< 2, UpdState = read_file(ActualFilename, State#state{root_path=RootPath, @@ -377,20 +392,22 @@ starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN}, UpdState#state{blockindex_cache = BlockIndex}}. starting({sst_newlevelzero, RootPath, Filename, - Slots, FetchFun, Penciller, MaxSQN}, State) -> + Slots, FetchFun, Penciller, MaxSQN, + PressMethod}, State) -> SW = os:timestamp(), KVList = leveled_pmem:to_list(Slots, FetchFun), - {[], [], SlotList, FirstKey} = merge_lists(KVList), + {[], [], SlotList, FirstKey} = merge_lists(KVList, PressMethod), {SlotCount, SlotIndex, BlockIndex, - SlotsBin} = build_all_slots(SlotList), + SlotsBin} = build_all_slots(SlotList, PressMethod), SummaryBin = build_table_summary(SlotIndex, 0, FirstKey, SlotCount, MaxSQN), - ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin), + ActualFilename = + write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod), UpdState = read_file(ActualFilename, State#state{root_path = RootPath, yield_blockquery = true}), @@ -400,13 +417,17 @@ starting({sst_newlevelzero, RootPath, Filename, SW), case Penciller of undefined -> - {next_state, reader, UpdState#state{blockindex_cache = BlockIndex}}; + {next_state, + reader, + UpdState#state{blockindex_cache = BlockIndex}}; _ -> leveled_penciller:pcl_confirml0complete(Penciller, UpdState#state.filename, Summary#summary.first_key, Summary#summary.last_key), - {next_state, reader, UpdState#state{blockindex_cache = BlockIndex}} + {next_state, + reader, + UpdState#state{blockindex_cache = BlockIndex}} end. @@ -420,16 +441,20 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> EndKey, ScanWidth, State), + PressMethod = State#state.compression_method, case State#state.yield_blockquery of true -> {reply, - {yield, SlotsToFetchBinList, SlotsToPoint}, + {yield, + SlotsToFetchBinList, + SlotsToPoint, + PressMethod}, reader, State}; false -> FetchFun = fun({SlotBin, SK, EK}, Acc) -> - Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK) + Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK, PressMethod) end, {reply, lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint, @@ -438,7 +463,7 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> end; reader({get_slots, SlotList}, _From, State) -> SlotBins = read_slots(State#state.handle, SlotList), - {reply, SlotBins, reader, State}; + {reply, {SlotBins, State#state.compression_method}, reader, State}; reader(get_maxsequencenumber, _From, State) -> Summary = State#state.summary, {reply, Summary#summary.max_sqn, reader, State}; @@ -475,14 +500,19 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth}, _From, State) -> ScanWidth, State), % Always yield as about to clear and de-reference + PressMethod = State#state.compression_method, {reply, - {yield, SlotsToFetchBinList, SlotsToPoint}, + {yield, SlotsToFetchBinList, SlotsToPoint, PressMethod}, delete_pending, State, ?DELETE_TIMEOUT}; delete_pending({get_slots, SlotList}, _From, State) -> SlotBins = read_slots(State#state.handle, SlotList), - {reply, SlotBins, delete_pending, State, ?DELETE_TIMEOUT}; + {reply, + {SlotBins, State#state.compression_method}, + delete_pending, + State, + ?DELETE_TIMEOUT}; delete_pending(close, _From, State) -> leveled_log:log("SST07", [State#state.filename]), ok = file:close(State#state.handle), @@ -528,6 +558,7 @@ code_change(_OldVsn, StateName, State, _Extra) -> fetch(LedgerKey, Hash, State) -> Summary = State#state.summary, + PressMethod = State#state.compression_method, Slot = lookup_slot(LedgerKey, Summary#summary.index), SlotID = Slot#slot_index_value.slot_id, Bloom = Slot#slot_index_value.bloom, @@ -540,9 +571,8 @@ fetch(LedgerKey, Hash, State) -> case CachedBlockIdx of none -> SlotBin = read_slot(State#state.handle, Slot), - {Result, - BlockLengths, - BlockIdx} = binaryslot_get(SlotBin, LedgerKey, Hash), + {Result, BlockLengths, BlockIdx} = + binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod), BlockIndexCache = array:set(SlotID - 1, <>, @@ -560,11 +590,13 @@ fetch(LedgerKey, Hash, State) -> [] -> {not_present, slot_bloom, SlotID, State}; _ -> - Result = check_blocks(PosList, - State#state.handle, - Slot, - BlockLengths, - LedgerKey), + Result = + check_blocks(PosList, + State#state.handle, + Slot, + BlockLengths, + LedgerKey, + PressMethod), {Result, slot_fetch, SlotID, State} end end @@ -626,12 +658,14 @@ fetch_range(StartKey, EndKey, ScanWidth, State) -> {SlotsToFetchBinList, SlotsToPoint}. -write_file(RootPath, Filename, SummaryBin, SlotsBin) -> +write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod) -> SummaryLength = byte_size(SummaryBin), SlotsLength = byte_size(SlotsBin), {PendingName, FinalName} = generate_filenames(Filename), + FileVersion = gen_fileversion(PressMethod), ok = file:write_file(filename:join(RootPath, PendingName), - <>, @@ -650,27 +684,48 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin) -> FinalName. read_file(Filename, State) -> - {Handle, SummaryBin} = open_reader(filename:join(State#state.root_path, - Filename)), + {Handle, FileVersion, SummaryBin} = + open_reader(filename:join(State#state.root_path, Filename)), + UpdState0 = imp_fileversion(FileVersion, State), {Summary, SlotList} = read_table_summary(SummaryBin), BlockIndexCache = array:new([{size, Summary#summary.size}, {default, none}]), - UpdState = State#state{blockindex_cache = BlockIndexCache}, + UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache}, SlotIndex = from_list(SlotList), UpdSummary = Summary#summary{index = SlotIndex}, leveled_log:log("SST03", [Filename, Summary#summary.size, Summary#summary.max_sqn]), - UpdState#state{summary = UpdSummary, + UpdState1#state{summary = UpdSummary, handle = Handle, filename = Filename}. +gen_fileversion(PressMethod) -> + Bit1 = + case PressMethod of + lz4 -> 1; + native -> 0 + end, + Bit1. + +imp_fileversion(VersionInt, State) -> + UpdState = + case VersionInt band 1 of + 0 -> + State#state{compression_method = native}; + 1 -> + State#state{compression_method = lz4} + end, + UpdState. + open_reader(Filename) -> {ok, Handle} = file:open(Filename, [binary, raw, read]), - {ok, Lengths} = file:pread(Handle, 0, 8), - <> = Lengths, - {ok, SummaryBin} = file:pread(Handle, SlotsLength + 8, SummaryLength), - {Handle, SummaryBin}. + {ok, Lengths} = file:pread(Handle, 0, 9), + <> = Lengths, + {ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength), + {Handle, FileVersion, SummaryBin}. build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN) -> [{LastKey, _LastV}|_Rest] = SlotIndex, @@ -693,23 +748,26 @@ read_table_summary(BinWithCheck) -> end. -build_all_slots(SlotList) -> +build_all_slots(SlotList, PressMethod) -> SlotCount = length(SlotList), BuildResponse = build_all_slots(SlotList, - 8, + 9, 1, [], array:new([{size, SlotCount}, {default, none}]), - <<>>), + <<>>, + PressMethod), {SlotIndex, BlockIndex, SlotsBin} = BuildResponse, {SlotCount, SlotIndex, BlockIndex, SlotsBin}. build_all_slots([], _Pos, _SlotID, - SlotIdxAcc, BlockIdxAcc, SlotBinAcc) -> + SlotIdxAcc, BlockIdxAcc, SlotBinAcc, + _PressMethod) -> {SlotIdxAcc, BlockIdxAcc, SlotBinAcc}; build_all_slots([SlotD|Rest], Pos, SlotID, - SlotIdxAcc, BlockIdxAcc, SlotBinAcc) -> + SlotIdxAcc, BlockIdxAcc, SlotBinAcc, + PressMethod) -> {BlockIdx, SlotBin, HashList, LastKey} = SlotD, Length = byte_size(SlotBin), Bloom = leveled_tinybloom:create_bloom(HashList), @@ -722,7 +780,8 @@ build_all_slots([SlotD|Rest], Pos, SlotID, SlotID + 1, [{LastKey, SlotIndexV}|SlotIdxAcc], array:set(SlotID - 1, BlockIdx, BlockIdxAcc), - <>). + <>, + PressMethod). generate_filenames(RootFilename) -> @@ -740,26 +799,30 @@ generate_filenames(RootFilename) -> end. --spec serialise_block(any()) -> binary(). +-spec serialise_block(any(), lz4|native) -> binary(). %% @doc %% Convert term to binary %% Function split out to make it easier to experiment with different %% compression methods. Also, perhaps standardise applictaion of CRC %% checks -serialise_block(Term) -> +serialise_block(Term, lz4) -> {ok, Bin} = lz4:pack(term_to_binary(Term)), - Bin. + Bin; +serialise_block(Term, native) -> + term_to_binary(Term, ?BINARY_SETTINGS). --spec deserialise_block(binary()) -> any(). +-spec deserialise_block(binary(), lz4|native) -> any(). %% @doc %% Convert binary to term %% Function split out to make it easier to experiment with different %% compression methods. Also, perhaps standardise applictaion of CRC %% checks -deserialise_block(Bin) -> +deserialise_block(Bin, lz4) -> {ok, Bin0} = lz4:unpack(Bin), - binary_to_term(Bin0). + binary_to_term(Bin0); +deserialise_block(Bin, native) -> + binary_to_term(Bin). %%%============================================================================ @@ -823,7 +886,7 @@ lookup_slots(StartKey, EndKey, Tree) -> %% based on a 17-bit hash (so 0.0039 fpr). -generate_binary_slot(Lookup, KVL) -> +generate_binary_slot(Lookup, KVL, PressMethod) -> HashFoldFun = fun({K, V}, {PosBinAcc, NoHashCount, HashAcc}) -> @@ -887,45 +950,45 @@ generate_binary_slot(Lookup, KVL) -> {B1, B2, B3, B4, B5} = case length(KVL) of L when L =< SideBlockSize -> - {serialise_block(KVL), + {serialise_block(KVL, PressMethod), <<0:0>>, <<0:0>>, <<0:0>>, <<0:0>>}; L when L =< 2 * SideBlockSize -> {KVLA, KVLB} = lists:split(SideBlockSize, KVL), - {serialise_block(KVLA), - serialise_block(KVLB), + {serialise_block(KVLA, PressMethod), + serialise_block(KVLB, PressMethod), <<0:0>>, <<0:0>>, <<0:0>>}; L when L =< (2 * SideBlockSize + MidBlockSize) -> {KVLA, KVLB_Rest} = lists:split(SideBlockSize, KVL), {KVLB, KVLC} = lists:split(SideBlockSize, KVLB_Rest), - {serialise_block(KVLA), - serialise_block(KVLB), - serialise_block(KVLC), + {serialise_block(KVLA, PressMethod), + serialise_block(KVLB, PressMethod), + serialise_block(KVLC, PressMethod), <<0:0>>, <<0:0>>}; L when L =< (3 * SideBlockSize + MidBlockSize) -> {KVLA, KVLB_Rest} = lists:split(SideBlockSize, KVL), {KVLB, KVLC_Rest} = lists:split(SideBlockSize, KVLB_Rest), {KVLC, KVLD} = lists:split(MidBlockSize, KVLC_Rest), - {serialise_block(KVLA), - serialise_block(KVLB), - serialise_block(KVLC), - serialise_block(KVLD), + {serialise_block(KVLA, PressMethod), + serialise_block(KVLB, PressMethod), + serialise_block(KVLC, PressMethod), + serialise_block(KVLD, PressMethod), <<0:0>>}; L when L =< (4 * SideBlockSize + MidBlockSize) -> {KVLA, KVLB_Rest} = lists:split(SideBlockSize, KVL), {KVLB, KVLC_Rest} = lists:split(SideBlockSize, KVLB_Rest), {KVLC, KVLD_Rest} = lists:split(MidBlockSize, KVLC_Rest), {KVLD, KVLE} = lists:split(SideBlockSize, KVLD_Rest), - {serialise_block(KVLA), - serialise_block(KVLB), - serialise_block(KVLC), - serialise_block(KVLD), - serialise_block(KVLE)} + {serialise_block(KVLA, PressMethod), + serialise_block(KVLB, PressMethod), + serialise_block(KVLC, PressMethod), + serialise_block(KVLD, PressMethod), + serialise_block(KVLE, PressMethod)} end, B1P = byte_size(PosBinIndex), @@ -951,18 +1014,21 @@ generate_binary_slot(Lookup, KVL) -> {<>, FullBin, HashL, LastKey}. -check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey) -> +check_blocks([], _Handle, _Slot, _BlockLengths, _LedgerKey, _PressMethod) -> not_present; -check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey) -> +check_blocks([Pos|Rest], Handle, Slot, BlockLengths, LedgerKey, PressMethod) -> {BlockNumber, BlockPos} = revert_position(Pos), BlockBin = read_block(Handle, Slot, BlockLengths, BlockNumber), - BlockL = deserialise_block(BlockBin), + BlockL = deserialise_block(BlockBin, PressMethod), {K, V} = lists:nth(BlockPos, BlockL), case K of LedgerKey -> {K, V}; _ -> - check_blocks(Rest, Handle, Slot, BlockLengths, LedgerKey) + check_blocks(Rest, + Handle, Slot, BlockLengths, + LedgerKey, + PressMethod) end. @@ -1018,7 +1084,7 @@ read_slots(Handle, SlotList) -> lists:map(BinSplitMapFun, LengthList). -binaryslot_get(FullBin, Key, Hash) -> +binaryslot_get(FullBin, Key, Hash, PressMethod) -> case crc_check_slot(FullBin) of {BlockLengths, Rest} -> <> = BlockLengths, @@ -1027,7 +1093,7 @@ binaryslot_get(FullBin, Key, Hash) -> extra_hash(Hash), [], 0), - {fetch_value(PosList, BlockLengths, Blocks, Key), + {fetch_value(PosList, BlockLengths, Blocks, Key, PressMethod), BlockLengths, PosBinIndex}; crc_wonky -> @@ -1036,7 +1102,7 @@ binaryslot_get(FullBin, Key, Hash) -> none} end. -binaryslot_tolist(FullBin) -> +binaryslot_tolist(FullBin, PressMethod) -> BlockFetchFun = fun(Length, {Acc, Bin}) -> case Length of @@ -1044,7 +1110,7 @@ binaryslot_tolist(FullBin) -> {Acc, Bin}; _ -> <> = Bin, - {Acc ++ deserialise_block(Block), Rest} + {Acc ++ deserialise_block(Block, PressMethod), Rest} end end, @@ -1067,9 +1133,9 @@ binaryslot_tolist(FullBin) -> Out. -binaryslot_trimmedlist(FullBin, all, all) -> - binaryslot_tolist(FullBin); -binaryslot_trimmedlist(FullBin, StartKey, EndKey) -> +binaryslot_trimmedlist(FullBin, all, all, PressMethod) -> + binaryslot_tolist(FullBin, PressMethod); +binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> LTrimFun = fun({K, _V}) -> K < StartKey end, RTrimFun = fun({K, _V}) -> not leveled_codec:endkey_passed(EndKey, K) end, @@ -1098,7 +1164,8 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) -> 0 -> [Block1, Block2]; _ -> - MidBlockList = deserialise_block(MidBlock), + MidBlockList = + deserialise_block(MidBlock, PressMethod), {MidFirst, _} = lists:nth(1, MidBlockList), {MidLast, _} = lists:last(MidBlockList), Split = {StartKey > MidLast, @@ -1136,7 +1203,7 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) -> BlockList = case is_binary(Block) of true -> - deserialise_block(Block); + deserialise_block(Block, PressMethod); false -> Block end, @@ -1212,21 +1279,21 @@ extra_hash({SegHash, _ExtraHash}) when is_integer(SegHash) -> extra_hash(NotHash) -> NotHash. -fetch_value([], _BlockLengths, _Blocks, _Key) -> +fetch_value([], _BlockLengths, _Blocks, _Key, _PressMethod) -> not_present; -fetch_value([Pos|Rest], BlockLengths, Blocks, Key) -> +fetch_value([Pos|Rest], BlockLengths, Blocks, Key, PressMethod) -> {BlockNumber, BlockPos} = revert_position(Pos), {_BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockNumber), <<_Pre:Offset/binary, Block:Length/binary, _Rest/binary>> = Blocks, - BlockL = deserialise_block(Block), + BlockL = deserialise_block(Block, PressMethod), {K, V} = lists:nth(BlockPos, BlockL), case K of Key -> {K, V}; _ -> - fetch_value(Rest, BlockLengths, Blocks, Key) + fetch_value(Rest, BlockLengths, Blocks, Key, PressMethod) end. @@ -1290,31 +1357,33 @@ find_pos(<<0:1/integer, NHC:7/integer, T/binary>>, Hash, PosList, Count) -> %% there are matching keys then the highest sequence number must be chosen and %% any lower sequence numbers should be compacted out of existence -merge_lists(KVList1) -> +merge_lists(KVList1, PressMethod) -> SlotCount = length(KVList1) div ?LOOK_SLOTSIZE, {[], [], - split_lists(KVList1, [], SlotCount), + split_lists(KVList1, [], SlotCount, PressMethod), element(1, lists:nth(1, KVList1))}. -split_lists([], SlotLists, 0) -> +split_lists([], SlotLists, 0, _PressMethod) -> lists:reverse(SlotLists); -split_lists(LastPuff, SlotLists, 0) -> - SlotD = generate_binary_slot(lookup, LastPuff), +split_lists(LastPuff, SlotLists, 0, PressMethod) -> + SlotD = generate_binary_slot(lookup, LastPuff, PressMethod), lists:reverse([SlotD|SlotLists]); -split_lists(KVList1, SlotLists, N) -> +split_lists(KVList1, SlotLists, N, PressMethod) -> {Slot, KVListRem} = lists:split(?LOOK_SLOTSIZE, KVList1), - SlotD = generate_binary_slot(lookup, Slot), - split_lists(KVListRem, [SlotD|SlotLists], N - 1). + SlotD = generate_binary_slot(lookup, Slot, PressMethod), + split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod). -merge_lists(KVList1, KVList2, LevelInfo) -> - merge_lists(KVList1, KVList2, LevelInfo, [], null, 0). +merge_lists(KVList1, KVList2, LevelInfo, PressMethod) -> + merge_lists(KVList1, KVList2, LevelInfo, [], null, 0, PressMethod). -merge_lists(KVList1, KVList2, _LI, SlotList, FirstKey, ?MAX_SLOTS) -> +merge_lists(KVList1, KVList2, _LI, SlotList, FirstKey, ?MAX_SLOTS, + _PressMethod) -> {KVList1, KVList2, lists:reverse(SlotList), FirstKey}; -merge_lists([], [], _LI, SlotList, FirstKey, _SlotCount) -> +merge_lists([], [], _LI, SlotList, FirstKey, _SlotCount, _PressMethod) -> {[], [], lists:reverse(SlotList), FirstKey}; -merge_lists(KVList1, KVList2, LI, SlotList, FirstKey, SlotCount) -> +merge_lists(KVList1, KVList2, LI, SlotList, FirstKey, SlotCount, + PressMethod) -> {KVRem1, KVRem2, Slot, FK0} = form_slot(KVList1, KVList2, LI, no_lookup, 0, [], FirstKey), case Slot of @@ -1324,15 +1393,17 @@ merge_lists(KVList1, KVList2, LI, SlotList, FirstKey, SlotCount) -> LI, SlotList, FK0, - SlotCount); + SlotCount, + PressMethod); {Lookup, KVL} -> - SlotD = generate_binary_slot(Lookup, KVL), + SlotD = generate_binary_slot(Lookup, KVL, PressMethod), merge_lists(KVRem1, KVRem2, LI, [SlotD|SlotList], FK0, - SlotCount + 1) + SlotCount + 1, + PressMethod) end. form_slot([], [], _LI, Type, _Size, Slot, FK) -> @@ -1545,7 +1616,7 @@ form_slot_test() -> ?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1). merge_tombstonelist_test() -> - % Merge lists wiht nothing but tombstones + % Merge lists with nothing but tombstones SkippingKV1 = {{o, "B1", "K9995", null}, {9995, tomb, 1234567, {}}}, SkippingKV2 = {{o, "B1", "K9996", null}, {9996, tomb, 1234567, {}}}, SkippingKV3 = {{o, "B1", "K9997", null}, {9997, tomb, 1234567, {}}}, @@ -1553,7 +1624,8 @@ merge_tombstonelist_test() -> SkippingKV5 = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}}, R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5], [SkippingKV2, SkippingKV4], - {true, 9999999}), + {true, 9999999}, + native), ?assertMatch({[], [], [], null}, R). indexed_list_test() -> @@ -1564,7 +1636,8 @@ indexed_list_test() -> SW0 = os:timestamp(), - {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, KVL1), + {_PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, KVL1, native), io:format(user, "Indexed list created slot in ~w microseconds of size ~w~n", [timer:now_diff(os:timestamp(), SW0), byte_size(FullBin)]), @@ -1592,7 +1665,8 @@ indexed_list_mixedkeys_test() -> KVL1 = lists:sublist(KVL0, 33), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), - {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + {_PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, Keys, native), {TestK1, TestV1} = lists:nth(4, KVL1), MH1 = leveled_codec:segment_hash(TestK1), @@ -1618,7 +1692,8 @@ indexed_list_mixedkeys2_test() -> IdxKeys2 = lists:ukeysort(1, generate_indexkeys(30)), % this isn't actually ordered correctly Keys = IdxKeys1 ++ KVL1 ++ IdxKeys2, - {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + {_PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, Keys, native), lists:foreach(fun({K, V}) -> MH = leveled_codec:segment_hash(K), test_binary_slot(FullBin, K, MH, {K, V}) @@ -1628,34 +1703,37 @@ indexed_list_mixedkeys2_test() -> indexed_list_allindexkeys_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), ?LOOK_SLOTSIZE), - {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + {PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, Keys, native), EmptySlotSize = ?LOOK_SLOTSIZE - 1, ?assertMatch(<<_BL:24/binary, EmptySlotSize:8/integer>>, PosBinIndex1), % SW = os:timestamp(), - BinToList = binaryslot_tolist(FullBin), + BinToList = binaryslot_tolist(FullBin, native), % io:format(user, % "Indexed list flattened in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]), ?assertMatch(Keys, BinToList), - ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all)). + ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all, native)). indexed_list_allindexkeys_nolookup_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(1000)), ?NOLOOK_SLOTSIZE), - {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(no_lookup, Keys), + {PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(no_lookup, Keys, native), ?assertMatch(<<_BL:24/binary, 127:8/integer>>, PosBinIndex1), % SW = os:timestamp(), - BinToList = binaryslot_tolist(FullBin), + BinToList = binaryslot_tolist(FullBin, native), % io:format(user, % "Indexed list flattened in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]), ?assertMatch(Keys, BinToList), - ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all)). + ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all, native)). indexed_list_allindexkeys_trimmed_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), ?LOOK_SLOTSIZE), - {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + {PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, Keys, native), EmptySlotSize = ?LOOK_SLOTSIZE - 1, ?assertMatch(<<_BL:24/binary, EmptySlotSize:8/integer>>, PosBinIndex1), ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, @@ -1666,26 +1744,27 @@ indexed_list_allindexkeys_trimmed_test() -> {i, "Bucket", {"t1_int", 99999}, - null})), + null}, + native)), {SK1, _} = lists:nth(10, Keys), {EK1, _} = lists:nth(100, Keys), R1 = lists:sublist(Keys, 10, 91), - O1 = binaryslot_trimmedlist(FullBin, SK1, EK1), + O1 = binaryslot_trimmedlist(FullBin, SK1, EK1, native), ?assertMatch(91, length(O1)), ?assertMatch(R1, O1), {SK2, _} = lists:nth(10, Keys), {EK2, _} = lists:nth(20, Keys), R2 = lists:sublist(Keys, 10, 11), - O2 = binaryslot_trimmedlist(FullBin, SK2, EK2), + O2 = binaryslot_trimmedlist(FullBin, SK2, EK2, native), ?assertMatch(11, length(O2)), ?assertMatch(R2, O2), {SK3, _} = lists:nth(?LOOK_SLOTSIZE - 1, Keys), {EK3, _} = lists:nth(?LOOK_SLOTSIZE, Keys), R3 = lists:sublist(Keys, ?LOOK_SLOTSIZE - 1, 2), - O3 = binaryslot_trimmedlist(FullBin, SK3, EK3), + O3 = binaryslot_trimmedlist(FullBin, SK3, EK3, native), ?assertMatch(2, length(O3)), ?assertMatch(R3, O3). @@ -1694,7 +1773,8 @@ indexed_list_mixedkeys_bitflip_test() -> KVL0 = lists:ukeysort(1, generate_randomkeys(1, 50, 1, 4)), KVL1 = lists:sublist(KVL0, 33), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), - {_PosBinIndex1, FullBin, _HL, LK} = generate_binary_slot(lookup, Keys), + {_PosBinIndex1, FullBin, _HL, LK} = + generate_binary_slot(lookup, Keys, native), ?assertMatch(LK, element(1, lists:last(Keys))), L = byte_size(FullBin), Byte1 = leveled_rand:uniform(L), @@ -1711,12 +1791,12 @@ indexed_list_mixedkeys_bitflip_test() -> MH1 = leveled_codec:segment_hash(TestK1), test_binary_slot(FullBin0, TestK1, MH1, not_present), - ToList = binaryslot_tolist(FullBin0), + ToList = binaryslot_tolist(FullBin0, native), ?assertMatch([], ToList), {SK1, _} = lists:nth(10, Keys), {EK1, _} = lists:nth(50, Keys), - O1 = binaryslot_trimmedlist(FullBin0, SK1, EK1), + O1 = binaryslot_trimmedlist(FullBin0, SK1, EK1, native), ?assertMatch(0, length(O1)), ?assertMatch([], O1). @@ -1724,7 +1804,7 @@ indexed_list_mixedkeys_bitflip_test() -> test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> % SW = os:timestamp(), - {ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash), + {ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash, native), ?assertMatch(ExpectedValue, ReturnedValue). % io:format(user, "Fetch success in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]). @@ -1737,8 +1817,10 @@ merge_test() -> KVL2 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 20)), KVL3 = lists:ukeymerge(1, KVL1, KVL2), SW0 = os:timestamp(), - {ok, P1, {FK1, LK1}} = sst_new("../test/", "level1_src", 1, KVL1, 6000), - {ok, P2, {FK2, LK2}} = sst_new("../test/", "level2_src", 2, KVL2, 3000), + {ok, P1, {FK1, LK1}} = + sst_new("../test/", "level1_src", 1, KVL1, 6000, native), + {ok, P2, {FK2, LK2}} = + sst_new("../test/", "level2_src", 2, KVL2, 3000, native), ExpFK1 = element(1, lists:nth(1, KVL1)), ExpLK1 = element(1, lists:last(KVL1)), ExpFK2 = element(1, lists:nth(1, KVL2)), @@ -1749,7 +1831,8 @@ merge_test() -> ?assertMatch(ExpLK2, LK2), ML1 = [{next, #manifest_entry{owner = P1}, FK1}], ML2 = [{next, #manifest_entry{owner = P2}, FK2}], - NewR = sst_new("../test/", "level2_merge", ML1, ML2, false, 2, N * 2), + NewR = + sst_new("../test/", "level2_merge", ML1, ML2, false, 2, N * 2, native), {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = NewR, ?assertMatch([], Rem1), ?assertMatch([], Rem2), @@ -1783,11 +1866,8 @@ simple_persisted_range_test() -> KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(RP, - Filename, - 1, - KVList1, - length(KVList1)), + {ok, Pid, {FirstKey, LastKey}} = + sst_new(RP, Filename, 1, KVList1, length(KVList1), native), {o, B, K, null} = LastKey, SK1 = {o, B, K, 0}, @@ -1836,11 +1916,8 @@ additional_range_test() -> [], lists:seq(?NOLOOK_SLOTSIZE + Gap + 1, 2 * ?NOLOOK_SLOTSIZE + Gap)), - {ok, - P1, - {{Rem1, Rem2}, - SK, - EK}} = sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999), + {ok, P1, {{Rem1, Rem2}, SK, EK}} = + sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native), ?assertMatch([], Rem1), ?assertMatch([], Rem2), ?assertMatch(SK, element(1, lists:nth(1, IK1))), @@ -1897,11 +1974,8 @@ simple_persisted_slotsize_test() -> ?LOOK_SLOTSIZE), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(RP, - Filename, - 1, - KVList1, - length(KVList1)), + {ok, Pid, {FirstKey, LastKey}} = + sst_new(RP, Filename, 1, KVList1, length(KVList1), native), lists:foreach(fun({K, V}) -> ?assertMatch({K, V}, sst_get(Pid, K)) end, @@ -1915,11 +1989,8 @@ simple_persisted_test() -> KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(RP, - Filename, - 1, - KVList1, - length(KVList1)), + {ok, Pid, {FirstKey, LastKey}} = + sst_new(RP, Filename, 1, KVList1, length(KVList1), native), SW0 = os:timestamp(), lists:foreach(fun({K, V}) -> ?assertMatch({K, V}, sst_get(Pid, K)) diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 3c2e550..4e82cc8 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -9,7 +9,8 @@ load_and_count/1, load_and_count_withdelete/1, space_clear_ondelete/1, - is_empty_test/1 + is_empty_test/1, + many_put_fetch_switchcompression/1 ]). all() -> [ @@ -20,7 +21,8 @@ all() -> [ load_and_count, load_and_count_withdelete, space_clear_ondelete, - is_empty_test + is_empty_test, + many_put_fetch_switchcompression ]. @@ -683,3 +685,49 @@ is_empty_test(_Config) -> true = sets:size(BLpd3()) == 0, ok = leveled_bookie:book_close(Bookie1). + + +many_put_fetch_switchcompression(_Config) -> + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {max_pencillercachesize, 16000}, + {sync_strategy, riak_sync}, + {compression_method, native}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = testutil:generate_testobject(), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), + testutil:check_forobject(Bookie1, TestObject), + ok = leveled_bookie:book_close(Bookie1), + StartOpts2 = [{root_path, RootPath}, + {max_journalsize, 500000000}, + {max_pencillercachesize, 32000}, + {sync_strategy, testutil:sync_strategy()}, + {compression_method, lz4}], + + %% Change compression method + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + testutil:check_forobject(Bookie2, TestObject), + GenList = [2, 40002, 80002, 120002], + CLs = testutil:load_objects(40000, GenList, Bookie2, TestObject, + fun testutil:generate_smallobjects/2), + CL1A = lists:nth(1, CLs), + ChkListFixed = lists:nth(length(CLs), CLs), + testutil:check_forlist(Bookie2, CL1A), + ObjList2A = testutil:generate_objects(5000, 2), + testutil:riakload(Bookie2, ObjList2A), + ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000), + testutil:check_forlist(Bookie2, ChkList2A), + testutil:check_forlist(Bookie2, ChkListFixed), + testutil:check_forobject(Bookie2, TestObject), + testutil:check_forlist(Bookie2, ChkList2A), + testutil:check_forlist(Bookie2, ChkListFixed), + testutil:check_forobject(Bookie2, TestObject), + ok = leveled_bookie:book_close(Bookie2), + + %% Change method back again + {ok, Bookie3} = leveled_bookie:book_start(StartOpts1), + testutil:check_forlist(Bookie3, ChkList2A), + testutil:check_forlist(Bookie3, ChkListFixed), + testutil:check_forobject(Bookie3, TestObject), + testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"), + ok = leveled_bookie:book_destroy(Bookie3). \ No newline at end of file