Mas d34 leveled.i459 partialmerge (#460)
* Add test to replicate issue 459 Nothing actually crashes due to the issue - but looking at the logs there is the polarised stats associated with the issue. When merging into L3, you would normally expect to merge into 4 files - but actually we see FileCounter occasionally spiking. * Add partial merge support There is a `max_mergebelow` size which can be a positive integer, or infinity. It defaults to 32. If a merge from Level N covers less than `max_mergebelow` files in level N + 1 - the merge will proceesd as before. If it has >= `max_mergebelow`, the merge will be curtailed when `max_mergebelow div 2` files have been created at that level. The remainder for Level N will then be written, as well as for Level N + 1 up to the next whole file that has no yet been touched by the merge. The backlog that prompted the merge will still exist - as the files in Level N have not been changed. However, it is likely the next file picked will not be the same one, and will in probability have a lower number of files to merge (as the average is =< 8). This will stop progress from being halted by long merge jobs, as they will exit out in a safe way after partial completion. In the case where the majority of files covered do not require a merge, then those files will be skipped the next time the remainder file is picked up for merge at Level N
This commit is contained in:
parent
c642575caa
commit
69e8b29d1f
8 changed files with 360 additions and 66 deletions
|
@ -33,6 +33,7 @@
|
||||||
%%% Non-configurable startup defaults
|
%%% Non-configurable startup defaults
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
-define(MAX_SSTSLOTS, 256).
|
-define(MAX_SSTSLOTS, 256).
|
||||||
|
-define(MAX_MERGEBELOW, 24).
|
||||||
-define(LOADING_PAUSE, 1000).
|
-define(LOADING_PAUSE, 1000).
|
||||||
-define(LOADING_BATCH, 1000).
|
-define(LOADING_BATCH, 1000).
|
||||||
-define(CACHE_SIZE_JITTER, 25).
|
-define(CACHE_SIZE_JITTER, 25).
|
||||||
|
@ -109,7 +110,8 @@
|
||||||
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
|
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
|
||||||
log_options = leveled_log:get_opts()
|
log_options = leveled_log:get_opts()
|
||||||
:: leveled_log:log_options(),
|
:: leveled_log:log_options(),
|
||||||
max_sstslots = ?MAX_SSTSLOTS :: pos_integer(),
|
max_sstslots = ?MAX_SSTSLOTS :: pos_integer()|infinity,
|
||||||
|
max_mergebelow = ?MAX_MERGEBELOW :: pos_integer()|infinity,
|
||||||
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
|
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
|
||||||
:: pos_integer(),
|
:: pos_integer(),
|
||||||
monitor = {no_monitor, 0}
|
monitor = {no_monitor, 0}
|
||||||
|
|
|
@ -13,11 +13,7 @@
|
||||||
{eunit_opts, [verbose]}.
|
{eunit_opts, [verbose]}.
|
||||||
|
|
||||||
{project_plugins, [
|
{project_plugins, [
|
||||||
{eqwalizer_rebar3,
|
{eqwalizer_rebar3, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_rebar3"}}
|
||||||
{git_subdir,
|
|
||||||
"https://github.com/whatsapp/eqwalizer.git",
|
|
||||||
{branch, "main"},
|
|
||||||
"eqwalizer_rebar3"}}
|
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{profiles,
|
{profiles,
|
||||||
|
@ -37,7 +33,7 @@
|
||||||
{deps, [
|
{deps, [
|
||||||
{lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop-3.4"}}},
|
{lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop-3.4"}}},
|
||||||
{zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}},
|
{zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}},
|
||||||
{eqwalizer_support, {git_subdir, "https://github.com/whatsapp/eqwalizer.git", {branch, "main"}, "eqwalizer_support"}}
|
{eqwalizer_support, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_support"}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{ct_opts, [{dir, ["test/end_to_end"]}]}.
|
{ct_opts, [{dir, ["test/end_to_end"]}]}.
|
||||||
|
|
|
@ -114,6 +114,7 @@
|
||||||
{max_journalsize, 1000000000},
|
{max_journalsize, 1000000000},
|
||||||
{max_journalobjectcount, 200000},
|
{max_journalobjectcount, 200000},
|
||||||
{max_sstslots, 256},
|
{max_sstslots, 256},
|
||||||
|
{max_mergebelow, 24},
|
||||||
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
|
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
|
||||||
{head_only, false},
|
{head_only, false},
|
||||||
{waste_retention_period, undefined},
|
{waste_retention_period, undefined},
|
||||||
|
@ -201,6 +202,12 @@
|
||||||
% The maximum number of slots in a SST file. All testing is done
|
% The maximum number of slots in a SST file. All testing is done
|
||||||
% at a size of 256 (except for Quickcheck tests}, altering this
|
% at a size of 256 (except for Quickcheck tests}, altering this
|
||||||
% value is not recommended
|
% value is not recommended
|
||||||
|
{max_mergeblow, pos_integer()|infinity} |
|
||||||
|
% The maximum number of files for a single file to be merged into
|
||||||
|
% within the ledger. If less than this, the merge will continue
|
||||||
|
% without a maximum. If this or more overlapping below, only up
|
||||||
|
% to max_mergebelow div 2 additions should be created (the merge
|
||||||
|
% should be partial)
|
||||||
{sync_strategy, sync_mode()} |
|
{sync_strategy, sync_mode()} |
|
||||||
% Should be sync if it is necessary to flush to disk after every
|
% Should be sync if it is necessary to flush to disk after every
|
||||||
% write, or none if not (allow the OS to schecdule). This has a
|
% write, or none if not (allow the OS to schecdule). This has a
|
||||||
|
@ -293,7 +300,7 @@
|
||||||
% To which level of the ledger should the ledger contents be
|
% To which level of the ledger should the ledger contents be
|
||||||
% pre-loaded into the pagecache (using fadvise on creation and
|
% pre-loaded into the pagecache (using fadvise on creation and
|
||||||
% startup)
|
% startup)
|
||||||
{compression_method, native|lz4|zstd|none} |
|
{compression_method, native|lz4|zstd|none} |
|
||||||
% Compression method and point allow Leveled to be switched from
|
% Compression method and point allow Leveled to be switched from
|
||||||
% using bif based compression (zlib) to using nif based compression
|
% using bif based compression (zlib) to using nif based compression
|
||||||
% (lz4 or zstd).
|
% (lz4 or zstd).
|
||||||
|
@ -1871,6 +1878,7 @@ set_options(Opts, Monitor) ->
|
||||||
CompressionLevel = proplists:get_value(compression_level, Opts),
|
CompressionLevel = proplists:get_value(compression_level, Opts),
|
||||||
|
|
||||||
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
|
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
|
||||||
|
MaxMergeBelow = proplists:get_value(max_mergebelow, Opts),
|
||||||
|
|
||||||
ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),
|
ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),
|
||||||
|
|
||||||
|
@ -1904,6 +1912,7 @@ set_options(Opts, Monitor) ->
|
||||||
press_level = CompressionLevel,
|
press_level = CompressionLevel,
|
||||||
log_options = leveled_log:get_opts(),
|
log_options = leveled_log:get_opts(),
|
||||||
max_sstslots = MaxSSTSlots,
|
max_sstslots = MaxSSTSlots,
|
||||||
|
max_mergebelow = MaxMergeBelow,
|
||||||
monitor = Monitor},
|
monitor = Monitor},
|
||||||
monitor = Monitor}
|
monitor = Monitor}
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -165,7 +165,7 @@
|
||||||
pc010 =>
|
pc010 =>
|
||||||
{info, <<"Merge to be commenced for FileToMerge=~s with MSN=~w">>},
|
{info, <<"Merge to be commenced for FileToMerge=~s with MSN=~w">>},
|
||||||
pc011 =>
|
pc011 =>
|
||||||
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w">>},
|
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w merge_type=~w">>},
|
||||||
pc012 =>
|
pc012 =>
|
||||||
{debug, <<"File to be created as part of MSN=~w Filename=~s IsBasement=~w">>},
|
{debug, <<"File to be created as part of MSN=~w Filename=~s IsBasement=~w">>},
|
||||||
pc013 =>
|
pc013 =>
|
||||||
|
@ -190,6 +190,8 @@
|
||||||
{info, <<"Grooming compaction picked file with tomb_count=~w">>},
|
{info, <<"Grooming compaction picked file with tomb_count=~w">>},
|
||||||
pc025 =>
|
pc025 =>
|
||||||
{info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>},
|
{info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>},
|
||||||
|
pc026 =>
|
||||||
|
{info, <<"Performing potential partial to level=~w merge as FileCounter=~w restricting to MaxAdditions=~w">>},
|
||||||
pm002 =>
|
pm002 =>
|
||||||
{info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>},
|
{info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>},
|
||||||
sst03 =>
|
sst03 =>
|
||||||
|
|
|
@ -269,7 +269,6 @@ notify_deletions([Head|Tail], Penciller) ->
|
||||||
%% to be merged into multiple SSTs at a lower level.
|
%% to be merged into multiple SSTs at a lower level.
|
||||||
%%
|
%%
|
||||||
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1
|
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1
|
||||||
|
|
||||||
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) ->
|
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) ->
|
||||||
leveled_log:log(pc010, [leveled_pmanifest:entry_filename(Src), NewSQN]),
|
leveled_log:log(pc010, [leveled_pmanifest:entry_filename(Src), NewSQN]),
|
||||||
SrcList = [{next, Src, all}],
|
SrcList = [{next, Src, all}],
|
||||||
|
@ -279,72 +278,188 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) ->
|
||||||
),
|
),
|
||||||
SinkLevel = SrcLevel + 1,
|
SinkLevel = SrcLevel + 1,
|
||||||
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
|
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
|
||||||
Additions =
|
MaxMergeBelow = OptsSST#sst_options.max_mergebelow,
|
||||||
|
MergeLimit = merge_limit(SrcLevel, length(SinkList), MaxMergeBelow),
|
||||||
|
{L2Additions, L1Additions, L2FileRemainder} =
|
||||||
do_merge(
|
do_merge(
|
||||||
SrcList, SinkList,
|
SrcList, SinkList,
|
||||||
SinkLevel, SinkBasement,
|
SinkLevel, SinkBasement,
|
||||||
RootPath, NewSQN, MaxSQN,
|
RootPath, NewSQN, MaxSQN,
|
||||||
OptsSST,
|
OptsSST,
|
||||||
[]
|
[],
|
||||||
|
MergeLimit
|
||||||
|
),
|
||||||
|
RevertPointerFun = fun({next, ME, _SK}) -> ME end,
|
||||||
|
SinkManifestRemovals =
|
||||||
|
lists:subtract(
|
||||||
|
lists:map(RevertPointerFun, SinkList),
|
||||||
|
lists:map(RevertPointerFun, L2FileRemainder)
|
||||||
),
|
),
|
||||||
RevertPointerFun =
|
|
||||||
fun({next, ME, _SK}) ->
|
|
||||||
ME
|
|
||||||
end,
|
|
||||||
SinkManifestList = lists:map(RevertPointerFun, SinkList),
|
|
||||||
Man0 =
|
Man0 =
|
||||||
leveled_pmanifest:replace_manifest_entry(
|
leveled_pmanifest:replace_manifest_entry(
|
||||||
Manifest,
|
Manifest,
|
||||||
NewSQN,
|
NewSQN,
|
||||||
SinkLevel,
|
SinkLevel,
|
||||||
SinkManifestList,
|
SinkManifestRemovals,
|
||||||
Additions
|
L2Additions
|
||||||
),
|
),
|
||||||
Man2 =
|
Man1 =
|
||||||
leveled_pmanifest:remove_manifest_entry(
|
case L1Additions of
|
||||||
Man0,
|
[] ->
|
||||||
NewSQN,
|
leveled_pmanifest:remove_manifest_entry(
|
||||||
SrcLevel,
|
Man0,
|
||||||
Src
|
NewSQN,
|
||||||
),
|
SrcLevel,
|
||||||
{Man2, [Src|SinkManifestList]}.
|
Src
|
||||||
|
);
|
||||||
|
PartialFiles ->
|
||||||
|
leveled_pmanifest:replace_manifest_entry(
|
||||||
|
Man0,
|
||||||
|
NewSQN,
|
||||||
|
SrcLevel,
|
||||||
|
[Src],
|
||||||
|
PartialFiles
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
{Man1, [Src|SinkManifestRemovals]}.
|
||||||
|
|
||||||
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) ->
|
-spec merge_limit(
|
||||||
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions)]),
|
non_neg_integer(), non_neg_integer(), pos_integer()|infinity)
|
||||||
lists:reverse(Additions);
|
-> pos_integer()|infinity.
|
||||||
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
|
merge_limit(SrcLevel, SinkListLength, MMB) when SrcLevel =< 1; SinkListLength < MMB ->
|
||||||
|
infinity;
|
||||||
|
merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) ->
|
||||||
|
AdditionsLimit = max(1, MMB div 2),
|
||||||
|
leveled_log:log(pc026, [SrcLevel + 1, SinkListLength, AdditionsLimit]),
|
||||||
|
AdditionsLimit.
|
||||||
|
|
||||||
|
-type merge_maybe_expanded_pointer() ::
|
||||||
|
leveled_codec:ledger_kv()|
|
||||||
|
leveled_sst:slot_pointer()|
|
||||||
|
leveled_sst:sst_pointer().
|
||||||
|
% Different to leveled_sst:maybe_expanded_pointer/0
|
||||||
|
% No sst_closed_pointer()
|
||||||
|
|
||||||
|
-spec do_merge(
|
||||||
|
list(merge_maybe_expanded_pointer()),
|
||||||
|
list(merge_maybe_expanded_pointer()),
|
||||||
|
leveled_pmanifest:lsm_level(),
|
||||||
|
boolean(),
|
||||||
|
string(),
|
||||||
|
pos_integer(),
|
||||||
|
pos_integer(),
|
||||||
|
leveled_sst:sst_options(),
|
||||||
|
list(leveled_pmanifest:manifest_entry()),
|
||||||
|
pos_integer()|infinity) ->
|
||||||
|
{
|
||||||
|
list(leveled_pmanifest:manifest_entry()),
|
||||||
|
list(leveled_pmanifest:manifest_entry()),
|
||||||
|
list(leveled_sst:sst_pointer())
|
||||||
|
}.
|
||||||
|
do_merge(
|
||||||
|
[], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max) ->
|
||||||
|
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]),
|
||||||
|
{lists:reverse(Additions), [], []};
|
||||||
|
do_merge(
|
||||||
|
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max)
|
||||||
|
when length(Additions) >= Max ->
|
||||||
|
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]),
|
||||||
|
FNSrc =
|
||||||
|
leveled_penciller:sst_filename(
|
||||||
|
NewSQN, SinkLevel - 1, 1
|
||||||
|
),
|
||||||
|
FNSnk =
|
||||||
|
leveled_penciller:sst_filename(
|
||||||
|
NewSQN, SinkLevel, length(Additions) + 1
|
||||||
|
),
|
||||||
|
{ExpandedKL1, []} = split_unexpanded_files(KL1),
|
||||||
|
{ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2),
|
||||||
|
TS1 = os:timestamp(),
|
||||||
|
InfOpts = OptsSST#sst_options{max_sstslots = infinity},
|
||||||
|
% Need to be careful to make sure all the remainder goes in one file,
|
||||||
|
% could be situations whereby the max_sstslots has been changed between
|
||||||
|
% restarts - and so there is too much data for one file in the
|
||||||
|
% remainder ... but don't want to loop round and consider more complex
|
||||||
|
% scenarios here.
|
||||||
|
NewMergeKL1 =
|
||||||
|
leveled_sst:sst_newmerge(
|
||||||
|
RP, FNSrc,ExpandedKL1, [], false, SinkLevel - 1, MaxSQN, InfOpts
|
||||||
|
),
|
||||||
|
TS2 = os:timestamp(),
|
||||||
|
NewMergeKL2 =
|
||||||
|
leveled_sst:sst_newmerge(
|
||||||
|
RP, FNSnk, [], ExpandedKL2, SinkB, SinkLevel, MaxSQN, InfOpts
|
||||||
|
),
|
||||||
|
{KL1Additions, [], []} = add_entry(NewMergeKL1, FNSrc, TS1, []),
|
||||||
|
{KL2Additions, [], []} = add_entry(NewMergeKL2, FNSnk, TS2, Additions),
|
||||||
|
{lists:reverse(KL2Additions), KL1Additions, L2FilePointersRem};
|
||||||
|
do_merge(
|
||||||
|
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) ->
|
||||||
FileName =
|
FileName =
|
||||||
leveled_penciller:sst_filename(
|
leveled_penciller:sst_filename(
|
||||||
NewSQN, SinkLevel, length(Additions)
|
NewSQN, SinkLevel, length(Additions)
|
||||||
),
|
),
|
||||||
leveled_log:log(pc012, [NewSQN, FileName, SinkB]),
|
leveled_log:log(pc012, [NewSQN, FileName, SinkB]),
|
||||||
TS1 = os:timestamp(),
|
TS1 = os:timestamp(),
|
||||||
case leveled_sst:sst_newmerge(RP, FileName,
|
NewMerge =
|
||||||
KL1, KL2, SinkB, SinkLevel, MaxSQN,
|
leveled_sst:sst_newmerge(
|
||||||
OptsSST) of
|
RP, FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN, OptsSST),
|
||||||
empty ->
|
{UpdAdditions, KL1Rem, KL2Rem} =
|
||||||
leveled_log:log(pc013, [FileName]),
|
add_entry(NewMerge, FileName, TS1, Additions),
|
||||||
do_merge(
|
do_merge(
|
||||||
[], [],
|
KL1Rem,
|
||||||
SinkLevel, SinkB,
|
KL2Rem,
|
||||||
RP, NewSQN, MaxSQN,
|
SinkLevel,
|
||||||
OptsSST,
|
SinkB,
|
||||||
Additions
|
RP,
|
||||||
);
|
NewSQN,
|
||||||
{ok, Pid, Reply, Bloom} ->
|
MaxSQN,
|
||||||
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
|
OptsSST,
|
||||||
Entry =
|
UpdAdditions,
|
||||||
leveled_pmanifest:new_entry(
|
Max
|
||||||
SmallestKey, HighestKey, Pid, FileName, Bloom),
|
).
|
||||||
leveled_log:log_timer(pc015, [], TS1),
|
|
||||||
do_merge(
|
add_entry(empty, FileName, _TS1, Additions) ->
|
||||||
KL1Rem, KL2Rem,
|
leveled_log:log(pc013, [FileName]),
|
||||||
SinkLevel, SinkB,
|
{[], [], Additions};
|
||||||
RP, NewSQN, MaxSQN,
|
add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) ->
|
||||||
OptsSST,
|
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
|
||||||
[Entry|Additions]
|
Entry =
|
||||||
)
|
leveled_pmanifest:new_entry(
|
||||||
end.
|
SmallestKey, HighestKey, Pid, FileName, Bloom),
|
||||||
|
leveled_log:log_timer(pc015, [], TS1),
|
||||||
|
{[Entry|Additions], KL1Rem, KL2Rem}.
|
||||||
|
|
||||||
|
|
||||||
|
-spec split_unexpanded_files(
|
||||||
|
list(merge_maybe_expanded_pointer())) ->
|
||||||
|
{
|
||||||
|
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
|
||||||
|
list(leveled_sst:sst_pointer())
|
||||||
|
}.
|
||||||
|
split_unexpanded_files(Pointers) ->
|
||||||
|
split_unexpanded_files(Pointers, [], []).
|
||||||
|
|
||||||
|
-spec split_unexpanded_files(
|
||||||
|
list(merge_maybe_expanded_pointer()),
|
||||||
|
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
|
||||||
|
list(leveled_sst:sst_pointer())) ->
|
||||||
|
{
|
||||||
|
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
|
||||||
|
list(leveled_sst:sst_pointer())
|
||||||
|
}.
|
||||||
|
split_unexpanded_files([], MaybeExpanded, FilePointers) ->
|
||||||
|
{lists:reverse(MaybeExpanded), lists:reverse(FilePointers)};
|
||||||
|
split_unexpanded_files([{next, P, SK}|Rest], MaybeExpanded, FilePointers) ->
|
||||||
|
split_unexpanded_files(Rest, MaybeExpanded, [{next, P, SK}|FilePointers]);
|
||||||
|
split_unexpanded_files([{LK, LV}|Rest], MaybeExpanded, []) ->
|
||||||
|
% Should never see this, once a FilePointer has been seen
|
||||||
|
split_unexpanded_files(Rest, [{LK, LV}|MaybeExpanded], []);
|
||||||
|
split_unexpanded_files([{pointer, P, SIV, SK, EK}|Rest], MaybeExpanded, []) ->
|
||||||
|
% Should never see this, once a FilePointer has been seen
|
||||||
|
split_unexpanded_files(
|
||||||
|
Rest, [{pointer, P, SIV, SK, EK}|MaybeExpanded], []
|
||||||
|
).
|
||||||
|
|
||||||
-spec grooming_scorer(
|
-spec grooming_scorer(
|
||||||
list(leveled_pmanifest:manifest_entry()))
|
list(leveled_pmanifest:manifest_entry()))
|
||||||
|
|
|
@ -248,7 +248,18 @@
|
||||||
|
|
||||||
-type build_timings() :: no_timing|#build_timings{}.
|
-type build_timings() :: no_timing|#build_timings{}.
|
||||||
|
|
||||||
-export_type([expandable_pointer/0, press_method/0, segment_check_fun/0]).
|
-export_type(
|
||||||
|
[
|
||||||
|
expandable_pointer/0,
|
||||||
|
maybe_expanded_pointer/0,
|
||||||
|
sst_closed_pointer/0,
|
||||||
|
sst_pointer/0,
|
||||||
|
slot_pointer/0,
|
||||||
|
press_method/0,
|
||||||
|
segment_check_fun/0,
|
||||||
|
sst_options/0
|
||||||
|
]
|
||||||
|
).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -312,8 +323,8 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec sst_newmerge(string(), string(),
|
-spec sst_newmerge(string(), string(),
|
||||||
list(leveled_codec:ledger_kv()|sst_pointer()),
|
list(maybe_expanded_pointer()),
|
||||||
list(leveled_codec:ledger_kv()|sst_pointer()),
|
list(maybe_expanded_pointer()),
|
||||||
boolean(), leveled_pmanifest:lsm_level(),
|
boolean(), leveled_pmanifest:lsm_level(),
|
||||||
integer(), sst_options())
|
integer(), sst_options())
|
||||||
-> empty|{ok, pid(),
|
-> empty|{ok, pid(),
|
||||||
|
@ -1529,7 +1540,9 @@ compress_level(_Level, _LevelToCompress, PressMethod) ->
|
||||||
PressMethod.
|
PressMethod.
|
||||||
|
|
||||||
-spec maxslots_level(
|
-spec maxslots_level(
|
||||||
leveled_pmanifest:lsm_level(), pos_integer()) -> pos_integer().
|
leveled_pmanifest:lsm_level(), pos_integer()|infinity) -> pos_integer()|infinity.
|
||||||
|
maxslots_level(_Level, infinity) ->
|
||||||
|
infinity;
|
||||||
maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL ->
|
maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL ->
|
||||||
MaxSlotCount;
|
MaxSlotCount;
|
||||||
maxslots_level(_Level, MaxSlotCount) ->
|
maxslots_level(_Level, MaxSlotCount) ->
|
||||||
|
@ -3020,7 +3033,7 @@ merge_lists(
|
||||||
list(binary_slot()),
|
list(binary_slot()),
|
||||||
leveled_codec:ledger_key()|null,
|
leveled_codec:ledger_key()|null,
|
||||||
non_neg_integer(),
|
non_neg_integer(),
|
||||||
non_neg_integer(),
|
pos_integer()|infinity,
|
||||||
press_method(),
|
press_method(),
|
||||||
boolean(),
|
boolean(),
|
||||||
non_neg_integer(),
|
non_neg_integer(),
|
||||||
|
|
|
@ -2,8 +2,9 @@
|
||||||
|
|
||||||
-include("leveled.hrl").
|
-include("leveled.hrl").
|
||||||
|
|
||||||
-export([all/0, init_per_suite/1, end_per_suite/1]).
|
-export([all/0, init_per_suite/1, end_per_suite/1, suite/0]).
|
||||||
-export([
|
-export([
|
||||||
|
test_large_lsm_merge/1,
|
||||||
basic_riak/1,
|
basic_riak/1,
|
||||||
fetchclocks_modifiedbetween/1,
|
fetchclocks_modifiedbetween/1,
|
||||||
crossbucket_aae/1,
|
crossbucket_aae/1,
|
||||||
|
@ -14,6 +15,8 @@
|
||||||
summarisable_sstindex/1
|
summarisable_sstindex/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
suite() -> [{timetrap, {hours, 2}}].
|
||||||
|
|
||||||
all() -> [
|
all() -> [
|
||||||
basic_riak,
|
basic_riak,
|
||||||
fetchclocks_modifiedbetween,
|
fetchclocks_modifiedbetween,
|
||||||
|
@ -22,7 +25,8 @@ all() -> [
|
||||||
dollar_bucket_index,
|
dollar_bucket_index,
|
||||||
dollar_key_index,
|
dollar_key_index,
|
||||||
bigobject_memorycheck,
|
bigobject_memorycheck,
|
||||||
summarisable_sstindex
|
summarisable_sstindex,
|
||||||
|
test_large_lsm_merge
|
||||||
].
|
].
|
||||||
|
|
||||||
-define(MAGIC, 53). % riak_kv -> riak_object
|
-define(MAGIC, 53). % riak_kv -> riak_object
|
||||||
|
@ -34,11 +38,160 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(Config) ->
|
end_per_suite(Config) ->
|
||||||
testutil:end_per_suite(Config).
|
testutil:end_per_suite(Config).
|
||||||
|
|
||||||
|
|
||||||
|
test_large_lsm_merge(_Config) ->
|
||||||
|
lsm_merge_tester(24).
|
||||||
|
|
||||||
|
lsm_merge_tester(LoopsPerBucket) ->
|
||||||
|
RootPath = testutil:reset_filestructure("lsmMerge"),
|
||||||
|
PutsPerLoop = 32000,
|
||||||
|
SampleOneIn = 100,
|
||||||
|
StartOpts1 =
|
||||||
|
[
|
||||||
|
{root_path, RootPath},
|
||||||
|
{max_pencillercachesize, 16000},
|
||||||
|
{max_sstslots, 96},
|
||||||
|
% Make SST files smaller, to accelerate merges
|
||||||
|
{max_mergebelow, 24},
|
||||||
|
{sync_strategy, testutil:sync_strategy()},
|
||||||
|
{log_level, warn},
|
||||||
|
{compression_method, zstd},
|
||||||
|
{
|
||||||
|
forced_logs,
|
||||||
|
[
|
||||||
|
b0015, b0016, b0017, b0018, p0032, sst12,
|
||||||
|
pc008, pc010, pc011, pc026,
|
||||||
|
p0018, p0024
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
|
||||||
|
LoadBucketFun =
|
||||||
|
fun(Book, Bucket, Loops) ->
|
||||||
|
V = testutil:get_compressiblevalue(),
|
||||||
|
lists:foreach(
|
||||||
|
fun(_I) ->
|
||||||
|
{_, V} =
|
||||||
|
testutil:put_indexed_objects(
|
||||||
|
Book,
|
||||||
|
Bucket,
|
||||||
|
PutsPerLoop,
|
||||||
|
V
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
lists:seq(1, Loops)
|
||||||
|
),
|
||||||
|
V
|
||||||
|
end,
|
||||||
|
|
||||||
|
V1 = LoadBucketFun(Bookie1, <<"B1">>, LoopsPerBucket),
|
||||||
|
io:format("Completed load of ~s~n", [<<"B1">>]),
|
||||||
|
V2 = LoadBucketFun(Bookie1, <<"B2">>, LoopsPerBucket),
|
||||||
|
io:format("Completed load of ~s~n", [<<"B2">>]),
|
||||||
|
ValueMap = #{<<"B1">> => V1, <<"B2">> => V2},
|
||||||
|
|
||||||
|
CheckBucketFun =
|
||||||
|
fun(Book) ->
|
||||||
|
BookHeadFoldFun =
|
||||||
|
fun(B, K, _Hd, {SampleKeys, CountAcc}) ->
|
||||||
|
UpdCntAcc =
|
||||||
|
maps:update_with(B, fun(C) -> C + 1 end, 1, CountAcc),
|
||||||
|
case rand:uniform(SampleOneIn) of
|
||||||
|
R when R == 1 ->
|
||||||
|
{[{B, K}|SampleKeys], UpdCntAcc};
|
||||||
|
_ ->
|
||||||
|
{SampleKeys, UpdCntAcc}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{async, HeadFolder} =
|
||||||
|
leveled_bookie:book_headfold(
|
||||||
|
Book,
|
||||||
|
?RIAK_TAG,
|
||||||
|
{BookHeadFoldFun, {[], maps:new()}},
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
false
|
||||||
|
),
|
||||||
|
{Time, R} = timer:tc(HeadFolder),
|
||||||
|
io:format(
|
||||||
|
"CheckBucketFold returned counts ~w in ~w ms~n",
|
||||||
|
[element(2, R), Time div 1000]
|
||||||
|
),
|
||||||
|
R
|
||||||
|
end,
|
||||||
|
|
||||||
|
{SampleKeysF1, CountMapF1} = CheckBucketFun(Bookie1),
|
||||||
|
true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF1),
|
||||||
|
true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF1),
|
||||||
|
|
||||||
|
TestSampleKeyFun =
|
||||||
|
fun(Book, Values) ->
|
||||||
|
fun({B, K}) ->
|
||||||
|
ExpectedV = maps:get(B, Values),
|
||||||
|
{ok, Obj} = testutil:book_riakget(Book, B, K),
|
||||||
|
true = ExpectedV == testutil:get_value(Obj)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
{GT1, ok} =
|
||||||
|
timer:tc(
|
||||||
|
fun() ->
|
||||||
|
lists:foreach(TestSampleKeyFun(Bookie1, ValueMap), SampleKeysF1)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
io:format(
|
||||||
|
"Returned ~w sample gets in ~w ms~n",
|
||||||
|
[length(SampleKeysF1), GT1 div 1000]
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
{ok, Bookie2} =
|
||||||
|
leveled_bookie:book_start(
|
||||||
|
lists:ukeysort(1, [{max_sstslots, 64}|StartOpts1])
|
||||||
|
),
|
||||||
|
|
||||||
|
{SampleKeysF2, CountMapF2} = CheckBucketFun(Bookie2),
|
||||||
|
true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF2),
|
||||||
|
true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF2),
|
||||||
|
|
||||||
|
{GT2, ok} =
|
||||||
|
timer:tc(
|
||||||
|
fun() ->
|
||||||
|
lists:foreach(TestSampleKeyFun(Bookie2, ValueMap), SampleKeysF2)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
io:format(
|
||||||
|
"Returned ~w sample gets in ~w ms~n",
|
||||||
|
[length(SampleKeysF2), GT2 div 1000]
|
||||||
|
),
|
||||||
|
|
||||||
|
V3 = LoadBucketFun(Bookie2, <<"B3">>, LoopsPerBucket),
|
||||||
|
io:format("Completed load of ~s~n", [<<"B3">>]),
|
||||||
|
UpdValueMap = #{<<"B1">> => V1, <<"B2">> => V2, <<"B3">> => V3},
|
||||||
|
|
||||||
|
{SampleKeysF3, CountMapF3} = CheckBucketFun(Bookie2),
|
||||||
|
true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF3),
|
||||||
|
true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF3),
|
||||||
|
true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B3">>, CountMapF3),
|
||||||
|
|
||||||
|
{GT3, ok} =
|
||||||
|
timer:tc(
|
||||||
|
fun() ->
|
||||||
|
lists:foreach(TestSampleKeyFun(Bookie2, UpdValueMap), SampleKeysF3)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
io:format(
|
||||||
|
"Returned ~w sample gets in ~w ms~n",
|
||||||
|
[length(SampleKeysF3), GT3 div 1000]
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_destroy(Bookie2).
|
||||||
|
|
||||||
basic_riak(_Config) ->
|
basic_riak(_Config) ->
|
||||||
basic_riak_tester(<<"B0">>, 640000),
|
basic_riak_tester(<<"B0">>, 640000),
|
||||||
basic_riak_tester({<<"Type0">>, <<"B0">>}, 80000).
|
basic_riak_tester({<<"Type0">>, <<"B0">>}, 80000).
|
||||||
|
|
||||||
|
|
||||||
basic_riak_tester(Bucket, KeyCount) ->
|
basic_riak_tester(Bucket, KeyCount) ->
|
||||||
% Key Count should be > 10K and divisible by 5
|
% Key Count should be > 10K and divisible by 5
|
||||||
io:format("Basic riak test with Bucket ~w KeyCount ~w~n",
|
io:format("Basic riak test with Bucket ~w KeyCount ~w~n",
|
||||||
|
|
|
@ -883,7 +883,11 @@ put_indexed_objects(Book, Bucket, Count, V) ->
|
||||||
KSpecL =
|
KSpecL =
|
||||||
lists:map(
|
lists:map(
|
||||||
fun({_RN, Obj, Spc}) ->
|
fun({_RN, Obj, Spc}) ->
|
||||||
book_riakput(Book, Obj, Spc),
|
R = book_riakput(Book,Obj, Spc),
|
||||||
|
case R of
|
||||||
|
ok -> ok;
|
||||||
|
pause -> timer:sleep(?SLOWOFFER_DELAY)
|
||||||
|
end,
|
||||||
{testutil:get_key(Obj), Spc}
|
{testutil:get_key(Obj), Spc}
|
||||||
end,
|
end,
|
||||||
ObjL1),
|
ObjL1),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue