diff --git a/include/leveled.hrl b/include/leveled.hrl index e3ed9c7..348467f 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -48,6 +48,8 @@ source_inker :: pid() | undefined, reload_strategy = [] :: list(), waste_retention_period :: integer() | undefined, + compression_method :: lz4|native, + compress_on_receipt :: boolean(), max_run_length}). -record(penciller_options, @@ -58,6 +60,7 @@ bookies_mem :: tuple() | undefined, source_penciller :: pid() | undefined, snapshot_longrunning = true :: boolean(), + compression_method :: lz4|native, levelzero_cointoss = false :: boolean()}). -record(iclerk_options, @@ -65,6 +68,7 @@ max_run_length :: integer() | undefined, cdb_options = #cdb_options{} :: #cdb_options{}, waste_retention_period :: integer() | undefined, + compression_method :: lz4|native, reload_strategy = [] :: list()}). -record(recent_aae, {filter :: whitelist|blacklist, diff --git a/rebar.config b/rebar.config index c849ed0..ea69915 100644 --- a/rebar.config +++ b/rebar.config @@ -9,3 +9,7 @@ {erl_opts, [debug_info, {parse_transform, lager_transform}, {parse_transform, eqc_cover}]}, {plugins, [rebar_eqc]}]} ]}. + +{deps, [ + {lz4, ".*", {git, "https://github.com/martinsumner/erlang-lz4", {branch, "mas-leveled"}}} + ]}. diff --git a/rebar.lock b/rebar.lock index 57afcca..d315af2 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1 +1,4 @@ -[]. +[{<<"lz4">>, + {git,"https://github.com/martinsumner/erlang-lz4", + {ref,"09d539685e616b614e851926384439384601ee5a"}}, + 0}]. diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index a74c734..2f3a58e 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -81,6 +81,8 @@ -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 80000). -define(RECENT_AAE, false). +-define(COMPRESSION_METHOD, lz4). +-define(COMPRESSION_POINT, on_receipt). -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -143,19 +145,34 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) -> %% @doc Start a Leveled Key/Value store - full options support. %% %% Allows an options proplists to be passed for setting options. There are -%% two primary additional options this allows over book_start/4: +%% four primary additional options this allows over book_start/4: %% - retain_strategy %% - waste_retention_period +%% - compression_method +%% - compression_point %% -%% Both of these relate to compaction in the Journal. The retain_strategy -%% determines if a skinny record of the object should be retained following -%% compaction, and how thta should be used when recovering lost state in the -%% Ledger. +%% Both of the first two options relate to compaction in the Journal. The +%% retain_strategydetermines if a skinny record of the object should be +%% retained following compaction, and how that should be used when recovering +%% lost state in the Ledger. +%% +%% This is relevant to when Riak uses Leveled in that KeyChanges are presented +%% by the vnode to the backend as deltas. This means that if those key +%% changes do not remain recorded in the journal once the value has been +%% compacted - rebuilding the ledger from the Journal would lead to incorrect +%% index entries being present. %% %% Currently compacted records no longer in use are not removed but moved to %% a journal_waste folder, and the waste_retention_period determines how long %% this history should be kept for (for example to allow for it to be backed -%% up before deletion) +%% up before deletion). +%% +%% Compression method and point allow Leveled to be switched from using bif +%% based compression (zlib) to suing nif based compression (lz4). The +%% compression point can be changed between on_receipt (all values are +%% compressed as they are received), to on_compact where values are originally +%% stored uncompressed (speeding PUT times), and are only compressed when +%% they are first subject to compaction %% %% TODO: %% The reload_strategy is exposed as currently no firm decision has been made @@ -387,7 +404,8 @@ init([Opts]) -> unit_minutes = UnitMinutes} end, - {Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE), + {Inker, Penciller} = + startup(InkerOpts, PencillerOpts, RecentAAE), NewETS = ets:new(mem, [ordered_set]), leveled_log:log("B0001", [Inker, Penciller]), @@ -899,16 +917,41 @@ set_options(Opts) -> ok = filelib:ensure_dir(JournalFP), ok = filelib:ensure_dir(LedgerFP), + CompressionMethod = + case get_opt(compression_method, Opts, ?COMPRESSION_METHOD) of + native -> + % Note native compression will have reduced performance + % https://github.com/martinsumner/leveled/issues/95 + native; + lz4 -> + % Must include lz4 library in rebar.config + lz4 + end, + CompressOnReceipt = + case get_opt(compression_point, Opts, ?COMPRESSION_POINT) of + on_receipt -> + % Note this will add measurable delay to PUT time + % https://github.com/martinsumner/leveled/issues/95 + true; + on_compact -> + % If using lz4 this is not recommended + false + end, + {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, max_run_length = get_opt(max_run_length, Opts), waste_retention_period = WRP, - cdb_options = #cdb_options{max_size=MaxJournalSize, - binary_mode=true, - sync_strategy=SyncStrat}}, + compression_method = CompressionMethod, + compress_on_receipt = CompressOnReceipt, + cdb_options = + #cdb_options{max_size=MaxJournalSize, + binary_mode=true, + sync_strategy=SyncStrat}}, #penciller_options{root_path = LedgerFP, max_inmemory_tablesize = PCLL0CacheSize, - levelzero_cointoss = true}}. + levelzero_cointoss = true, + compression_method = CompressionMethod}}. startup(InkerOpts, PencillerOpts, RecentAAE) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index af5a2a5..2c79445 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -46,15 +46,16 @@ to_ledgerkey/3, to_ledgerkey/5, from_ledgerkey/1, - to_inkerkv/4, + to_inkerkv/3, + to_inkerkv/6, from_inkerkv/1, from_inkerkv/2, from_journalkey/1, compact_inkerkvc/2, split_inkvalue/1, check_forinkertype/2, - maybe_compress/1, - create_value_for_journal/2, + maybe_compress/2, + create_value_for_journal/3, build_metadata_object/2, generate_ledgerkv/5, get_size/2, @@ -219,11 +220,13 @@ to_ledgerkey(Bucket, Key, Tag) -> %% Return the Key, Value and Hash Option for this object. The hash option %% indicates whether the key would ever be looked up directly, and so if it %% requires an entry in the hash table -to_inkerkv(LedgerKey, SQN, to_fetch, null) -> - {{SQN, ?INKT_STND, LedgerKey}, null, true}; -to_inkerkv(LedgerKey, SQN, Object, KeyChanges) -> +to_inkerkv(LedgerKey, SQN, to_fetch) -> + {{SQN, ?INKT_STND, LedgerKey}, null, true}. + +to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) -> InkerType = check_forinkertype(LedgerKey, Object), - Value = create_value_for_journal({Object, KeyChanges}, false), + Value = + create_value_for_journal({Object, KeyChanges}, Compress, PressMethod), {{SQN, InkerType, LedgerKey}, Value}. %% Used when fetching objects, so only handles standard, hashable entries @@ -240,45 +243,51 @@ from_inkerkv(Object, ToIgnoreKeyChanges) -> Object end. -create_value_for_journal({Object, KeyChanges}, Compress) +create_value_for_journal({Object, KeyChanges}, Compress, Method) when not is_binary(KeyChanges) -> KeyChangeBin = term_to_binary(KeyChanges, [compressed]), - create_value_for_journal({Object, KeyChangeBin}, Compress); -create_value_for_journal({Object, KeyChangeBin}, Compress) -> + create_value_for_journal({Object, KeyChangeBin}, Compress, Method); +create_value_for_journal({Object, KeyChangeBin}, Compress, Method) -> KeyChangeBinLen = byte_size(KeyChangeBin), - ObjectBin = serialise_object(Object, Compress), - TypeCode = encode_valuetype(is_binary(Object), Compress), + ObjectBin = serialise_object(Object, Compress, Method), + TypeCode = encode_valuetype(is_binary(Object), Compress, Method), <>. -maybe_compress({null, KeyChanges}) -> - create_value_for_journal({null, KeyChanges}, false); -maybe_compress(JournalBin) -> +maybe_compress({null, KeyChanges}, _PressMethod) -> + create_value_for_journal({null, KeyChanges}, false, native); +maybe_compress(JournalBin, PressMethod) -> Length0 = byte_size(JournalBin) - 5, <> = JournalBin, - {IsBinary, IsCompressed} = decode_valuetype(Type), + {IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type), case IsCompressed of true -> JournalBin; false -> Length1 = Length0 - KeyChangeLength, <> = JBin0, - V0 = {deserialise_object(OBin2, IsBinary, IsCompressed), + V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), binary_to_term(KCBin2)}, - create_value_for_journal(V0, true) + create_value_for_journal(V0, true, PressMethod) end. -serialise_object(Object, false) when is_binary(Object) -> +serialise_object(Object, false, _Method) when is_binary(Object) -> Object; -serialise_object(Object, true) when is_binary(Object) -> - zlib:compress(Object); -serialise_object(Object, false) -> +serialise_object(Object, true, Method) when is_binary(Object) -> + case Method of + lz4 -> + {ok, Bin} = lz4:pack(Object), + Bin; + native -> + zlib:compress(Object) + end; +serialise_object(Object, false, _Method) -> term_to_binary(Object); -serialise_object(Object, true) -> +serialise_object(Object, true, _Method) -> term_to_binary(Object, [compressed]). revert_value_from_journal(JournalBin) -> @@ -289,26 +298,34 @@ revert_value_from_journal(JournalBin, ToIgnoreKeyChanges) -> <> = JournalBin, - {IsBinary, IsCompressed} = decode_valuetype(Type), + {IsBinary, IsCompressed, IsLz4} = decode_valuetype(Type), Length1 = Length0 - KeyChangeLength, case ToIgnoreKeyChanges of true -> <> = JBin0, - {deserialise_object(OBin2, IsBinary, IsCompressed), []}; + {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), []}; false -> <> = JBin0, - {deserialise_object(OBin2, IsBinary, IsCompressed), + {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), binary_to_term(KCBin2)} end. -deserialise_object(Binary, true, true) -> +deserialise_object(Binary, true, true, true) -> + {ok, Deflated} = lz4:unpack(Binary), + Deflated; +deserialise_object(Binary, true, true, false) -> zlib:uncompress(Binary); -deserialise_object(Binary, true, false) -> +deserialise_object(Binary, true, false, _IsLz4) -> Binary; -deserialise_object(Binary, false, _) -> +deserialise_object(Binary, false, _, _IsLz4) -> binary_to_term(Binary). -encode_valuetype(IsBinary, IsCompressed) -> +encode_valuetype(IsBinary, IsCompressed, Method) -> + Bit3 = + case Method of + lz4 -> 4; + native -> 0 + end, Bit2 = case IsBinary of true -> 2; @@ -319,12 +336,13 @@ encode_valuetype(IsBinary, IsCompressed) -> true -> 1; false -> 0 end, - Bit1 + Bit2. + Bit1 + Bit2 + Bit3. decode_valuetype(TypeInt) -> IsCompressed = TypeInt band 1 == 1, IsBinary = TypeInt band 2 == 2, - {IsBinary, IsCompressed}. + IsLz4 = TypeInt band 4 == 4, + {IsBinary, IsCompressed, IsLz4}. from_journalkey({SQN, _Type, LedgerKey}) -> {SQN, LedgerKey}. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 5c6b126..f60de64 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -109,7 +109,8 @@ cdb_options, waste_retention_period :: integer() | undefined, waste_path :: string() | undefined, - reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list()}). + reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), + compression_method :: lz4|native}). -record(candidate, {low_sqn :: integer() | undefined, filename :: string() | undefined, @@ -167,7 +168,9 @@ init([IClerkOpts]) -> cdb_options = CDBopts, reload_strategy = ReloadStrategy, waste_path = WP, - waste_retention_period = WRP}}. + waste_retention_period = WRP, + compression_method = + IClerkOpts#iclerk_options.compression_method}}. handle_call(_Msg, _From, State) -> {reply, not_supported, State}. @@ -194,7 +197,8 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, FilterFun, FilterServer, MaxSQN, - State#state.reload_strategy), + State#state.reload_strategy, + State#state.compression_method), FilesToDelete = lists:map(fun(C) -> {C#candidate.low_sqn, C#candidate.filename, @@ -490,7 +494,8 @@ update_inker(Inker, ManifestSlice, FilesToDelete) -> FilesToDelete), ok. -compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> +compact_files(BestRun, CDBopts, FilterFun, FilterServer, + MaxSQN, RStrategy, PressMethod) -> BatchesOfPositions = get_all_positions(BestRun, []), compact_files(BatchesOfPositions, CDBopts, @@ -499,19 +504,20 @@ compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> FilterServer, MaxSQN, RStrategy, + PressMethod, []). compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN, - _RStrategy, ManSlice0) -> + _RStrategy, _PressMethod, ManSlice0) -> ManSlice0; compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN, - _RStrategy, ManSlice0) -> + _RStrategy, _PressMethod, ManSlice0) -> ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0), ManSlice1; compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, MaxSQN, - RStrategy, ManSlice0) -> + RStrategy, PressMethod, ManSlice0) -> {SrcJournal, PositionList} = Batch, KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal, PositionList, @@ -524,9 +530,10 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, {ActiveJournal1, ManSlice1} = write_values(KVCs1, CDBopts, ActiveJournal0, - ManSlice0), + ManSlice0, + PressMethod), compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, - RStrategy, ManSlice1). + RStrategy, PressMethod, ManSlice1). get_all_positions([], PositionBatches) -> PositionBatches; @@ -577,12 +584,12 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> KVCs). -write_values([], _CDBopts, Journal0, ManSlice0) -> +write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> {Journal0, ManSlice0}; -write_values(KVCList, CDBopts, Journal0, ManSlice0) -> +write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> KVList = lists:map(fun({K, V, _C}) -> % Compress the value as part of compaction - {K, leveled_codec:maybe_compress(V)} + {K, leveled_codec:maybe_compress(V, PressMethod)} end, KVCList), {ok, Journal1} = case Journal0 of @@ -605,7 +612,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) -> {Journal1, ManSlice0}; roll -> ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1), - write_values(KVCList, CDBopts, null, ManSlice1) + write_values(KVCList, CDBopts, null, ManSlice1, PressMethod) end. clear_waste(State) -> @@ -754,7 +761,8 @@ test_ledgerkey(Key) -> {o, "Bucket", Key, null}. test_inkerkv(SQN, Key, V, IdxSpecs) -> - leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs). + leveled_codec:to_inkerkv(test_ledgerkey(Key), SQN, V, IdxSpecs, + native, false). fetch_testcdb(RP) -> FN1 = leveled_inker:filepath(RP, 1, new_journal), @@ -839,7 +847,8 @@ compact_single_file_recovr_test() -> LedgerFun1, LedgerSrv1, 9, - [{?STD_TAG, recovr}]), + [{?STD_TAG, recovr}], + native), io:format("FN of ~s~n", [FN]), ?assertMatch(2, LowSQN), ?assertMatch(probably, @@ -876,7 +885,8 @@ compact_single_file_retain_test() -> LedgerFun1, LedgerSrv1, 9, - [{?STD_TAG, retain}]), + [{?STD_TAG, retain}], + native), io:format("FN of ~s~n", [FN]), ?assertMatch(1, LowSQN), ?assertMatch(probably, @@ -936,13 +946,16 @@ compact_singlefile_totwosmallfiles_testto() -> lists:foreach(fun(X) -> LK = test_ledgerkey("Key" ++ integer_to_list(X)), Value = leveled_rand:rand_bytes(1024), - {IK, IV} = leveled_codec:to_inkerkv(LK, X, Value, []), + {IK, IV} = + leveled_codec:to_inkerkv(LK, X, Value, [], + native, true), ok = leveled_cdb:cdb_put(CDB1, IK, IV) end, lists:seq(1, 1000)), {ok, NewName} = leveled_cdb:cdb_complete(CDB1), {ok, CDBr} = leveled_cdb:cdb_open_reader(NewName), - CDBoptsSmall = #cdb_options{binary_mode=true, max_size=400000, file_path=CP}, + CDBoptsSmall = + #cdb_options{binary_mode=true, max_size=400000, file_path=CP}, BestRun1 = [#candidate{low_sqn=1, filename=leveled_cdb:cdb_filename(CDBr), journal=CDBr, @@ -954,7 +967,8 @@ compact_singlefile_totwosmallfiles_testto() -> FakeFilterFun, null, 900, - [{?STD_TAG, recovr}]), + [{?STD_TAG, recovr}], + native), ?assertMatch(2, length(ManifestSlice)), lists:foreach(fun({_SQN, _FN, CDB, _LK}) -> ok = leveled_cdb:cdb_deletepending(CDB), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 8385699..74433b9 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -136,6 +136,8 @@ clerk :: pid() | undefined, compaction_pending = false :: boolean(), is_snapshot = false :: boolean(), + compression_method :: lz4|native, + compress_on_receipt :: boolean(), source_inker :: pid() | undefined}). @@ -506,10 +508,13 @@ start_from_file(InkOpts) -> ReloadStrategy = InkOpts#inker_options.reload_strategy, MRL = InkOpts#inker_options.max_run_length, WRP = InkOpts#inker_options.waste_retention_period, + PressMethod = InkOpts#inker_options.compression_method, + PressOnReceipt = InkOpts#inker_options.compress_on_receipt, IClerkOpts = #iclerk_options{inker = self(), cdb_options=IClerkCDBOpts, waste_retention_period = WRP, reload_strategy = ReloadStrategy, + compression_method = PressMethod, max_run_length = MRL}, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), @@ -525,16 +530,21 @@ start_from_file(InkOpts) -> active_journaldb = ActiveJournal, root_path = RootPath, cdb_options = CDBopts#cdb_options{waste_path=WasteFP}, + compression_method = PressMethod, + compress_on_receipt = PressOnReceipt, clerk = Clerk}}. put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, ActiveJournal = State#state.active_journaldb, - {JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey, - NewSQN, - Object, - KeyChanges), + {JournalKey, JournalBin} = + leveled_codec:to_inkerkv(LedgerKey, + NewSQN, + Object, + KeyChanges, + State#state.compression_method, + State#state.compress_on_receipt), case leveled_cdb:cdb_put(ActiveJournal, JournalKey, JournalBin) of @@ -576,19 +586,15 @@ get_object(LedgerKey, SQN, Manifest) -> get_object(LedgerKey, SQN, Manifest, ToIgnoreKeyChanges) -> JournalP = leveled_imanifest:find_entry(SQN, Manifest), - {InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, - SQN, - to_fetch, - null), + {InkerKey, _V, true} = + leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch), Obj = leveled_cdb:cdb_get(JournalP, InkerKey), leveled_codec:from_inkerkv(Obj, ToIgnoreKeyChanges). key_check(LedgerKey, SQN, Manifest) -> JournalP = leveled_imanifest:find_entry(SQN, Manifest), - {InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, - SQN, - to_fetch, - null), + {InkerKey, _V, true} = + leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch), leveled_cdb:cdb_keycheck(JournalP, InkerKey). build_manifest(ManifestFilenames, @@ -848,7 +854,7 @@ initiate_penciller_snapshot(Bookie) -> -ifdef(TEST). create_value_for_journal(Obj, Comp) -> - leveled_codec:create_value_for_journal(Obj, Comp). + leveled_codec:create_value_for_journal(Obj, Comp, native). build_dummy_journal() -> F = fun(X) -> X end, @@ -930,7 +936,9 @@ simple_inker_test() -> build_dummy_journal(), CDBopts = #cdb_options{max_size=300000, binary_mode=true}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), + cdb_options=CDBopts, + compression_method=native, + compress_on_receipt=true}), Obj1 = ink_get(Ink1, "Key1", 1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), Obj3 = ink_get(Ink1, "Key1", 3), @@ -952,7 +960,9 @@ simple_inker_completeactivejournal_test() -> F1r = filename:join(JournalFP, "nursery_1.pnd"), ok = file:rename(F1, F1r), {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), + cdb_options=CDBopts, + compression_method=native, + compress_on_receipt=true}), Obj1 = ink_get(Ink1, "Key1", 1), ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), Obj2 = ink_get(Ink1, "Key4", 4), @@ -970,7 +980,9 @@ compact_journal_test() -> RStrategy = [{?STD_TAG, recovr}], {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts, - reload_strategy=RStrategy}), + reload_strategy=RStrategy, + compression_method=native, + compress_on_receipt=false}), {ok, NewSQN1, _ObjSize} = ink_put(Ink1, test_ledgerkey("KeyAA"), "TestValueAA", @@ -1030,7 +1042,9 @@ empty_manifest_test() -> clean_testdir(RootPath), CDBopts = #cdb_options{max_size=300000}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), + cdb_options=CDBopts, + compression_method=native, + compress_on_receipt=true}), ?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)), CheckFun = fun(L, K, SQN) -> lists:member({SQN, K}, L) end, @@ -1050,7 +1064,9 @@ empty_manifest_test() -> ok = file:write_file(FN, term_to_binary("Hello")), {ok, Ink2} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), + cdb_options=CDBopts, + compression_method=native, + compress_on_receipt=false}), ?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", {[], infinity}), ?assertMatch(2, SQN), diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index c412cf4..ec10982 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -35,7 +35,7 @@ ]). -export([ - clerk_new/2, + clerk_new/3, clerk_prompt/1, clerk_push/2, clerk_close/1, @@ -49,15 +49,19 @@ -record(state, {owner :: pid() | undefined, root_path :: string() | undefined, - pending_deletions = dict:new() % OTP 16 does not like type + pending_deletions = dict:new(), % OTP 16 does not like type + compression_method :: lz4|native }). %%%============================================================================ %%% API %%%============================================================================ -clerk_new(Owner, Manifest) -> - {ok, Pid} = gen_server:start(?MODULE, [], []), +clerk_new(Owner, Manifest, CompressionMethod) -> + {ok, Pid} = + gen_server:start(?MODULE, + [{compression_method, CompressionMethod}], + []), ok = gen_server:call(Pid, {load, Owner, Manifest}, infinity), leveled_log:log("PC001", [Pid, Owner]), {ok, Pid}. @@ -78,8 +82,8 @@ clerk_close(Pid) -> %%% gen_server callbacks %%%============================================================================ -init([]) -> - {ok, #state{}}. +init([{compression_method, CompressionMethod}]) -> + {ok, #state{compression_method = CompressionMethod}}. handle_call({load, Owner, RootPath}, _From, State) -> {reply, ok, State#state{owner=Owner, root_path=RootPath}, ?MIN_TIMEOUT}; @@ -120,7 +124,8 @@ request_work(State) -> handle_work({SrcLevel, Manifest}, State) -> {UpdManifest, EntriesToDelete} = merge(SrcLevel, Manifest, - State#state.root_path), + State#state.root_path, + State#state.compression_method), leveled_log:log("PC007", []), SWMC = os:timestamp(), ok = leveled_penciller:pcl_manifestchange(State#state.owner, @@ -132,7 +137,7 @@ handle_work({SrcLevel, Manifest}, State) -> leveled_log:log_timer("PC018", [], SWSM), {leveled_pmanifest:get_manifest_sqn(UpdManifest), EntriesToDelete}. -merge(SrcLevel, Manifest, RootPath) -> +merge(SrcLevel, Manifest, RootPath, CompressionMethod) -> Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel), NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1, SinkList = leveled_pmanifest:merge_lookup(Manifest, @@ -152,7 +157,9 @@ merge(SrcLevel, Manifest, RootPath) -> {Man0, []}; _ -> SST_RP = leveled_penciller:sst_rootpath(RootPath), - perform_merge(Manifest, Src, SinkList, SrcLevel, SST_RP, NewSQN) + perform_merge(Manifest, + Src, SinkList, SrcLevel, + SST_RP, NewSQN, CompressionMethod) end. notify_deletions([], _Penciller) -> @@ -167,16 +174,21 @@ notify_deletions([Head|Tail], Penciller) -> %% %% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1 -perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> +perform_merge(Manifest, + Src, SinkList, SrcLevel, + RootPath, NewSQN, + CompressionMethod) -> leveled_log:log("PC010", [Src#manifest_entry.filename, NewSQN]), SrcList = [{next, Src, all}], MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), SinkLevel = SrcLevel + 1, SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel), - Additions = do_merge(SrcList, SinkList, - SinkLevel, SinkBasement, - RootPath, NewSQN, MaxSQN, - []), + Additions = + do_merge(SrcList, SinkList, + SinkLevel, SinkBasement, + RootPath, NewSQN, MaxSQN, + CompressionMethod, + []), RevertPointerFun = fun({next, ME, _SK}) -> ME @@ -193,22 +205,23 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) -> Src), {Man2, [Src|SinkManifestList]}. -do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) -> +do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _CM, Additions) -> leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), Additions; -do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) -> +do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, CM, Additions) -> FileName = leveled_penciller:sst_filename(NewSQN, SinkLevel, length(Additions)), leveled_log:log("PC012", [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), case leveled_sst:sst_new(RP, FileName, - KL1, KL2, SinkB, SinkLevel, MaxSQN) of + KL1, KL2, SinkB, SinkLevel, MaxSQN, CM) of empty -> leveled_log:log("PC013", [FileName]), do_merge([], [], SinkLevel, SinkB, RP, NewSQN, MaxSQN, + CM, Additions); {ok, Pid, Reply} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, @@ -220,6 +233,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) -> do_merge(KL1Rem, KL2Rem, SinkLevel, SinkB, RP, NewSQN, MaxSQN, + CM, Additions ++ [Entry]) end. @@ -265,31 +279,36 @@ merge_file_test() -> "KL1_L1.sst", 1, KL1_L1, - 999999), + 999999, + native), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), {ok, PidL2_1, _} = leveled_sst:sst_new("../test/", "KL1_L2.sst", 2, KL1_L2, - 999999), + 999999, + native), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), {ok, PidL2_2, _} = leveled_sst:sst_new("../test/", "KL2_L2.sst", 2, KL2_L2, - 999999), + 999999, + lz4), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), {ok, PidL2_3, _} = leveled_sst:sst_new("../test/", "KL3_L2.sst", 2, KL3_L2, - 999999), + 999999, + lz4), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), {ok, PidL2_4, _} = leveled_sst:sst_new("../test/", "KL4_L2.sst", 2, KL4_L2, - 999999), + 999999, + lz4), E1 = #manifest_entry{owner = PidL1_1, filename = "./KL1_L1.sst", @@ -321,7 +340,8 @@ merge_file_test() -> PointerList = lists:map(fun(ME) -> {next, ME, all} end, [E2, E3, E4, E5]), - {Man6, _Dels} = perform_merge(Man5, E1, PointerList, 1, "../test", 3), + {Man6, _Dels} = + perform_merge(Man5, E1, PointerList, 1, "../test", 3, native), ?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index d42795c..1355b9c 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -244,7 +244,9 @@ work_ongoing = false :: boolean(), % i.e. compaction work work_backlog = false :: boolean(), % i.e. compaction work - head_timing :: tuple() | undefined}). + head_timing :: tuple() | undefined, + + compression_method :: lz4|native}). -type penciller_options() :: #penciller_options{}. -type bookies_memory() :: {tuple()|empty_cache, @@ -861,25 +863,28 @@ sst_filename(ManSQN, Level, Count) -> start_from_file(PCLopts) -> RootPath = PCLopts#penciller_options.root_path, - MaxTableSize = case PCLopts#penciller_options.max_inmemory_tablesize of - undefined -> - ?MAX_TABLESIZE; - M -> - M - end, + MaxTableSize = + case PCLopts#penciller_options.max_inmemory_tablesize of + undefined -> + ?MAX_TABLESIZE; + M -> + M + end, + PressMethod = PCLopts#penciller_options.compression_method, - {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath), + {ok, MergeClerk} = leveled_pclerk:clerk_new(self(), RootPath, PressMethod), CoinToss = PCLopts#penciller_options.levelzero_cointoss, % Used to randomly defer the writing of L0 file. Intended to help with % vnode syncronisation issues (e.g. stop them all by default merging to % level zero concurrently) - InitState = #state{clerk=MergeClerk, - root_path=RootPath, - levelzero_maxcachesize=MaxTableSize, - levelzero_cointoss=CoinToss, - levelzero_index=leveled_pmem:new_index()}, + InitState = #state{clerk = MergeClerk, + root_path = RootPath, + levelzero_maxcachesize = MaxTableSize, + levelzero_cointoss = CoinToss, + levelzero_index = leveled_pmem:new_index(), + compression_method = PressMethod}, %% Open manifest Manifest0 = leveled_pmanifest:open_manifest(RootPath), @@ -887,7 +892,8 @@ start_from_file(PCLopts) -> fun(FN) -> {ok, Pid, - {_FK, _LK}} = leveled_sst:sst_open(sst_rootpath(RootPath), FN), + {_FK, _LK}} = + leveled_sst:sst_open(sst_rootpath(RootPath), FN), Pid end, SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, @@ -1033,7 +1039,8 @@ roll_memory(State, false) -> length(State#state.levelzero_cache), FetchFun, PCL, - State#state.ledger_sqn), + State#state.ledger_sqn, + State#state.compression_method), {ok, Constructor, _} = R, Constructor; roll_memory(State, true) -> @@ -1047,7 +1054,8 @@ roll_memory(State, true) -> FileName, 0, KVList, - State#state.ledger_sqn), + State#state.ledger_sqn, + State#state.compression_method), {ok, Constructor, _} = R, Constructor. @@ -1498,7 +1506,8 @@ simple_server_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000}), + max_inmemory_tablesize=1000, + compression_method=native}), Key1_Pre = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, Key1 = add_missing_hash(Key1_Pre), @@ -1538,7 +1547,8 @@ simple_server_test() -> ok = pcl_close(PCL), {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, - max_inmemory_tablesize=1000}), + max_inmemory_tablesize=1000, + compression_method=native}), ?assertMatch(2003, pcl_getstartupsequencenumber(PCLr)), % ok = maybe_pause_push(PCLr, [Key2] ++ KL2 ++ [Key3]), @@ -1790,7 +1800,8 @@ create_file_test() -> 1, FetchFun, undefined, - 10000), + 10000, + native), lists:foreach(fun(X) -> case checkready(SP) of timeout -> diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 388172f..3a14d12 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -91,9 +91,9 @@ delete_pending/2, delete_pending/3]). --export([sst_new/5, - sst_new/7, - sst_newlevelzero/6, +-export([sst_new/6, + sst_new/8, + sst_newlevelzero/7, sst_open/2, sst_get/2, sst_get/3, @@ -118,10 +118,12 @@ -record(summary, {first_key :: tuple(), last_key :: tuple(), - index :: tuple() | undefined, + index :: tuple() | undefined, size :: integer(), max_sqn :: integer()}). +-type press_methods() :: lz4|native. + %% yield_blockquery is used to detemrine if the work necessary to process a %% range query beyond the fetching the slot should be managed from within %% this process, or should be handled by the calling process. @@ -129,14 +131,16 @@ %% see Issue 52. Handling within the SST process may lead to contention and %% extra copying. Files at the top of the tree yield, those lower down don't. --record(state, {summary, - handle :: file:fd() | undefined, - sst_timings :: tuple() | undefined, - penciller :: pid() | undefined, - root_path, - filename, - yield_blockquery = false :: boolean(), - blockindex_cache}). +-record(state, + {summary, + handle :: file:fd() | undefined, + sst_timings :: tuple() | undefined, + penciller :: pid() | undefined, + root_path, + filename, + yield_blockquery = false :: boolean(), + blockindex_cache, + compression_method :: press_methods()}). -type sst_state() :: #state{}. @@ -162,29 +166,31 @@ sst_open(RootPath, Filename) -> {ok, Pid, {SK, EK}} end. --spec sst_new(string(), string(), integer(), list(), integer()) -> +-spec sst_new(string(), string(), integer(), + list(), integer(), press_methods()) -> {ok, pid(), {tuple(), tuple()}}. %% @doc %% Start a new SST file at the assigned level passing in a list of Key, Value %% pairs. This should not be used for basement levels or unexpanded Key/Value %% lists as merge_lists will not be called. -sst_new(RootPath, Filename, Level, KVList, MaxSQN) -> +sst_new(RootPath, Filename, Level, KVList, MaxSQN, PressMethod) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), - {[], [], SlotList, FK} = merge_lists(KVList), + {[], [], SlotList, FK} = merge_lists(KVList, PressMethod), case gen_fsm:sync_send_event(Pid, {sst_new, RootPath, Filename, Level, {SlotList, FK}, - MaxSQN}, + MaxSQN, + PressMethod}, infinity) of {ok, {SK, EK}} -> {ok, Pid, {SK, EK}} end. -spec sst_new(string(), string(), list(), list(), - boolean(), integer(), integer()) -> + boolean(), integer(), integer(), press_methods()) -> empty|{ok, pid(), {{list(), list()}, tuple(), tuple()}}. %% @doc %% Start a new SST file at the assigned level passing in a two lists of @@ -197,8 +203,11 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN) -> %% be that the merge_lists returns nothin (for example when a basement file is %% all tombstones) - and the atome empty is returned in this case so that the %% file is not added to the manifest. -sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN) -> - {Rem1, Rem2, SlotList, FK} = merge_lists(KVL1, KVL2, {IsBasement, Level}), +sst_new(RootPath, Filename, + KVL1, KVL2, IsBasement, Level, + MaxSQN, PressMethod) -> + {Rem1, Rem2, SlotList, FK} = + merge_lists(KVL1, KVL2, {IsBasement, Level}, PressMethod), case SlotList of [] -> empty; @@ -210,7 +219,8 @@ sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN) -> Filename, Level, {SlotList, FK}, - MaxSQN}, + MaxSQN, + PressMethod}, infinity) of {ok, {SK, EK}} -> {ok, Pid, {{Rem1, Rem2}, SK, EK}} @@ -218,13 +228,16 @@ sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN) -> end. -spec sst_newlevelzero(string(), string(), - integer(), fun(), pid()|undefined, integer()) -> + integer(), fun(), pid()|undefined, integer(), + press_methods()) -> {ok, pid(), noreply}. %% @doc %% Start a new file at level zero. At this level the file size is not fixed - %% it will be as big as the input. Also the KVList is not passed in, it is %% fetched slot by slot using the FetchFun -sst_newlevelzero(RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN) -> +sst_newlevelzero(RootPath, Filename, + Slots, FetchFun, Penciller, + MaxSQN, PressMethod) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), gen_fsm:send_event(Pid, {sst_newlevelzero, @@ -233,7 +246,8 @@ sst_newlevelzero(RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN) -> Slots, FetchFun, Penciller, - MaxSQN}), + MaxSQN, + PressMethod}), {ok, Pid, noreply}. -spec sst_get(pid(), tuple()) -> tuple()|not_present. @@ -279,8 +293,9 @@ sst_getfilteredrange(Pid, StartKey, EndKey, ScanWidth, SegList) -> StartKey, EndKey, ScanWidth, SegList0}, infinity) of - {yield, SlotsToFetchBinList, SlotsToPoint} -> - binaryslot_reader(SlotsToFetchBinList) ++ SlotsToPoint; + {yield, SlotsToFetchBinList, SlotsToPoint, PressMethod} -> + binaryslot_reader(SlotsToFetchBinList, PressMethod) + ++ SlotsToPoint; Reply -> Reply end. @@ -302,11 +317,10 @@ sst_getslots(Pid, SlotList) -> %% returned (not precisely - with false results returned in addition). Use %% false as a SegList to not filter sst_getfilteredslots(Pid, SlotList, SegList) -> - SegList0 = tune_seglist(SegList), - SlotBins = gen_fsm:sync_send_event(Pid, - {get_slots, SlotList, SegList0}, - infinity), - binaryslot_reader(SlotBins). + SegL0 = tune_seglist(SegList), + {SlotBins, PressMethod} = + gen_fsm:sync_send_event(Pid, {get_slots, SlotList, SegL0}, infinity), + binaryslot_reader(SlotBins, PressMethod). -spec sst_getmaxsequencenumber(pid()) -> integer(). %% @doc @@ -375,19 +389,22 @@ starting({sst_open, RootPath, Filename}, _From, State) -> {ok, {Summary#summary.first_key, Summary#summary.last_key}}, reader, UpdState}; -starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN}, - _From, State) -> +starting({sst_new, + RootPath, Filename, Level, + {SlotList, FirstKey}, MaxSQN, + PressMethod}, _From, State) -> SW = os:timestamp(), {Length, SlotIndex, BlockIndex, - SlotsBin} = build_all_slots(SlotList), + SlotsBin} = build_all_slots(SlotList, PressMethod), SummaryBin = build_table_summary(SlotIndex, Level, FirstKey, Length, MaxSQN), - ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin), + ActualFilename = + write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod), YBQ = Level =< 2, UpdState = read_file(ActualFilename, State#state{root_path=RootPath, @@ -402,20 +419,22 @@ starting({sst_new, RootPath, Filename, Level, {SlotList, FirstKey}, MaxSQN}, UpdState#state{blockindex_cache = BlockIndex}}. starting({sst_newlevelzero, RootPath, Filename, - Slots, FetchFun, Penciller, MaxSQN}, State) -> + Slots, FetchFun, Penciller, MaxSQN, + PressMethod}, State) -> SW = os:timestamp(), KVList = leveled_pmem:to_list(Slots, FetchFun), - {[], [], SlotList, FirstKey} = merge_lists(KVList), + {[], [], SlotList, FirstKey} = merge_lists(KVList, PressMethod), {SlotCount, SlotIndex, BlockIndex, - SlotsBin} = build_all_slots(SlotList), + SlotsBin} = build_all_slots(SlotList, PressMethod), SummaryBin = build_table_summary(SlotIndex, 0, FirstKey, SlotCount, MaxSQN), - ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin), + ActualFilename = + write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod), UpdState = read_file(ActualFilename, State#state{root_path = RootPath, yield_blockquery = true}), @@ -425,13 +444,17 @@ starting({sst_newlevelzero, RootPath, Filename, SW), case Penciller of undefined -> - {next_state, reader, UpdState#state{blockindex_cache = BlockIndex}}; + {next_state, + reader, + UpdState#state{blockindex_cache = BlockIndex}}; _ -> leveled_penciller:pcl_confirml0complete(Penciller, UpdState#state.filename, Summary#summary.first_key, Summary#summary.last_key), - {next_state, reader, UpdState#state{blockindex_cache = BlockIndex}} + {next_state, + reader, + UpdState#state{blockindex_cache = BlockIndex}} end. @@ -446,15 +469,20 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, _From, State) -> ScanWidth, SlotList, State), + PressMethod = State#state.compression_method, case State#state.yield_blockquery of true -> {reply, - {yield, SlotsToFetchBinList, SlotsToPoint}, + {yield, + SlotsToFetchBinList, + SlotsToPoint, + PressMethod}, reader, State}; false -> {reply, - binaryslot_reader(SlotsToFetchBinList) ++ SlotsToPoint, + binaryslot_reader(SlotsToFetchBinList, PressMethod) + ++ SlotsToPoint, reader, State} end; @@ -462,8 +490,9 @@ reader({get_slots, SlotList, SegList}, _From, State) -> SlotBins = read_slots(State#state.handle, SlotList, - {SegList, State#state.blockindex_cache}), - {reply, SlotBins, reader, State}; + {SegList, State#state.blockindex_cache}, + State#state.compression_method), + {reply, {SlotBins, State#state.compression_method}, reader, State}; reader(get_maxsequencenumber, _From, State) -> Summary = State#state.summary, {reply, Summary#summary.max_sqn, reader, State}; @@ -502,8 +531,9 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SlotList}, SlotList, State), % Always yield as about to clear and de-reference + PressMethod = State#state.compression_method, {reply, - {yield, SlotsToFetchBinList, SlotsToPoint}, + {yield, SlotsToFetchBinList, SlotsToPoint, PressMethod}, delete_pending, State, ?DELETE_TIMEOUT}; @@ -511,8 +541,13 @@ delete_pending({get_slots, SlotList, SegList}, _From, State) -> SlotBins = read_slots(State#state.handle, SlotList, - {SegList, State#state.blockindex_cache}), - {reply, SlotBins, delete_pending, State, ?DELETE_TIMEOUT}; + {SegList, State#state.blockindex_cache}, + State#state.compression_method), + {reply, + {SlotBins, State#state.compression_method}, + delete_pending, + State, + ?DELETE_TIMEOUT}; delete_pending(close, _From, State) -> leveled_log:log("SST07", [State#state.filename]), ok = file:close(State#state.handle), @@ -558,6 +593,7 @@ code_change(_OldVsn, StateName, State, _Extra) -> fetch(LedgerKey, Hash, State) -> Summary = State#state.summary, + PressMethod = State#state.compression_method, Slot = lookup_slot(LedgerKey, Summary#summary.index), SlotID = Slot#slot_index_value.slot_id, Bloom = Slot#slot_index_value.bloom, @@ -570,9 +606,8 @@ fetch(LedgerKey, Hash, State) -> case CachedBlockIdx of none -> SlotBin = read_slot(State#state.handle, Slot), - {Result, - BlockLengths, - BlockIdx} = binaryslot_get(SlotBin, LedgerKey, Hash), + {Result, BlockLengths, BlockIdx} = + binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod), BlockIndexCache = array:set(SlotID - 1, <>, @@ -597,6 +632,7 @@ fetch(LedgerKey, Hash, State) -> StartPos, BlockLengths, LedgerKey, + PressMethod, not_present), {Result, slot_fetch, SlotID, State} end @@ -667,16 +703,19 @@ fetch_range(StartKey, EndKey, ScanWidth, SegList, State) -> SlotsToFetchBinList = read_slots(Handle, SlotsToFetch, - {SegList, State#state.blockindex_cache}), + {SegList, State#state.blockindex_cache}, + State#state.compression_method), {SlotsToFetchBinList, SlotsToPoint}. -write_file(RootPath, Filename, SummaryBin, SlotsBin) -> +write_file(RootPath, Filename, SummaryBin, SlotsBin, PressMethod) -> SummaryLength = byte_size(SummaryBin), SlotsLength = byte_size(SlotsBin), {PendingName, FinalName} = generate_filenames(Filename), + FileVersion = gen_fileversion(PressMethod), ok = file:write_file(filename:join(RootPath, PendingName), - <>, @@ -695,27 +734,48 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin) -> FinalName. read_file(Filename, State) -> - {Handle, SummaryBin} = open_reader(filename:join(State#state.root_path, - Filename)), + {Handle, FileVersion, SummaryBin} = + open_reader(filename:join(State#state.root_path, Filename)), + UpdState0 = imp_fileversion(FileVersion, State), {Summary, SlotList} = read_table_summary(SummaryBin), BlockIndexCache = array:new([{size, Summary#summary.size}, {default, none}]), - UpdState = State#state{blockindex_cache = BlockIndexCache}, + UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache}, SlotIndex = from_list(SlotList), UpdSummary = Summary#summary{index = SlotIndex}, leveled_log:log("SST03", [Filename, Summary#summary.size, Summary#summary.max_sqn]), - UpdState#state{summary = UpdSummary, + UpdState1#state{summary = UpdSummary, handle = Handle, filename = Filename}. +gen_fileversion(PressMethod) -> + Bit1 = + case PressMethod of + lz4 -> 1; + native -> 0 + end, + Bit1. + +imp_fileversion(VersionInt, State) -> + UpdState = + case VersionInt band 1 of + 0 -> + State#state{compression_method = native}; + 1 -> + State#state{compression_method = lz4} + end, + UpdState. + open_reader(Filename) -> {ok, Handle} = file:open(Filename, [binary, raw, read]), - {ok, Lengths} = file:pread(Handle, 0, 8), - <> = Lengths, - {ok, SummaryBin} = file:pread(Handle, SlotsLength + 8, SummaryLength), - {Handle, SummaryBin}. + {ok, Lengths} = file:pread(Handle, 0, 9), + <> = Lengths, + {ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength), + {Handle, FileVersion, SummaryBin}. build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN) -> [{LastKey, _LastV}|_Rest] = SlotIndex, @@ -738,23 +798,26 @@ read_table_summary(BinWithCheck) -> end. -build_all_slots(SlotList) -> +build_all_slots(SlotList, PressMethod) -> SlotCount = length(SlotList), BuildResponse = build_all_slots(SlotList, - 8, + 9, 1, [], array:new([{size, SlotCount}, {default, none}]), - <<>>), + <<>>, + PressMethod), {SlotIndex, BlockIndex, SlotsBin} = BuildResponse, {SlotCount, SlotIndex, BlockIndex, SlotsBin}. build_all_slots([], _Pos, _SlotID, - SlotIdxAcc, BlockIdxAcc, SlotBinAcc) -> + SlotIdxAcc, BlockIdxAcc, SlotBinAcc, + _PressMethod) -> {SlotIdxAcc, BlockIdxAcc, SlotBinAcc}; build_all_slots([SlotD|Rest], Pos, SlotID, - SlotIdxAcc, BlockIdxAcc, SlotBinAcc) -> + SlotIdxAcc, BlockIdxAcc, SlotBinAcc, + PressMethod) -> {BlockIdx, SlotBin, HashList, LastKey} = SlotD, Length = byte_size(SlotBin), Bloom = leveled_tinybloom:create_bloom(HashList), @@ -767,7 +830,8 @@ build_all_slots([SlotD|Rest], Pos, SlotID, SlotID + 1, [{LastKey, SlotIndexV}|SlotIdxAcc], array:set(SlotID - 1, BlockIdx, BlockIdxAcc), - <>). + <>, + PressMethod). generate_filenames(RootFilename) -> @@ -785,6 +849,32 @@ generate_filenames(RootFilename) -> end. +-spec serialise_block(any(), press_methods()) -> binary(). +%% @doc +%% Convert term to binary +%% Function split out to make it easier to experiment with different +%% compression methods. Also, perhaps standardise applictaion of CRC +%% checks +serialise_block(Term, lz4) -> + {ok, Bin} = lz4:pack(term_to_binary(Term)), + Bin; +serialise_block(Term, native) -> + term_to_binary(Term, ?BINARY_SETTINGS). + + +-spec deserialise_block(binary(), press_methods()) -> any(). +%% @doc +%% Convert binary to term +%% Function split out to make it easier to experiment with different +%% compression methods. Also, perhaps standardise applictaion of CRC +%% checks +deserialise_block(Bin, lz4) -> + {ok, Bin0} = lz4:unpack(Bin), + binary_to_term(Bin0); +deserialise_block(Bin, native) -> + binary_to_term(Bin). + + %%%============================================================================ %%% SlotIndex Implementation %%%============================================================================ @@ -846,7 +936,7 @@ lookup_slots(StartKey, EndKey, Tree) -> %% based on a 17-bit hash (so 0.0039 fpr). -generate_binary_slot(Lookup, KVL) -> +generate_binary_slot(Lookup, KVL, PressMethod) -> HashFoldFun = fun({K, V}, {PosBinAcc, NoHashCount, HashAcc}) -> @@ -910,45 +1000,45 @@ generate_binary_slot(Lookup, KVL) -> {B1, B2, B3, B4, B5} = case length(KVL) of L when L =< SideBlockSize -> - {term_to_binary(KVL, ?BINARY_SETTINGS), + {serialise_block(KVL, PressMethod), <<0:0>>, <<0:0>>, <<0:0>>, <<0:0>>}; L when L =< 2 * SideBlockSize -> {KVLA, KVLB} = lists:split(SideBlockSize, KVL), - {term_to_binary(KVLA, ?BINARY_SETTINGS), - term_to_binary(KVLB, ?BINARY_SETTINGS), + {serialise_block(KVLA, PressMethod), + serialise_block(KVLB, PressMethod), <<0:0>>, <<0:0>>, <<0:0>>}; L when L =< (2 * SideBlockSize + MidBlockSize) -> {KVLA, KVLB_Rest} = lists:split(SideBlockSize, KVL), {KVLB, KVLC} = lists:split(SideBlockSize, KVLB_Rest), - {term_to_binary(KVLA, ?BINARY_SETTINGS), - term_to_binary(KVLB, ?BINARY_SETTINGS), - term_to_binary(KVLC, ?BINARY_SETTINGS), + {serialise_block(KVLA, PressMethod), + serialise_block(KVLB, PressMethod), + serialise_block(KVLC, PressMethod), <<0:0>>, <<0:0>>}; L when L =< (3 * SideBlockSize + MidBlockSize) -> {KVLA, KVLB_Rest} = lists:split(SideBlockSize, KVL), {KVLB, KVLC_Rest} = lists:split(SideBlockSize, KVLB_Rest), {KVLC, KVLD} = lists:split(MidBlockSize, KVLC_Rest), - {term_to_binary(KVLA, ?BINARY_SETTINGS), - term_to_binary(KVLB, ?BINARY_SETTINGS), - term_to_binary(KVLC, ?BINARY_SETTINGS), - term_to_binary(KVLD, ?BINARY_SETTINGS), + {serialise_block(KVLA, PressMethod), + serialise_block(KVLB, PressMethod), + serialise_block(KVLC, PressMethod), + serialise_block(KVLD, PressMethod), <<0:0>>}; L when L =< (4 * SideBlockSize + MidBlockSize) -> {KVLA, KVLB_Rest} = lists:split(SideBlockSize, KVL), {KVLB, KVLC_Rest} = lists:split(SideBlockSize, KVLB_Rest), {KVLC, KVLD_Rest} = lists:split(MidBlockSize, KVLC_Rest), {KVLD, KVLE} = lists:split(SideBlockSize, KVLD_Rest), - {term_to_binary(KVLA, ?BINARY_SETTINGS), - term_to_binary(KVLB, ?BINARY_SETTINGS), - term_to_binary(KVLC, ?BINARY_SETTINGS), - term_to_binary(KVLD, ?BINARY_SETTINGS), - term_to_binary(KVLE, ?BINARY_SETTINGS)} + {serialise_block(KVLA, PressMethod), + serialise_block(KVLB, PressMethod), + serialise_block(KVLC, PressMethod), + serialise_block(KVLD, PressMethod), + serialise_block(KVLE, PressMethod)} end, B1P = byte_size(PosBinIndex), @@ -977,17 +1067,18 @@ generate_binary_slot(Lookup, KVL) -> % Acc should start as not_present if LedgerKey is a key, and a list if % LedgerKey is false -check_blocks([], _Handle, _StartPos, _BlockLengths, _LedgerKeyToCheck, Acc) -> +check_blocks([], _Handle, _StartPos, _BlockLengths, + _LedgerKeyToCheck, _PressMethod, Acc) -> Acc; -check_blocks([Pos|Rest], Handle, StartPos, BlockLengths, - LedgerKeyToCheck, Acc) -> +check_blocks([Pos|Rest], Handle, StartPos, + BlockLengths, LedgerKeyToCheck, PressMethod, Acc) -> {BlockNumber, BlockPos} = revert_position(Pos), BlockBin = read_block(Handle, StartPos, BlockLengths, BlockNumber), - BlockL = binary_to_term(BlockBin), + BlockL = deserialise_block(BlockBin, PressMethod), {K, V} = lists:nth(BlockPos, BlockL), case K of LedgerKeyToCheck -> @@ -998,7 +1089,7 @@ check_blocks([Pos|Rest], Handle, StartPos, BlockLengths, Acc ++ [{K, V}]; _ -> check_blocks(Rest, Handle, StartPos, BlockLengths, - LedgerKeyToCheck, Acc) + LedgerKeyToCheck, PressMethod, Acc) end end. @@ -1048,7 +1139,8 @@ binarysplit_mapfun(MultiSlotBin, StartPos) -> end. --spec read_slots(file:io_device(), list(), {false|list(), any()}) -> list(). +-spec read_slots(file:io_device(), list(), + {false|list(), any()}, press_methods()) -> list(). %% @doc %% The reading of sots will return a list of either 2-tuples containing %% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples @@ -1063,12 +1155,12 @@ binarysplit_mapfun(MultiSlotBin, StartPos) -> %% any key comparison between levels should allow for a non-matching key to %% be considered as superior to a matching key - as otherwise a matching key %% may be intermittently removed from the result set -read_slots(Handle, SlotList, {false, _BlockIndexCache}) -> +read_slots(Handle, SlotList, {false, _BlockIndexCache}, _PressMethod) -> % No list of segments passed LengthList = lists:map(fun pointer_mapfun/1, SlotList), {MultiSlotBin, StartPos} = read_length_list(Handle, LengthList), lists:map(binarysplit_mapfun(MultiSlotBin, StartPos), LengthList); -read_slots(Handle, SlotList, {SegList, BlockIndexCache}) -> +read_slots(Handle, SlotList, {SegList, BlockIndexCache}, PressMethod) -> % List of segments passed so only {K, V} pairs matching those segments % should be returned. This required the {K, V} pair to have been added % with the appropriate hash - if the pair were added with no_lookup as @@ -1097,27 +1189,31 @@ read_slots(Handle, SlotList, {SegList, BlockIndexCache}) -> Acc; PL -> Acc ++ check_blocks(PL, Handle, SP, BlockLengths, - false, []) + false, PressMethod, []) end end end, lists:foldl(BinMapFun, [], SlotList). --spec binaryslot_reader(list({tuple(), tuple()}|{binary(), tuple(), tuple()})) - -> list({tuple(), tuple()}). +-spec binaryslot_reader(list({tuple(), tuple()}|{binary(), tuple(), tuple()}), + native|lz4) -> list({tuple(), tuple()}). %% @doc %% Read the binary slots converting them to {K, V} pairs if they were not %% already {K, V} pairs -binaryslot_reader(SlotBinsToFetch) -> - binaryslot_reader(SlotBinsToFetch, []). +binaryslot_reader(SlotBinsToFetch, PressMethod) -> + binaryslot_reader(SlotBinsToFetch, PressMethod, []). -binaryslot_reader([], Acc) -> +binaryslot_reader([], _PressMethod, Acc) -> Acc; -binaryslot_reader([{SlotBin, SK, EK}|Tail], Acc) -> - binaryslot_reader(Tail, Acc ++ binaryslot_trimmedlist(SlotBin, SK, EK)); -binaryslot_reader([{K, V}|Tail], Acc) -> - binaryslot_reader(Tail, Acc ++ [{K, V}]). +binaryslot_reader([{SlotBin, SK, EK}|Tail], PressMethod, Acc) -> + binaryslot_reader(Tail, + PressMethod, + Acc ++ binaryslot_trimmedlist(SlotBin, + SK, EK, + PressMethod)); +binaryslot_reader([{K, V}|Tail], PressMethod, Acc) -> + binaryslot_reader(Tail, PressMethod, Acc ++ [{K, V}]). read_length_list(Handle, LengthList) -> @@ -1129,7 +1225,7 @@ read_length_list(Handle, LengthList) -> -binaryslot_get(FullBin, Key, Hash) -> +binaryslot_get(FullBin, Key, Hash, PressMethod) -> case crc_check_slot(FullBin) of {BlockLengths, Rest} -> <> = BlockLengths, @@ -1138,7 +1234,7 @@ binaryslot_get(FullBin, Key, Hash) -> extra_hash(Hash), [], 0), - {fetch_value(PosList, BlockLengths, Blocks, Key), + {fetch_value(PosList, BlockLengths, Blocks, Key, PressMethod), BlockLengths, PosBinIndex}; crc_wonky -> @@ -1147,7 +1243,7 @@ binaryslot_get(FullBin, Key, Hash) -> none} end. -binaryslot_tolist(FullBin) -> +binaryslot_tolist(FullBin, PressMethod) -> BlockFetchFun = fun(Length, {Acc, Bin}) -> case Length of @@ -1155,7 +1251,7 @@ binaryslot_tolist(FullBin) -> {Acc, Bin}; _ -> <> = Bin, - {Acc ++ binary_to_term(Block), Rest} + {Acc ++ deserialise_block(Block, PressMethod), Rest} end end, @@ -1178,9 +1274,9 @@ binaryslot_tolist(FullBin) -> Out. -binaryslot_trimmedlist(FullBin, all, all) -> - binaryslot_tolist(FullBin); -binaryslot_trimmedlist(FullBin, StartKey, EndKey) -> +binaryslot_trimmedlist(FullBin, all, all, PressMethod) -> + binaryslot_tolist(FullBin, PressMethod); +binaryslot_trimmedlist(FullBin, StartKey, EndKey, PressMethod) -> LTrimFun = fun({K, _V}) -> K < StartKey end, RTrimFun = fun({K, _V}) -> not leveled_codec:endkey_passed(EndKey, K) end, @@ -1209,7 +1305,8 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) -> 0 -> [Block1, Block2]; _ -> - MidBlockList = binary_to_term(MidBlock), + MidBlockList = + deserialise_block(MidBlock, PressMethod), {MidFirst, _} = lists:nth(1, MidBlockList), {MidLast, _} = lists:last(MidBlockList), Split = {StartKey > MidLast, @@ -1247,7 +1344,7 @@ binaryslot_trimmedlist(FullBin, StartKey, EndKey) -> BlockList = case is_binary(Block) of true -> - binary_to_term(Block); + deserialise_block(Block, PressMethod); false -> Block end, @@ -1334,21 +1431,21 @@ tune_seglist(SegList) -> SegList end. -fetch_value([], _BlockLengths, _Blocks, _Key) -> +fetch_value([], _BlockLengths, _Blocks, _Key, _PressMethod) -> not_present; -fetch_value([Pos|Rest], BlockLengths, Blocks, Key) -> +fetch_value([Pos|Rest], BlockLengths, Blocks, Key, PressMethod) -> {BlockNumber, BlockPos} = revert_position(Pos), {_BlockPos, Offset, Length} = block_offsetandlength(BlockLengths, BlockNumber), <<_Pre:Offset/binary, Block:Length/binary, _Rest/binary>> = Blocks, - BlockL = binary_to_term(Block), + BlockL = deserialise_block(Block, PressMethod), {K, V} = lists:nth(BlockPos, BlockL), case K of Key -> {K, V}; _ -> - fetch_value(Rest, BlockLengths, Blocks, Key) + fetch_value(Rest, BlockLengths, Blocks, Key, PressMethod) end. @@ -1420,31 +1517,33 @@ find_pos(<<0:1/integer, NHC:7/integer, T/binary>>, Hash, PosList, Count) -> %% there are matching keys then the highest sequence number must be chosen and %% any lower sequence numbers should be compacted out of existence -merge_lists(KVList1) -> +merge_lists(KVList1, PressMethod) -> SlotCount = length(KVList1) div ?LOOK_SLOTSIZE, {[], [], - split_lists(KVList1, [], SlotCount), + split_lists(KVList1, [], SlotCount, PressMethod), element(1, lists:nth(1, KVList1))}. -split_lists([], SlotLists, 0) -> +split_lists([], SlotLists, 0, _PressMethod) -> lists:reverse(SlotLists); -split_lists(LastPuff, SlotLists, 0) -> - SlotD = generate_binary_slot(lookup, LastPuff), +split_lists(LastPuff, SlotLists, 0, PressMethod) -> + SlotD = generate_binary_slot(lookup, LastPuff, PressMethod), lists:reverse([SlotD|SlotLists]); -split_lists(KVList1, SlotLists, N) -> +split_lists(KVList1, SlotLists, N, PressMethod) -> {Slot, KVListRem} = lists:split(?LOOK_SLOTSIZE, KVList1), - SlotD = generate_binary_slot(lookup, Slot), - split_lists(KVListRem, [SlotD|SlotLists], N - 1). + SlotD = generate_binary_slot(lookup, Slot, PressMethod), + split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod). -merge_lists(KVList1, KVList2, LevelInfo) -> - merge_lists(KVList1, KVList2, LevelInfo, [], null, 0). +merge_lists(KVList1, KVList2, LevelInfo, PressMethod) -> + merge_lists(KVList1, KVList2, LevelInfo, [], null, 0, PressMethod). -merge_lists(KVList1, KVList2, _LI, SlotList, FirstKey, ?MAX_SLOTS) -> +merge_lists(KVList1, KVList2, _LI, SlotList, FirstKey, ?MAX_SLOTS, + _PressMethod) -> {KVList1, KVList2, lists:reverse(SlotList), FirstKey}; -merge_lists([], [], _LI, SlotList, FirstKey, _SlotCount) -> +merge_lists([], [], _LI, SlotList, FirstKey, _SlotCount, _PressMethod) -> {[], [], lists:reverse(SlotList), FirstKey}; -merge_lists(KVList1, KVList2, LI, SlotList, FirstKey, SlotCount) -> +merge_lists(KVList1, KVList2, LI, SlotList, FirstKey, SlotCount, + PressMethod) -> {KVRem1, KVRem2, Slot, FK0} = form_slot(KVList1, KVList2, LI, no_lookup, 0, [], FirstKey), case Slot of @@ -1454,15 +1553,17 @@ merge_lists(KVList1, KVList2, LI, SlotList, FirstKey, SlotCount) -> LI, SlotList, FK0, - SlotCount); + SlotCount, + PressMethod); {Lookup, KVL} -> - SlotD = generate_binary_slot(Lookup, KVL), + SlotD = generate_binary_slot(Lookup, KVL, PressMethod), merge_lists(KVRem1, KVRem2, LI, [SlotD|SlotList], FK0, - SlotCount + 1) + SlotCount + 1, + PressMethod) end. form_slot([], [], _LI, Type, _Size, Slot, FK) -> @@ -1690,7 +1791,8 @@ merge_tombstonelist_test() -> SkippingKV5 = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}}, R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5], [SkippingKV2, SkippingKV4], - {true, 9999999}), + {true, 9999999}, + native), ?assertMatch({[], [], [], null}, R). indexed_list_test() -> @@ -1701,7 +1803,8 @@ indexed_list_test() -> SW0 = os:timestamp(), - {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, KVL1), + {_PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, KVL1, native), io:format(user, "Indexed list created slot in ~w microseconds of size ~w~n", [timer:now_diff(os:timestamp(), SW0), byte_size(FullBin)]), @@ -1729,7 +1832,8 @@ indexed_list_mixedkeys_test() -> KVL1 = lists:sublist(KVL0, 33), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), - {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + {_PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, Keys, native), {TestK1, TestV1} = lists:nth(4, KVL1), MH1 = leveled_codec:segment_hash(TestK1), @@ -1755,7 +1859,8 @@ indexed_list_mixedkeys2_test() -> IdxKeys2 = lists:ukeysort(1, generate_indexkeys(30)), % this isn't actually ordered correctly Keys = IdxKeys1 ++ KVL1 ++ IdxKeys2, - {_PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + {_PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, Keys, native), lists:foreach(fun({K, V}) -> MH = leveled_codec:segment_hash(K), test_binary_slot(FullBin, K, MH, {K, V}) @@ -1765,34 +1870,37 @@ indexed_list_mixedkeys2_test() -> indexed_list_allindexkeys_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), ?LOOK_SLOTSIZE), - {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + {PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, Keys, native), EmptySlotSize = ?LOOK_SLOTSIZE - 1, ?assertMatch(<<_BL:24/binary, EmptySlotSize:8/integer>>, PosBinIndex1), % SW = os:timestamp(), - BinToList = binaryslot_tolist(FullBin), + BinToList = binaryslot_tolist(FullBin, native), % io:format(user, % "Indexed list flattened in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]), ?assertMatch(Keys, BinToList), - ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all)). + ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all, native)). indexed_list_allindexkeys_nolookup_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(1000)), ?NOLOOK_SLOTSIZE), - {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(no_lookup, Keys), + {PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(no_lookup, Keys, native), ?assertMatch(<<_BL:24/binary, 127:8/integer>>, PosBinIndex1), % SW = os:timestamp(), - BinToList = binaryslot_tolist(FullBin), + BinToList = binaryslot_tolist(FullBin, native), % io:format(user, % "Indexed list flattened in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]), ?assertMatch(Keys, BinToList), - ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all)). + ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, all, all, native)). indexed_list_allindexkeys_trimmed_test() -> Keys = lists:sublist(lists:ukeysort(1, generate_indexkeys(150)), ?LOOK_SLOTSIZE), - {PosBinIndex1, FullBin, _HL, _LK} = generate_binary_slot(lookup, Keys), + {PosBinIndex1, FullBin, _HL, _LK} = + generate_binary_slot(lookup, Keys, native), EmptySlotSize = ?LOOK_SLOTSIZE - 1, ?assertMatch(<<_BL:24/binary, EmptySlotSize:8/integer>>, PosBinIndex1), ?assertMatch(Keys, binaryslot_trimmedlist(FullBin, @@ -1803,26 +1911,27 @@ indexed_list_allindexkeys_trimmed_test() -> {i, "Bucket", {"t1_int", 99999}, - null})), + null}, + native)), {SK1, _} = lists:nth(10, Keys), {EK1, _} = lists:nth(100, Keys), R1 = lists:sublist(Keys, 10, 91), - O1 = binaryslot_trimmedlist(FullBin, SK1, EK1), + O1 = binaryslot_trimmedlist(FullBin, SK1, EK1, native), ?assertMatch(91, length(O1)), ?assertMatch(R1, O1), {SK2, _} = lists:nth(10, Keys), {EK2, _} = lists:nth(20, Keys), R2 = lists:sublist(Keys, 10, 11), - O2 = binaryslot_trimmedlist(FullBin, SK2, EK2), + O2 = binaryslot_trimmedlist(FullBin, SK2, EK2, native), ?assertMatch(11, length(O2)), ?assertMatch(R2, O2), {SK3, _} = lists:nth(?LOOK_SLOTSIZE - 1, Keys), {EK3, _} = lists:nth(?LOOK_SLOTSIZE, Keys), R3 = lists:sublist(Keys, ?LOOK_SLOTSIZE - 1, 2), - O3 = binaryslot_trimmedlist(FullBin, SK3, EK3), + O3 = binaryslot_trimmedlist(FullBin, SK3, EK3, native), ?assertMatch(2, length(O3)), ?assertMatch(R3, O3). @@ -1831,7 +1940,8 @@ indexed_list_mixedkeys_bitflip_test() -> KVL0 = lists:ukeysort(1, generate_randomkeys(1, 50, 1, 4)), KVL1 = lists:sublist(KVL0, 33), Keys = lists:ukeysort(1, generate_indexkeys(60) ++ KVL1), - {_PosBinIndex1, FullBin, _HL, LK} = generate_binary_slot(lookup, Keys), + {_PosBinIndex1, FullBin, _HL, LK} = + generate_binary_slot(lookup, Keys, native), ?assertMatch(LK, element(1, lists:last(Keys))), FullBin0 = flip_byte(FullBin), @@ -1840,12 +1950,12 @@ indexed_list_mixedkeys_bitflip_test() -> MH1 = leveled_codec:segment_hash(TestK1), test_binary_slot(FullBin0, TestK1, MH1, not_present), - ToList = binaryslot_tolist(FullBin0), + ToList = binaryslot_tolist(FullBin0, native), ?assertMatch([], ToList), {SK1, _} = lists:nth(10, Keys), {EK1, _} = lists:nth(50, Keys), - O1 = binaryslot_trimmedlist(FullBin0, SK1, EK1), + O1 = binaryslot_trimmedlist(FullBin0, SK1, EK1, native), ?assertMatch(0, length(O1)), ?assertMatch([], O1). @@ -1864,7 +1974,7 @@ flip_byte(Binary) -> test_binary_slot(FullBin, Key, Hash, ExpectedValue) -> % SW = os:timestamp(), - {ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash), + {ReturnedValue, _BLs, _Idx} = binaryslot_get(FullBin, Key, Hash, native), ?assertMatch(ExpectedValue, ReturnedValue). % io:format(user, "Fetch success in ~w microseconds ~n", % [timer:now_diff(os:timestamp(), SW)]). @@ -1877,8 +1987,10 @@ merge_test() -> KVL2 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 20)), KVL3 = lists:ukeymerge(1, KVL1, KVL2), SW0 = os:timestamp(), - {ok, P1, {FK1, LK1}} = sst_new("../test/", "level1_src", 1, KVL1, 6000), - {ok, P2, {FK2, LK2}} = sst_new("../test/", "level2_src", 2, KVL2, 3000), + {ok, P1, {FK1, LK1}} = + sst_new("../test/", "level1_src", 1, KVL1, 6000, native), + {ok, P2, {FK2, LK2}} = + sst_new("../test/", "level2_src", 2, KVL2, 3000, native), ExpFK1 = element(1, lists:nth(1, KVL1)), ExpLK1 = element(1, lists:last(KVL1)), ExpFK2 = element(1, lists:nth(1, KVL2)), @@ -1889,7 +2001,8 @@ merge_test() -> ?assertMatch(ExpLK2, LK2), ML1 = [{next, #manifest_entry{owner = P1}, FK1}], ML2 = [{next, #manifest_entry{owner = P2}, FK2}], - NewR = sst_new("../test/", "level2_merge", ML1, ML2, false, 2, N * 2), + NewR = + sst_new("../test/", "level2_merge", ML1, ML2, false, 2, N * 2, native), {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = NewR, ?assertMatch([], Rem1), ?assertMatch([], Rem2), @@ -1923,11 +2036,8 @@ simple_persisted_range_test() -> KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(RP, - Filename, - 1, - KVList1, - length(KVList1)), + {ok, Pid, {FirstKey, LastKey}} = + sst_new(RP, Filename, 1, KVList1, length(KVList1), native), {o, B, K, null} = LastKey, SK1 = {o, B, K, 0}, @@ -1976,11 +2086,8 @@ additional_range_test() -> [], lists:seq(?NOLOOK_SLOTSIZE + Gap + 1, 2 * ?NOLOOK_SLOTSIZE + Gap)), - {ok, - P1, - {{Rem1, Rem2}, - SK, - EK}} = sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999), + {ok, P1, {{Rem1, Rem2}, SK, EK}} = + sst_new("../test/", "range1_src", IK1, IK2, false, 1, 9999, native), ?assertMatch([], Rem1), ?assertMatch([], Rem2), ?assertMatch(SK, element(1, lists:nth(1, IK1))), @@ -2037,11 +2144,8 @@ simple_persisted_slotsize_test() -> ?LOOK_SLOTSIZE), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(RP, - Filename, - 1, - KVList1, - length(KVList1)), + {ok, Pid, {FirstKey, LastKey}} = + sst_new(RP, Filename, 1, KVList1, length(KVList1), native), lists:foreach(fun({K, V}) -> ?assertMatch({K, V}, sst_get(Pid, K)) end, @@ -2055,11 +2159,8 @@ simple_persisted_test() -> KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(RP, - Filename, - 1, - KVList1, - length(KVList1)), + {ok, Pid, {FirstKey, LastKey}} = + sst_new(RP, Filename, 1, KVList1, length(KVList1), native), SW0 = os:timestamp(), lists:foreach(fun({K, V}) -> ?assertMatch({K, V}, sst_get(Pid, K)) @@ -2250,7 +2351,7 @@ hashmatching_bytreesize_test() -> end, KVL = lists:map(GenKeyFun, lists:seq(1, 128)), {PosBinIndex1, _FullBin, _HL, _LK} = - generate_binary_slot(lookup, KVL), + generate_binary_slot(lookup, KVL, native), check_segment_match(PosBinIndex1, KVL, small), check_segment_match(PosBinIndex1, KVL, medium). diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 3c2e550..ec59e54 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -9,7 +9,8 @@ load_and_count/1, load_and_count_withdelete/1, space_clear_ondelete/1, - is_empty_test/1 + is_empty_test/1, + many_put_fetch_switchcompression/1 ]). all() -> [ @@ -20,7 +21,8 @@ all() -> [ load_and_count, load_and_count_withdelete, space_clear_ondelete, - is_empty_test + is_empty_test, + many_put_fetch_switchcompression ]. @@ -72,7 +74,8 @@ many_put_fetch_head(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_pencillercachesize, 16000}, - {sync_strategy, riak_sync}], + {sync_strategy, riak_sync}, + {compression_point, on_compact}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -81,7 +84,8 @@ many_put_fetch_head(_Config) -> StartOpts2 = [{root_path, RootPath}, {max_journalsize, 500000000}, {max_pencillercachesize, 32000}, - {sync_strategy, testutil:sync_strategy()}], + {sync_strategy, testutil:sync_strategy()}, + {compression_point, on_receipt}], {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), GenList = [2, 20002, 40002, 60002, 80002, @@ -683,3 +687,49 @@ is_empty_test(_Config) -> true = sets:size(BLpd3()) == 0, ok = leveled_bookie:book_close(Bookie1). + + +many_put_fetch_switchcompression(_Config) -> + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {max_pencillercachesize, 16000}, + {sync_strategy, riak_sync}, + {compression_method, native}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = testutil:generate_testobject(), + ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), + testutil:check_forobject(Bookie1, TestObject), + ok = leveled_bookie:book_close(Bookie1), + StartOpts2 = [{root_path, RootPath}, + {max_journalsize, 500000000}, + {max_pencillercachesize, 32000}, + {sync_strategy, testutil:sync_strategy()}, + {compression_method, lz4}], + + %% Change compression method + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + testutil:check_forobject(Bookie2, TestObject), + GenList = [2, 40002, 80002, 120002], + CLs = testutil:load_objects(40000, GenList, Bookie2, TestObject, + fun testutil:generate_smallobjects/2), + CL1A = lists:nth(1, CLs), + ChkListFixed = lists:nth(length(CLs), CLs), + testutil:check_forlist(Bookie2, CL1A), + ObjList2A = testutil:generate_objects(5000, 2), + testutil:riakload(Bookie2, ObjList2A), + ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000), + testutil:check_forlist(Bookie2, ChkList2A), + testutil:check_forlist(Bookie2, ChkListFixed), + testutil:check_forobject(Bookie2, TestObject), + testutil:check_forlist(Bookie2, ChkList2A), + testutil:check_forlist(Bookie2, ChkListFixed), + testutil:check_forobject(Bookie2, TestObject), + ok = leveled_bookie:book_close(Bookie2), + + %% Change method back again + {ok, Bookie3} = leveled_bookie:book_start(StartOpts1), + testutil:check_forlist(Bookie3, ChkList2A), + testutil:check_forlist(Bookie3, ChkListFixed), + testutil:check_forobject(Bookie3, TestObject), + testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"), + ok = leveled_bookie:book_destroy(Bookie3). \ No newline at end of file diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 842ec7f..ca624f1 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -305,17 +305,21 @@ aae_bustedjournal(_Config) -> journal_compaction_bustedjournal(_Config) -> % Different circumstances will be created in different runs - busted_journal_test(10000000), - busted_journal_test(7777777). + busted_journal_test(10000000, native, on_receipt, true), + busted_journal_test(7777777, lz4, on_compact, true), + busted_journal_test(8888888, lz4, on_receipt, true), + busted_journal_test(7777777, lz4, on_compact, false). -busted_journal_test(MaxJournalSize) -> +busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) -> % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_journalsize, MaxJournalSize}, {max_run_length, 10}, - {sync_strategy, testutil:sync_strategy()}], + {sync_strategy, testutil:sync_strategy()}, + {compression_method, PressMethod}, + {compression_point, PressPoint}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), @@ -331,11 +335,18 @@ busted_journal_test(MaxJournalSize) -> ObjList2), ok = leveled_bookie:book_close(Bookie1), - CDBFiles = testutil:find_journals(RootPath), - lists:foreach(fun(FN) -> - testutil:corrupt_journal(RootPath, FN, 100, 2048, 1000) - end, - CDBFiles), + case Bust of + true -> + CDBFiles = testutil:find_journals(RootPath), + lists:foreach(fun(FN) -> + testutil:corrupt_journal(RootPath, + FN, + 100, 2048, 1000) + end, + CDBFiles); + false -> + ok + end, {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), @@ -358,6 +369,7 @@ busted_journal_test(MaxJournalSize) -> testutil:reset_filestructure(10000). + rotating_object_check(BookOpts, B, NumberOfObjects) -> {ok, Book1} = leveled_bookie:book_start(BookOpts), {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),