diff --git a/priv/leveled.schema b/priv/leveled.schema index 3e565e6..7c259dc 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -72,7 +72,7 @@ {datatype, integer} ]}. -%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% @doc The approximate count of objects when a Journal file should be rolled. %% This time measured in object count, a file will be rolled if either the %% object count or the journal size limit is reached. Default 200K. %% Note that on startup an actual maximum size will be chosen which varies by @@ -83,6 +83,14 @@ {datatype, integer} ]}. +%% @doc The level of the ledger to be pre-loaded into the page cache +%% Depending on how much memory is available for the page cache, and how much +%% disk I/O activity can be tolerated at startup - then the level at which the +%% ledger is forced into the page cache can be controlled by configuration. +{mapping, "leveled.ledger_pagecachelevel", "leveled.ledger_pagecachelevel", [ + {default, 4}, + {datatype, integer} +]}. %% @doc The number of journal compactions per vnode per day %% The higher the value, the more compaction runs, and the sooner space is diff --git a/priv/leveled_multi.schema b/priv/leveled_multi.schema index 22857d7..d1cc0d3 100644 --- a/priv/leveled_multi.schema +++ b/priv/leveled_multi.schema @@ -66,7 +66,7 @@ {datatype, integer} ]}. -%% @doc The approximate size (in bytes) when a Journal file should be rolled. +%% @doc The approximate count of objects when a Journal file should be rolled. %% This time measured in object count, a file will be rolled if either the %% object count or the journal size limit is reached. Default 200K. %% Note that on startup an actual maximum size will be chosen which varies by @@ -77,6 +77,16 @@ {datatype, integer} ]}. +%% @doc The level of the ledger to be pre-loaded into the page cache +%% Depending on how much memory is available for the page cache, and how much +%% disk I/O activity can be tolerated at startup - then the level at which the +%% ledger is forced into the page cache can be controlled by configuration. +{mapping, "multi_backend.$name.leveled.ledger_pagecachelevel", "riak_kv.multi_backend", [ + {default, 4}, + {datatype, integer} +]}. + + %% @doc The number of journal compactions per vnode per day %% The higher the value, the more compaction runs, and the sooner space is %% recovered. But each run has a cost diff --git a/src/leveled.app.src b/src/leveled.app.src index 86a8724..add537e 100644 --- a/src/leveled.app.src +++ b/src/leveled.app.src @@ -1,7 +1,7 @@ {application, leveled, [ {description, "Key Value store based on LSM-Tree and designed for larger values"}, - {vsn, "0.9.17"}, + {vsn, "0.9.18"}, {registered, []}, {applications, [ kernel, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index b14dfde..3fbf027 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -144,6 +144,7 @@ {maxrunlength_compactionpercentage, 70.0}, {reload_strategy, []}, {max_pencillercachesize, ?MAX_PCL_CACHE_SIZE}, + {ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP}, {compression_method, ?COMPRESSION_METHOD}, {compression_point, ?COMPRESSION_POINT}, {log_level, ?LOG_LEVEL}, @@ -320,6 +321,10 @@ % The minimum size 400 - attempt to set this vlaue lower will be % ignored. As a rule the value should be at least 4 x the Bookie's % cache size + {ledger_preloadpagecache_level, pos_integer()} | + % To which level of the ledger should the ledger contents be + % pre-loaded into the pagecache (using fadvise on creation and + % startup) {compression_method, native|lz4} | % Compression method and point allow Leveled to be switched from % using bif based compression (zlib) to using nif based compression @@ -1183,13 +1188,15 @@ init([Opts]) -> false -> ok end, - + + PageCacheLevel = proplists:get_value(ledger_preloadpagecache_level, Opts), + {HeadOnly, HeadLookup, SSTPageCacheLevel} = case proplists:get_value(head_only, Opts) of false -> - {false, true, ?SST_PAGECACHELEVEL_LOOKUP}; + {false, true, PageCacheLevel}; with_lookup -> - {true, true, ?SST_PAGECACHELEVEL_LOOKUP}; + {true, true, PageCacheLevel}; no_lookup -> {true, false, ?SST_PAGECACHELEVEL_NOLOOKUP} end, diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index baa6506..fb8a088 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -793,24 +793,24 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> {Journal0, ManSlice0}; write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> - KVList = lists:map(fun({K, V, _C}) -> + KVList = + lists:map(fun({K, V, _C}) -> % Compress the value as part of compaction - {K, leveled_codec:maybe_compress(V, PressMethod)} - end, - KVCList), - {ok, Journal1} = case Journal0 of - null -> - {TK, _TV} = lists:nth(1, KVList), - {SQN, _LK} = leveled_codec:from_journalkey(TK), - FP = CDBopts#cdb_options.file_path, - FN = leveled_inker:filepath(FP, - SQN, - compact_journal), - leveled_log:log("IC009", [FN]), - leveled_cdb:cdb_open_writer(FN, CDBopts); - _ -> - {ok, Journal0} - end, + {K, leveled_codec:maybe_compress(V, PressMethod)} + end, + KVCList), + {ok, Journal1} = + case Journal0 of + null -> + {TK, _TV} = lists:nth(1, KVList), + {SQN, _LK} = leveled_codec:from_journalkey(TK), + FP = CDBopts#cdb_options.file_path, + FN = leveled_inker:filepath(FP, SQN, compact_journal), + leveled_log:log("IC009", [FN]), + leveled_cdb:cdb_open_writer(FN, CDBopts); + _ -> + {ok, Journal0} + end, R = leveled_cdb:cdb_mput(Journal1, KVList), case R of ok -> diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index f6621cd..d832c7d 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -12,7 +12,8 @@ aae_bustedjournal/1, journal_compaction_bustedjournal/1, close_duringcompaction/1, - allkeydelta_journal_multicompact/1 + allkeydelta_journal_multicompact/1, + recompact_keydeltas/1 ]). all() -> [ @@ -25,7 +26,8 @@ all() -> [ aae_bustedjournal, journal_compaction_bustedjournal, close_duringcompaction, - allkeydelta_journal_multicompact + allkeydelta_journal_multicompact, + recompact_keydeltas ]. @@ -600,18 +602,19 @@ busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) -> allkeydelta_journal_multicompact(_Config) -> - % Simply confirms that none of this causes a crash RootPath = testutil:reset_filestructure(), B = <<"test_bucket">>, StartOptsFun = fun(JOC) -> [{root_path, RootPath}, {max_journalobjectcount, JOC}, - {max_run_length, 6}, + {max_run_length, 4}, + {singlefile_compactionpercentage, 70.0}, + {maxrunlength_compactionpercentage, 85.0}, {sync_strategy, testutil:sync_strategy()}] end, - {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(16000)), - {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 40000), + {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(14000)), + {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 24000), {KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1, B, KSpcL1, @@ -637,7 +640,7 @@ allkeydelta_journal_multicompact(_Config) -> ok = leveled_bookie:book_close(Bookie1), leveled_penciller:clean_testdir(RootPath ++ "/ledger"), io:format("Restart without ledger~n"), - {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(24000)), + {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(13000)), ok = testutil:check_indexed_objects(Bookie2, B, @@ -657,7 +660,7 @@ allkeydelta_journal_multicompact(_Config) -> ok = leveled_bookie:book_close(Bookie2), io:format("Restart with smaller journal object count~n"), - {ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(8000)), + {ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(7000)), {KSpcL4, V4} = testutil:put_altered_indexed_objects(Bookie3, B, @@ -674,11 +677,45 @@ allkeydelta_journal_multicompact(_Config) -> file:list_dir( filename:join(RootPath, "journal/journal_files/post_compact")), io:format("Number of files after compaction ~w~n", [length(FileList4)]), - true = length(FileList4) >= length(FileList3) + 4, + true = length(FileList4) >= length(FileList3) + 3, ok = leveled_bookie:book_close(Bookie3), testutil:reset_filestructure(10000). +recompact_keydeltas(_Config) -> + RootPath = testutil:reset_filestructure(), + B = <<"test_bucket">>, + StartOptsFun = + fun(JOC) -> + [{root_path, RootPath}, + {max_journalobjectcount, JOC}, + {max_run_length, 4}, + {singlefile_compactionpercentage, 70.0}, + {maxrunlength_compactionpercentage, 85.0}, + {sync_strategy, testutil:sync_strategy()}] + end, + {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(45000)), + {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 24000), + {KSpcL2, _V2} = testutil:put_altered_indexed_objects(Bookie1, + B, + KSpcL1, + false), + ok = leveled_bookie:book_close(Bookie1), + {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(45000)), + compact_and_wait(Bookie2, 0), + {KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2, + B, + KSpcL2, + false), + compact_and_wait(Bookie2, 0), + ok = testutil:check_indexed_objects(Bookie2, + B, + KSpcL1 ++ KSpcL2 ++ KSpcL3, + V3), + ok = leveled_bookie:book_close(Bookie2), + testutil:reset_filestructure(10000). + + rotating_object_check(BookOpts, B, NumberOfObjects) -> {ok, Book1} = leveled_bookie:book_start(BookOpts),