Switch to using passed in compression method for maybe_compress

When the compaction discovers compression is required it will used the passed in method at startup - not the method which had been previously defined.
This commit is contained in:
Martin Sumner 2017-11-06 21:16:46 +00:00
parent 9a0a4ced0d
commit f358bd7622
3 changed files with 42 additions and 31 deletions

View file

@ -54,7 +54,7 @@
compact_inkerkvc/2, compact_inkerkvc/2,
split_inkvalue/1, split_inkvalue/1,
check_forinkertype/2, check_forinkertype/2,
maybe_compress/1, maybe_compress/2,
create_value_for_journal/3, create_value_for_journal/3,
build_metadata_object/2, build_metadata_object/2,
generate_ledgerkv/5, generate_ledgerkv/5,
@ -251,9 +251,9 @@ create_value_for_journal({Object, KeyChangeBin}, Compress, Method) ->
KeyChangeBinLen:32/integer, KeyChangeBinLen:32/integer,
TypeCode:8/integer>>. TypeCode:8/integer>>.
maybe_compress({null, KeyChanges}) -> maybe_compress({null, KeyChanges}, _PressMethod) ->
create_value_for_journal({null, KeyChanges}, false, native); create_value_for_journal({null, KeyChanges}, false, native);
maybe_compress(JournalBin) -> maybe_compress(JournalBin, PressMethod) ->
Length0 = byte_size(JournalBin) - 5, Length0 = byte_size(JournalBin) - 5,
<<JBin0:Length0/binary, <<JBin0:Length0/binary,
KeyChangeLength:32/integer, KeyChangeLength:32/integer,
@ -267,10 +267,6 @@ maybe_compress(JournalBin) ->
<<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0, <<OBin2:Length1/binary, KCBin2:KeyChangeLength/binary>> = JBin0,
V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4), V0 = {deserialise_object(OBin2, IsBinary, IsCompressed, IsLz4),
binary_to_term(KCBin2)}, binary_to_term(KCBin2)},
PressMethod = case IsLz4 of
true -> lz4;
false -> native
end,
create_value_for_journal(V0, true, PressMethod) create_value_for_journal(V0, true, PressMethod)
end. end.

View file

