Compaction of already compacted journals

Ensure that journals with a large volume of key deltas do not erroneously get repeatedly compacted.
This commit is contained in:
Martin Sumner 2019-07-24 18:03:22 +01:00
parent 21e0ce70e7
commit 22e732841c
3 changed files with 94 additions and 19 deletions

View file

@ -35,6 +35,7 @@
from_inkerkv/2,
from_journalkey/1,
revert_to_keydeltas/2,
is_compaction_candidate/1,
split_inkvalue/1,
check_forinkertype/2,
get_tagstrategy/2,
@ -559,7 +560,13 @@ check_forinkertype(_LedgerKey, head_only) ->
check_forinkertype(_LedgerKey, _Object) ->
?INKT_STND.
-spec is_compaction_candidate(journal_key()) -> boolean().
%% @doc
%% Only journal keys with standard objects should be scored for compaction
is_compaction_candidate({_SQN, ?INKT_STND, _LK}) ->
true;
is_compaction_candidate(_OtherJKType) ->
false.
%%%============================================================================

View file

@ -518,15 +518,22 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
FoldFunForSizeCompare =
fun(KS, {ActSize, RplSize}) ->
case KS of
{{SQN, _Type, PK}, Size} ->
Check = FilterFun(FilterServer, PK, SQN),
case {Check, SQN > MaxSQN} of
{true, _} ->
{{SQN, Type, PK}, Size} ->
MayScore =
leveled_codec:is_compaction_candidate({SQN, Type, PK}),
case MayScore of
false ->
{ActSize + Size - ?CRC_SIZE, RplSize};
{false, true} ->
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
true ->
Check = FilterFun(FilterServer, PK, SQN),
case {Check, SQN > MaxSQN} of
{true, _} ->
{ActSize + Size - ?CRC_SIZE, RplSize};
{false, true} ->
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
end
end;
_ ->
% There is a key which is not in expected format
@ -1174,13 +1181,13 @@ compact_singlefile_totwosmallfiles_testto() ->
size_score_test() ->
KeySizeList =
[{{1, "INK", "Key1"}, 104},
{{2, "INK", "Key2"}, 124},
{{3, "INK", "Key3"}, 144},
{{4, "INK", "Key4"}, 154},
{{5, "INK", "Key5", "Subk1"}, 164},
{{6, "INK", "Key6"}, 174},
{{7, "INK", "Key7"}, 184}],
[{{1, ?INKT_STND, "Key1"}, 104},
{{2, ?INKT_STND, "Key2"}, 124},
{{3, ?INKT_STND, "Key3"}, 144},
{{4, ?INKT_STND, "Key4"}, 154},
{{5, ?INKT_STND, "Key5", "Subk1"}, 164},
{{6, ?INKT_STND, "Key6"}, 174},
{{7, ?INKT_STND, "Key7"}, 184}],
MaxSQN = 6,
CurrentList = ["Key1", "Key4", "Key5", "Key6"],
FilterFun = fun(L, K, _SQN) -> lists:member(K, L) end,

View file

@ -11,7 +11,8 @@
aae_missingjournal/1,
aae_bustedjournal/1,
journal_compaction_bustedjournal/1,
close_duringcompaction/1
close_duringcompaction/1,
allkeydelta_journal_multicompact/1
]).
all() -> [
@ -23,7 +24,8 @@ all() -> [
aae_missingjournal,
aae_bustedjournal,
journal_compaction_bustedjournal,
close_duringcompaction
close_duringcompaction,
allkeydelta_journal_multicompact
].
@ -597,6 +599,62 @@ busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) ->
testutil:reset_filestructure(10000).
allkeydelta_journal_multicompact(_Config) ->
% Simply confirms that none of this causes a crash
RootPath = testutil:reset_filestructure(),
B = <<"test_bucket">>,
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 50000000},
{max_run_length, 6},
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 40000),
{KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1,
B,
KSpcL1,
false),
compact_and_wait(Bookie1, 0),
compact_and_wait(Bookie1, 0),
{ok, FileList1} =
file:list_dir(
filename:join(RootPath, "journal/journal_files/post_compact")),
io:format("Number of files after compaction ~w~n", [length(FileList1)]),
compact_and_wait(Bookie1, 0),
{ok, FileList2} =
file:list_dir(
filename:join(RootPath, "journal/journal_files/post_compact")),
io:format("Number of files after compaction ~w~n", [length(FileList2)]),
true = FileList1 == FileList2,
ok = testutil:check_indexed_objects(Bookie1,
B,
KSpcL1 ++ KSpcL2,
V2),
ok = leveled_bookie:book_close(Bookie1),
leveled_penciller:clean_testdir(RootPath ++ "/ledger"),
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
ok = testutil:check_indexed_objects(Bookie2,
B,
KSpcL1 ++ KSpcL2,
V2),
{KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2,
B,
KSpcL2,
false),
compact_and_wait(Bookie2, 0),
ok = testutil:check_indexed_objects(Bookie2,
B,
KSpcL1 ++ KSpcL2 ++ KSpcL3,
V3),
ok = leveled_bookie:book_close(Bookie2),
testutil:reset_filestructure(10000).
rotating_object_check(BookOpts, B, NumberOfObjects) ->
{ok, Book1} = leveled_bookie:book_start(BookOpts),
@ -645,6 +703,9 @@ rotating_object_check(BookOpts, B, NumberOfObjects) ->
{ok, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4}.
compact_and_wait(Book) ->
compact_and_wait(Book, 20000).
compact_and_wait(Book, WaitForDelete) ->
ok = leveled_bookie:book_compactjournal(Book, 30000),
F = fun leveled_bookie:book_islastcompactionpending/1,
lists:foldl(fun(X, Pending) ->
@ -660,7 +721,7 @@ compact_and_wait(Book) ->
true,
lists:seq(1, 15)),
io:format("Waiting for journal deletes~n"),
timer:sleep(20000).
timer:sleep(WaitForDelete).
restart_from_blankledger(BookOpts, B_SpcL) ->
leveled_penciller:clean_testdir(proplists:get_value(root_path, BookOpts) ++