Add counting of tombstones to new SST files

.. and that old-style SST files cna still be created, and opened, with a return of 'not_counted'
This commit is contained in:
Martin Sumner 2020-03-27 10:20:10 +00:00
parent 4ef0f4006d
commit aca945a171
3 changed files with 249 additions and 73 deletions

View file

@ -260,9 +260,9 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
length(Additions)), length(Additions)),
leveled_log:log("PC012", [NewSQN, FileName, SinkB]), leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(), TS1 = os:timestamp(),
case leveled_sst:sst_new(RP, FileName, case leveled_sst:sst_newmerge(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN, KL1, KL2, SinkB, SinkLevel, MaxSQN,
OptsSST) of OptsSST) of
empty -> empty ->
leveled_log:log("PC013", [FileName]), leveled_log:log("PC013", [FileName]),
do_merge([], [], do_merge([], [],

View file

@ -92,6 +92,7 @@
-define(FLIPPER32, 4294967295). -define(FLIPPER32, 4294967295).
-define(COMPRESS_AT_LEVEL, 1). -define(COMPRESS_AT_LEVEL, 1).
-define(INDEX_MODDATE, true). -define(INDEX_MODDATE, true).
-define(TOMB_COUNT, true).
-define(USE_SET_FOR_SPEED, 64). -define(USE_SET_FOR_SPEED, 64).
-define(STARTUP_TIMEOUT, 10000). -define(STARTUP_TIMEOUT, 10000).
@ -111,7 +112,7 @@
delete_pending/3]). delete_pending/3]).
-export([sst_new/6, -export([sst_new/6,
sst_new/8, sst_newmerge/8,
sst_newlevelzero/7, sst_newlevelzero/7,
sst_open/4, sst_open/4,
sst_get/2, sst_get/2,
@ -123,6 +124,7 @@
sst_checkready/1, sst_checkready/1,
sst_switchlevels/2, sst_switchlevels/2,
sst_deleteconfirmed/1, sst_deleteconfirmed/1,
sst_gettombcount/1,
sst_close/1]). sst_close/1]).
-export([tune_seglist/1, extract_hash/1, member_check/2]). -export([tune_seglist/1, extract_hash/1, member_check/2]).
@ -158,7 +160,7 @@
range_endpoint(), range_endpoint(),
range_endpoint()}. range_endpoint()}.
-type expandable_pointer() -type expandable_pointer()
:: slot_pointer()|sst_closed_pointer(). :: slot_pointer()|sst_pointer()|sst_closed_pointer().
-type expanded_pointer() -type expanded_pointer()
:: leveled_codec:ledger_kv()|expandable_pointer(). :: leveled_codec:ledger_kv()|expandable_pointer().
-type binaryslot_element() -type binaryslot_element()
@ -168,9 +170,13 @@
{sets, sets:set(non_neg_integer())}| {sets, sets:set(non_neg_integer())}|
{list, list(non_neg_integer())}. {list, list(non_neg_integer())}.
-type sst_options() -type sst_options()
:: #sst_options{}. :: #sst_options{}.
-type binary_slot()
:: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}.
-type sst_summary()
:: #summary{}.
%% yield_blockquery is used to detemrine if the work necessary to process a %% yield_blockquery is used to determine if the work necessary to process a
%% range query beyond the fetching the slot should be managed from within %% range query beyond the fetching the slot should be managed from within
%% this process, or should be handled by the calling process. %% this process, or should be handled by the calling process.
%% Handling within the calling process may lead to extra binary heap garbage %% Handling within the calling process may lead to extra binary heap garbage
@ -193,7 +199,9 @@
fetch_cache = array:new([{size, ?CACHE_SIZE}]), fetch_cache = array:new([{size, ?CACHE_SIZE}]),
new_slots :: list()|undefined, new_slots :: list()|undefined,
deferred_startup_tuple :: tuple()|undefined, deferred_startup_tuple :: tuple()|undefined,
level :: non_neg_integer()|undefined}). level :: non_neg_integer()|undefined,
tomb_count = not_counted
:: non_neg_integer()|not_counted}).
-record(sst_timings, -record(sst_timings,
{sample_count = 0 :: integer(), {sample_count = 0 :: integer(),
@ -265,7 +273,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []), {ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{[], [], SlotList, FK} = {[], [], SlotList, FK, _CountOfTombs} =
merge_lists(KVList, OptsSST0, IndexModDate), merge_lists(KVList, OptsSST0, IndexModDate),
case gen_fsm:sync_send_event(Pid, case gen_fsm:sync_send_event(Pid,
{sst_new, {sst_new,
@ -276,17 +284,18 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
MaxSQN, MaxSQN,
OptsSST0, OptsSST0,
IndexModDate, IndexModDate,
not_counted,
self()}, self()},
infinity) of infinity) of
{ok, {SK, EK}, Bloom} -> {ok, {SK, EK}, Bloom} ->
{ok, Pid, {SK, EK}, Bloom} {ok, Pid, {SK, EK}, Bloom}
end. end.
-spec sst_new(string(), string(), -spec sst_newmerge(string(), string(),
list(leveled_codec:ledger_kv()|sst_pointer()), list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()), list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(), integer(), boolean(), integer(),
integer(), sst_options()) integer(), sst_options())
-> empty|{ok, pid(), -> empty|{ok, pid(),
{{list(leveled_codec:ledger_kv()), {{list(leveled_codec:ledger_kv()),
list(leveled_codec:ledger_kv())}, list(leveled_codec:ledger_kv())},
@ -302,23 +311,23 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
%% The remainder of the lists is returned along with the StartKey and EndKey %% The remainder of the lists is returned along with the StartKey and EndKey
%% so that the remainder cna be used in the next file in the merge. It might %% so that the remainder cna be used in the next file in the merge. It might
%% be that the merge_lists returns nothing (for example when a basement file is %% be that the merge_lists returns nothing (for example when a basement file is
%% all tombstones) - and the atome empty is returned in this case so that the %% all tombstones) - and the atom empty is returned in this case so that the
%% file is not added to the manifest. %% file is not added to the manifest.
sst_new(RootPath, Filename, sst_newmerge(RootPath, Filename,
KVL1, KVL2, IsBasement, Level, KVL1, KVL2, IsBasement, Level,
MaxSQN, OptsSST) -> MaxSQN, OptsSST) ->
sst_new(RootPath, Filename, sst_newmerge(RootPath, Filename,
KVL1, KVL2, IsBasement, Level, KVL1, KVL2, IsBasement, Level,
MaxSQN, OptsSST, ?INDEX_MODDATE). MaxSQN, OptsSST, ?INDEX_MODDATE, ?TOMB_COUNT).
sst_new(RootPath, Filename, sst_newmerge(RootPath, Filename,
KVL1, KVL2, IsBasement, Level, KVL1, KVL2, IsBasement, Level,
MaxSQN, OptsSST, IndexModDate) -> MaxSQN, OptsSST, IndexModDate, TombCount) ->
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{Rem1, Rem2, SlotList, FK} = {Rem1, Rem2, SlotList, FK, CountOfTombs} =
merge_lists(KVL1, KVL2, {IsBasement, Level}, merge_lists(KVL1, KVL2, {IsBasement, Level}, OptsSST0,
OptsSST0, IndexModDate), IndexModDate, TombCount),
case SlotList of case SlotList of
[] -> [] ->
empty; empty;
@ -333,6 +342,7 @@ sst_new(RootPath, Filename,
MaxSQN, MaxSQN,
OptsSST0, OptsSST0,
IndexModDate, IndexModDate,
CountOfTombs,
self()}, self()},
infinity) of infinity) of
{ok, {SK, EK}, Bloom} -> {ok, {SK, EK}, Bloom} ->
@ -428,6 +438,15 @@ sst_expandpointer(Pointer, MorePointers, ScanWidth, SegmentList, LowLastMod) ->
sst_setfordelete(Pid, Penciller) -> sst_setfordelete(Pid, Penciller) ->
gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity). gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity).
-spec sst_gettombcount(pid()) -> non_neg_integer()|not_counted.
%% @doc
%% Get the count of tomb stones in this SST file, returning not_counted if this
%% file was created with a version which did not support tombstone counting, or
%% could also be because the file is L0 (which aren't counted as being chosen
%% for merge is inevitable)
sst_gettombcount(Pid) ->
gen_fsm:sync_send_event(Pid, get_tomb_count, infinity).
-spec sst_clear(pid()) -> ok. -spec sst_clear(pid()) -> ok.
%% @doc %% @doc
%% For this file to be closed and deleted %% For this file to be closed and deleted
@ -497,17 +516,18 @@ starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) ->
starting({sst_new, starting({sst_new,
RootPath, Filename, Level, RootPath, Filename, Level,
{SlotList, FirstKey}, MaxSQN, {SlotList, FirstKey}, MaxSQN,
OptsSST, IdxModDate, StartingPID}, _From, State) -> OptsSST, IdxModDate, CountOfTombs, StartingPID}, _From, State) ->
SW = os:timestamp(), SW = os:timestamp(),
leveled_log:save(OptsSST#sst_options.log_options), leveled_log:save(OptsSST#sst_options.log_options),
PressMethod = OptsSST#sst_options.press_method, PressMethod = OptsSST#sst_options.press_method,
{Length, SlotIndex, BlockIndex, SlotsBin, Bloom} = {Length, SlotIndex, BlockIndex, SlotsBin, Bloom} =
build_all_slots(SlotList), build_all_slots(SlotList),
SummaryBin = SummaryBin =
build_table_summary(SlotIndex, Level, FirstKey, Length, MaxSQN, Bloom), build_table_summary(SlotIndex, Level, FirstKey, Length,
MaxSQN, Bloom, CountOfTombs),
ActualFilename = ActualFilename =
write_file(RootPath, Filename, SummaryBin, SlotsBin, write_file(RootPath, Filename, SummaryBin, SlotsBin,
PressMethod, IdxModDate), PressMethod, IdxModDate, CountOfTombs),
YBQ = Level =< 2, YBQ = Level =< 2,
{UpdState, Bloom} = {UpdState, Bloom} =
read_file(ActualFilename, read_file(ActualFilename,
@ -530,7 +550,8 @@ starting({sst_newlevelzero, RootPath, Filename,
Penciller, MaxSQN, Penciller, MaxSQN,
OptsSST, IdxModDate}, _From, State) -> OptsSST, IdxModDate}, _From, State) ->
DeferredStartupTuple = DeferredStartupTuple =
{RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate}, {RootPath, Filename, Penciller, MaxSQN, OptsSST,
IdxModDate},
{reply, ok, starting, {reply, ok, starting,
State#state{deferred_startup_tuple = DeferredStartupTuple, level = 0}}; State#state{deferred_startup_tuple = DeferredStartupTuple, level = 0}};
starting(close, _From, State) -> starting(close, _From, State) ->
@ -551,7 +572,7 @@ starting(complete_l0startup, State) ->
Time0 = timer:now_diff(os:timestamp(), SW0), Time0 = timer:now_diff(os:timestamp(), SW0),
SW1 = os:timestamp(), SW1 = os:timestamp(),
{[], [], SlotList, FirstKey} = {[], [], SlotList, FirstKey, _CountOfTombs} =
merge_lists(KVList, OptsSST, IdxModDate), merge_lists(KVList, OptsSST, IdxModDate),
Time1 = timer:now_diff(os:timestamp(), SW1), Time1 = timer:now_diff(os:timestamp(), SW1),
@ -562,13 +583,14 @@ starting(complete_l0startup, State) ->
SW3 = os:timestamp(), SW3 = os:timestamp(),
SummaryBin = SummaryBin =
build_table_summary(SlotIndex, 0, FirstKey, SlotCount, MaxSQN, Bloom), build_table_summary(SlotIndex, 0, FirstKey, SlotCount,
MaxSQN, Bloom, not_counted),
Time3 = timer:now_diff(os:timestamp(), SW3), Time3 = timer:now_diff(os:timestamp(), SW3),
SW4 = os:timestamp(), SW4 = os:timestamp(),
ActualFilename = ActualFilename =
write_file(RootPath, Filename, SummaryBin, SlotsBin, write_file(RootPath, Filename, SummaryBin, SlotsBin,
PressMethod, IdxModDate), PressMethod, IdxModDate, not_counted),
{UpdState, Bloom} = {UpdState, Bloom} =
read_file(ActualFilename, read_file(ActualFilename,
State#state{root_path=RootPath, State#state{root_path=RootPath,
@ -716,6 +738,8 @@ reader(background_complete, _From, State) ->
Summary#summary.last_key}, Summary#summary.last_key},
reader, reader,
State}; State};
reader(get_tomb_count, _From, State) ->
{reply, State#state.tomb_count, reader, State};
reader(close, _From, State) -> reader(close, _From, State) ->
ok = file:close(State#state.handle), ok = file:close(State#state.handle),
{stop, normal, ok, State}. {stop, normal, ok, State}.
@ -1196,11 +1220,11 @@ compress_level(_Level, PressMethod) ->
PressMethod. PressMethod.
write_file(RootPath, Filename, SummaryBin, SlotsBin, write_file(RootPath, Filename, SummaryBin, SlotsBin,
PressMethod, IdxModDate) -> PressMethod, IdxModDate, CountOfTombs) ->
SummaryLength = byte_size(SummaryBin), SummaryLength = byte_size(SummaryBin),
SlotsLength = byte_size(SlotsBin), SlotsLength = byte_size(SlotsBin),
{PendingName, FinalName} = generate_filenames(Filename), {PendingName, FinalName} = generate_filenames(Filename),
FileVersion = gen_fileversion(PressMethod, IdxModDate), FileVersion = gen_fileversion(PressMethod, IdxModDate, CountOfTombs),
case filelib:is_file(filename:join(RootPath, FinalName)) of case filelib:is_file(filename:join(RootPath, FinalName)) of
true -> true ->
AltName = filename:join(RootPath, filename:basename(FinalName)) AltName = filename:join(RootPath, filename:basename(FinalName))
@ -1210,6 +1234,7 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin,
false -> false ->
ok ok
end, end,
ok = leveled_util:safe_rename(filename:join(RootPath, PendingName), ok = leveled_util:safe_rename(filename:join(RootPath, PendingName),
filename:join(RootPath, FinalName), filename:join(RootPath, FinalName),
<<FileVersion:8/integer, <<FileVersion:8/integer,
@ -1225,7 +1250,8 @@ read_file(Filename, State, LoadPageCache) ->
open_reader(filename:join(State#state.root_path, Filename), open_reader(filename:join(State#state.root_path, Filename),
LoadPageCache), LoadPageCache),
UpdState0 = imp_fileversion(FileVersion, State), UpdState0 = imp_fileversion(FileVersion, State),
{Summary, Bloom, SlotList} = read_table_summary(SummaryBin), {Summary, Bloom, SlotList, TombCount} =
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
BlockIndexCache = array:new([{size, Summary#summary.size}, BlockIndexCache = array:new([{size, Summary#summary.size},
{default, none}]), {default, none}]),
UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache}, UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache},
@ -1235,11 +1261,12 @@ read_file(Filename, State, LoadPageCache) ->
Summary#summary.size, Summary#summary.size,
Summary#summary.max_sqn]), Summary#summary.max_sqn]),
{UpdState1#state{summary = UpdSummary, {UpdState1#state{summary = UpdSummary,
handle = Handle, handle = Handle,
filename = Filename}, filename = Filename,
tomb_count = TombCount},
Bloom}. Bloom}.
gen_fileversion(PressMethod, IdxModDate) -> gen_fileversion(PressMethod, IdxModDate, CountOfTombs) ->
% Native or none can be treated the same once written, as reader % Native or none can be treated the same once written, as reader
% does not need to know as compression info will be in header of the % does not need to know as compression info will be in header of the
% block % block
@ -1256,7 +1283,14 @@ gen_fileversion(PressMethod, IdxModDate) ->
false -> false ->
0 0
end, end,
Bit1+ Bit2. Bit3 =
case CountOfTombs of
not_counted ->
0;
_ ->
4
end,
Bit1 + Bit2 + Bit3.
imp_fileversion(VersionInt, State) -> imp_fileversion(VersionInt, State) ->
UpdState0 = UpdState0 =
@ -1273,7 +1307,12 @@ imp_fileversion(VersionInt, State) ->
2 -> 2 ->
UpdState0#state{index_moddate = true} UpdState0#state{index_moddate = true}
end, end,
UpdState1. case VersionInt band 4 of
0 ->
UpdState1;
4 ->
UpdState1#state{tomb_count = 0}
end.
open_reader(Filename, LoadPageCache) -> open_reader(Filename, LoadPageCache) ->
{ok, Handle} = file:open(Filename, [binary, raw, read]), {ok, Handle} = file:open(Filename, [binary, raw, read]),
@ -1290,25 +1329,50 @@ open_reader(Filename, LoadPageCache) ->
{ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength), {ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength),
{Handle, FileVersion, SummaryBin}. {Handle, FileVersion, SummaryBin}.
build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN, Bloom) -> build_table_summary(SlotIndex, _Level, FirstKey,
SlotCount, MaxSQN, Bloom, CountOfTombs) ->
[{LastKey, _LastV}|_Rest] = SlotIndex, [{LastKey, _LastV}|_Rest] = SlotIndex,
Summary = #summary{first_key = FirstKey, Summary = #summary{first_key = FirstKey,
last_key = LastKey, last_key = LastKey,
size = SlotCount, size = SlotCount,
max_sqn = MaxSQN}, max_sqn = MaxSQN},
SummBin = SummBin0 =
term_to_binary({Summary, Bloom, lists:reverse(SlotIndex)}, term_to_binary({Summary, Bloom, lists:reverse(SlotIndex)},
?BINARY_SETTINGS), ?BINARY_SETTINGS),
SummBin =
case CountOfTombs of
not_counted ->
SummBin0;
I ->
<<I:32/integer, SummBin0/binary>>
end,
SummCRC = hmac(SummBin), SummCRC = hmac(SummBin),
<<SummCRC:32/integer, SummBin/binary>>. <<SummCRC:32/integer, SummBin/binary>>.
read_table_summary(BinWithCheck) -> -spec read_table_summary(binary(), not_counted|non_neg_integer()) ->
{sst_summary(),
leveled_ebloom:bloom(),
list(tuple()),
not_counted|non_neg_integer()}.
%% @doc
%% Read the table summary - format varies depending on file version (presence
%5 of tomb count)
read_table_summary(BinWithCheck, TombCount) ->
<<SummCRC:32/integer, SummBin/binary>> = BinWithCheck, <<SummCRC:32/integer, SummBin/binary>> = BinWithCheck,
CRCCheck = hmac(SummBin), CRCCheck = hmac(SummBin),
if if
CRCCheck == SummCRC -> CRCCheck == SummCRC ->
% If not might it might be possible to rebuild from all the slots % If not might it might be possible to rebuild from all the slots
binary_to_term(SummBin) case TombCount of
not_counted ->
erlang:append_element(binary_to_term(SummBin),
not_counted);
_ ->
<<I:32/integer, SummBin0/binary>> = SummBin,
erlang:append_element(binary_to_term(SummBin0), I)
end
end. end.
@ -1535,10 +1599,7 @@ take_max_lastmoddate(LMD, LMDAcc) ->
press_method(), press_method(),
boolean(), boolean(),
build_timings()) -> build_timings()) ->
{{binary(), {binary_slot(),
binary(),
list(integer()),
leveled_codec:ledger_key()},
build_timings()}. build_timings()}.
%% @doc %% @doc
%% Generate the serialised slot to be used when storing this sublist of keys %% Generate the serialised slot to be used when storing this sublist of keys
@ -2258,7 +2319,7 @@ revert_position(Pos) ->
%% large numbers of index keys are present - as well as improving compression %% large numbers of index keys are present - as well as improving compression
%% ratios in the Ledger. %% ratios in the Ledger.
%% %%
%% The outcome of merge_lists/3 and merge_lists/5 should be an list of slots. %% The outcome of merge_lists/3 and merge_lists/6 should be an list of slots.
%% Each slot should be ordered by Key and be of the form {Flag, KVList}, where %% Each slot should be ordered by Key and be of the form {Flag, KVList}, where
%% Flag can either be lookup or no-lookup. The list of slots should also be %% Flag can either be lookup or no-lookup. The list of slots should also be
%% ordered by Key (i.e. the first key in the slot) %% ordered by Key (i.e. the first key in the slot)
@ -2275,17 +2336,19 @@ revert_position(Pos) ->
%% any lower sequence numbers should be compacted out of existence %% any lower sequence numbers should be compacted out of existence
-spec merge_lists(list(), sst_options(), boolean()) -spec merge_lists(list(), sst_options(), boolean())
-> {list(), list(), list(tuple()), tuple()|null}. -> {list(), list(), list(binary_slot()),
tuple()|null, non_neg_integer()|not_counted}.
%% @doc %% @doc
%% %%
%% Merge from asingle list (i.e. at Level 0) %% Merge from a single list (i.e. at Level 0)
merge_lists(KVList1, SSTOpts, IdxModDate) -> merge_lists(KVList1, SSTOpts, IdxModDate) ->
SlotCount = length(KVList1) div ?LOOK_SLOTSIZE, SlotCount = length(KVList1) div ?LOOK_SLOTSIZE,
{[], {[],
[], [],
split_lists(KVList1, [], split_lists(KVList1, [],
SlotCount, SSTOpts#sst_options.press_method, IdxModDate), SlotCount, SSTOpts#sst_options.press_method, IdxModDate),
element(1, lists:nth(1, KVList1))}. element(1, lists:nth(1, KVList1)),
not_counted}.
split_lists([], SlotLists, 0, _PressMethod, _IdxModDate) -> split_lists([], SlotLists, 0, _PressMethod, _IdxModDate) ->
@ -2301,35 +2364,57 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) ->
split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod, IdxModDate). split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod, IdxModDate).
-spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) -> -spec merge_lists(list(), list(), tuple(), sst_options(),
{list(), list(), list(tuple()), tuple()|null}. boolean(), boolean()) ->
{list(), list(), list(binary_slot()), tuple()|null,
non_neg_integer()}.
%% @doc %% @doc
%% Merge lists when merging across more thna one file. KVLists that are %% Merge lists when merging across more than one file. KVLists that are
%% provided may include pointers to fetch more Keys/Values from the source %% provided may include pointers to fetch more Keys/Values from the source
%% file %% file
merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) -> merge_lists(KVList1, KVList2, LevelInfo, SSTOpts,
IndexModDate, SaveTombCount) ->
InitTombCount =
case SaveTombCount of true -> 0; false -> not_counted end,
merge_lists(KVList1, KVList2, merge_lists(KVList1, KVList2,
LevelInfo, LevelInfo,
[], null, 0, [], null, 0,
SSTOpts#sst_options.max_sstslots, SSTOpts#sst_options.max_sstslots,
SSTOpts#sst_options.press_method, SSTOpts#sst_options.press_method,
IndexModDate, IndexModDate,
InitTombCount,
#build_timings{}). #build_timings{}).
-spec merge_lists(
list(expanded_pointer()),
list(expanded_pointer()),
{boolean(), non_neg_integer()},
list(binary_slot()),
leveled_codec:ledger_key()|null,
non_neg_integer(),
non_neg_integer(),
press_method(),
boolean(),
non_neg_integer()|not_counted,
build_timings()) ->
{list(expanded_pointer()), list(expanded_pointer()),
list(binary_slot()), leveled_codec:ledger_key()|null,
non_neg_integer()|not_counted}.
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, MaxSlots, MaxSlots, merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, MaxSlots, MaxSlots,
_PressMethod, _IdxModDate, T0) -> _PressMethod, _IdxModDate, CountOfTombs, T0) ->
% This SST file is full, move to complete file, and return the % This SST file is full, move to complete file, and return the
% remainder % remainder
log_buildtimings(T0, LI), log_buildtimings(T0, LI),
{KVL1, KVL2, lists:reverse(SlotList), FirstKey}; {KVL1, KVL2, lists:reverse(SlotList), FirstKey, CountOfTombs};
merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, _MaxSlots, merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, _MaxSlots,
_PressMethod, _IdxModDate, T0) -> _PressMethod, _IdxModDate, CountOfTombs, T0) ->
% the source files are empty, complete the file % the source files are empty, complete the file
log_buildtimings(T0, LI), log_buildtimings(T0, LI),
{[], [], lists:reverse(SlotList), FirstKey}; {[], [], lists:reverse(SlotList), FirstKey, CountOfTombs};
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots, merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
PressMethod, IdxModDate, T0) -> PressMethod, IdxModDate, CountOfTombs, T0) ->
% Form a slot by merging the two lists until the next 128 K/V pairs have % Form a slot by merging the two lists until the next 128 K/V pairs have
% been determined % been determined
SW = os:timestamp(), SW = os:timestamp(),
@ -2348,6 +2433,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
MaxSlots, MaxSlots,
PressMethod, PressMethod,
IdxModDate, IdxModDate,
CountOfTombs,
T1); T1);
{Lookup, KVL} -> {Lookup, KVL} ->
% Convert the list of KVs for the slot into a binary, and related % Convert the list of KVs for the slot into a binary, and related
@ -2363,9 +2449,42 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
MaxSlots, MaxSlots,
PressMethod, PressMethod,
IdxModDate, IdxModDate,
count_tombs(KVL, CountOfTombs),
T2) T2)
end. end.
-spec count_tombs(list(leveled_codec:ledger_kv()),
non_neg_integer()|not_counted) ->
non_neg_integer()|not_counted.
%% @doc
%% Count the tombstones in a list of KVs
count_tombs(_KVL, not_counted) ->
not_counted;
count_tombs(KVL, InitCount) ->
FoldFun =
fun(KV, Count) ->
case leveled_codec:strip_to_statusonly(KV) of
tomb ->
Count + 1;
_ ->
Count
end
end,
lists:foldl(FoldFun, InitCount, KVL).
-spec form_slot(list(expanded_pointer()),
list(expanded_pointer()),
{boolean(), non_neg_integer()},
lookup|no_lookup,
non_neg_integer(),
list(leveled_codec:ledger_kv()),
leveled_codec:ledger_key()|null) ->
{list(expanded_pointer()), list(expanded_pointer()),
{lookup|no_lookup, list(leveled_codec:ledger_kv())},
leveled_codec:ledger_key()}.
%% @doc
%% Merge together Key Value lists to provide an ordered slot of KVs
form_slot([], [], _LI, Type, _Size, Slot, FK) -> form_slot([], [], _LI, Type, _Size, Slot, FK) ->
{[], [], {Type, lists:reverse(Slot)}, FK}; {[], [], {Type, lists:reverse(Slot)}, FK};
form_slot(KVList1, KVList2, _LI, lookup, ?LOOK_SLOTSIZE, Slot, FK) -> form_slot(KVList1, KVList2, _LI, lookup, ?LOOK_SLOTSIZE, Slot, FK) ->
@ -2644,8 +2763,8 @@ testsst_new(RootPath, Filename,
OptsSST = OptsSST =
#sst_options{press_method=PressMethod, #sst_options{press_method=PressMethod,
log_options=leveled_log:get_opts()}, log_options=leveled_log:get_opts()},
sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN, sst_newmerge(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN,
OptsSST, false). OptsSST, false, false).
generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(Seqn, generate_randomkeys(Seqn,
@ -2695,11 +2814,61 @@ generate_indexkey(Term, Count) ->
infinity). infinity).
tombcount_test() ->
N = 1600,
KL1 = generate_randomkeys(N div 2 + 1, N, 1, 4),
KL2 = generate_indexkeys(N div 2),
FlipToTombFun =
fun({K, V}) ->
case leveled_rand:uniform(10) of
X when X > 5 ->
{K, setelement(2, V, tomb)};
_ ->
{K, V}
end
end,
KVL1 = lists:map(FlipToTombFun, KL1),
KVL2 = lists:map(FlipToTombFun, KL2),
CountTombFun =
fun({_K, V}, Acc) ->
case element(2, V) of
tomb ->
Acc + 1;
_ ->
Acc
end
end,
ExpectedCount = lists:foldl(CountTombFun, 0, KVL1 ++ KVL2),
{RP, Filename} = {?TEST_AREA, "tombcount_test"},
OptsSST =
#sst_options{press_method=native,
log_options=leveled_log:get_opts()},
{ok, SST1, _KD, _BB} = sst_newmerge(RP, Filename,
KVL1, KVL2, false, 2,
N, OptsSST, false, false),
?assertMatch(not_counted, sst_gettombcount(SST1)),
ok = sst_close(SST1),
ok = file:delete(filename:join(RP, Filename ++ ".sst")),
{ok, SST2, _KD, _BB} = sst_newmerge(RP, Filename,
KVL1, KVL2, false, 2,
N, OptsSST, false, true),
?assertMatch(ExpectedCount, sst_gettombcount(SST2)),
ok = sst_close(SST2),
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
form_slot_test() -> form_slot_test() ->
% If a skip key happens, mustn't switch to loookup by accident as could be % If a skip key happens, mustn't switch to loookup by accident as could be
% over the expected size % over the expected size
SkippingKV = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}}, SkippingKV =
Slot = [{{o, "B1", "K5", null}, {5, active, 99234567, {}}}], {{o, "B1", "K9999", null}, {9999, tomb, {1234568, 1234567}, {}}},
Slot =
[{{o, "B1", "K5", null},
{5, {active, infinity}, {99234568, 99234567}, {}}}],
R1 = form_slot([SkippingKV], [], R1 = form_slot([SkippingKV], [],
{true, 99999999}, {true, 99999999},
no_lookup, no_lookup,
@ -2709,19 +2878,26 @@ form_slot_test() ->
?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1). ?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1).
merge_tombstonelist_test() -> merge_tombstonelist_test() ->
% Merge lists with nothing but tombstones % Merge lists with nothing but tombstones, and file at basement level
SkippingKV1 = {{o, "B1", "K9995", null}, {9995, tomb, 1234567, {}}}, SkippingKV1 =
SkippingKV2 = {{o, "B1", "K9996", null}, {9996, tomb, 1234567, {}}}, {{o, "B1", "K9995", null}, {9995, tomb, {1234568, 1234567}, {}}},
SkippingKV3 = {{o, "B1", "K9997", null}, {9997, tomb, 1234567, {}}}, SkippingKV2 =
SkippingKV4 = {{o, "B1", "K9998", null}, {9998, tomb, 1234567, {}}}, {{o, "B1", "K9996", null}, {9996, tomb, {1234568, 1234567}, {}}},
SkippingKV5 = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}}, SkippingKV3 =
{{o, "B1", "K9997", null}, {9997, tomb, {1234568, 1234567}, {}}},
SkippingKV4 =
{{o, "B1", "K9998", null}, {9998, tomb, {1234568, 1234567}, {}}},
SkippingKV5 =
{{o, "B1", "K9999", null}, {9999, tomb, {1234568, 1234567}, {}}},
R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5], R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5],
[SkippingKV2, SkippingKV4], [SkippingKV2, SkippingKV4],
{true, 9999999}, {true, 9999999},
#sst_options{press_method = native, #sst_options{press_method = native,
max_sstslots = 256}, max_sstslots = 256},
?INDEX_MODDATE), ?INDEX_MODDATE,
?assertMatch({[], [], [], null}, R). true),
?assertMatch({[], [], [], null, 0}, R).
indexed_list_test() -> indexed_list_test() ->
io:format(user, "~nIndexed list timing test:~n", []), io:format(user, "~nIndexed list timing test:~n", []),

View file

@ -196,7 +196,7 @@ bigsst_littlesst(_Config) ->
{compression_point, on_compact}], {compression_point, on_compact}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
ObjL1 = ObjL1 =
testutil:generate_objects(60000, 1, [], testutil:generate_objects(80000, 1, [],
leveled_rand:rand_bytes(100), leveled_rand:rand_bytes(100),
fun() -> [] end, <<"B">>), fun() -> [] end, <<"B">>),
testutil:riakload(Bookie1, ObjL1), testutil:riakload(Bookie1, ObjL1),