From f358bd7622a01253700ff1874cb8b9f0f8c82ced Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 6 Nov 2017 21:16:46 +0000 Subject: [PATCH] Switch to using passed in compression method for maybe_compress When the compaction discovers compression is required it will used the passed in method at startup - not the method which had been previously defined. --- src/leveled_codec.erl | 12 ++++------ src/leveled_iclerk.erl | 35 ++++++++++++++++++------------ test/end_to_end/recovery_SUITE.erl | 26 ++++++++++++++-------- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 401c709..be65bf8 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -54,7 +54,7 @@ compact_inkerkvc/2, split_inkvalue/1, check_forinkertype/2, - maybe_compress/1, + maybe_compress/2, create_value_for_journal/3, build_metadata_object/2, generate_ledgerkv/5, @@ -251,9 +251,9 @@ create_value_for_journal({Object, KeyChangeBin}, Compress, Method) -> KeyChangeBinLen:32/integer, TypeCode:8/integer>>. -maybe_compress({null, KeyChanges}) -> +maybe_compress({null, KeyChanges}, _PressMethod) -> create_value_for_journal({null, KeyChanges}, false, native); -maybe_compress(JournalBin) -> +maybe_compress(JournalBin, PressMethod) -> Length0 = byte_size(JournalBin) - 5, < <> = JBin0, V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), binary_to_term(KCBin2)}, - PressMethod = case IsLz4 of - true -> lz4; - false -> native - end, create_value_for_journal(V0, true, PressMethod) end. @@ -340,7 +336,7 @@ encode_valuetype(IsBinary, IsCompressed, Method) -> decode_valuetype(TypeInt) -> IsCompressed = TypeInt band 1 == 1, IsBinary = TypeInt band 2 == 2, - IsLz4 = TypeInt band 4 ==4, + IsLz4 = TypeInt band 4 == 4, {IsBinary, IsCompressed, IsLz4}. from_journalkey({SQN, _Type, LedgerKey}) -> diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 9663328..f60de64 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -197,7 +197,8 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, FilterFun, FilterServer, MaxSQN, - State#state.reload_strategy), + State#state.reload_strategy, + State#state.compression_method), FilesToDelete = lists:map(fun(C) -> {C#candidate.low_sqn, C#candidate.filename, @@ -493,7 +494,8 @@ update_inker(Inker, ManifestSlice, FilesToDelete) -> FilesToDelete), ok. -compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> +compact_files(BestRun, CDBopts, FilterFun, FilterServer, + MaxSQN, RStrategy, PressMethod) -> BatchesOfPositions = get_all_positions(BestRun, []), compact_files(BatchesOfPositions, CDBopts, @@ -502,19 +504,20 @@ compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> FilterServer, MaxSQN, RStrategy, + PressMethod, []). compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN, - _RStrategy, ManSlice0) -> + _RStrategy, _PressMethod, ManSlice0) -> ManSlice0; compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN, - _RStrategy, ManSlice0) -> + _RStrategy, _PressMethod, ManSlice0) -> ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0), ManSlice1; compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, MaxSQN, - RStrategy, ManSlice0) -> + RStrategy, PressMethod, ManSlice0) -> {SrcJournal, PositionList} = Batch, KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal, PositionList, @@ -527,9 +530,10 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, {ActiveJournal1, ManSlice1} = write_values(KVCs1, CDBopts, ActiveJournal0, - ManSlice0), + ManSlice0, + PressMethod), compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, - RStrategy, ManSlice1). + RStrategy, PressMethod, ManSlice1). get_all_positions([], PositionBatches) -> PositionBatches; @@ -580,12 +584,12 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> KVCs). -write_values([], _CDBopts, Journal0, ManSlice0) -> +write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> {Journal0, ManSlice0}; -write_values(KVCList, CDBopts, Journal0, ManSlice0) -> +write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> KVList = lists:map(fun({K, V, _C}) -> % Compress the value as part of compaction - {K, leveled_codec:maybe_compress(V)} + {K, leveled_codec:maybe_compress(V, PressMethod)} end, KVCList), {ok, Journal1} = case Journal0 of @@ -608,7 +612,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) -> {Journal1, ManSlice0}; roll -> ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1), - write_values(KVCList, CDBopts, null, ManSlice1) + write_values(KVCList, CDBopts, null, ManSlice1, PressMethod) end. clear_waste(State) -> @@ -843,7 +847,8 @@ compact_single_file_recovr_test() -> LedgerFun1, LedgerSrv1, 9, - [{?STD_TAG, recovr}]), + [{?STD_TAG, recovr}], + native), io:format("FN of ~s~n", [FN]), ?assertMatch(2, LowSQN), ?assertMatch(probably, @@ -880,7 +885,8 @@ compact_single_file_retain_test() -> LedgerFun1, LedgerSrv1, 9, - [{?STD_TAG, retain}]), + [{?STD_TAG, retain}], + native), io:format("FN of ~s~n", [FN]), ?assertMatch(1, LowSQN), ?assertMatch(probably, @@ -961,7 +967,8 @@ compact_singlefile_totwosmallfiles_testto() -> FakeFilterFun, null, 900, - [{?STD_TAG, recovr}]), + [{?STD_TAG, recovr}], + native), ?assertMatch(2, length(ManifestSlice)), lists:foreach(fun({_SQN, _FN, CDB, _LK}) -> ok = leveled_cdb:cdb_deletepending(CDB), diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 33dc977..dd92716 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -307,12 +307,13 @@ aae_bustedjournal(_Config) -> journal_compaction_bustedjournal(_Config) -> % Different circumstances will be created in different runs - busted_journal_test(10000000, native, on_receipt), - busted_journal_test(7777777, lz4, on_compact), - busted_journal_test(8888888, lz4, on_receipt). + busted_journal_test(10000000, native, on_receipt, true), + busted_journal_test(7777777, lz4, on_compact, true), + busted_journal_test(8888888, lz4, on_receipt, true), + busted_journal_test(7777777, lz4, on_compact, false). -busted_journal_test(MaxJournalSize, PressMethod, PressPoint) -> +busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) -> % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, @@ -336,11 +337,18 @@ busted_journal_test(MaxJournalSize, PressMethod, PressPoint) -> ObjList2), ok = leveled_bookie:book_close(Bookie1), - CDBFiles = testutil:find_journals(RootPath), - lists:foreach(fun(FN) -> - testutil:corrupt_journal(RootPath, FN, 100, 2048, 1000) - end, - CDBFiles), + case Bust of + true -> + CDBFiles = testutil:find_journals(RootPath), + lists:foreach(fun(FN) -> + testutil:corrupt_journal(RootPath, + FN, + 100, 2048, 1000) + end, + CDBFiles); + false -> + ok + end, {ok, Bookie2} = leveled_bookie:book_start(StartOpts1),