Merge pull request #312 from martinsumner/mas-i311-mergeselector
Mas i311 mergeselector
This commit is contained in:
commit
9412e7c9b0
6 changed files with 393 additions and 82 deletions
|
@ -230,6 +230,8 @@
|
|||
{"PC023",
|
||||
{info, "At level=~w file_count=~w avg_mem=~w "
|
||||
++ "file with most memory fn=~s p=~w mem=~w"}},
|
||||
{"PC024",
|
||||
{info, "Grooming compaction picked file with tomb_count=~w"}},
|
||||
{"PM002",
|
||||
{info, "Completed dump of L0 cache to list of l0cache_size=~w"}},
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@
|
|||
|
||||
-define(MAX_TIMEOUT, 2000).
|
||||
-define(MIN_TIMEOUT, 200).
|
||||
-define(GROOMING_PERC, 50).
|
||||
|
||||
-record(state, {owner :: pid() | undefined,
|
||||
root_path :: string() | undefined,
|
||||
|
@ -56,6 +57,8 @@
|
|||
sst_options :: #sst_options{}
|
||||
}).
|
||||
|
||||
-type manifest_entry() :: #manifest_entry{}.
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
@ -183,7 +186,15 @@ merge(SrcLevel, Manifest, RootPath, OptsSST) ->
|
|||
leveled_log:log("PC023",
|
||||
[SrcLevel + 1, FCnt, AvgMem, MaxFN, MaxP, MaxMem])
|
||||
end,
|
||||
Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel),
|
||||
SelectMethod =
|
||||
case leveled_rand:uniform(100) of
|
||||
R when R =< ?GROOMING_PERC ->
|
||||
{grooming, fun grooming_scorer/1};
|
||||
_ ->
|
||||
random
|
||||
end,
|
||||
Src =
|
||||
leveled_pmanifest:mergefile_selector(Manifest, SrcLevel, SelectMethod),
|
||||
NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1,
|
||||
SinkList = leveled_pmanifest:merge_lookup(Manifest,
|
||||
SrcLevel + 1,
|
||||
|
@ -260,7 +271,7 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
|
|||
length(Additions)),
|
||||
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
|
||||
TS1 = os:timestamp(),
|
||||
case leveled_sst:sst_new(RP, FileName,
|
||||
case leveled_sst:sst_newmerge(RP, FileName,
|
||||
KL1, KL2, SinkB, SinkLevel, MaxSQN,
|
||||
OptsSST) of
|
||||
empty ->
|
||||
|
@ -285,6 +296,23 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
|
|||
Additions ++ [Entry])
|
||||
end.
|
||||
|
||||
-spec grooming_scorer(list(manifest_entry())) -> manifest_entry().
|
||||
grooming_scorer([ME | MEs]) ->
|
||||
InitTombCount = leveled_sst:sst_gettombcount(ME#manifest_entry.owner),
|
||||
{HighestTC, BestME} = grooming_scorer(InitTombCount, ME, MEs),
|
||||
leveled_log:log("PC024", [HighestTC]),
|
||||
BestME.
|
||||
|
||||
grooming_scorer(HighestTC, BestME, []) ->
|
||||
{HighestTC, BestME};
|
||||
grooming_scorer(HighestTC, BestME, [ME | MEs]) ->
|
||||
TombCount = leveled_sst:sst_gettombcount(ME#manifest_entry.owner),
|
||||
case TombCount > HighestTC of
|
||||
true ->
|
||||
grooming_scorer(TombCount, ME, MEs);
|
||||
false ->
|
||||
grooming_scorer(HighestTC, BestME, MEs)
|
||||
end.
|
||||
|
||||
return_deletions(ManifestSQN, PendingDeletionD) ->
|
||||
% The returning of deletions had been seperated out as a failure to fetch
|
||||
|
@ -325,6 +353,82 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) ->
|
|||
generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange).
|
||||
|
||||
|
||||
grooming_score_test() ->
|
||||
ok = filelib:ensure_dir("test/test_area/ledger_files/"),
|
||||
KL1_L3 = lists:sort(generate_randomkeys(2000, 0, 100)),
|
||||
KL2_L3 = lists:sort(generate_randomkeys(2000, 101, 250)),
|
||||
KL3_L3 = lists:sort(generate_randomkeys(2000, 251, 300)),
|
||||
KL4_L3 = lists:sort(generate_randomkeys(2000, 301, 400)),
|
||||
[{HeadK, HeadV}|RestKL2] = KL2_L3,
|
||||
|
||||
{ok, PidL3_1, _, _} =
|
||||
leveled_sst:sst_newmerge("test/test_area/ledger_files/",
|
||||
"1_L3.sst",
|
||||
KL1_L3,
|
||||
[{HeadK, setelement(2, HeadV, tomb)}
|
||||
|RestKL2],
|
||||
false,
|
||||
3,
|
||||
999999,
|
||||
#sst_options{},
|
||||
true,
|
||||
true),
|
||||
{ok, PidL3_1B, _, _} =
|
||||
leveled_sst:sst_newmerge("test/test_area/ledger_files/",
|
||||
"1B_L3.sst",
|
||||
KL1_L3,
|
||||
[{HeadK, setelement(2, HeadV, tomb)}
|
||||
|RestKL2],
|
||||
true,
|
||||
3,
|
||||
999999,
|
||||
#sst_options{},
|
||||
true,
|
||||
true),
|
||||
|
||||
{ok, PidL3_2, _, _} =
|
||||
leveled_sst:sst_newmerge("test/test_area/ledger_files/",
|
||||
"2_L3.sst",
|
||||
KL3_L3,
|
||||
KL4_L3,
|
||||
false,
|
||||
3,
|
||||
999999,
|
||||
#sst_options{},
|
||||
true,
|
||||
true),
|
||||
{ok, PidL3_2NC, _, _} =
|
||||
leveled_sst:sst_newmerge("test/test_area/ledger_files/",
|
||||
"2NC_L3.sst",
|
||||
KL3_L3,
|
||||
KL4_L3,
|
||||
false,
|
||||
3,
|
||||
999999,
|
||||
#sst_options{},
|
||||
true,
|
||||
false),
|
||||
|
||||
ME1 = #manifest_entry{owner=PidL3_1},
|
||||
ME1B = #manifest_entry{owner=PidL3_1B},
|
||||
ME2 = #manifest_entry{owner=PidL3_2},
|
||||
ME2NC = #manifest_entry{owner=PidL3_2NC},
|
||||
?assertMatch(ME1, grooming_scorer([ME1, ME2])),
|
||||
?assertMatch(ME1, grooming_scorer([ME2, ME1])),
|
||||
% prefer the file with the tombstone
|
||||
?assertMatch(ME2NC, grooming_scorer([ME1, ME2NC])),
|
||||
?assertMatch(ME2NC, grooming_scorer([ME2NC, ME1])),
|
||||
% not_counted > 1 - we will merge files in unexpected (i.e. legacy)
|
||||
% format first
|
||||
?assertMatch(ME1B, grooming_scorer([ME1B, ME2])),
|
||||
?assertMatch(ME2, grooming_scorer([ME2, ME1B])),
|
||||
% If the file with the tombstone is in the basement, it will have
|
||||
% no tombstone so the first file will be chosen
|
||||
|
||||
lists:foreach(fun(P) -> leveled_sst:sst_clear(P) end,
|
||||
[PidL3_1, PidL3_1B, PidL3_2, PidL3_2NC]).
|
||||
|
||||
|
||||
merge_file_test() ->
|
||||
ok = filelib:ensure_dir("test/test_area/ledger_files/"),
|
||||
KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)),
|
||||
|
@ -401,7 +505,10 @@ merge_file_test() ->
|
|||
"test/test_area/ledger_files/",
|
||||
3, #sst_options{}),
|
||||
|
||||
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)).
|
||||
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)),
|
||||
|
||||
lists:foreach(fun(P) -> leveled_sst:sst_clear(P) end,
|
||||
[PidL1_1, PidL2_1, PidL2_2, PidL2_3, PidL2_4]).
|
||||
|
||||
coverage_cheat_test() ->
|
||||
{ok, _State1} =
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
remove_manifest_entry/4,
|
||||
replace_manifest_entry/5,
|
||||
switch_manifest_entry/4,
|
||||
mergefile_selector/2,
|
||||
mergefile_selector/3,
|
||||
add_snapshot/3,
|
||||
release_snapshot/2,
|
||||
merge_snapshot/2,
|
||||
|
@ -60,6 +60,7 @@
|
|||
-define(TREE_WIDTH, 8).
|
||||
-define(PHANTOM_PID, r2d_fail).
|
||||
-define(MANIFESTS_TO_RETAIN, 5).
|
||||
-define(GROOM_SAMPLE, 16).
|
||||
|
||||
-record(manifest, {levels,
|
||||
% an array of lists or trees representing the manifest
|
||||
|
@ -82,6 +83,8 @@
|
|||
-type manifest() :: #manifest{}.
|
||||
-type manifest_entry() :: #manifest_entry{}.
|
||||
-type manifest_owner() :: pid()|list().
|
||||
-type selector_strategy() ::
|
||||
random|{grooming, fun((list(manifest_entry())) -> manifest_entry())}.
|
||||
|
||||
-export_type([manifest/0, manifest_entry/0, manifest_owner/0]).
|
||||
|
||||
|
@ -429,7 +432,8 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
|
|||
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
|
||||
|
||||
|
||||
-spec mergefile_selector(manifest(), integer()) -> manifest_entry().
|
||||
-spec mergefile_selector(manifest(), integer(), selector_strategy())
|
||||
-> manifest_entry().
|
||||
%% @doc
|
||||
%% An algorithm for discovering which files to merge ....
|
||||
%% We can find the most optimal file:
|
||||
|
@ -441,14 +445,29 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
|
|||
%% genuinely better - eventually every file has to be compacted.
|
||||
%%
|
||||
%% Hence, the initial implementation is to select files to merge at random
|
||||
mergefile_selector(Manifest, LevelIdx) when LevelIdx =< 1 ->
|
||||
mergefile_selector(Manifest, LevelIdx, _Strategy) when LevelIdx =< 1 ->
|
||||
Level = array:get(LevelIdx, Manifest#manifest.levels),
|
||||
lists:nth(leveled_rand:uniform(length(Level)), Level);
|
||||
mergefile_selector(Manifest, LevelIdx) ->
|
||||
mergefile_selector(Manifest, LevelIdx, random) ->
|
||||
Level = leveled_tree:to_list(array:get(LevelIdx,
|
||||
Manifest#manifest.levels)),
|
||||
{_SK, ME} = lists:nth(leveled_rand:uniform(length(Level)), Level),
|
||||
ME.
|
||||
ME;
|
||||
mergefile_selector(Manifest, LevelIdx, {grooming, ScoringFun}) ->
|
||||
Level = leveled_tree:to_list(array:get(LevelIdx,
|
||||
Manifest#manifest.levels)),
|
||||
SelectorFun =
|
||||
fun(_I, Acc) ->
|
||||
{_SK, ME} = lists:nth(leveled_rand:uniform(length(Level)), Level),
|
||||
[ME|Acc]
|
||||
end,
|
||||
Sample =
|
||||
lists:usort(lists:foldl(SelectorFun, [], lists:seq(1, ?GROOM_SAMPLE))),
|
||||
% Note that Entries may be less than GROOM_SAMPLE, if same one picked
|
||||
% multiple times. Level cannot be empty, as otherwise a merge would not
|
||||
% have been chosen at this level
|
||||
ScoringFun(Sample).
|
||||
|
||||
|
||||
-spec merge_snapshot(manifest(), manifest()) -> manifest().
|
||||
%% @doc
|
||||
|
@ -607,6 +626,7 @@ check_bloom(Manifest, FP, Hash) ->
|
|||
%%% Internal Functions
|
||||
%%%============================================================================
|
||||
|
||||
|
||||
-spec get_manifest_entry({tuple(), manifest_entry()}|manifest_entry())
|
||||
-> manifest_entry().
|
||||
%% @doc
|
||||
|
@ -1057,10 +1077,10 @@ changeup_setup(Man6) ->
|
|||
random_select_test() ->
|
||||
ManTuple = initial_setup(),
|
||||
LastManifest = element(7, ManTuple),
|
||||
L1File = mergefile_selector(LastManifest, 1),
|
||||
L1File = mergefile_selector(LastManifest, 1, random),
|
||||
% This blows up if the function is not prepared for the different format
|
||||
% https://github.com/martinsumner/leveled/issues/43
|
||||
_L2File = mergefile_selector(LastManifest, 2),
|
||||
_L2File = mergefile_selector(LastManifest, 2, random),
|
||||
Level1 = array:get(1, LastManifest#manifest.levels),
|
||||
?assertMatch(true, lists:member(L1File, Level1)).
|
||||
|
||||
|
|
|
@ -92,6 +92,7 @@
|
|||
-define(FLIPPER32, 4294967295).
|
||||
-define(COMPRESS_AT_LEVEL, 1).
|
||||
-define(INDEX_MODDATE, true).
|
||||
-define(TOMB_COUNT, true).
|
||||
-define(USE_SET_FOR_SPEED, 64).
|
||||
-define(STARTUP_TIMEOUT, 10000).
|
||||
|
||||
|
@ -111,7 +112,7 @@
|
|||
delete_pending/3]).
|
||||
|
||||
-export([sst_new/6,
|
||||
sst_new/8,
|
||||
sst_newmerge/8,
|
||||
sst_newlevelzero/7,
|
||||
sst_open/4,
|
||||
sst_get/2,
|
||||
|
@ -123,8 +124,15 @@
|
|||
sst_checkready/1,
|
||||
sst_switchlevels/2,
|
||||
sst_deleteconfirmed/1,
|
||||
sst_gettombcount/1,
|
||||
sst_close/1]).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
-export([sst_newmerge/10]).
|
||||
|
||||
-endif.
|
||||
|
||||
-export([tune_seglist/1, extract_hash/1, member_check/2]).
|
||||
|
||||
-export([in_range/3]).
|
||||
|
@ -158,7 +166,7 @@
|
|||
range_endpoint(),
|
||||
range_endpoint()}.
|
||||
-type expandable_pointer()
|
||||
:: slot_pointer()|sst_closed_pointer().
|
||||
:: slot_pointer()|sst_pointer()|sst_closed_pointer().
|
||||
-type expanded_pointer()
|
||||
:: leveled_codec:ledger_kv()|expandable_pointer().
|
||||
-type binaryslot_element()
|
||||
|
@ -169,8 +177,12 @@
|
|||
{list, list(non_neg_integer())}.
|
||||
-type sst_options()
|
||||
:: #sst_options{}.
|
||||
-type binary_slot()
|
||||
:: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}.
|
||||
-type sst_summary()
|
||||
:: #summary{}.
|
||||
|
||||
%% yield_blockquery is used to detemrine if the work necessary to process a
|
||||
%% yield_blockquery is used to determine if the work necessary to process a
|
||||
%% range query beyond the fetching the slot should be managed from within
|
||||
%% this process, or should be handled by the calling process.
|
||||
%% Handling within the calling process may lead to extra binary heap garbage
|
||||
|
@ -193,7 +205,9 @@
|
|||
fetch_cache = array:new([{size, ?CACHE_SIZE}]),
|
||||
new_slots :: list()|undefined,
|
||||
deferred_startup_tuple :: tuple()|undefined,
|
||||
level :: non_neg_integer()|undefined}).
|
||||
level :: non_neg_integer()|undefined,
|
||||
tomb_count = not_counted
|
||||
:: non_neg_integer()|not_counted}).
|
||||
|
||||
-record(sst_timings,
|
||||
{sample_count = 0 :: integer(),
|
||||
|
@ -265,7 +279,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
|
|||
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
|
||||
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
|
||||
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
|
||||
{[], [], SlotList, FK} =
|
||||
{[], [], SlotList, FK, _CountOfTombs} =
|
||||
merge_lists(KVList, OptsSST0, IndexModDate),
|
||||
case gen_fsm:sync_send_event(Pid,
|
||||
{sst_new,
|
||||
|
@ -276,13 +290,14 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
|
|||
MaxSQN,
|
||||
OptsSST0,
|
||||
IndexModDate,
|
||||
not_counted,
|
||||
self()},
|
||||
infinity) of
|
||||
{ok, {SK, EK}, Bloom} ->
|
||||
{ok, Pid, {SK, EK}, Bloom}
|
||||
end.
|
||||
|
||||
-spec sst_new(string(), string(),
|
||||
-spec sst_newmerge(string(), string(),
|
||||
list(leveled_codec:ledger_kv()|sst_pointer()),
|
||||
list(leveled_codec:ledger_kv()|sst_pointer()),
|
||||
boolean(), integer(),
|
||||
|
@ -302,23 +317,23 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
|
|||
%% The remainder of the lists is returned along with the StartKey and EndKey
|
||||
%% so that the remainder cna be used in the next file in the merge. It might
|
||||
%% be that the merge_lists returns nothing (for example when a basement file is
|
||||
%% all tombstones) - and the atome empty is returned in this case so that the
|
||||
%% all tombstones) - and the atom empty is returned in this case so that the
|
||||
%% file is not added to the manifest.
|
||||
sst_new(RootPath, Filename,
|
||||
sst_newmerge(RootPath, Filename,
|
||||
KVL1, KVL2, IsBasement, Level,
|
||||
MaxSQN, OptsSST) ->
|
||||
sst_new(RootPath, Filename,
|
||||
sst_newmerge(RootPath, Filename,
|
||||
KVL1, KVL2, IsBasement, Level,
|
||||
MaxSQN, OptsSST, ?INDEX_MODDATE).
|
||||
MaxSQN, OptsSST, ?INDEX_MODDATE, ?TOMB_COUNT).
|
||||
|
||||
sst_new(RootPath, Filename,
|
||||
sst_newmerge(RootPath, Filename,
|
||||
KVL1, KVL2, IsBasement, Level,
|
||||
MaxSQN, OptsSST, IndexModDate) ->
|
||||
MaxSQN, OptsSST, IndexModDate, TombCount) ->
|
||||
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
|
||||
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
|
||||
{Rem1, Rem2, SlotList, FK} =
|
||||
merge_lists(KVL1, KVL2, {IsBasement, Level},
|
||||
OptsSST0, IndexModDate),
|
||||
{Rem1, Rem2, SlotList, FK, CountOfTombs} =
|
||||
merge_lists(KVL1, KVL2, {IsBasement, Level}, OptsSST0,
|
||||
IndexModDate, TombCount),
|
||||
case SlotList of
|
||||
[] ->
|
||||
empty;
|
||||
|
@ -333,6 +348,7 @@ sst_new(RootPath, Filename,
|
|||
MaxSQN,
|
||||
OptsSST0,
|
||||
IndexModDate,
|
||||
CountOfTombs,
|
||||
self()},
|
||||
infinity) of
|
||||
{ok, {SK, EK}, Bloom} ->
|
||||
|
@ -428,6 +444,15 @@ sst_expandpointer(Pointer, MorePointers, ScanWidth, SegmentList, LowLastMod) ->
|
|||
sst_setfordelete(Pid, Penciller) ->
|
||||
gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity).
|
||||
|
||||
-spec sst_gettombcount(pid()) -> non_neg_integer()|not_counted.
|
||||
%% @doc
|
||||
%% Get the count of tomb stones in this SST file, returning not_counted if this
|
||||
%% file was created with a version which did not support tombstone counting, or
|
||||
%% could also be because the file is L0 (which aren't counted as being chosen
|
||||
%% for merge is inevitable)
|
||||
sst_gettombcount(Pid) ->
|
||||
gen_fsm:sync_send_event(Pid, get_tomb_count, infinity).
|
||||
|
||||
-spec sst_clear(pid()) -> ok.
|
||||
%% @doc
|
||||
%% For this file to be closed and deleted
|
||||
|
@ -497,17 +522,18 @@ starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) ->
|
|||
starting({sst_new,
|
||||
RootPath, Filename, Level,
|
||||
{SlotList, FirstKey}, MaxSQN,
|
||||
OptsSST, IdxModDate, StartingPID}, _From, State) ->
|
||||
OptsSST, IdxModDate, CountOfTombs, StartingPID}, _From, State) ->
|
||||
SW = os:timestamp(),
|
||||
leveled_log:save(OptsSST#sst_options.log_options),
|
||||
PressMethod = OptsSST#sst_options.press_method,
|
||||
{Length, SlotIndex, BlockIndex, SlotsBin, Bloom} =
|
||||
build_all_slots(SlotList),
|
||||
SummaryBin =
|
||||
build_table_summary(SlotIndex, Level, FirstKey, Length, MaxSQN, Bloom),
|
||||
build_table_summary(SlotIndex, Level, FirstKey, Length,
|
||||
MaxSQN, Bloom, CountOfTombs),
|
||||
ActualFilename =
|
||||
write_file(RootPath, Filename, SummaryBin, SlotsBin,
|
||||
PressMethod, IdxModDate),
|
||||
PressMethod, IdxModDate, CountOfTombs),
|
||||
YBQ = Level =< 2,
|
||||
{UpdState, Bloom} =
|
||||
read_file(ActualFilename,
|
||||
|
@ -530,7 +556,8 @@ starting({sst_newlevelzero, RootPath, Filename,
|
|||
Penciller, MaxSQN,
|
||||
OptsSST, IdxModDate}, _From, State) ->
|
||||
DeferredStartupTuple =
|
||||
{RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate},
|
||||
{RootPath, Filename, Penciller, MaxSQN, OptsSST,
|
||||
IdxModDate},
|
||||
{reply, ok, starting,
|
||||
State#state{deferred_startup_tuple = DeferredStartupTuple, level = 0}};
|
||||
starting(close, _From, State) ->
|
||||
|
@ -551,7 +578,7 @@ starting(complete_l0startup, State) ->
|
|||
Time0 = timer:now_diff(os:timestamp(), SW0),
|
||||
|
||||
SW1 = os:timestamp(),
|
||||
{[], [], SlotList, FirstKey} =
|
||||
{[], [], SlotList, FirstKey, _CountOfTombs} =
|
||||
merge_lists(KVList, OptsSST, IdxModDate),
|
||||
Time1 = timer:now_diff(os:timestamp(), SW1),
|
||||
|
||||
|
@ -562,13 +589,14 @@ starting(complete_l0startup, State) ->
|
|||
|
||||
SW3 = os:timestamp(),
|
||||
SummaryBin =
|
||||
build_table_summary(SlotIndex, 0, FirstKey, SlotCount, MaxSQN, Bloom),
|
||||
build_table_summary(SlotIndex, 0, FirstKey, SlotCount,
|
||||
MaxSQN, Bloom, not_counted),
|
||||
Time3 = timer:now_diff(os:timestamp(), SW3),
|
||||
|
||||
SW4 = os:timestamp(),
|
||||
ActualFilename =
|
||||
write_file(RootPath, Filename, SummaryBin, SlotsBin,
|
||||
PressMethod, IdxModDate),
|
||||
PressMethod, IdxModDate, not_counted),
|
||||
{UpdState, Bloom} =
|
||||
read_file(ActualFilename,
|
||||
State#state{root_path=RootPath,
|
||||
|
@ -716,6 +744,8 @@ reader(background_complete, _From, State) ->
|
|||
Summary#summary.last_key},
|
||||
reader,
|
||||
State};
|
||||
reader(get_tomb_count, _From, State) ->
|
||||
{reply, State#state.tomb_count, reader, State};
|
||||
reader(close, _From, State) ->
|
||||
ok = file:close(State#state.handle),
|
||||
{stop, normal, ok, State}.
|
||||
|
@ -1196,11 +1226,11 @@ compress_level(_Level, PressMethod) ->
|
|||
PressMethod.
|
||||
|
||||
write_file(RootPath, Filename, SummaryBin, SlotsBin,
|
||||
PressMethod, IdxModDate) ->
|
||||
PressMethod, IdxModDate, CountOfTombs) ->
|
||||
SummaryLength = byte_size(SummaryBin),
|
||||
SlotsLength = byte_size(SlotsBin),
|
||||
{PendingName, FinalName} = generate_filenames(Filename),
|
||||
FileVersion = gen_fileversion(PressMethod, IdxModDate),
|
||||
FileVersion = gen_fileversion(PressMethod, IdxModDate, CountOfTombs),
|
||||
case filelib:is_file(filename:join(RootPath, FinalName)) of
|
||||
true ->
|
||||
AltName = filename:join(RootPath, filename:basename(FinalName))
|
||||
|
@ -1210,6 +1240,7 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin,
|
|||
false ->
|
||||
ok
|
||||
end,
|
||||
|
||||
ok = leveled_util:safe_rename(filename:join(RootPath, PendingName),
|
||||
filename:join(RootPath, FinalName),
|
||||
<<FileVersion:8/integer,
|
||||
|
@ -1225,7 +1256,8 @@ read_file(Filename, State, LoadPageCache) ->
|
|||
open_reader(filename:join(State#state.root_path, Filename),
|
||||
LoadPageCache),
|
||||
UpdState0 = imp_fileversion(FileVersion, State),
|
||||
{Summary, Bloom, SlotList} = read_table_summary(SummaryBin),
|
||||
{Summary, Bloom, SlotList, TombCount} =
|
||||
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
|
||||
BlockIndexCache = array:new([{size, Summary#summary.size},
|
||||
{default, none}]),
|
||||
UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache},
|
||||
|
@ -1236,10 +1268,11 @@ read_file(Filename, State, LoadPageCache) ->
|
|||
Summary#summary.max_sqn]),
|
||||
{UpdState1#state{summary = UpdSummary,
|
||||
handle = Handle,
|
||||
filename = Filename},
|
||||
filename = Filename,
|
||||
tomb_count = TombCount},
|
||||
Bloom}.
|
||||
|
||||
gen_fileversion(PressMethod, IdxModDate) ->
|
||||
gen_fileversion(PressMethod, IdxModDate, CountOfTombs) ->
|
||||
% Native or none can be treated the same once written, as reader
|
||||
% does not need to know as compression info will be in header of the
|
||||
% block
|
||||
|
@ -1256,7 +1289,14 @@ gen_fileversion(PressMethod, IdxModDate) ->
|
|||
false ->
|
||||
0
|
||||
end,
|
||||
Bit1+ Bit2.
|
||||
Bit3 =
|
||||
case CountOfTombs of
|
||||
not_counted ->
|
||||
0;
|
||||
_ ->
|
||||
4
|
||||
end,
|
||||
Bit1 + Bit2 + Bit3.
|
||||
|
||||
imp_fileversion(VersionInt, State) ->
|
||||
UpdState0 =
|
||||
|
@ -1273,7 +1313,12 @@ imp_fileversion(VersionInt, State) ->
|
|||
2 ->
|
||||
UpdState0#state{index_moddate = true}
|
||||
end,
|
||||
UpdState1.
|
||||
case VersionInt band 4 of
|
||||
0 ->
|
||||
UpdState1;
|
||||
4 ->
|
||||
UpdState1#state{tomb_count = 0}
|
||||
end.
|
||||
|
||||
open_reader(Filename, LoadPageCache) ->
|
||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||
|
@ -1290,25 +1335,50 @@ open_reader(Filename, LoadPageCache) ->
|
|||
{ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength),
|
||||
{Handle, FileVersion, SummaryBin}.
|
||||
|
||||
build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN, Bloom) ->
|
||||
build_table_summary(SlotIndex, _Level, FirstKey,
|
||||
SlotCount, MaxSQN, Bloom, CountOfTombs) ->
|
||||
[{LastKey, _LastV}|_Rest] = SlotIndex,
|
||||
Summary = #summary{first_key = FirstKey,
|
||||
last_key = LastKey,
|
||||
size = SlotCount,
|
||||
max_sqn = MaxSQN},
|
||||
SummBin =
|
||||
SummBin0 =
|
||||
term_to_binary({Summary, Bloom, lists:reverse(SlotIndex)},
|
||||
?BINARY_SETTINGS),
|
||||
|
||||
SummBin =
|
||||
case CountOfTombs of
|
||||
not_counted ->
|
||||
SummBin0;
|
||||
I ->
|
||||
<<I:32/integer, SummBin0/binary>>
|
||||
end,
|
||||
|
||||
SummCRC = hmac(SummBin),
|
||||
<<SummCRC:32/integer, SummBin/binary>>.
|
||||
|
||||
read_table_summary(BinWithCheck) ->
|
||||
-spec read_table_summary(binary(), not_counted|non_neg_integer()) ->
|
||||
{sst_summary(),
|
||||
leveled_ebloom:bloom(),
|
||||
list(tuple()),
|
||||
not_counted|non_neg_integer()}.
|
||||
%% @doc
|
||||
%% Read the table summary - format varies depending on file version (presence
|
||||
%% of tomb count)
|
||||
read_table_summary(BinWithCheck, TombCount) ->
|
||||
<<SummCRC:32/integer, SummBin/binary>> = BinWithCheck,
|
||||
CRCCheck = hmac(SummBin),
|
||||
if
|
||||
CRCCheck == SummCRC ->
|
||||
% If not might it might be possible to rebuild from all the slots
|
||||
binary_to_term(SummBin)
|
||||
case TombCount of
|
||||
not_counted ->
|
||||
erlang:append_element(binary_to_term(SummBin),
|
||||
not_counted);
|
||||
_ ->
|
||||
<<I:32/integer, SummBin0/binary>> = SummBin,
|
||||
erlang:append_element(binary_to_term(SummBin0), I)
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
|
@ -1535,10 +1605,7 @@ take_max_lastmoddate(LMD, LMDAcc) ->
|
|||
press_method(),
|
||||
boolean(),
|
||||
build_timings()) ->
|
||||
{{binary(),
|
||||
binary(),
|
||||
list(integer()),
|
||||
leveled_codec:ledger_key()},
|
||||
{binary_slot(),
|
||||
build_timings()}.
|
||||
%% @doc
|
||||
%% Generate the serialised slot to be used when storing this sublist of keys
|
||||
|
@ -2258,7 +2325,7 @@ revert_position(Pos) ->
|
|||
%% large numbers of index keys are present - as well as improving compression
|
||||
%% ratios in the Ledger.
|
||||
%%
|
||||
%% The outcome of merge_lists/3 and merge_lists/5 should be an list of slots.
|
||||
%% The outcome of merge_lists/3 and merge_lists/6 should be an list of slots.
|
||||
%% Each slot should be ordered by Key and be of the form {Flag, KVList}, where
|
||||
%% Flag can either be lookup or no-lookup. The list of slots should also be
|
||||
%% ordered by Key (i.e. the first key in the slot)
|
||||
|
@ -2275,17 +2342,19 @@ revert_position(Pos) ->
|
|||
%% any lower sequence numbers should be compacted out of existence
|
||||
|
||||
-spec merge_lists(list(), sst_options(), boolean())
|
||||
-> {list(), list(), list(tuple()), tuple()|null}.
|
||||
-> {list(), list(), list(binary_slot()),
|
||||
tuple()|null, non_neg_integer()|not_counted}.
|
||||
%% @doc
|
||||
%%
|
||||
%% Merge from asingle list (i.e. at Level 0)
|
||||
%% Merge from a single list (i.e. at Level 0)
|
||||
merge_lists(KVList1, SSTOpts, IdxModDate) ->
|
||||
SlotCount = length(KVList1) div ?LOOK_SLOTSIZE,
|
||||
{[],
|
||||
[],
|
||||
split_lists(KVList1, [],
|
||||
SlotCount, SSTOpts#sst_options.press_method, IdxModDate),
|
||||
element(1, lists:nth(1, KVList1))}.
|
||||
element(1, lists:nth(1, KVList1)),
|
||||
not_counted}.
|
||||
|
||||
|
||||
split_lists([], SlotLists, 0, _PressMethod, _IdxModDate) ->
|
||||
|
@ -2301,35 +2370,57 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) ->
|
|||
split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod, IdxModDate).
|
||||
|
||||
|
||||
-spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) ->
|
||||
{list(), list(), list(tuple()), tuple()|null}.
|
||||
-spec merge_lists(list(), list(), tuple(), sst_options(),
|
||||
boolean(), boolean()) ->
|
||||
{list(), list(), list(binary_slot()), tuple()|null,
|
||||
non_neg_integer()}.
|
||||
%% @doc
|
||||
%% Merge lists when merging across more than one file. KVLists that are
|
||||
%% provided may include pointers to fetch more Keys/Values from the source
|
||||
%% file
|
||||
merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) ->
|
||||
merge_lists(KVList1, KVList2, LevelInfo, SSTOpts,
|
||||
IndexModDate, SaveTombCount) ->
|
||||
InitTombCount =
|
||||
case SaveTombCount of true -> 0; false -> not_counted end,
|
||||
merge_lists(KVList1, KVList2,
|
||||
LevelInfo,
|
||||
[], null, 0,
|
||||
SSTOpts#sst_options.max_sstslots,
|
||||
SSTOpts#sst_options.press_method,
|
||||
IndexModDate,
|
||||
InitTombCount,
|
||||
#build_timings{}).
|
||||
|
||||
|
||||
-spec merge_lists(
|
||||
list(expanded_pointer()),
|
||||
list(expanded_pointer()),
|
||||
{boolean(), non_neg_integer()},
|
||||
list(binary_slot()),
|
||||
leveled_codec:ledger_key()|null,
|
||||
non_neg_integer(),
|
||||
non_neg_integer(),
|
||||
press_method(),
|
||||
boolean(),
|
||||
non_neg_integer()|not_counted,
|
||||
build_timings()) ->
|
||||
{list(expanded_pointer()), list(expanded_pointer()),
|
||||
list(binary_slot()), leveled_codec:ledger_key()|null,
|
||||
non_neg_integer()|not_counted}.
|
||||
|
||||
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, MaxSlots, MaxSlots,
|
||||
_PressMethod, _IdxModDate, T0) ->
|
||||
_PressMethod, _IdxModDate, CountOfTombs, T0) ->
|
||||
% This SST file is full, move to complete file, and return the
|
||||
% remainder
|
||||
log_buildtimings(T0, LI),
|
||||
{KVL1, KVL2, lists:reverse(SlotList), FirstKey};
|
||||
{KVL1, KVL2, lists:reverse(SlotList), FirstKey, CountOfTombs};
|
||||
merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, _MaxSlots,
|
||||
_PressMethod, _IdxModDate, T0) ->
|
||||
_PressMethod, _IdxModDate, CountOfTombs, T0) ->
|
||||
% the source files are empty, complete the file
|
||||
log_buildtimings(T0, LI),
|
||||
{[], [], lists:reverse(SlotList), FirstKey};
|
||||
{[], [], lists:reverse(SlotList), FirstKey, CountOfTombs};
|
||||
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
|
||||
PressMethod, IdxModDate, T0) ->
|
||||
PressMethod, IdxModDate, CountOfTombs, T0) ->
|
||||
% Form a slot by merging the two lists until the next 128 K/V pairs have
|
||||
% been determined
|
||||
SW = os:timestamp(),
|
||||
|
@ -2348,6 +2439,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
|
|||
MaxSlots,
|
||||
PressMethod,
|
||||
IdxModDate,
|
||||
CountOfTombs,
|
||||
T1);
|
||||
{Lookup, KVL} ->
|
||||
% Convert the list of KVs for the slot into a binary, and related
|
||||
|
@ -2363,9 +2455,42 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
|
|||
MaxSlots,
|
||||
PressMethod,
|
||||
IdxModDate,
|
||||
count_tombs(KVL, CountOfTombs),
|
||||
T2)
|
||||
end.
|
||||
|
||||
|
||||
-spec count_tombs(list(leveled_codec:ledger_kv()),
|
||||
non_neg_integer()|not_counted) ->
|
||||
non_neg_integer()|not_counted.
|
||||
%% @doc
|
||||
%% Count the tombstones in a list of KVs
|
||||
count_tombs(_KVL, not_counted) ->
|
||||
not_counted;
|
||||
count_tombs(KVL, InitCount) ->
|
||||
FoldFun =
|
||||
fun(KV, Count) ->
|
||||
case leveled_codec:strip_to_statusonly(KV) of
|
||||
tomb ->
|
||||
Count + 1;
|
||||
_ ->
|
||||
Count
|
||||
end
|
||||
end,
|
||||
lists:foldl(FoldFun, InitCount, KVL).
|
||||
|
||||
-spec form_slot(list(expanded_pointer()),
|
||||
list(expanded_pointer()),
|
||||
{boolean(), non_neg_integer()},
|
||||
lookup|no_lookup,
|
||||
non_neg_integer(),
|
||||
list(leveled_codec:ledger_kv()),
|
||||
leveled_codec:ledger_key()|null) ->
|
||||
{list(expanded_pointer()), list(expanded_pointer()),
|
||||
{lookup|no_lookup, list(leveled_codec:ledger_kv())},
|
||||
leveled_codec:ledger_key()}.
|
||||
%% @doc
|
||||
%% Merge together Key Value lists to provide an ordered slot of KVs
|
||||
form_slot([], [], _LI, Type, _Size, Slot, FK) ->
|
||||
{[], [], {Type, lists:reverse(Slot)}, FK};
|
||||
form_slot(KVList1, KVList2, _LI, lookup, ?LOOK_SLOTSIZE, Slot, FK) ->
|
||||
|
@ -2644,8 +2769,8 @@ testsst_new(RootPath, Filename,
|
|||
OptsSST =
|
||||
#sst_options{press_method=PressMethod,
|
||||
log_options=leveled_log:get_opts()},
|
||||
sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN,
|
||||
OptsSST, false).
|
||||
sst_newmerge(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN,
|
||||
OptsSST, false, false).
|
||||
|
||||
generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
|
||||
generate_randomkeys(Seqn,
|
||||
|
@ -2695,11 +2820,61 @@ generate_indexkey(Term, Count) ->
|
|||
infinity).
|
||||
|
||||
|
||||
tombcount_test() ->
|
||||
N = 1600,
|
||||
KL1 = generate_randomkeys(N div 2 + 1, N, 1, 4),
|
||||
KL2 = generate_indexkeys(N div 2),
|
||||
FlipToTombFun =
|
||||
fun({K, V}) ->
|
||||
case leveled_rand:uniform(10) of
|
||||
X when X > 5 ->
|
||||
{K, setelement(2, V, tomb)};
|
||||
_ ->
|
||||
{K, V}
|
||||
end
|
||||
end,
|
||||
KVL1 = lists:map(FlipToTombFun, KL1),
|
||||
KVL2 = lists:map(FlipToTombFun, KL2),
|
||||
CountTombFun =
|
||||
fun({_K, V}, Acc) ->
|
||||
case element(2, V) of
|
||||
tomb ->
|
||||
Acc + 1;
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
ExpectedCount = lists:foldl(CountTombFun, 0, KVL1 ++ KVL2),
|
||||
|
||||
{RP, Filename} = {?TEST_AREA, "tombcount_test"},
|
||||
OptsSST =
|
||||
#sst_options{press_method=native,
|
||||
log_options=leveled_log:get_opts()},
|
||||
{ok, SST1, _KD, _BB} = sst_newmerge(RP, Filename,
|
||||
KVL1, KVL2, false, 2,
|
||||
N, OptsSST, false, false),
|
||||
?assertMatch(not_counted, sst_gettombcount(SST1)),
|
||||
ok = sst_close(SST1),
|
||||
ok = file:delete(filename:join(RP, Filename ++ ".sst")),
|
||||
|
||||
{ok, SST2, _KD, _BB} = sst_newmerge(RP, Filename,
|
||||
KVL1, KVL2, false, 2,
|
||||
N, OptsSST, false, true),
|
||||
|
||||
?assertMatch(ExpectedCount, sst_gettombcount(SST2)),
|
||||
ok = sst_close(SST2),
|
||||
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
||||
|
||||
|
||||
|
||||
form_slot_test() ->
|
||||
% If a skip key happens, mustn't switch to loookup by accident as could be
|
||||
% over the expected size
|
||||
SkippingKV = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}},
|
||||
Slot = [{{o, "B1", "K5", null}, {5, active, 99234567, {}}}],
|
||||
SkippingKV =
|
||||
{{o, "B1", "K9999", null}, {9999, tomb, {1234568, 1234567}, {}}},
|
||||
Slot =
|
||||
[{{o, "B1", "K5", null},
|
||||
{5, {active, infinity}, {99234568, 99234567}, {}}}],
|
||||
R1 = form_slot([SkippingKV], [],
|
||||
{true, 99999999},
|
||||
no_lookup,
|
||||
|
@ -2709,19 +2884,26 @@ form_slot_test() ->
|
|||
?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1).
|
||||
|
||||
merge_tombstonelist_test() ->
|
||||
% Merge lists with nothing but tombstones
|
||||
SkippingKV1 = {{o, "B1", "K9995", null}, {9995, tomb, 1234567, {}}},
|
||||
SkippingKV2 = {{o, "B1", "K9996", null}, {9996, tomb, 1234567, {}}},
|
||||
SkippingKV3 = {{o, "B1", "K9997", null}, {9997, tomb, 1234567, {}}},
|
||||
SkippingKV4 = {{o, "B1", "K9998", null}, {9998, tomb, 1234567, {}}},
|
||||
SkippingKV5 = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}},
|
||||
% Merge lists with nothing but tombstones, and file at basement level
|
||||
SkippingKV1 =
|
||||
{{o, "B1", "K9995", null}, {9995, tomb, {1234568, 1234567}, {}}},
|
||||
SkippingKV2 =
|
||||
{{o, "B1", "K9996", null}, {9996, tomb, {1234568, 1234567}, {}}},
|
||||
SkippingKV3 =
|
||||
{{o, "B1", "K9997", null}, {9997, tomb, {1234568, 1234567}, {}}},
|
||||
SkippingKV4 =
|
||||
{{o, "B1", "K9998", null}, {9998, tomb, {1234568, 1234567}, {}}},
|
||||
SkippingKV5 =
|
||||
{{o, "B1", "K9999", null}, {9999, tomb, {1234568, 1234567}, {}}},
|
||||
R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5],
|
||||
[SkippingKV2, SkippingKV4],
|
||||
{true, 9999999},
|
||||
#sst_options{press_method = native,
|
||||
max_sstslots = 256},
|
||||
?INDEX_MODDATE),
|
||||
?assertMatch({[], [], [], null}, R).
|
||||
?INDEX_MODDATE,
|
||||
true),
|
||||
|
||||
?assertMatch({[], [], [], null, 0}, R).
|
||||
|
||||
indexed_list_test() ->
|
||||
io:format(user, "~nIndexed list timing test:~n", []),
|
||||
|
|
|
@ -196,7 +196,7 @@ bigsst_littlesst(_Config) ->
|
|||
{compression_point, on_compact}],
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
ObjL1 =
|
||||
testutil:generate_objects(60000, 1, [],
|
||||
testutil:generate_objects(80000, 1, [],
|
||||
leveled_rand:rand_bytes(100),
|
||||
fun() -> [] end, <<"B">>),
|
||||
testutil:riakload(Bookie1, ObjL1),
|
||||
|
|
|
@ -26,7 +26,7 @@ all() -> [
|
|||
|
||||
|
||||
basic_riak(_Config) ->
|
||||
basic_riak_tester(<<"B0">>, 120000),
|
||||
basic_riak_tester(<<"B0">>, 640000),
|
||||
basic_riak_tester({<<"Type0">>, <<"B0">>}, 80000).
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue