Promote cache when scanning
When scanning over a leveled store with a helper (e.g. segment filter and last modified date range), applying the filter will speed up the query when the block index cache is available to get_slots. If it is not available, previously the leveled_sst did not then promote the cache after it had accessed the underlying blocks. Now the code does this, and also when the cache has all been added, it extracts the largest last modified date so that sst files older than the passed in date can be immediately dismissed
This commit is contained in:
parent
80e6920d6c
commit
a210aa6846
3 changed files with 262 additions and 86 deletions
|
@ -146,7 +146,7 @@
|
||||||
%% @doc Target Percentage for Single File
|
%% @doc Target Percentage for Single File
|
||||||
%% What is the target score for a run of a single file, to qualify for
|
%% What is the target score for a run of a single file, to qualify for
|
||||||
%% compaction. If less than this percentage would be retained after compaction
|
%% compaction. If less than this percentage would be retained after compaction
|
||||||
%% then it is a candidate (e.g. in default case if 50% of space would be
|
%% then it is a candidate (e.g. in default case if 70% of space would be
|
||||||
%% recovered)
|
%% recovered)
|
||||||
{mapping, "leveled.singlefile_compactionpercentage", "leveled.singlefile_compactionpercentage", [
|
{mapping, "leveled.singlefile_compactionpercentage", "leveled.singlefile_compactionpercentage", [
|
||||||
{default, 30.0},
|
{default, 30.0},
|
||||||
|
|
|
@ -181,6 +181,8 @@
|
||||||
:: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}.
|
:: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}.
|
||||||
-type sst_summary()
|
-type sst_summary()
|
||||||
:: #summary{}.
|
:: #summary{}.
|
||||||
|
-type blockindex_cache()
|
||||||
|
:: any(). % An array but OTP 16 types
|
||||||
|
|
||||||
%% yield_blockquery is used to determine if the work necessary to process a
|
%% yield_blockquery is used to determine if the work necessary to process a
|
||||||
%% range query beyond the fetching the slot should be managed from within
|
%% range query beyond the fetching the slot should be managed from within
|
||||||
|
@ -196,7 +198,7 @@
|
||||||
root_path,
|
root_path,
|
||||||
filename,
|
filename,
|
||||||
yield_blockquery = false :: boolean(),
|
yield_blockquery = false :: boolean(),
|
||||||
blockindex_cache,
|
blockindex_cache :: blockindex_cache()|undefined,
|
||||||
compression_method = native :: press_method(),
|
compression_method = native :: press_method(),
|
||||||
index_moddate = ?INDEX_MODDATE :: boolean(),
|
index_moddate = ?INDEX_MODDATE :: boolean(),
|
||||||
timings = no_timing :: sst_timings(),
|
timings = no_timing :: sst_timings(),
|
||||||
|
@ -207,7 +209,8 @@
|
||||||
deferred_startup_tuple :: tuple()|undefined,
|
deferred_startup_tuple :: tuple()|undefined,
|
||||||
level :: non_neg_integer()|undefined,
|
level :: non_neg_integer()|undefined,
|
||||||
tomb_count = not_counted
|
tomb_count = not_counted
|
||||||
:: non_neg_integer()|not_counted}).
|
:: non_neg_integer()|not_counted,
|
||||||
|
high_modified_date :: non_neg_integer()|undefined}).
|
||||||
|
|
||||||
-record(sst_timings,
|
-record(sst_timings,
|
||||||
{sample_count = 0 :: integer(),
|
{sample_count = 0 :: integer(),
|
||||||
|
@ -526,8 +529,14 @@ starting({sst_new,
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
leveled_log:save(OptsSST#sst_options.log_options),
|
leveled_log:save(OptsSST#sst_options.log_options),
|
||||||
PressMethod = OptsSST#sst_options.press_method,
|
PressMethod = OptsSST#sst_options.press_method,
|
||||||
{Length, SlotIndex, BlockIndex, SlotsBin, Bloom} =
|
{Length, SlotIndex, BlockEntries, SlotsBin, Bloom} =
|
||||||
build_all_slots(SlotList),
|
build_all_slots(SlotList),
|
||||||
|
{BlockIndex, HighModDate} =
|
||||||
|
update_blockindex_cache(true,
|
||||||
|
BlockEntries,
|
||||||
|
new_blockindex_cache(Length),
|
||||||
|
undefined,
|
||||||
|
IdxModDate),
|
||||||
SummaryBin =
|
SummaryBin =
|
||||||
build_table_summary(SlotIndex, Level, FirstKey, Length,
|
build_table_summary(SlotIndex, Level, FirstKey, Length,
|
||||||
MaxSQN, Bloom, CountOfTombs),
|
MaxSQN, Bloom, CountOfTombs),
|
||||||
|
@ -550,6 +559,7 @@ starting({sst_new,
|
||||||
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
||||||
reader,
|
reader,
|
||||||
UpdState#state{blockindex_cache = BlockIndex,
|
UpdState#state{blockindex_cache = BlockIndex,
|
||||||
|
high_modified_date = HighModDate,
|
||||||
starting_pid = StartingPID,
|
starting_pid = StartingPID,
|
||||||
level = Level}};
|
level = Level}};
|
||||||
starting({sst_newlevelzero, RootPath, Filename,
|
starting({sst_newlevelzero, RootPath, Filename,
|
||||||
|
@ -583,8 +593,14 @@ starting(complete_l0startup, State) ->
|
||||||
Time1 = timer:now_diff(os:timestamp(), SW1),
|
Time1 = timer:now_diff(os:timestamp(), SW1),
|
||||||
|
|
||||||
SW2 = os:timestamp(),
|
SW2 = os:timestamp(),
|
||||||
{SlotCount, SlotIndex, BlockIndex, SlotsBin,Bloom} =
|
{SlotCount, SlotIndex, BlockEntries, SlotsBin,Bloom} =
|
||||||
build_all_slots(SlotList),
|
build_all_slots(SlotList),
|
||||||
|
{BlockIndex, HighModDate} =
|
||||||
|
update_blockindex_cache(true,
|
||||||
|
BlockEntries,
|
||||||
|
new_blockindex_cache(SlotCount),
|
||||||
|
undefined,
|
||||||
|
IdxModDate),
|
||||||
Time2 = timer:now_diff(os:timestamp(), SW2),
|
Time2 = timer:now_diff(os:timestamp(), SW2),
|
||||||
|
|
||||||
SW3 = os:timestamp(),
|
SW3 = os:timestamp(),
|
||||||
|
@ -616,19 +632,19 @@ starting(complete_l0startup, State) ->
|
||||||
|
|
||||||
case Penciller of
|
case Penciller of
|
||||||
undefined ->
|
undefined ->
|
||||||
{next_state,
|
ok;
|
||||||
reader,
|
|
||||||
UpdState#state{blockindex_cache = BlockIndex}};
|
|
||||||
_ ->
|
_ ->
|
||||||
leveled_penciller:pcl_confirml0complete(Penciller,
|
leveled_penciller:pcl_confirml0complete(Penciller,
|
||||||
UpdState#state.filename,
|
UpdState#state.filename,
|
||||||
Summary#summary.first_key,
|
Summary#summary.first_key,
|
||||||
Summary#summary.last_key,
|
Summary#summary.last_key,
|
||||||
Bloom),
|
Bloom),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
{next_state,
|
{next_state,
|
||||||
reader,
|
reader,
|
||||||
UpdState#state{blockindex_cache = BlockIndex}}
|
UpdState#state{blockindex_cache = BlockIndex,
|
||||||
end;
|
high_modified_date = HighModDate}};
|
||||||
starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
|
starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
|
||||||
Self = self(),
|
Self = self(),
|
||||||
FetchedSlots =
|
FetchedSlots =
|
||||||
|
@ -673,13 +689,10 @@ reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
timings_countdown = CountDown}};
|
timings_countdown = CountDown}};
|
||||||
reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
_From, State) ->
|
_From, State) ->
|
||||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
{NeedBlockIdx, SlotsToFetchBinList, SlotsToPoint} =
|
||||||
EndKey,
|
fetch_range(StartKey, EndKey, ScanWidth,
|
||||||
ScanWidth,
|
SegList, LowLastMod,
|
||||||
SegList,
|
|
||||||
LowLastMod,
|
|
||||||
State),
|
State),
|
||||||
|
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
IdxModDate = State#state.index_moddate,
|
IdxModDate = State#state.index_moddate,
|
||||||
|
|
||||||
|
@ -694,34 +707,47 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
reader,
|
reader,
|
||||||
State};
|
State};
|
||||||
false ->
|
false ->
|
||||||
{L, BIC} =
|
{L, FoundBIC} =
|
||||||
binaryslot_reader(SlotsToFetchBinList,
|
binaryslot_reader(SlotsToFetchBinList,
|
||||||
PressMethod, IdxModDate, SegList),
|
PressMethod,
|
||||||
FoldFun =
|
IdxModDate,
|
||||||
fun(CacheEntry, Cache) ->
|
SegList),
|
||||||
case CacheEntry of
|
{BlockIdxC0, HighModDate} =
|
||||||
{_ID, none} ->
|
update_blockindex_cache(NeedBlockIdx,
|
||||||
Cache;
|
FoundBIC,
|
||||||
{ID, Header} ->
|
State#state.blockindex_cache,
|
||||||
array:set(ID - 1, binary:copy(Header), Cache)
|
State#state.high_modified_date,
|
||||||
end
|
State#state.index_moddate),
|
||||||
end,
|
|
||||||
BlockIdxC0 = lists:foldl(FoldFun, State#state.blockindex_cache, BIC),
|
|
||||||
{reply,
|
{reply,
|
||||||
L ++ SlotsToPoint,
|
L ++ SlotsToPoint,
|
||||||
reader,
|
reader,
|
||||||
State#state{blockindex_cache = BlockIdxC0}}
|
State#state{blockindex_cache = BlockIdxC0,
|
||||||
|
high_modified_date = HighModDate}}
|
||||||
end;
|
end;
|
||||||
reader({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
|
reader({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
IdxModDate = State#state.index_moddate,
|
IdxModDate = State#state.index_moddate,
|
||||||
SlotBins =
|
ReadNeeded =
|
||||||
|
check_modified(State#state.high_modified_date,
|
||||||
|
LowLastMod,
|
||||||
|
State#state.index_moddate),
|
||||||
|
{NeedBlockIdx, SlotBins} =
|
||||||
|
case ReadNeeded of
|
||||||
|
true ->
|
||||||
read_slots(State#state.handle,
|
read_slots(State#state.handle,
|
||||||
SlotList,
|
SlotList,
|
||||||
{SegList, LowLastMod, State#state.blockindex_cache},
|
{SegList,
|
||||||
|
LowLastMod,
|
||||||
|
State#state.blockindex_cache},
|
||||||
State#state.compression_method,
|
State#state.compression_method,
|
||||||
State#state.index_moddate),
|
State#state.index_moddate);
|
||||||
{reply, {SlotBins, PressMethod, IdxModDate}, reader, State};
|
false ->
|
||||||
|
{false, []}
|
||||||
|
end,
|
||||||
|
{reply,
|
||||||
|
{NeedBlockIdx, SlotBins, PressMethod, IdxModDate},
|
||||||
|
reader,
|
||||||
|
State};
|
||||||
reader(get_maxsequencenumber, _From, State) ->
|
reader(get_maxsequencenumber, _From, State) ->
|
||||||
Summary = State#state.summary,
|
Summary = State#state.summary,
|
||||||
{reply, Summary#summary.max_sqn, reader, State};
|
{reply, Summary#summary.max_sqn, reader, State};
|
||||||
|
@ -759,12 +785,8 @@ delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
||||||
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
_From, State) ->
|
_From, State) ->
|
||||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
{_NeedBlockIdx, SlotsToFetchBinList, SlotsToPoint} =
|
||||||
EndKey,
|
fetch_range(StartKey, EndKey, ScanWidth, SegList, LowLastMod, State),
|
||||||
ScanWidth,
|
|
||||||
SegList,
|
|
||||||
LowLastMod,
|
|
||||||
State),
|
|
||||||
% Always yield as about to clear and de-reference
|
% Always yield as about to clear and de-reference
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
IdxModDate = State#state.index_moddate,
|
IdxModDate = State#state.index_moddate,
|
||||||
|
@ -776,14 +798,14 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
delete_pending({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
|
delete_pending({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
IdxModDate = State#state.index_moddate,
|
IdxModDate = State#state.index_moddate,
|
||||||
SlotBins =
|
{_NeedBlockIdx, SlotBins} =
|
||||||
read_slots(State#state.handle,
|
read_slots(State#state.handle,
|
||||||
SlotList,
|
SlotList,
|
||||||
{SegList, LowLastMod, State#state.blockindex_cache},
|
{SegList, LowLastMod, State#state.blockindex_cache},
|
||||||
PressMethod,
|
PressMethod,
|
||||||
IdxModDate),
|
IdxModDate),
|
||||||
{reply,
|
{reply,
|
||||||
{SlotBins, PressMethod, IdxModDate},
|
{false, SlotBins, PressMethod, IdxModDate},
|
||||||
delete_pending,
|
delete_pending,
|
||||||
State,
|
State,
|
||||||
?DELETE_TIMEOUT};
|
?DELETE_TIMEOUT};
|
||||||
|
@ -815,8 +837,17 @@ delete_pending(close, State) ->
|
||||||
handle_sync_event(_Msg, _From, StateName, State) ->
|
handle_sync_event(_Msg, _From, StateName, State) ->
|
||||||
{reply, undefined, StateName, State}.
|
{reply, undefined, StateName, State}.
|
||||||
|
|
||||||
handle_event(_Msg, StateName, State) ->
|
handle_event({update_blockindex_cache, BIC}, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{BlockIndexCache, HighModDate} =
|
||||||
|
update_blockindex_cache(true,
|
||||||
|
BIC,
|
||||||
|
State#state.blockindex_cache,
|
||||||
|
State#state.high_modified_date,
|
||||||
|
State#state.index_moddate),
|
||||||
|
{next_state,
|
||||||
|
StateName,
|
||||||
|
State#state{blockindex_cache = BlockIndexCache,
|
||||||
|
high_modified_date = HighModDate}}.
|
||||||
|
|
||||||
handle_info(tidyup_after_startup, delete_pending, State) ->
|
handle_info(tidyup_after_startup, delete_pending, State) ->
|
||||||
% No need to GC, this file is to be shutdown. This message may have
|
% No need to GC, this file is to be shutdown. This message may have
|
||||||
|
@ -983,11 +1014,17 @@ sst_getslots(Pid, SlotList) ->
|
||||||
%% of the object, if the object is to be covered by the query
|
%% of the object, if the object is to be covered by the query
|
||||||
sst_getfilteredslots(Pid, SlotList, SegList, LowLastMod) ->
|
sst_getfilteredslots(Pid, SlotList, SegList, LowLastMod) ->
|
||||||
SegL0 = tune_seglist(SegList),
|
SegL0 = tune_seglist(SegList),
|
||||||
{SlotBins, PressMethod, IdxModDate} =
|
{NeedBlockIdx, SlotBins, PressMethod, IdxModDate} =
|
||||||
gen_fsm:sync_send_event(Pid,
|
gen_fsm:sync_send_event(Pid,
|
||||||
{get_slots, SlotList, SegL0, LowLastMod},
|
{get_slots, SlotList, SegL0, LowLastMod},
|
||||||
infinity),
|
infinity),
|
||||||
{L, _BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0),
|
{L, BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0),
|
||||||
|
case NeedBlockIdx of
|
||||||
|
true ->
|
||||||
|
gen_fsm:send_all_state_event(Pid, {update_blockindex_cache, BIC});
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
L.
|
L.
|
||||||
|
|
||||||
|
|
||||||
|
@ -1065,6 +1102,62 @@ tune_seglist(SegList) ->
|
||||||
%%% Internal Functions
|
%%% Internal Functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec new_blockindex_cache(pos_integer()) -> blockindex_cache().
|
||||||
|
new_blockindex_cache(Size) ->
|
||||||
|
array:new([{size, Size}, {default, none}]).
|
||||||
|
|
||||||
|
-spec update_blockindex_cache(boolean(),
|
||||||
|
list({integer(), binary()}),
|
||||||
|
blockindex_cache(),
|
||||||
|
non_neg_integer()|undefined,
|
||||||
|
boolean()) ->
|
||||||
|
{blockindex_cache(),
|
||||||
|
non_neg_integer()|undefined}.
|
||||||
|
update_blockindex_cache(Needed, Entries, BIC, HighModDate, IdxModDate)
|
||||||
|
when Needed,
|
||||||
|
HighModDate == undefined ->
|
||||||
|
FoldFun =
|
||||||
|
fun(CacheEntry, Cache) ->
|
||||||
|
case CacheEntry of
|
||||||
|
{ID, Header} when is_binary(Header) ->
|
||||||
|
array:set(ID - 1, binary:copy(Header), Cache);
|
||||||
|
_ ->
|
||||||
|
Cache
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
BlockIdxC0 = lists:foldl(FoldFun, BIC, Entries),
|
||||||
|
Size = array:size(BlockIdxC0),
|
||||||
|
BestModDates =
|
||||||
|
case IdxModDate of
|
||||||
|
true ->
|
||||||
|
ModDateFold =
|
||||||
|
fun(_ID, Header, Acc) when is_binary(Header) ->
|
||||||
|
[element(2, extract_header(Header, IdxModDate))|Acc]
|
||||||
|
end,
|
||||||
|
array:sparse_foldl(ModDateFold, [], BlockIdxC0);
|
||||||
|
false ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
BestModDate =
|
||||||
|
case length(BestModDates) of
|
||||||
|
Size ->
|
||||||
|
lists:max(BestModDates);
|
||||||
|
_ ->
|
||||||
|
undefined
|
||||||
|
end,
|
||||||
|
{BlockIdxC0, BestModDate};
|
||||||
|
update_blockindex_cache(_Needed, _Entries, BIC, HighModDate, _IdxModDate) ->
|
||||||
|
{BIC, HighModDate}.
|
||||||
|
|
||||||
|
-spec check_modified(non_neg_integer()|undefined,
|
||||||
|
non_neg_integer(),
|
||||||
|
boolean()) -> boolean().
|
||||||
|
check_modified(HighLastModifiedInSST, LowModDate, true)
|
||||||
|
when is_integer(HighLastModifiedInSST) ->
|
||||||
|
LowModDate =< HighLastModifiedInSST;
|
||||||
|
check_modified(_, _, _) ->
|
||||||
|
true.
|
||||||
|
|
||||||
-spec fetch(tuple(),
|
-spec fetch(tuple(),
|
||||||
{integer(), integer()}|integer(),
|
{integer(), integer()}|integer(),
|
||||||
sst_state(), sst_timings())
|
sst_state(), sst_timings())
|
||||||
|
@ -1093,14 +1186,17 @@ fetch(LedgerKey, Hash, State, Timings0) ->
|
||||||
SlotBin = read_slot(State#state.handle, Slot),
|
SlotBin = read_slot(State#state.handle, Slot),
|
||||||
{Result, Header} =
|
{Result, Header} =
|
||||||
binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate),
|
binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate),
|
||||||
BlockIndexCache =
|
{BlockIndexCache, HighModDate} =
|
||||||
array:set(SlotID - 1,
|
update_blockindex_cache(true,
|
||||||
binary:copy(Header),
|
[{SlotID, Header}],
|
||||||
State#state.blockindex_cache),
|
State#state.blockindex_cache,
|
||||||
|
State#state.high_modified_date,
|
||||||
|
State#state.index_moddate),
|
||||||
{_SW3, Timings3} =
|
{_SW3, Timings3} =
|
||||||
update_timings(SW2, Timings2, noncached_block, false),
|
update_timings(SW2, Timings2, noncached_block, false),
|
||||||
{Result,
|
{Result,
|
||||||
State#state{blockindex_cache = BlockIndexCache},
|
State#state{blockindex_cache = BlockIndexCache,
|
||||||
|
high_modified_date = HighModDate},
|
||||||
Timings3};
|
Timings3};
|
||||||
{BlockLengths, _LMD, PosBin} ->
|
{BlockLengths, _LMD, PosBin} ->
|
||||||
PosList = find_pos(PosBin, extract_hash(Hash), [], 0),
|
PosList = find_pos(PosBin, extract_hash(Hash), [], 0),
|
||||||
|
@ -1150,7 +1246,8 @@ fetch(LedgerKey, Hash, State, Timings0) ->
|
||||||
|
|
||||||
-spec fetch_range(tuple(), tuple(), integer(),
|
-spec fetch_range(tuple(), tuple(), integer(),
|
||||||
leveled_codec:segment_list(), non_neg_integer(),
|
leveled_codec:segment_list(), non_neg_integer(),
|
||||||
sst_state()) -> {list(), list()}.
|
sst_state()) ->
|
||||||
|
{boolean(), list(), list()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Fetch the contents of the SST file for a given key range. This will
|
%% Fetch the contents of the SST file for a given key range. This will
|
||||||
%% pre-fetch some results, and append pointers for additional results.
|
%% pre-fetch some results, and append pointers for additional results.
|
||||||
|
@ -1209,13 +1306,13 @@ fetch_range(StartKey, EndKey, ScanWidth, SegList, LowLastMod, State) ->
|
||||||
lists:split(ScanWidth, ExpandedSlots)
|
lists:split(ScanWidth, ExpandedSlots)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
SlotsToFetchBinList =
|
{NeededBlockIdx, SlotsToFetchBinList} =
|
||||||
read_slots(Handle,
|
read_slots(Handle,
|
||||||
SlotsToFetch,
|
SlotsToFetch,
|
||||||
{SegList, LowLastMod, State#state.blockindex_cache},
|
{SegList, LowLastMod, State#state.blockindex_cache},
|
||||||
State#state.compression_method,
|
State#state.compression_method,
|
||||||
State#state.index_moddate),
|
State#state.index_moddate),
|
||||||
{SlotsToFetchBinList, SlotsToPoint}.
|
{NeededBlockIdx, SlotsToFetchBinList, SlotsToPoint}.
|
||||||
|
|
||||||
-spec compress_level(integer(), press_method()) -> press_method().
|
-spec compress_level(integer(), press_method()) -> press_method().
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -1258,8 +1355,7 @@ read_file(Filename, State, LoadPageCache) ->
|
||||||
UpdState0 = imp_fileversion(FileVersion, State),
|
UpdState0 = imp_fileversion(FileVersion, State),
|
||||||
{Summary, Bloom, SlotList, TombCount} =
|
{Summary, Bloom, SlotList, TombCount} =
|
||||||
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
|
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
|
||||||
BlockIndexCache = array:new([{size, Summary#summary.size},
|
BlockIndexCache = new_blockindex_cache(Summary#summary.size),
|
||||||
{default, none}]),
|
|
||||||
UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache},
|
UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache},
|
||||||
SlotIndex = from_list(SlotList),
|
SlotIndex = from_list(SlotList),
|
||||||
UpdSummary = Summary#summary{index = SlotIndex},
|
UpdSummary = Summary#summary{index = SlotIndex},
|
||||||
|
@ -1389,8 +1485,7 @@ build_all_slots(SlotList) ->
|
||||||
9,
|
9,
|
||||||
1,
|
1,
|
||||||
[],
|
[],
|
||||||
array:new([{size, SlotCount},
|
[],
|
||||||
{default, none}]),
|
|
||||||
<<>>,
|
<<>>,
|
||||||
[]),
|
[]),
|
||||||
Bloom = leveled_ebloom:create_bloom(HashLists),
|
Bloom = leveled_ebloom:create_bloom(HashLists),
|
||||||
|
@ -1410,7 +1505,7 @@ build_all_slots([SlotD|Rest], Pos, SlotID,
|
||||||
Pos + Length,
|
Pos + Length,
|
||||||
SlotID + 1,
|
SlotID + 1,
|
||||||
[{LastKey, SlotIndexV}|SlotIdxAcc],
|
[{LastKey, SlotIndexV}|SlotIdxAcc],
|
||||||
array:set(SlotID - 1, BlockIdx, BlockIdxAcc),
|
[{SlotID, BlockIdx}|BlockIdxAcc],
|
||||||
<<SlotBinAcc/binary, SlotBin/binary>>,
|
<<SlotBinAcc/binary, SlotBin/binary>>,
|
||||||
lists:append(HashLists, HashList)).
|
lists:append(HashLists, HashList)).
|
||||||
|
|
||||||
|
@ -1842,7 +1937,8 @@ binarysplit_mapfun(MultiSlotBin, StartPos) ->
|
||||||
|
|
||||||
-spec read_slots(file:io_device(), list(),
|
-spec read_slots(file:io_device(), list(),
|
||||||
{false|list(), non_neg_integer(), binary()},
|
{false|list(), non_neg_integer(), binary()},
|
||||||
press_method(), boolean()) -> list(binaryslot_element()).
|
press_method(), boolean()) ->
|
||||||
|
{boolean(), list(binaryslot_element())}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% The reading of sots will return a list of either 2-tuples containing
|
%% The reading of sots will return a list of either 2-tuples containing
|
||||||
%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples
|
%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples
|
||||||
|
@ -1861,7 +1957,7 @@ read_slots(Handle, SlotList, {false, 0, _BlockIndexCache},
|
||||||
_PressMethod, _IdxModDate) ->
|
_PressMethod, _IdxModDate) ->
|
||||||
% No list of segments passed or useful Low LastModified Date
|
% No list of segments passed or useful Low LastModified Date
|
||||||
% Just read slots in SlotList
|
% Just read slots in SlotList
|
||||||
read_slotlist(SlotList, Handle);
|
{false, read_slotlist(SlotList, Handle)};
|
||||||
read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
PressMethod, IdxModDate) ->
|
PressMethod, IdxModDate) ->
|
||||||
% List of segments passed so only {K, V} pairs matching those segments
|
% List of segments passed so only {K, V} pairs matching those segments
|
||||||
|
@ -1869,7 +1965,7 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
% with the appropriate hash - if the pair were added with no_lookup as
|
% with the appropriate hash - if the pair were added with no_lookup as
|
||||||
% the hash value this will fail unexpectedly.
|
% the hash value this will fail unexpectedly.
|
||||||
BinMapFun =
|
BinMapFun =
|
||||||
fun(Pointer, Acc) ->
|
fun(Pointer, {NeededBlockIdx, Acc}) ->
|
||||||
{SP, _L, ID, SK, EK} = pointer_mapfun(Pointer),
|
{SP, _L, ID, SK, EK} = pointer_mapfun(Pointer),
|
||||||
CachedHeader = array:get(ID - 1, BlockIndexCache),
|
CachedHeader = array:get(ID - 1, BlockIndexCache),
|
||||||
case extract_header(CachedHeader, IdxModDate) of
|
case extract_header(CachedHeader, IdxModDate) of
|
||||||
|
@ -1877,7 +1973,7 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
% If there is an attempt to use the seg list query and the
|
% If there is an attempt to use the seg list query and the
|
||||||
% index block cache isn't cached for any part this may be
|
% index block cache isn't cached for any part this may be
|
||||||
% slower as each slot will be read in turn
|
% slower as each slot will be read in turn
|
||||||
Acc ++ read_slotlist([Pointer], Handle);
|
{true, Acc ++ read_slotlist([Pointer], Handle)};
|
||||||
{BlockLengths, LMD, BlockIdx} ->
|
{BlockLengths, LMD, BlockIdx} ->
|
||||||
% If there is a BlockIndex cached then we can use it to
|
% If there is a BlockIndex cached then we can use it to
|
||||||
% check to see if any of the expected segments are
|
% check to see if any of the expected segments are
|
||||||
|
@ -1894,12 +1990,14 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
% LowLastMod date passed in the query - therefore
|
% LowLastMod date passed in the query - therefore
|
||||||
% there are no interesting modifications in this
|
% there are no interesting modifications in this
|
||||||
% slot - it is all too old
|
% slot - it is all too old
|
||||||
Acc;
|
{NeededBlockIdx, Acc};
|
||||||
false ->
|
false ->
|
||||||
case SegList of
|
case SegList of
|
||||||
false ->
|
false ->
|
||||||
% Need all the slot now
|
% Need all the slot now
|
||||||
Acc ++ read_slotlist([Pointer], Handle);
|
{NeededBlockIdx,
|
||||||
|
Acc ++
|
||||||
|
read_slotlist([Pointer], Handle)};
|
||||||
_SL ->
|
_SL ->
|
||||||
% Need to find just the right keys
|
% Need to find just the right keys
|
||||||
PositionList =
|
PositionList =
|
||||||
|
@ -1920,12 +2018,13 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
% to be filtered
|
% to be filtered
|
||||||
FilterFun =
|
FilterFun =
|
||||||
fun(KV) -> in_range(KV, SK, EK) end,
|
fun(KV) -> in_range(KV, SK, EK) end,
|
||||||
Acc ++ lists:filter(FilterFun, KVL)
|
{NeededBlockIdx,
|
||||||
|
Acc ++ lists:filter(FilterFun, KVL)}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
lists:foldl(BinMapFun, [], SlotList).
|
lists:foldl(BinMapFun, {false, []}, SlotList).
|
||||||
|
|
||||||
|
|
||||||
-spec in_range(leveled_codec:ledger_kv(),
|
-spec in_range(leveled_codec:ledger_kv(),
|
||||||
|
@ -2015,7 +2114,7 @@ read_length_list(Handle, LengthList) ->
|
||||||
|
|
||||||
|
|
||||||
-spec extract_header(binary()|none, boolean()) ->
|
-spec extract_header(binary()|none, boolean()) ->
|
||||||
{binary(), integer(), binary()}|none.
|
{binary(), non_neg_integer(), binary()}|none.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Helper for extracting the binaries from the header ignoring the missing LMD
|
%% Helper for extracting the binaries from the header ignoring the missing LMD
|
||||||
%% if LMD is not indexed
|
%% if LMD is not indexed
|
||||||
|
@ -3657,8 +3756,6 @@ key_dominates_test() ->
|
||||||
key_dominates([KV7|KL2], [KV2], {true, 1})).
|
key_dominates([KV7|KL2], [KV2], {true, 1})).
|
||||||
|
|
||||||
nonsense_coverage_test() ->
|
nonsense_coverage_test() ->
|
||||||
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
|
|
||||||
ok = gen_fsm:send_all_state_event(Pid, nonsense),
|
|
||||||
?assertMatch({ok, reader, #state{}}, code_change(nonsense,
|
?assertMatch({ok, reader, #state{}}, code_change(nonsense,
|
||||||
reader,
|
reader,
|
||||||
#state{},
|
#state{},
|
||||||
|
@ -3861,6 +3958,39 @@ corrupted_block_fetch_tester(PressMethod) ->
|
||||||
ExpectedMisses = element(2, ?LOOK_BLOCKSIZE),
|
ExpectedMisses = element(2, ?LOOK_BLOCKSIZE),
|
||||||
?assertMatch(ExpectedMisses, MissCount).
|
?assertMatch(ExpectedMisses, MissCount).
|
||||||
|
|
||||||
|
block_index_cache_test() ->
|
||||||
|
{Mega, Sec, _} = os:timestamp(),
|
||||||
|
Now = Mega * 1000000 + Sec,
|
||||||
|
EntriesTS =
|
||||||
|
lists:map(fun(I) ->
|
||||||
|
TS = Now - I + 1,
|
||||||
|
{I, <<0:160/integer, TS:32/integer, 0:32/integer>>}
|
||||||
|
end,
|
||||||
|
lists:seq(1, 8)),
|
||||||
|
EntriesNoTS =
|
||||||
|
lists:map(fun(I) ->
|
||||||
|
{I, <<0:160/integer, 0:32/integer>>}
|
||||||
|
end,
|
||||||
|
lists:seq(1, 8)),
|
||||||
|
HeaderTS = <<0:160/integer, Now:32/integer, 0:32/integer>>,
|
||||||
|
HeaderNoTS = <<0:192>>,
|
||||||
|
BIC = array:new([{size, 8}, {default, none}]),
|
||||||
|
{BIC0, undefined} =
|
||||||
|
update_blockindex_cache(false, EntriesNoTS, BIC, undefined, false),
|
||||||
|
{BIC1, undefined} =
|
||||||
|
update_blockindex_cache(false, EntriesTS, BIC, undefined, true),
|
||||||
|
{BIC2, undefined} =
|
||||||
|
update_blockindex_cache(true, EntriesNoTS, BIC, undefined, false),
|
||||||
|
{BIC3, LMD3} =
|
||||||
|
update_blockindex_cache(true, EntriesTS, BIC, undefined, true),
|
||||||
|
|
||||||
|
?assertMatch(none, array:get(0, BIC0)),
|
||||||
|
?assertMatch(none, array:get(0, BIC1)),
|
||||||
|
?assertMatch(HeaderNoTS, array:get(0, BIC2)),
|
||||||
|
?assertMatch(HeaderTS, array:get(0, BIC3)),
|
||||||
|
?assertMatch(Now, LMD3).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
receive_fun() ->
|
receive_fun() ->
|
||||||
receive
|
receive
|
||||||
|
|
|
@ -258,6 +258,13 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
{ok, Bookie1A} = leveled_bookie:book_start(StartOpts1A),
|
{ok, Bookie1A} = leveled_bookie:book_start(StartOpts1A),
|
||||||
{ok, Bookie1B} = leveled_bookie:book_start(StartOpts1B),
|
{ok, Bookie1B} = leveled_bookie:book_start(StartOpts1B),
|
||||||
|
|
||||||
|
ObjList0 =
|
||||||
|
testutil:generate_objects(100000,
|
||||||
|
{fixed_binary, 1}, [],
|
||||||
|
leveled_rand:rand_bytes(32),
|
||||||
|
fun() -> [] end,
|
||||||
|
<<"BaselineB">>),
|
||||||
|
|
||||||
ObjL1StartTS = testutil:convert_to_seconds(os:timestamp()),
|
ObjL1StartTS = testutil:convert_to_seconds(os:timestamp()),
|
||||||
ObjList1 =
|
ObjList1 =
|
||||||
testutil:generate_objects(20000,
|
testutil:generate_objects(20000,
|
||||||
|
@ -331,6 +338,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
testutil:riakload(Bookie1A, ObjList4),
|
testutil:riakload(Bookie1A, ObjList4),
|
||||||
testutil:riakload(Bookie1A, ObjList6),
|
testutil:riakload(Bookie1A, ObjList6),
|
||||||
|
|
||||||
|
testutil:riakload(Bookie1B, ObjList0),
|
||||||
testutil:riakload(Bookie1B, ObjList4),
|
testutil:riakload(Bookie1B, ObjList4),
|
||||||
testutil:riakload(Bookie1B, ObjList5),
|
testutil:riakload(Bookie1B, ObjList5),
|
||||||
testutil:riakload(Bookie1B, ObjList1),
|
testutil:riakload(Bookie1B, ObjList1),
|
||||||
|
@ -412,7 +420,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
fun(_B, K, V, {LK, AccC}) ->
|
fun(_B, K, V, {LK, AccC}) ->
|
||||||
% Value is proxy_object? Can we get the metadata and
|
% Value is proxy_object? Can we get the metadata and
|
||||||
% read the last modified date? The do a non-accelerated
|
% read the last modified date? The do a non-accelerated
|
||||||
% fold to chekc that it is slower
|
% fold to check that it is slower
|
||||||
{proxy_object, MDBin, _Size, _Fetcher} = binary_to_term(V),
|
{proxy_object, MDBin, _Size, _Fetcher} = binary_to_term(V),
|
||||||
LMDTS = testutil:get_lastmodified(MDBin),
|
LMDTS = testutil:get_lastmodified(MDBin),
|
||||||
LMD = testutil:convert_to_seconds(LMDTS),
|
LMD = testutil:convert_to_seconds(LMDTS),
|
||||||
|
@ -458,13 +466,20 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
true = NoFilterTime > PlusFilterTime,
|
true = NoFilterTime > PlusFilterTime,
|
||||||
|
|
||||||
SimpleCountFun =
|
SimpleCountFun =
|
||||||
fun(_B, _K, _V, AccC) -> AccC + 1 end,
|
fun(BucketList) ->
|
||||||
|
fun(B, _K, _V, AccC) ->
|
||||||
|
case lists:member(B, BucketList) of
|
||||||
|
true -> AccC + 1;
|
||||||
|
false -> AccC
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
{async, R4A_MultiBucketRunner} =
|
{async, R4A_MultiBucketRunner} =
|
||||||
leveled_bookie:book_headfold(Bookie1A,
|
leveled_bookie:book_headfold(Bookie1A,
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
{bucket_list, [<<"B0">>, <<"B2">>]},
|
{bucket_list, [<<"B0">>, <<"B2">>]},
|
||||||
{SimpleCountFun, 0},
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -482,7 +497,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
{bucket_list, [<<"B2">>, <<"B0">>]},
|
{bucket_list, [<<"B2">>, <<"B0">>]},
|
||||||
% Reverse the buckets in the bucket
|
% Reverse the buckets in the bucket
|
||||||
% list
|
% list
|
||||||
{SimpleCountFun, 0},
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -495,10 +510,10 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
|
|
||||||
{async, R5B_MultiBucketRunner} =
|
{async, R5B_MultiBucketRunner} =
|
||||||
leveled_bookie:book_headfold(Bookie1B,
|
leveled_bookie:book_headfold(Bookie1B,
|
||||||
% Same query - other bookie
|
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
{bucket_list, [<<"B2">>, <<"B0">>]},
|
{bucket_list,
|
||||||
{SimpleCountFun, 0},
|
[<<"BaselineB">>, <<"B2">>, <<"B0">>]},
|
||||||
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -506,7 +521,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
false),
|
false),
|
||||||
R5B_MultiBucket = R5B_MultiBucketRunner(),
|
R5B_MultiBucket = R5B_MultiBucketRunner(),
|
||||||
io:format("R5B_MultiBucket ~w ~n", [R5B_MultiBucket]),
|
io:format("R5B_MultiBucket ~w ~n", [R5B_MultiBucket]),
|
||||||
true = R5A_MultiBucket == 37000,
|
true = R5B_MultiBucket == 37000,
|
||||||
|
|
||||||
testutil:update_some_objects(Bookie1A, ObjList1, 1000),
|
testutil:update_some_objects(Bookie1A, ObjList1, 1000),
|
||||||
R6A_PlusFilter = lists:foldl(FoldRangesFun(Bookie1A,
|
R6A_PlusFilter = lists:foldl(FoldRangesFun(Bookie1A,
|
||||||
|
@ -523,7 +538,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
leveled_bookie:book_headfold(Bookie1A,
|
leveled_bookie:book_headfold(Bookie1A,
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
{bucket_list, [<<"B1">>, <<"B2">>]},
|
{bucket_list, [<<"B1">>, <<"B2">>]},
|
||||||
{SimpleCountFun, 0},
|
{SimpleCountFun([<<"B1">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -537,7 +552,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
leveled_bookie:book_headfold(Bookie1A,
|
leveled_bookie:book_headfold(Bookie1A,
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
{bucket_list, [<<"B1">>, <<"B2">>]},
|
{bucket_list, [<<"B1">>, <<"B2">>]},
|
||||||
{SimpleCountFun, 0},
|
{SimpleCountFun([<<"B1">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -547,8 +562,39 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
io:format("R8A_MultiBucket ~w ~n", [R8A_MultiBucket]),
|
io:format("R8A_MultiBucket ~w ~n", [R8A_MultiBucket]),
|
||||||
true = R8A_MultiBucket == {0, 5000},
|
true = R8A_MultiBucket == {0, 5000},
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie1B),
|
||||||
|
io:format("Double query to generate index cache and use~n"),
|
||||||
|
{ok, Bookie1BS} = leveled_bookie:book_start(StartOpts1B),
|
||||||
|
{async, R5B_MultiBucketRunner0} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
all,
|
||||||
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL4StartTS, ObjL6EndTS},
|
||||||
|
false),
|
||||||
|
R5B_MultiBucket0 = R5B_MultiBucketRunner0(),
|
||||||
|
io:format("R5B_MultiBucket ~w ~n", [R5B_MultiBucket0]),
|
||||||
|
true = R5B_MultiBucket0 == 37000,
|
||||||
|
{async, R5B_MultiBucketRunner1} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
all,
|
||||||
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL4StartTS, ObjL6EndTS},
|
||||||
|
false),
|
||||||
|
R5B_MultiBucket1 = R5B_MultiBucketRunner1(),
|
||||||
|
io:format("R5B_MultiBucket ~w ~n", [R5B_MultiBucket1]),
|
||||||
|
true = R5B_MultiBucket1 == 37000,
|
||||||
|
|
||||||
|
|
||||||
ok = leveled_bookie:book_destroy(Bookie1A),
|
ok = leveled_bookie:book_destroy(Bookie1A),
|
||||||
ok = leveled_bookie:book_destroy(Bookie1B).
|
ok = leveled_bookie:book_destroy(Bookie1BS).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue