diff --git a/include/leveled.hrl b/include/leveled.hrl index 6295b4c..5244e38 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -33,6 +33,7 @@ %%% Non-configurable startup defaults %%%============================================================================ -define(MAX_SSTSLOTS, 256). +-define(MAX_MERGEBELOW, 24). -define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). -define(CACHE_SIZE_JITTER, 25). @@ -109,7 +110,8 @@ press_level = ?COMPRESSION_LEVEL :: non_neg_integer(), log_options = leveled_log:get_opts() :: leveled_log:log_options(), - max_sstslots = ?MAX_SSTSLOTS :: pos_integer(), + max_sstslots = ?MAX_SSTSLOTS :: pos_integer()|infinity, + max_mergebelow = ?MAX_MERGEBELOW :: pos_integer()|infinity, pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP :: pos_integer(), monitor = {no_monitor, 0} diff --git a/rebar.config b/rebar.config index 1953a96..454f40f 100644 --- a/rebar.config +++ b/rebar.config @@ -13,11 +13,7 @@ {eunit_opts, [verbose]}. {project_plugins, [ - {eqwalizer_rebar3, - {git_subdir, - "https://github.com/whatsapp/eqwalizer.git", - {branch, "main"}, - "eqwalizer_rebar3"}} + {eqwalizer_rebar3, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_rebar3"}} ]}. {profiles, @@ -37,7 +33,7 @@ {deps, [ {lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop-3.4"}}}, {zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}}, - {eqwalizer_support, {git_subdir, "https://github.com/whatsapp/eqwalizer.git", {branch, "main"}, "eqwalizer_support"}} + {eqwalizer_support, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_support"}} ]}. {ct_opts, [{dir, ["test/end_to_end"]}]}. diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index e4292e1..459cc99 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -114,6 +114,7 @@ {max_journalsize, 1000000000}, {max_journalobjectcount, 200000}, {max_sstslots, 256}, + {max_mergebelow, 24}, {sync_strategy, ?DEFAULT_SYNC_STRATEGY}, {head_only, false}, {waste_retention_period, undefined}, @@ -201,6 +202,12 @@ % The maximum number of slots in a SST file. All testing is done % at a size of 256 (except for Quickcheck tests}, altering this % value is not recommended + {max_mergeblow, pos_integer()|infinity} | + % The maximum number of files for a single file to be merged into + % within the ledger. If less than this, the merge will continue + % without a maximum. If this or more overlapping below, only up + % to max_mergebelow div 2 additions should be created (the merge + % should be partial) {sync_strategy, sync_mode()} | % Should be sync if it is necessary to flush to disk after every % write, or none if not (allow the OS to schecdule). This has a @@ -293,7 +300,7 @@ % To which level of the ledger should the ledger contents be % pre-loaded into the pagecache (using fadvise on creation and % startup) - {compression_method, native|lz4|zstd|none} | + {compression_method, native|lz4|zstd|none} | % Compression method and point allow Leveled to be switched from % using bif based compression (zlib) to using nif based compression % (lz4 or zstd). @@ -1871,6 +1878,7 @@ set_options(Opts, Monitor) -> CompressionLevel = proplists:get_value(compression_level, Opts), MaxSSTSlots = proplists:get_value(max_sstslots, Opts), + MaxMergeBelow = proplists:get_value(max_mergebelow, Opts), ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts), @@ -1904,6 +1912,7 @@ set_options(Opts, Monitor) -> press_level = CompressionLevel, log_options = leveled_log:get_opts(), max_sstslots = MaxSSTSlots, + max_mergebelow = MaxMergeBelow, monitor = Monitor}, monitor = Monitor} }. diff --git a/src/leveled_log.erl b/src/leveled_log.erl index c46c8b7..af64b54 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -165,7 +165,7 @@ pc010 => {info, <<"Merge to be commenced for FileToMerge=~s with MSN=~w">>}, pc011 => - {info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w">>}, + {info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w merge_type=~w">>}, pc012 => {debug, <<"File to be created as part of MSN=~w Filename=~s IsBasement=~w">>}, pc013 => @@ -190,6 +190,8 @@ {info, <<"Grooming compaction picked file with tomb_count=~w">>}, pc025 => {info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>}, + pc026 => + {info, <<"Performing potential partial to level=~w merge as FileCounter=~w restricting to MaxAdditions=~w">>}, pm002 => {info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>}, sst03 => diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 107bcf1..810c481 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -269,7 +269,6 @@ notify_deletions([Head|Tail], Penciller) -> %% to be merged into multiple SSTs at a lower level. %% %% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1 - perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) -> leveled_log:log(pc010, [leveled_pmanifest:entry_filename(Src), NewSQN]), SrcList = [{next, Src, all}], @@ -279,72 +278,188 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) -> ), SinkLevel = SrcLevel + 1, SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel), - Additions = + MaxMergeBelow = OptsSST#sst_options.max_mergebelow, + MergeLimit = merge_limit(SrcLevel, length(SinkList), MaxMergeBelow), + {L2Additions, L1Additions, L2FileRemainder} = do_merge( SrcList, SinkList, SinkLevel, SinkBasement, RootPath, NewSQN, MaxSQN, OptsSST, - [] + [], + MergeLimit + ), + RevertPointerFun = fun({next, ME, _SK}) -> ME end, + SinkManifestRemovals = + lists:subtract( + lists:map(RevertPointerFun, SinkList), + lists:map(RevertPointerFun, L2FileRemainder) ), - RevertPointerFun = - fun({next, ME, _SK}) -> - ME - end, - SinkManifestList = lists:map(RevertPointerFun, SinkList), Man0 = leveled_pmanifest:replace_manifest_entry( Manifest, NewSQN, SinkLevel, - SinkManifestList, - Additions + SinkManifestRemovals, + L2Additions ), - Man2 = - leveled_pmanifest:remove_manifest_entry( - Man0, - NewSQN, - SrcLevel, - Src - ), - {Man2, [Src|SinkManifestList]}. + Man1 = + case L1Additions of + [] -> + leveled_pmanifest:remove_manifest_entry( + Man0, + NewSQN, + SrcLevel, + Src + ); + PartialFiles -> + leveled_pmanifest:replace_manifest_entry( + Man0, + NewSQN, + SrcLevel, + [Src], + PartialFiles + ) + end, + {Man1, [Src|SinkManifestRemovals]}. -do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) -> - leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions)]), - lists:reverse(Additions); -do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) -> +-spec merge_limit( + non_neg_integer(), non_neg_integer(), pos_integer()|infinity) + -> pos_integer()|infinity. +merge_limit(SrcLevel, SinkListLength, MMB) when SrcLevel =< 1; SinkListLength < MMB -> + infinity; +merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) -> + AdditionsLimit = max(1, MMB div 2), + leveled_log:log(pc026, [SrcLevel + 1, SinkListLength, AdditionsLimit]), + AdditionsLimit. + +-type merge_maybe_expanded_pointer() :: + leveled_codec:ledger_kv()| + leveled_sst:slot_pointer()| + leveled_sst:sst_pointer(). + % Different to leveled_sst:maybe_expanded_pointer/0 + % No sst_closed_pointer() + +-spec do_merge( + list(merge_maybe_expanded_pointer()), + list(merge_maybe_expanded_pointer()), + leveled_pmanifest:lsm_level(), + boolean(), + string(), + pos_integer(), + pos_integer(), + leveled_sst:sst_options(), + list(leveled_pmanifest:manifest_entry()), + pos_integer()|infinity) -> + { + list(leveled_pmanifest:manifest_entry()), + list(leveled_pmanifest:manifest_entry()), + list(leveled_sst:sst_pointer()) + }. +do_merge( + [], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max) -> + leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]), + {lists:reverse(Additions), [], []}; +do_merge( + KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) + when length(Additions) >= Max -> + leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]), + FNSrc = + leveled_penciller:sst_filename( + NewSQN, SinkLevel - 1, 1 + ), + FNSnk = + leveled_penciller:sst_filename( + NewSQN, SinkLevel, length(Additions) + 1 + ), + {ExpandedKL1, []} = split_unexpanded_files(KL1), + {ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2), + TS1 = os:timestamp(), + InfOpts = OptsSST#sst_options{max_sstslots = infinity}, + % Need to be careful to make sure all the remainder goes in one file, + % could be situations whereby the max_sstslots has been changed between + % restarts - and so there is too much data for one file in the + % remainder ... but don't want to loop round and consider more complex + % scenarios here. + NewMergeKL1 = + leveled_sst:sst_newmerge( + RP, FNSrc,ExpandedKL1, [], false, SinkLevel - 1, MaxSQN, InfOpts + ), + TS2 = os:timestamp(), + NewMergeKL2 = + leveled_sst:sst_newmerge( + RP, FNSnk, [], ExpandedKL2, SinkB, SinkLevel, MaxSQN, InfOpts + ), + {KL1Additions, [], []} = add_entry(NewMergeKL1, FNSrc, TS1, []), + {KL2Additions, [], []} = add_entry(NewMergeKL2, FNSnk, TS2, Additions), + {lists:reverse(KL2Additions), KL1Additions, L2FilePointersRem}; +do_merge( + KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) -> FileName = leveled_penciller:sst_filename( NewSQN, SinkLevel, length(Additions) ), leveled_log:log(pc012, [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), - case leveled_sst:sst_newmerge(RP, FileName, - KL1, KL2, SinkB, SinkLevel, MaxSQN, - OptsSST) of - empty -> - leveled_log:log(pc013, [FileName]), - do_merge( - [], [], - SinkLevel, SinkB, - RP, NewSQN, MaxSQN, - OptsSST, - Additions - ); - {ok, Pid, Reply, Bloom} -> - {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, - Entry = - leveled_pmanifest:new_entry( - SmallestKey, HighestKey, Pid, FileName, Bloom), - leveled_log:log_timer(pc015, [], TS1), - do_merge( - KL1Rem, KL2Rem, - SinkLevel, SinkB, - RP, NewSQN, MaxSQN, - OptsSST, - [Entry|Additions] - ) - end. + NewMerge = + leveled_sst:sst_newmerge( + RP, FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN, OptsSST), + {UpdAdditions, KL1Rem, KL2Rem} = + add_entry(NewMerge, FileName, TS1, Additions), + do_merge( + KL1Rem, + KL2Rem, + SinkLevel, + SinkB, + RP, + NewSQN, + MaxSQN, + OptsSST, + UpdAdditions, + Max + ). + +add_entry(empty, FileName, _TS1, Additions) -> + leveled_log:log(pc013, [FileName]), + {[], [], Additions}; +add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) -> + {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, + Entry = + leveled_pmanifest:new_entry( + SmallestKey, HighestKey, Pid, FileName, Bloom), + leveled_log:log_timer(pc015, [], TS1), + {[Entry|Additions], KL1Rem, KL2Rem}. + + +-spec split_unexpanded_files( + list(merge_maybe_expanded_pointer())) -> + { + list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()), + list(leveled_sst:sst_pointer()) + }. +split_unexpanded_files(Pointers) -> + split_unexpanded_files(Pointers, [], []). + +-spec split_unexpanded_files( + list(merge_maybe_expanded_pointer()), + list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()), + list(leveled_sst:sst_pointer())) -> + { + list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()), + list(leveled_sst:sst_pointer()) + }. +split_unexpanded_files([], MaybeExpanded, FilePointers) -> + {lists:reverse(MaybeExpanded), lists:reverse(FilePointers)}; +split_unexpanded_files([{next, P, SK}|Rest], MaybeExpanded, FilePointers) -> + split_unexpanded_files(Rest, MaybeExpanded, [{next, P, SK}|FilePointers]); +split_unexpanded_files([{LK, LV}|Rest], MaybeExpanded, []) -> + % Should never see this, once a FilePointer has been seen + split_unexpanded_files(Rest, [{LK, LV}|MaybeExpanded], []); +split_unexpanded_files([{pointer, P, SIV, SK, EK}|Rest], MaybeExpanded, []) -> + % Should never see this, once a FilePointer has been seen + split_unexpanded_files( + Rest, [{pointer, P, SIV, SK, EK}|MaybeExpanded], [] + ). -spec grooming_scorer( list(leveled_pmanifest:manifest_entry())) diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 1a5b42d..ddf0d98 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -248,7 +248,18 @@ -type build_timings() :: no_timing|#build_timings{}. --export_type([expandable_pointer/0, press_method/0, segment_check_fun/0]). +-export_type( + [ + expandable_pointer/0, + maybe_expanded_pointer/0, + sst_closed_pointer/0, + sst_pointer/0, + slot_pointer/0, + press_method/0, + segment_check_fun/0, + sst_options/0 + ] +). %%%============================================================================ %%% API @@ -312,8 +323,8 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> end. -spec sst_newmerge(string(), string(), - list(leveled_codec:ledger_kv()|sst_pointer()), - list(leveled_codec:ledger_kv()|sst_pointer()), + list(maybe_expanded_pointer()), + list(maybe_expanded_pointer()), boolean(), leveled_pmanifest:lsm_level(), integer(), sst_options()) -> empty|{ok, pid(), @@ -1529,7 +1540,9 @@ compress_level(_Level, _LevelToCompress, PressMethod) -> PressMethod. -spec maxslots_level( - leveled_pmanifest:lsm_level(), pos_integer()) -> pos_integer(). + leveled_pmanifest:lsm_level(), pos_integer()|infinity) -> pos_integer()|infinity. +maxslots_level(_Level, infinity) -> + infinity; maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL -> MaxSlotCount; maxslots_level(_Level, MaxSlotCount) -> @@ -3020,7 +3033,7 @@ merge_lists( list(binary_slot()), leveled_codec:ledger_key()|null, non_neg_integer(), - non_neg_integer(), + pos_integer()|infinity, press_method(), boolean(), non_neg_integer(), diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index e5288c7..8d31dd7 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -2,8 +2,9 @@ -include("leveled.hrl"). --export([all/0, init_per_suite/1, end_per_suite/1]). +-export([all/0, init_per_suite/1, end_per_suite/1, suite/0]). -export([ + test_large_lsm_merge/1, basic_riak/1, fetchclocks_modifiedbetween/1, crossbucket_aae/1, @@ -14,6 +15,8 @@ summarisable_sstindex/1 ]). +suite() -> [{timetrap, {hours, 2}}]. + all() -> [ basic_riak, fetchclocks_modifiedbetween, @@ -22,7 +25,8 @@ all() -> [ dollar_bucket_index, dollar_key_index, bigobject_memorycheck, - summarisable_sstindex + summarisable_sstindex, + test_large_lsm_merge ]. -define(MAGIC, 53). % riak_kv -> riak_object @@ -34,11 +38,160 @@ init_per_suite(Config) -> end_per_suite(Config) -> testutil:end_per_suite(Config). + +test_large_lsm_merge(_Config) -> + lsm_merge_tester(24). + +lsm_merge_tester(LoopsPerBucket) -> + RootPath = testutil:reset_filestructure("lsmMerge"), + PutsPerLoop = 32000, + SampleOneIn = 100, + StartOpts1 = + [ + {root_path, RootPath}, + {max_pencillercachesize, 16000}, + {max_sstslots, 96}, + % Make SST files smaller, to accelerate merges + {max_mergebelow, 24}, + {sync_strategy, testutil:sync_strategy()}, + {log_level, warn}, + {compression_method, zstd}, + { + forced_logs, + [ + b0015, b0016, b0017, b0018, p0032, sst12, + pc008, pc010, pc011, pc026, + p0018, p0024 + ] + } + ], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + + LoadBucketFun = + fun(Book, Bucket, Loops) -> + V = testutil:get_compressiblevalue(), + lists:foreach( + fun(_I) -> + {_, V} = + testutil:put_indexed_objects( + Book, + Bucket, + PutsPerLoop, + V + ) + end, + lists:seq(1, Loops) + ), + V + end, + + V1 = LoadBucketFun(Bookie1, <<"B1">>, LoopsPerBucket), + io:format("Completed load of ~s~n", [<<"B1">>]), + V2 = LoadBucketFun(Bookie1, <<"B2">>, LoopsPerBucket), + io:format("Completed load of ~s~n", [<<"B2">>]), + ValueMap = #{<<"B1">> => V1, <<"B2">> => V2}, + + CheckBucketFun = + fun(Book) -> + BookHeadFoldFun = + fun(B, K, _Hd, {SampleKeys, CountAcc}) -> + UpdCntAcc = + maps:update_with(B, fun(C) -> C + 1 end, 1, CountAcc), + case rand:uniform(SampleOneIn) of + R when R == 1 -> + {[{B, K}|SampleKeys], UpdCntAcc}; + _ -> + {SampleKeys, UpdCntAcc} + end + end, + {async, HeadFolder} = + leveled_bookie:book_headfold( + Book, + ?RIAK_TAG, + {BookHeadFoldFun, {[], maps:new()}}, + true, + false, + false + ), + {Time, R} = timer:tc(HeadFolder), + io:format( + "CheckBucketFold returned counts ~w in ~w ms~n", + [element(2, R), Time div 1000] + ), + R + end, + + {SampleKeysF1, CountMapF1} = CheckBucketFun(Bookie1), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF1), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF1), + + TestSampleKeyFun = + fun(Book, Values) -> + fun({B, K}) -> + ExpectedV = maps:get(B, Values), + {ok, Obj} = testutil:book_riakget(Book, B, K), + true = ExpectedV == testutil:get_value(Obj) + end + end, + + {GT1, ok} = + timer:tc( + fun() -> + lists:foreach(TestSampleKeyFun(Bookie1, ValueMap), SampleKeysF1) + end + ), + io:format( + "Returned ~w sample gets in ~w ms~n", + [length(SampleKeysF1), GT1 div 1000] + ), + + ok = leveled_bookie:book_close(Bookie1), + {ok, Bookie2} = + leveled_bookie:book_start( + lists:ukeysort(1, [{max_sstslots, 64}|StartOpts1]) + ), + + {SampleKeysF2, CountMapF2} = CheckBucketFun(Bookie2), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF2), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF2), + + {GT2, ok} = + timer:tc( + fun() -> + lists:foreach(TestSampleKeyFun(Bookie2, ValueMap), SampleKeysF2) + end + ), + io:format( + "Returned ~w sample gets in ~w ms~n", + [length(SampleKeysF2), GT2 div 1000] + ), + + V3 = LoadBucketFun(Bookie2, <<"B3">>, LoopsPerBucket), + io:format("Completed load of ~s~n", [<<"B3">>]), + UpdValueMap = #{<<"B1">> => V1, <<"B2">> => V2, <<"B3">> => V3}, + + {SampleKeysF3, CountMapF3} = CheckBucketFun(Bookie2), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF3), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF3), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B3">>, CountMapF3), + + {GT3, ok} = + timer:tc( + fun() -> + lists:foreach(TestSampleKeyFun(Bookie2, UpdValueMap), SampleKeysF3) + end + ), + io:format( + "Returned ~w sample gets in ~w ms~n", + [length(SampleKeysF3), GT3 div 1000] + ), + + ok = leveled_bookie:book_destroy(Bookie2). + basic_riak(_Config) -> basic_riak_tester(<<"B0">>, 640000), basic_riak_tester({<<"Type0">>, <<"B0">>}, 80000). - basic_riak_tester(Bucket, KeyCount) -> % Key Count should be > 10K and divisible by 5 io:format("Basic riak test with Bucket ~w KeyCount ~w~n", diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 5816d11..e437b1b 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -883,7 +883,11 @@ put_indexed_objects(Book, Bucket, Count, V) -> KSpecL = lists:map( fun({_RN, Obj, Spc}) -> - book_riakput(Book, Obj, Spc), + R = book_riakput(Book,Obj, Spc), + case R of + ok -> ok; + pause -> timer:sleep(?SLOWOFFER_DELAY) + end, {testutil:get_key(Obj), Spc} end, ObjL1),