@ -197,7 +197,8 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
FilterFun, FilterFun,
FilterServer, FilterServer,
MaxSQN, MaxSQN,
State#state.reload_strategy), State#state.reload_strategy,
State#state.compression_method),
FilesToDelete = lists:map(fun(C) -> FilesToDelete = lists:map(fun(C) ->
{C#candidate.low_sqn, {C#candidate.low_sqn,
C#candidate.filename, C#candidate.filename,
@ -493,7 +494,8 @@ update_inker(Inker, ManifestSlice, FilesToDelete) ->
FilesToDelete), FilesToDelete),
ok. ok.
compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> compact_files(BestRun, CDBopts, FilterFun, FilterServer,
MaxSQN, RStrategy, PressMethod) ->
BatchesOfPositions = get_all_positions(BestRun, []), BatchesOfPositions = get_all_positions(BestRun, []),
compact_files(BatchesOfPositions, compact_files(BatchesOfPositions,
CDBopts, CDBopts,
@ -502,19 +504,20 @@ compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) ->
FilterServer, FilterServer,
MaxSQN, MaxSQN,
RStrategy, RStrategy,
PressMethod,
[]). []).
compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN, compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN,
_RStrategy, ManSlice0) -> _RStrategy, _PressMethod, ManSlice0) ->
ManSlice0; ManSlice0;
compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN, compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN,
_RStrategy, ManSlice0) -> _RStrategy, _PressMethod, ManSlice0) ->
ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0), ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0),
ManSlice1; ManSlice1;
compact_files([Batch|T], CDBopts, ActiveJournal0, compact_files([Batch|T], CDBopts, ActiveJournal0,
FilterFun, FilterServer, MaxSQN, FilterFun, FilterServer, MaxSQN,
RStrategy, ManSlice0) -> RStrategy, PressMethod, ManSlice0) ->
{SrcJournal, PositionList} = Batch, {SrcJournal, PositionList} = Batch,
KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal, KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal,
PositionList, PositionList,
@ -527,9 +530,10 @@ compact_files([Batch|T], CDBopts, ActiveJournal0,
{ActiveJournal1, ManSlice1} = write_values(KVCs1, {ActiveJournal1, ManSlice1} = write_values(KVCs1,
CDBopts, CDBopts,
ActiveJournal0, ActiveJournal0,
ManSlice0), ManSlice0,
PressMethod),
compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN,
RStrategy, ManSlice1). RStrategy, PressMethod, ManSlice1).
get_all_positions([], PositionBatches) -> get_all_positions([], PositionBatches) ->
PositionBatches; PositionBatches;
@ -580,12 +584,12 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
KVCs). KVCs).
write_values([], _CDBopts, Journal0, ManSlice0) -> write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
{Journal0, ManSlice0}; {Journal0, ManSlice0};
write_values(KVCList, CDBopts, Journal0, ManSlice0) -> write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
KVList = lists:map(fun({K, V, _C}) -> KVList = lists:map(fun({K, V, _C}) ->
% Compress the value as part of compaction % Compress the value as part of compaction
{K, leveled_codec:maybe_compress(V)} {K, leveled_codec:maybe_compress(V, PressMethod)}
end, end,
KVCList), KVCList),
{ok, Journal1} = case Journal0 of {ok, Journal1} = case Journal0 of
@ -608,7 +612,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
{Journal1, ManSlice0}; {Journal1, ManSlice0};
roll -> roll ->
ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1), ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1),
write_values(KVCList, CDBopts, null, ManSlice1) write_values(KVCList, CDBopts, null, ManSlice1, PressMethod)
end. end.
clear_waste(State) -> clear_waste(State) ->
@ -843,7 +847,8 @@ compact_single_file_recovr_test() ->
LedgerFun1, LedgerFun1,
LedgerSrv1, LedgerSrv1,
9, 9,
[{?STD_TAG, recovr}]), [{?STD_TAG, recovr}],
native),
io:format("FN of ~s~n", [FN]), io:format("FN of ~s~n", [FN]),
?assertMatch(2, LowSQN), ?assertMatch(2, LowSQN),
?assertMatch(probably, ?assertMatch(probably,
@ -880,7 +885,8 @@ compact_single_file_retain_test() ->
LedgerFun1, LedgerFun1,
LedgerSrv1, LedgerSrv1,
9, 9,
[{?STD_TAG, retain}]), [{?STD_TAG, retain}],
native),
io:format("FN of ~s~n", [FN]), io:format("FN of ~s~n", [FN]),
?assertMatch(1, LowSQN), ?assertMatch(1, LowSQN),
?assertMatch(probably, ?assertMatch(probably,
@ -961,7 +967,8 @@ compact_singlefile_totwosmallfiles_testto() ->
FakeFilterFun, FakeFilterFun,
null, null,
900, 900,
[{?STD_TAG, recovr}]), [{?STD_TAG, recovr}],
native),
?assertMatch(2, length(ManifestSlice)), ?assertMatch(2, length(ManifestSlice)),
lists:foreach(fun({_SQN, _FN, CDB, _LK}) -> lists:foreach(fun({_SQN, _FN, CDB, _LK}) ->
ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_deletepending(CDB),

View file

@ -307,12 +307,13 @@ aae_bustedjournal(_Config) ->
journal_compaction_bustedjournal(_Config) -> journal_compaction_bustedjournal(_Config) ->
% Different circumstances will be created in different runs % Different circumstances will be created in different runs
busted_journal_test(10000000, native, on_receipt), busted_journal_test(10000000, native, on_receipt, true),
busted_journal_test(7777777, lz4, on_compact), busted_journal_test(7777777, lz4, on_compact, true),
busted_journal_test(8888888, lz4, on_receipt). busted_journal_test(8888888, lz4, on_receipt, true),
busted_journal_test(7777777, lz4, on_compact, false).
busted_journal_test(MaxJournalSize, PressMethod, PressPoint) -> busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) ->
% Simply confirms that none of this causes a crash % Simply confirms that none of this causes a crash
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath}, StartOpts1 = [{root_path, RootPath},
@ -336,11 +337,18 @@ busted_journal_test(MaxJournalSize, PressMethod, PressPoint) ->
ObjList2), ObjList2),
ok = leveled_bookie:book_close(Bookie1), ok = leveled_bookie:book_close(Bookie1),
case Bust of
true ->
CDBFiles = testutil:find_journals(RootPath), CDBFiles = testutil:find_journals(RootPath),
lists:foreach(fun(FN) -> lists:foreach(fun(FN) ->
testutil:corrupt_journal(RootPath, FN, 100, 2048, 1000) testutil:corrupt_journal(RootPath,
FN,
100, 2048, 1000)
end,
CDBFiles);
false ->
ok
end, end,
CDBFiles),
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1), {ok, Bookie2} = leveled_bookie:book_start(StartOpts1),