Merge pull request #320 from martinsumner/mas-i319-cachescores
Allow for caching of compaction scores
This commit is contained in:
commit
3b305e0adb
11 changed files with 642 additions and 173 deletions
|
@ -106,8 +106,9 @@ The `compaction_runs_perday` indicates for the leveled store how many times eahc
|
||||||
|
|
||||||
The `compaction_low_hour` and `compaction_high_hour` are the hours of the day which support the compaction window - set to 0 and 23 respectively if compaction is required to be a continuous process.
|
The `compaction_low_hour` and `compaction_high_hour` are the hours of the day which support the compaction window - set to 0 and 23 respectively if compaction is required to be a continuous process.
|
||||||
|
|
||||||
The `max_run_length` controls how many files can be compacted in a single compaction run. The scoring of files and runs is controlled through `maxrunlength_compactionpercentage` and `singlefile_compactionpercentage`.
|
The `max_run_length` controls how many files can be compacted in a single compaction run. The scoring of files and runs is controlled through `maxrunlength_compactionpercentage` and `singlefile_compactionpercentage`. The `singlefile_compactionpercentage` is an acceptable compaction score for a file to be eligible for compaction on its own, where as the `maxrunlength_compactionpercentage` is the score required for a run of the `max_run_length` to be considered eligible. The higher the `maxrunlength_compactionpercentage` and the lower the `singlefile_compactionpercentage` - the more likely a longer run will be chosen over a shorter run.
|
||||||
|
|
||||||
|
The `journalcompaction_scoreonein` option controls how frequently a file will be scored. If this is set to one, then each and every file will be scored each and every compaction run. If this is set to an integer greater than one ('n'), then on average any given file will only be score on one in 'n' runs. On other runs. a cached score for the file will be used. On startup all files will be scored on the first run. As journals get very large, and where frequent comapction is required due to mutating objects, this can save significant resource. In Riak, this option is controlled via `leveled.compaction_scores_perday`, with the number of `leveled.compaction_runs_perday` being divided by this to produce the `journalcompaction_scoreonein`. By default each file will only be scored once per day.
|
||||||
|
|
||||||
## Snapshot Timeouts
|
## Snapshot Timeouts
|
||||||
|
|
||||||
|
|
|
@ -69,6 +69,7 @@
|
||||||
max_run_length,
|
max_run_length,
|
||||||
singlefile_compactionperc :: float()|undefined,
|
singlefile_compactionperc :: float()|undefined,
|
||||||
maxrunlength_compactionperc :: float()|undefined,
|
maxrunlength_compactionperc :: float()|undefined,
|
||||||
|
score_onein = 1 :: pos_integer(),
|
||||||
snaptimeout_long :: pos_integer() | undefined}).
|
snaptimeout_long :: pos_integer() | undefined}).
|
||||||
|
|
||||||
-record(penciller_options,
|
-record(penciller_options,
|
||||||
|
@ -94,4 +95,5 @@
|
||||||
compression_method = native :: lz4|native,
|
compression_method = native :: lz4|native,
|
||||||
singlefile_compactionperc :: float()|undefined,
|
singlefile_compactionperc :: float()|undefined,
|
||||||
maxrunlength_compactionperc :: float()|undefined,
|
maxrunlength_compactionperc :: float()|undefined,
|
||||||
|
score_onein = 1 :: pos_integer(),
|
||||||
reload_strategy = [] :: list()}).
|
reload_strategy = [] :: list()}).
|
||||||
|
|
|
@ -100,6 +100,12 @@
|
||||||
{datatype, integer}
|
{datatype, integer}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @doc The number of times per day to score an individual file for compaction
|
||||||
|
{mapping, "leveled.compaction_scores_perday", "leveled.compaction_scores_perday", [
|
||||||
|
{default, 1},
|
||||||
|
{datatype, integer}
|
||||||
|
]}.
|
||||||
|
|
||||||
%% @doc Compaction Low Hour
|
%% @doc Compaction Low Hour
|
||||||
%% The hour of the day in which journal compaction can start. Use Low hour
|
%% The hour of the day in which journal compaction can start. Use Low hour
|
||||||
%% of 0 and High hour of 23 to have no compaction window (i.e. always compact
|
%% of 0 and High hour of 23 to have no compaction window (i.e. always compact
|
||||||
|
@ -140,10 +146,10 @@
|
||||||
%% @doc Target Percentage for Single File
|
%% @doc Target Percentage for Single File
|
||||||
%% What is the target score for a run of a single file, to qualify for
|
%% What is the target score for a run of a single file, to qualify for
|
||||||
%% compaction. If less than this percentage would be retained after compaction
|
%% compaction. If less than this percentage would be retained after compaction
|
||||||
%% then it is a candidate (e.g. in default case if 50% of space would be
|
%% then it is a candidate (e.g. in default case if 70% of space would be
|
||||||
%% recovered)
|
%% recovered)
|
||||||
{mapping, "leveled.singlefile_compactionpercentage", "leveled.singlefile_compactionpercentage", [
|
{mapping, "leveled.singlefile_compactionpercentage", "leveled.singlefile_compactionpercentage", [
|
||||||
{default, 50.0},
|
{default, 30.0},
|
||||||
{datatype, float},
|
{datatype, float},
|
||||||
hidden
|
hidden
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -140,8 +140,9 @@
|
||||||
{head_only, false},
|
{head_only, false},
|
||||||
{waste_retention_period, undefined},
|
{waste_retention_period, undefined},
|
||||||
{max_run_length, undefined},
|
{max_run_length, undefined},
|
||||||
{singlefile_compactionpercentage, 50.0},
|
{singlefile_compactionpercentage, 30.0},
|
||||||
{maxrunlength_compactionpercentage, 70.0},
|
{maxrunlength_compactionpercentage, 70.0},
|
||||||
|
{journalcompaction_scoreonein, 1},
|
||||||
{reload_strategy, []},
|
{reload_strategy, []},
|
||||||
{max_pencillercachesize, ?MAX_PCL_CACHE_SIZE},
|
{max_pencillercachesize, ?MAX_PCL_CACHE_SIZE},
|
||||||
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
|
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
|
||||||
|
@ -292,6 +293,11 @@
|
||||||
% a run of max_run_length, before that run can be a compaction
|
% a run of max_run_length, before that run can be a compaction
|
||||||
% candidate. For runs between 1 and max_run_length, a
|
% candidate. For runs between 1 and max_run_length, a
|
||||||
% proportionate score is calculated
|
% proportionate score is calculated
|
||||||
|
{journalcompaction_scoreonein, pos_integer()} |
|
||||||
|
% When scoring for compaction run a probability (1 in x) of whether
|
||||||
|
% any file will be scored this run. If not scored a cached score
|
||||||
|
% will be used, and the cached score is the average of the latest
|
||||||
|
% score and the rolling average of previous scores
|
||||||
{reload_strategy, list()} |
|
{reload_strategy, list()} |
|
||||||
% The reload_strategy is exposed as an option as currently no firm
|
% The reload_strategy is exposed as an option as currently no firm
|
||||||
% decision has been made about how recovery from failure should
|
% decision has been made about how recovery from failure should
|
||||||
|
@ -1757,6 +1763,8 @@ set_options(Opts) ->
|
||||||
|
|
||||||
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
|
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
|
||||||
|
|
||||||
|
ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),
|
||||||
|
|
||||||
{#inker_options{root_path = JournalFP,
|
{#inker_options{root_path = JournalFP,
|
||||||
reload_strategy = ReloadStrategy,
|
reload_strategy = ReloadStrategy,
|
||||||
max_run_length = proplists:get_value(max_run_length, Opts),
|
max_run_length = proplists:get_value(max_run_length, Opts),
|
||||||
|
@ -1766,6 +1774,7 @@ set_options(Opts) ->
|
||||||
snaptimeout_long = SnapTimeoutLong,
|
snaptimeout_long = SnapTimeoutLong,
|
||||||
compression_method = CompressionMethod,
|
compression_method = CompressionMethod,
|
||||||
compress_on_receipt = CompressOnReceipt,
|
compress_on_receipt = CompressOnReceipt,
|
||||||
|
score_onein = ScoreOneIn,
|
||||||
cdb_options =
|
cdb_options =
|
||||||
#cdb_options{max_size=MaxJournalSize,
|
#cdb_options{max_size=MaxJournalSize,
|
||||||
max_count=MaxJournalCount,
|
max_count=MaxJournalCount,
|
||||||
|
|
|
@ -113,7 +113,9 @@
|
||||||
cdb_deletepending/1,
|
cdb_deletepending/1,
|
||||||
cdb_deletepending/3,
|
cdb_deletepending/3,
|
||||||
cdb_isrolling/1,
|
cdb_isrolling/1,
|
||||||
cdb_clerkcomplete/1]).
|
cdb_clerkcomplete/1,
|
||||||
|
cdb_getcachedscore/2,
|
||||||
|
cdb_putcachedscore/2]).
|
||||||
|
|
||||||
-export([finished_rolling/1,
|
-export([finished_rolling/1,
|
||||||
hashtable_calc/2]).
|
hashtable_calc/2]).
|
||||||
|
@ -133,6 +135,8 @@
|
||||||
-define(GETPOS_FACTOR, 8).
|
-define(GETPOS_FACTOR, 8).
|
||||||
-define(MAX_OBJECT_SIZE, 1000000000).
|
-define(MAX_OBJECT_SIZE, 1000000000).
|
||||||
% 1GB but really should be much smaller than this
|
% 1GB but really should be much smaller than this
|
||||||
|
-define(MEGA, 1000000).
|
||||||
|
-define(CACHE_LIFE, 86400).
|
||||||
|
|
||||||
-record(state, {hashtree,
|
-record(state, {hashtree,
|
||||||
last_position :: integer() | undefined,
|
last_position :: integer() | undefined,
|
||||||
|
@ -152,7 +156,8 @@
|
||||||
timings = no_timing :: cdb_timings(),
|
timings = no_timing :: cdb_timings(),
|
||||||
timings_countdown = 0 :: integer(),
|
timings_countdown = 0 :: integer(),
|
||||||
log_options = leveled_log:get_opts()
|
log_options = leveled_log:get_opts()
|
||||||
:: leveled_log:log_options()}).
|
:: leveled_log:log_options(),
|
||||||
|
cached_score :: {float(), erlang:timestamp()}|undefined}).
|
||||||
|
|
||||||
-record(cdb_timings, {sample_count = 0 :: integer(),
|
-record(cdb_timings, {sample_count = 0 :: integer(),
|
||||||
sample_cyclecount = 0 :: integer(),
|
sample_cyclecount = 0 :: integer(),
|
||||||
|
@ -164,6 +169,9 @@
|
||||||
-type cdb_timings() :: no_timing|#cdb_timings{}.
|
-type cdb_timings() :: no_timing|#cdb_timings{}.
|
||||||
-type hashtable_index() :: tuple().
|
-type hashtable_index() :: tuple().
|
||||||
-type file_location() :: integer()|eof.
|
-type file_location() :: integer()|eof.
|
||||||
|
-type filter_fun() ::
|
||||||
|
fun((any(), binary(), integer(), any(), fun((binary()) -> any())) ->
|
||||||
|
{stop|loop, any()}).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -369,7 +377,7 @@ cdb_deletepending(Pid) ->
|
||||||
cdb_deletepending(Pid, ManSQN, Inker) ->
|
cdb_deletepending(Pid, ManSQN, Inker) ->
|
||||||
gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}).
|
gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}).
|
||||||
|
|
||||||
-spec cdb_scan(pid(), fun(), any(), integer()|undefined) ->
|
-spec cdb_scan(pid(), filter_fun(), any(), integer()|undefined) ->
|
||||||
{integer()|eof, any()}.
|
{integer()|eof, any()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to
|
%% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to
|
||||||
|
@ -424,6 +432,20 @@ cdb_isrolling(Pid) ->
|
||||||
cdb_clerkcomplete(Pid) ->
|
cdb_clerkcomplete(Pid) ->
|
||||||
gen_fsm:send_all_state_event(Pid, clerk_complete).
|
gen_fsm:send_all_state_event(Pid, clerk_complete).
|
||||||
|
|
||||||
|
-spec cdb_getcachedscore(pid(), erlang:timestamp()) -> undefined|float().
|
||||||
|
%% @doc
|
||||||
|
%% Return the cached score for a CDB file
|
||||||
|
cdb_getcachedscore(Pid, Now) ->
|
||||||
|
gen_fsm:sync_send_all_state_event(Pid, {get_cachedscore, Now}, infinity).
|
||||||
|
|
||||||
|
|
||||||
|
-spec cdb_putcachedscore(pid(), float()) -> ok.
|
||||||
|
%% @doc
|
||||||
|
%% Return the cached score for a CDB file
|
||||||
|
cdb_putcachedscore(Pid, Score) ->
|
||||||
|
gen_fsm:sync_send_all_state_event(Pid, {put_cachedscore, Score}, infinity).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
@ -829,6 +851,24 @@ handle_sync_event(cdb_filename, _From, StateName, State) ->
|
||||||
{reply, State#state.filename, StateName, State};
|
{reply, State#state.filename, StateName, State};
|
||||||
handle_sync_event(cdb_isrolling, _From, StateName, State) ->
|
handle_sync_event(cdb_isrolling, _From, StateName, State) ->
|
||||||
{reply, StateName == rolling, StateName, State};
|
{reply, StateName == rolling, StateName, State};
|
||||||
|
handle_sync_event({get_cachedscore, {NowMega, NowSecs, _}},
|
||||||
|
_From, StateName, State) ->
|
||||||
|
ScoreToReturn =
|
||||||
|
case State#state.cached_score of
|
||||||
|
undefined ->
|
||||||
|
undefined;
|
||||||
|
{Score, {CacheMega, CacheSecs, _}} ->
|
||||||
|
case (NowMega * ?MEGA + NowSecs) >
|
||||||
|
(CacheMega * ?MEGA + CacheSecs + ?CACHE_LIFE) of
|
||||||
|
true ->
|
||||||
|
undefined;
|
||||||
|
false ->
|
||||||
|
Score
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{reply, ScoreToReturn, StateName, State};
|
||||||
|
handle_sync_event({put_cachedscore, Score}, _From, StateName, State) ->
|
||||||
|
{reply, ok, StateName, State#state{cached_score = {Score,os:timestamp()}}};
|
||||||
handle_sync_event(cdb_close, _From, delete_pending, State) ->
|
handle_sync_event(cdb_close, _From, delete_pending, State) ->
|
||||||
leveled_log:log("CDB05",
|
leveled_log:log("CDB05",
|
||||||
[State#state.filename, delete_pending, cdb_close]),
|
[State#state.filename, delete_pending, cdb_close]),
|
||||||
|
@ -836,8 +876,7 @@ handle_sync_event(cdb_close, _From, delete_pending, State) ->
|
||||||
State#state.filename,
|
State#state.filename,
|
||||||
State#state.waste_path),
|
State#state.waste_path),
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_sync_event(cdb_close, _From, StateName, State) ->
|
handle_sync_event(cdb_close, _From, _StateName, State) ->
|
||||||
leveled_log:log("CDB05", [State#state.filename, StateName, cdb_close]),
|
|
||||||
file:close(State#state.handle),
|
file:close(State#state.handle),
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
|
@ -2396,6 +2435,16 @@ get_keys_byposition_manykeys_test_to() ->
|
||||||
SampleList3 = cdb_getpositions(P2, KeyCount + 1),
|
SampleList3 = cdb_getpositions(P2, KeyCount + 1),
|
||||||
?assertMatch(KeyCount, length(SampleList3)),
|
?assertMatch(KeyCount, length(SampleList3)),
|
||||||
|
|
||||||
|
?assertMatch(undefined, cdb_getcachedscore(P2, os:timestamp())),
|
||||||
|
ok = cdb_putcachedscore(P2, 80.0),
|
||||||
|
?assertMatch(80.0, cdb_getcachedscore(P2, os:timestamp())),
|
||||||
|
timer:sleep(1000),
|
||||||
|
{NowMega, NowSecs, _} = Now = os:timestamp(),
|
||||||
|
?assertMatch(80.0, cdb_getcachedscore(P2, Now)),
|
||||||
|
FutureEpoch = NowMega * ?MEGA + NowSecs + ?CACHE_LIFE,
|
||||||
|
Future = {FutureEpoch div ?MEGA, FutureEpoch rem ?MEGA, 0},
|
||||||
|
?assertMatch(undefined, cdb_getcachedscore(P2, Future)),
|
||||||
|
|
||||||
ok = cdb_close(P2),
|
ok = cdb_close(P2),
|
||||||
ok = file:delete(F2).
|
ok = file:delete(F2).
|
||||||
|
|
||||||
|
|
|
@ -97,7 +97,7 @@
|
||||||
|
|
||||||
-define(JOURNAL_FILEX, "cdb").
|
-define(JOURNAL_FILEX, "cdb").
|
||||||
-define(PENDING_FILEX, "pnd").
|
-define(PENDING_FILEX, "pnd").
|
||||||
-define(SAMPLE_SIZE, 100).
|
-define(SAMPLE_SIZE, 192).
|
||||||
-define(BATCH_SIZE, 32).
|
-define(BATCH_SIZE, 32).
|
||||||
-define(BATCHES_TO_CHECK, 8).
|
-define(BATCHES_TO_CHECK, 8).
|
||||||
-define(CRC_SIZE, 4).
|
-define(CRC_SIZE, 4).
|
||||||
|
@ -117,17 +117,18 @@
|
||||||
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
|
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
|
||||||
compression_method = native :: lz4|native,
|
compression_method = native :: lz4|native,
|
||||||
scored_files = [] :: list(candidate()),
|
scored_files = [] :: list(candidate()),
|
||||||
scoring_state :: scoring_state()|undefined}).
|
scoring_state :: scoring_state()|undefined,
|
||||||
|
score_onein = 1 :: pos_integer()}).
|
||||||
|
|
||||||
-record(candidate, {low_sqn :: integer() | undefined,
|
-record(candidate, {low_sqn :: integer() | undefined,
|
||||||
filename :: string() | undefined,
|
filename :: string() | undefined,
|
||||||
journal :: pid() | undefined,
|
journal :: pid() | undefined,
|
||||||
compaction_perc :: float() | undefined}).
|
compaction_perc :: float() | undefined}).
|
||||||
|
|
||||||
-record(scoring_state, {filter_fun :: fun(),
|
-record(scoring_state, {filter_fun :: leveled_inker:filterfun(),
|
||||||
filter_server :: pid(),
|
filter_server :: leveled_inker:filterserver(),
|
||||||
max_sqn :: non_neg_integer(),
|
max_sqn :: non_neg_integer(),
|
||||||
close_fun :: fun(),
|
close_fun :: leveled_inker:filterclosefun(),
|
||||||
start_time :: erlang:timestamp()}).
|
start_time :: erlang:timestamp()}).
|
||||||
|
|
||||||
-type iclerk_options() :: #iclerk_options{}.
|
-type iclerk_options() :: #iclerk_options{}.
|
||||||
|
@ -165,8 +166,11 @@
|
||||||
clerk_new(InkerClerkOpts) ->
|
clerk_new(InkerClerkOpts) ->
|
||||||
gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerClerkOpts], []).
|
gen_server:start_link(?MODULE, [leveled_log:get_opts(), InkerClerkOpts], []).
|
||||||
|
|
||||||
-spec clerk_compact(pid(), pid(),
|
-spec clerk_compact(pid(),
|
||||||
fun(), fun(), fun(),
|
pid(),
|
||||||
|
leveled_inker:filterinitfun(),
|
||||||
|
leveled_inker:filterclosefun(),
|
||||||
|
leveled_inker:filterfun(),
|
||||||
list()) -> ok.
|
list()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Trigger a compaction for this clerk if the threshold of data recovery has
|
%% Trigger a compaction for this clerk if the threshold of data recovery has
|
||||||
|
@ -280,7 +284,10 @@ init([LogOpts, IClerkOpts]) ->
|
||||||
singlefile_compactionperc = SFL_CompPerc,
|
singlefile_compactionperc = SFL_CompPerc,
|
||||||
maxrunlength_compactionperc = MRL_CompPerc,
|
maxrunlength_compactionperc = MRL_CompPerc,
|
||||||
compression_method =
|
compression_method =
|
||||||
IClerkOpts#iclerk_options.compression_method}}.
|
IClerkOpts#iclerk_options.compression_method,
|
||||||
|
score_onein =
|
||||||
|
IClerkOpts#iclerk_options.score_onein
|
||||||
|
}}.
|
||||||
|
|
||||||
handle_call(stop, _From, State) ->
|
handle_call(stop, _From, State) ->
|
||||||
case State#state.scoring_state of
|
case State#state.scoring_state of
|
||||||
|
@ -325,13 +332,38 @@ handle_cast({score_filelist, [Entry|Tail]}, State) ->
|
||||||
Candidates = State#state.scored_files,
|
Candidates = State#state.scored_files,
|
||||||
{LowSQN, FN, JournalP, _LK} = Entry,
|
{LowSQN, FN, JournalP, _LK} = Entry,
|
||||||
ScoringState = State#state.scoring_state,
|
ScoringState = State#state.scoring_state,
|
||||||
CpctPerc = check_single_file(JournalP,
|
CpctPerc =
|
||||||
|
case {leveled_cdb:cdb_getcachedscore(JournalP, os:timestamp()),
|
||||||
|
leveled_rand:uniform(State#state.score_onein) == 1,
|
||||||
|
State#state.score_onein} of
|
||||||
|
{CachedScore, _UseNewScore, ScoreOneIn}
|
||||||
|
when CachedScore == undefined; ScoreOneIn == 1 ->
|
||||||
|
% If caches are not used, always use the current score
|
||||||
|
check_single_file(JournalP,
|
||||||
|
ScoringState#scoring_state.filter_fun,
|
||||||
|
ScoringState#scoring_state.filter_server,
|
||||||
|
ScoringState#scoring_state.max_sqn,
|
||||||
|
?SAMPLE_SIZE,
|
||||||
|
?BATCH_SIZE,
|
||||||
|
State#state.reload_strategy);
|
||||||
|
{CachedScore, true, _ScoreOneIn} ->
|
||||||
|
% If caches are used roll the score towards the current score
|
||||||
|
% Expectation is that this will reduce instances of individual
|
||||||
|
% files being compacted when a run is missed due to cached
|
||||||
|
% scores being used in surrounding journals
|
||||||
|
NewScore =
|
||||||
|
check_single_file(JournalP,
|
||||||
ScoringState#scoring_state.filter_fun,
|
ScoringState#scoring_state.filter_fun,
|
||||||
ScoringState#scoring_state.filter_server,
|
ScoringState#scoring_state.filter_server,
|
||||||
ScoringState#scoring_state.max_sqn,
|
ScoringState#scoring_state.max_sqn,
|
||||||
?SAMPLE_SIZE,
|
?SAMPLE_SIZE,
|
||||||
?BATCH_SIZE,
|
?BATCH_SIZE,
|
||||||
State#state.reload_strategy),
|
State#state.reload_strategy),
|
||||||
|
(NewScore + CachedScore) / 2;
|
||||||
|
{CachedScore, false, _ScoreOneIn} ->
|
||||||
|
CachedScore
|
||||||
|
end,
|
||||||
|
ok = leveled_cdb:cdb_putcachedscore(JournalP, CpctPerc),
|
||||||
Candidate =
|
Candidate =
|
||||||
#candidate{low_sqn = LowSQN,
|
#candidate{low_sqn = LowSQN,
|
||||||
filename = FN,
|
filename = FN,
|
||||||
|
@ -509,7 +541,10 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
-spec check_single_file(pid(), fun(), any(), non_neg_integer(),
|
-spec check_single_file(pid(),
|
||||||
|
leveled_inker:filterfun(),
|
||||||
|
leveled_inker:filterserver(),
|
||||||
|
leveled_codec:sqn(),
|
||||||
non_neg_integer(), non_neg_integer(),
|
non_neg_integer(), non_neg_integer(),
|
||||||
leveled_codec:compaction_strategy()) ->
|
leveled_codec:compaction_strategy()) ->
|
||||||
float().
|
float().
|
||||||
|
@ -549,44 +584,31 @@ safely_log_filescore(PositionList, FN, Score, SW) ->
|
||||||
leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW).
|
leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW).
|
||||||
|
|
||||||
-spec size_comparison_score(list(key_size() | corrupted_test_key_size()),
|
-spec size_comparison_score(list(key_size() | corrupted_test_key_size()),
|
||||||
fun(),
|
leveled_inker:filterfun(),
|
||||||
any(),
|
leveled_inker:filterserver(),
|
||||||
non_neg_integer(),
|
leveled_codec:sqn(),
|
||||||
leveled_codec:compaction_strategy()) ->
|
leveled_codec:compaction_strategy()) ->
|
||||||
float().
|
float().
|
||||||
size_comparison_score(KeySizeList,
|
size_comparison_score(KeySizeList,
|
||||||
FilterFun, FilterServer, MaxSQN,
|
FilterFun, FilterServer, MaxSQN,
|
||||||
RS) ->
|
ReloadStrategy) ->
|
||||||
FoldFunForSizeCompare =
|
FoldFunForSizeCompare =
|
||||||
fun(KS, {ActSize, RplSize}) ->
|
fun(KS, {ActSize, RplSize}) ->
|
||||||
case KS of
|
case KS of
|
||||||
{{SQN, Type, PK}, Size} ->
|
{{SQN, Type, PK}, Size} ->
|
||||||
IsJournalEntry =
|
ToRetain =
|
||||||
leveled_codec:is_full_journalentry({SQN, Type, PK}),
|
to_retain({SQN, Type, PK},
|
||||||
case IsJournalEntry of
|
FilterFun,
|
||||||
false ->
|
FilterServer,
|
||||||
TS = leveled_codec:get_tagstrategy(PK, RS),
|
MaxSQN,
|
||||||
% If the strategy is to retain key deltas, then
|
ReloadStrategy),
|
||||||
% scoring must reflect that. Key deltas are
|
case ToRetain of
|
||||||
% possible even if strategy does not allow as
|
|
||||||
% there is support for changing strategy from
|
|
||||||
% retain to recalc
|
|
||||||
case TS of
|
|
||||||
retain ->
|
|
||||||
{ActSize + Size - ?CRC_SIZE, RplSize};
|
|
||||||
_ ->
|
|
||||||
{ActSize, RplSize + Size - ?CRC_SIZE}
|
|
||||||
end;
|
|
||||||
true ->
|
true ->
|
||||||
Check = FilterFun(FilterServer, PK, SQN),
|
{ActSize + Size - ?CRC_SIZE, RplSize};
|
||||||
case {Check, SQN > MaxSQN} of
|
convert ->
|
||||||
{current, _} ->
|
{ActSize, RplSize + Size - ?CRC_SIZE};
|
||||||
{ActSize + Size - ?CRC_SIZE, RplSize};
|
false ->
|
||||||
{_, true} ->
|
{ActSize, RplSize + Size - ?CRC_SIZE}
|
||||||
{ActSize + Size - ?CRC_SIZE, RplSize};
|
|
||||||
_ ->
|
|
||||||
{ActSize, RplSize + Size - ?CRC_SIZE}
|
|
||||||
end
|
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
% There is a key which is not in expected format
|
% There is a key which is not in expected format
|
||||||
|
@ -810,53 +832,71 @@ split_positions_into_batches(Positions, Journal, Batches) ->
|
||||||
%% if it contains index entries. The hot_backup approach is also not safe with
|
%% if it contains index entries. The hot_backup approach is also not safe with
|
||||||
%% a `recovr` strategy. The recovr strategy assumes faults in the ledger will
|
%% a `recovr` strategy. The recovr strategy assumes faults in the ledger will
|
||||||
%% be resolved via application-level anti-entropy
|
%% be resolved via application-level anti-entropy
|
||||||
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, Strategy) ->
|
||||||
FoldFun =
|
FoldFun =
|
||||||
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy),
|
filter_output_fun(FilterFun, FilterServer, MaxSQN, Strategy),
|
||||||
lists:reverse(lists:foldl(FoldFun, [], KVCs)).
|
lists:reverse(lists:foldl(FoldFun, [], KVCs)).
|
||||||
|
|
||||||
|
|
||||||
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
filter_output_fun(FilterFun, FilterServer, MaxSQN, Strategy) ->
|
||||||
fun(KVC0, Acc) ->
|
fun(KVC0, Acc) ->
|
||||||
case KVC0 of
|
case KVC0 of
|
||||||
{_InkKey, crc_wonky, false} ->
|
{_InkKey, crc_wonky, false} ->
|
||||||
% Bad entry, disregard, don't check
|
% Bad entry, disregard, don't check
|
||||||
Acc;
|
Acc;
|
||||||
{JK, JV, _Check} ->
|
{JK, JV, _Check} ->
|
||||||
{SQN, LK} =
|
ToRetain =
|
||||||
leveled_codec:from_journalkey(JK),
|
to_retain(JK, FilterFun, FilterServer, MaxSQN, Strategy),
|
||||||
CompactStrategy =
|
case ToRetain of
|
||||||
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
|
true ->
|
||||||
IsJournalEntry =
|
|
||||||
leveled_codec:is_full_journalentry(JK),
|
|
||||||
case {CompactStrategy, IsJournalEntry} of
|
|
||||||
{retain, false} ->
|
|
||||||
[KVC0|Acc];
|
[KVC0|Acc];
|
||||||
_ ->
|
convert ->
|
||||||
KeyCurrent = FilterFun(FilterServer, LK, SQN),
|
{JK0, JV0} =
|
||||||
IsInMemory = SQN > MaxSQN,
|
leveled_codec:revert_to_keydeltas(JK, JV),
|
||||||
case {KeyCurrent, IsInMemory, CompactStrategy} of
|
[{JK0, JV0, null}|Acc];
|
||||||
{KC, InMem, _} when KC == current; InMem ->
|
false ->
|
||||||
% This entry may still be required
|
Acc
|
||||||
% regardless of strategy
|
|
||||||
[KVC0|Acc];
|
|
||||||
{_, _, retain} ->
|
|
||||||
% If we have a retain strategy, it can't be
|
|
||||||
% discarded - but the value part is no
|
|
||||||
% longer required as this version has been
|
|
||||||
% replaced
|
|
||||||
{JK0, JV0} =
|
|
||||||
leveled_codec:revert_to_keydeltas(JK, JV),
|
|
||||||
[{JK0, JV0, null}|Acc];
|
|
||||||
{_, _, _} ->
|
|
||||||
% This is out of date and not retained so
|
|
||||||
% discard
|
|
||||||
Acc
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec to_retain(leveled_codec:journal_key(),
|
||||||
|
leveled_inker:filterfun(),
|
||||||
|
leveled_inker:fillter_server(),
|
||||||
|
leveled_codec:sqn(),
|
||||||
|
leveled_codec:compaction_strategy()) -> boolean()|convert.
|
||||||
|
to_retain(JournalKey, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
||||||
|
{SQN, LK} =
|
||||||
|
leveled_codec:from_journalkey(JournalKey),
|
||||||
|
CompactStrategy =
|
||||||
|
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
|
||||||
|
IsJournalEntry =
|
||||||
|
leveled_codec:is_full_journalentry(JournalKey),
|
||||||
|
case {CompactStrategy, IsJournalEntry} of
|
||||||
|
{retain, false} ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
KeyCurrent = FilterFun(FilterServer, LK, SQN),
|
||||||
|
IsInMemory = SQN > MaxSQN,
|
||||||
|
case {KeyCurrent, IsInMemory, CompactStrategy} of
|
||||||
|
{KC, InMem, _} when KC == current; InMem ->
|
||||||
|
% This entry may still be required
|
||||||
|
% regardless of strategy
|
||||||
|
true;
|
||||||
|
{_, _, retain} ->
|
||||||
|
% If we have a retain strategy, it can't be
|
||||||
|
% discarded - but the value part is no
|
||||||
|
% longer required as this version has been
|
||||||
|
% replaced
|
||||||
|
convert;
|
||||||
|
{_, _, _} ->
|
||||||
|
% This is out of date and not retained so
|
||||||
|
% discard
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
|
write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
|
||||||
{Journal0, ManSlice0};
|
{Journal0, ManSlice0};
|
||||||
write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
|
write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
|
||||||
|
|
|
@ -157,6 +157,14 @@
|
||||||
-type inker_options() :: #inker_options{}.
|
-type inker_options() :: #inker_options{}.
|
||||||
-type ink_state() :: #state{}.
|
-type ink_state() :: #state{}.
|
||||||
-type registered_snapshot() :: {pid(), os:timestamp(), integer()}.
|
-type registered_snapshot() :: {pid(), os:timestamp(), integer()}.
|
||||||
|
-type filterserver() :: pid()|list(tuple()).
|
||||||
|
-type filterfun() ::
|
||||||
|
fun((filterserver(), leveled_codec:ledger_key(), leveled_codec:sqn()) ->
|
||||||
|
current|replaced|missing).
|
||||||
|
-type filterclosefun() :: fun((filterserver()) -> ok).
|
||||||
|
-type filterinitfun() :: fun((pid()) -> {filterserver(), leveled_codec:sqn()}).
|
||||||
|
|
||||||
|
-export_type([filterserver/0, filterfun/0, filterclosefun/0, filterinitfun/0]).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -806,6 +814,7 @@ start_from_file(InkOpts) ->
|
||||||
PressMethod = InkOpts#inker_options.compression_method,
|
PressMethod = InkOpts#inker_options.compression_method,
|
||||||
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
|
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
|
||||||
SnapTimeout = InkOpts#inker_options.snaptimeout_long,
|
SnapTimeout = InkOpts#inker_options.snaptimeout_long,
|
||||||
|
ScoreOneIn = InkOpts#inker_options.score_onein,
|
||||||
|
|
||||||
IClerkOpts =
|
IClerkOpts =
|
||||||
#iclerk_options{inker = self(),
|
#iclerk_options{inker = self(),
|
||||||
|
@ -815,7 +824,8 @@ start_from_file(InkOpts) ->
|
||||||
compression_method = PressMethod,
|
compression_method = PressMethod,
|
||||||
max_run_length = MRL,
|
max_run_length = MRL,
|
||||||
singlefile_compactionperc = SFL_CompactPerc,
|
singlefile_compactionperc = SFL_CompactPerc,
|
||||||
maxrunlength_compactionperc = MRL_CompactPerc},
|
maxrunlength_compactionperc = MRL_CompactPerc,
|
||||||
|
score_onein = ScoreOneIn},
|
||||||
|
|
||||||
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
|
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
|
||||||
|
|
||||||
|
|
|
@ -181,6 +181,8 @@
|
||||||
:: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}.
|
:: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}.
|
||||||
-type sst_summary()
|
-type sst_summary()
|
||||||
:: #summary{}.
|
:: #summary{}.
|
||||||
|
-type blockindex_cache()
|
||||||
|
:: any(). % An array but OTP 16 types
|
||||||
|
|
||||||
%% yield_blockquery is used to determine if the work necessary to process a
|
%% yield_blockquery is used to determine if the work necessary to process a
|
||||||
%% range query beyond the fetching the slot should be managed from within
|
%% range query beyond the fetching the slot should be managed from within
|
||||||
|
@ -196,7 +198,7 @@
|
||||||
root_path,
|
root_path,
|
||||||
filename,
|
filename,
|
||||||
yield_blockquery = false :: boolean(),
|
yield_blockquery = false :: boolean(),
|
||||||
blockindex_cache,
|
blockindex_cache :: blockindex_cache()|undefined,
|
||||||
compression_method = native :: press_method(),
|
compression_method = native :: press_method(),
|
||||||
index_moddate = ?INDEX_MODDATE :: boolean(),
|
index_moddate = ?INDEX_MODDATE :: boolean(),
|
||||||
timings = no_timing :: sst_timings(),
|
timings = no_timing :: sst_timings(),
|
||||||
|
@ -207,7 +209,8 @@
|
||||||
deferred_startup_tuple :: tuple()|undefined,
|
deferred_startup_tuple :: tuple()|undefined,
|
||||||
level :: non_neg_integer()|undefined,
|
level :: non_neg_integer()|undefined,
|
||||||
tomb_count = not_counted
|
tomb_count = not_counted
|
||||||
:: non_neg_integer()|not_counted}).
|
:: non_neg_integer()|not_counted,
|
||||||
|
high_modified_date :: non_neg_integer()|undefined}).
|
||||||
|
|
||||||
-record(sst_timings,
|
-record(sst_timings,
|
||||||
{sample_count = 0 :: integer(),
|
{sample_count = 0 :: integer(),
|
||||||
|
@ -526,8 +529,14 @@ starting({sst_new,
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
leveled_log:save(OptsSST#sst_options.log_options),
|
leveled_log:save(OptsSST#sst_options.log_options),
|
||||||
PressMethod = OptsSST#sst_options.press_method,
|
PressMethod = OptsSST#sst_options.press_method,
|
||||||
{Length, SlotIndex, BlockIndex, SlotsBin, Bloom} =
|
{Length, SlotIndex, BlockEntries, SlotsBin, Bloom} =
|
||||||
build_all_slots(SlotList),
|
build_all_slots(SlotList),
|
||||||
|
{BlockIndex, HighModDate} =
|
||||||
|
update_blockindex_cache(true,
|
||||||
|
BlockEntries,
|
||||||
|
new_blockindex_cache(Length),
|
||||||
|
undefined,
|
||||||
|
IdxModDate),
|
||||||
SummaryBin =
|
SummaryBin =
|
||||||
build_table_summary(SlotIndex, Level, FirstKey, Length,
|
build_table_summary(SlotIndex, Level, FirstKey, Length,
|
||||||
MaxSQN, Bloom, CountOfTombs),
|
MaxSQN, Bloom, CountOfTombs),
|
||||||
|
@ -550,6 +559,7 @@ starting({sst_new,
|
||||||
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
||||||
reader,
|
reader,
|
||||||
UpdState#state{blockindex_cache = BlockIndex,
|
UpdState#state{blockindex_cache = BlockIndex,
|
||||||
|
high_modified_date = HighModDate,
|
||||||
starting_pid = StartingPID,
|
starting_pid = StartingPID,
|
||||||
level = Level}};
|
level = Level}};
|
||||||
starting({sst_newlevelzero, RootPath, Filename,
|
starting({sst_newlevelzero, RootPath, Filename,
|
||||||
|
@ -583,8 +593,14 @@ starting(complete_l0startup, State) ->
|
||||||
Time1 = timer:now_diff(os:timestamp(), SW1),
|
Time1 = timer:now_diff(os:timestamp(), SW1),
|
||||||
|
|
||||||
SW2 = os:timestamp(),
|
SW2 = os:timestamp(),
|
||||||
{SlotCount, SlotIndex, BlockIndex, SlotsBin,Bloom} =
|
{SlotCount, SlotIndex, BlockEntries, SlotsBin,Bloom} =
|
||||||
build_all_slots(SlotList),
|
build_all_slots(SlotList),
|
||||||
|
{BlockIndex, HighModDate} =
|
||||||
|
update_blockindex_cache(true,
|
||||||
|
BlockEntries,
|
||||||
|
new_blockindex_cache(SlotCount),
|
||||||
|
undefined,
|
||||||
|
IdxModDate),
|
||||||
Time2 = timer:now_diff(os:timestamp(), SW2),
|
Time2 = timer:now_diff(os:timestamp(), SW2),
|
||||||
|
|
||||||
SW3 = os:timestamp(),
|
SW3 = os:timestamp(),
|
||||||
|
@ -616,19 +632,19 @@ starting(complete_l0startup, State) ->
|
||||||
|
|
||||||
case Penciller of
|
case Penciller of
|
||||||
undefined ->
|
undefined ->
|
||||||
{next_state,
|
ok;
|
||||||
reader,
|
|
||||||
UpdState#state{blockindex_cache = BlockIndex}};
|
|
||||||
_ ->
|
_ ->
|
||||||
leveled_penciller:pcl_confirml0complete(Penciller,
|
leveled_penciller:pcl_confirml0complete(Penciller,
|
||||||
UpdState#state.filename,
|
UpdState#state.filename,
|
||||||
Summary#summary.first_key,
|
Summary#summary.first_key,
|
||||||
Summary#summary.last_key,
|
Summary#summary.last_key,
|
||||||
Bloom),
|
Bloom),
|
||||||
{next_state,
|
ok
|
||||||
reader,
|
end,
|
||||||
UpdState#state{blockindex_cache = BlockIndex}}
|
{next_state,
|
||||||
end;
|
reader,
|
||||||
|
UpdState#state{blockindex_cache = BlockIndex,
|
||||||
|
high_modified_date = HighModDate}};
|
||||||
starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
|
starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
|
||||||
Self = self(),
|
Self = self(),
|
||||||
FetchedSlots =
|
FetchedSlots =
|
||||||
|
@ -673,13 +689,19 @@ reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
timings_countdown = CountDown}};
|
timings_countdown = CountDown}};
|
||||||
reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
_From, State) ->
|
_From, State) ->
|
||||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
ReadNeeded =
|
||||||
EndKey,
|
check_modified(State#state.high_modified_date,
|
||||||
ScanWidth,
|
LowLastMod,
|
||||||
SegList,
|
State#state.index_moddate),
|
||||||
LowLastMod,
|
{NeedBlockIdx, SlotsToFetchBinList, SlotsToPoint} =
|
||||||
State),
|
case ReadNeeded of
|
||||||
|
true ->
|
||||||
|
fetch_range(StartKey, EndKey, ScanWidth,
|
||||||
|
SegList, LowLastMod,
|
||||||
|
State);
|
||||||
|
false ->
|
||||||
|
{false, [], []}
|
||||||
|
end,
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
IdxModDate = State#state.index_moddate,
|
IdxModDate = State#state.index_moddate,
|
||||||
|
|
||||||
|
@ -694,34 +716,38 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
reader,
|
reader,
|
||||||
State};
|
State};
|
||||||
false ->
|
false ->
|
||||||
{L, BIC} =
|
{L, FoundBIC} =
|
||||||
binaryslot_reader(SlotsToFetchBinList,
|
binaryslot_reader(SlotsToFetchBinList,
|
||||||
PressMethod, IdxModDate, SegList),
|
PressMethod,
|
||||||
FoldFun =
|
IdxModDate,
|
||||||
fun(CacheEntry, Cache) ->
|
SegList),
|
||||||
case CacheEntry of
|
{BlockIdxC0, HighModDate} =
|
||||||
{_ID, none} ->
|
update_blockindex_cache(NeedBlockIdx,
|
||||||
Cache;
|
FoundBIC,
|
||||||
{ID, Header} ->
|
State#state.blockindex_cache,
|
||||||
array:set(ID - 1, binary:copy(Header), Cache)
|
State#state.high_modified_date,
|
||||||
end
|
State#state.index_moddate),
|
||||||
end,
|
|
||||||
BlockIdxC0 = lists:foldl(FoldFun, State#state.blockindex_cache, BIC),
|
|
||||||
{reply,
|
{reply,
|
||||||
L ++ SlotsToPoint,
|
L ++ SlotsToPoint,
|
||||||
reader,
|
reader,
|
||||||
State#state{blockindex_cache = BlockIdxC0}}
|
State#state{blockindex_cache = BlockIdxC0,
|
||||||
|
high_modified_date = HighModDate}}
|
||||||
end;
|
end;
|
||||||
reader({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
|
reader({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
IdxModDate = State#state.index_moddate,
|
IdxModDate = State#state.index_moddate,
|
||||||
SlotBins =
|
{NeedBlockIdx, SlotBins} =
|
||||||
read_slots(State#state.handle,
|
read_slots(State#state.handle,
|
||||||
SlotList,
|
SlotList,
|
||||||
{SegList, LowLastMod, State#state.blockindex_cache},
|
{SegList,
|
||||||
State#state.compression_method,
|
LowLastMod,
|
||||||
State#state.index_moddate),
|
State#state.blockindex_cache},
|
||||||
{reply, {SlotBins, PressMethod, IdxModDate}, reader, State};
|
State#state.compression_method,
|
||||||
|
State#state.index_moddate),
|
||||||
|
{reply,
|
||||||
|
{NeedBlockIdx, SlotBins, PressMethod, IdxModDate},
|
||||||
|
reader,
|
||||||
|
State};
|
||||||
reader(get_maxsequencenumber, _From, State) ->
|
reader(get_maxsequencenumber, _From, State) ->
|
||||||
Summary = State#state.summary,
|
Summary = State#state.summary,
|
||||||
{reply, Summary#summary.max_sqn, reader, State};
|
{reply, Summary#summary.max_sqn, reader, State};
|
||||||
|
@ -759,12 +785,8 @@ delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
{reply, Result, delete_pending, UpdState, ?DELETE_TIMEOUT};
|
||||||
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
_From, State) ->
|
_From, State) ->
|
||||||
{SlotsToFetchBinList, SlotsToPoint} = fetch_range(StartKey,
|
{_NeedBlockIdx, SlotsToFetchBinList, SlotsToPoint} =
|
||||||
EndKey,
|
fetch_range(StartKey, EndKey, ScanWidth, SegList, LowLastMod, State),
|
||||||
ScanWidth,
|
|
||||||
SegList,
|
|
||||||
LowLastMod,
|
|
||||||
State),
|
|
||||||
% Always yield as about to clear and de-reference
|
% Always yield as about to clear and de-reference
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
IdxModDate = State#state.index_moddate,
|
IdxModDate = State#state.index_moddate,
|
||||||
|
@ -776,14 +798,14 @@ delete_pending({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
delete_pending({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
|
delete_pending({get_slots, SlotList, SegList, LowLastMod}, _From, State) ->
|
||||||
PressMethod = State#state.compression_method,
|
PressMethod = State#state.compression_method,
|
||||||
IdxModDate = State#state.index_moddate,
|
IdxModDate = State#state.index_moddate,
|
||||||
SlotBins =
|
{_NeedBlockIdx, SlotBins} =
|
||||||
read_slots(State#state.handle,
|
read_slots(State#state.handle,
|
||||||
SlotList,
|
SlotList,
|
||||||
{SegList, LowLastMod, State#state.blockindex_cache},
|
{SegList, LowLastMod, State#state.blockindex_cache},
|
||||||
PressMethod,
|
PressMethod,
|
||||||
IdxModDate),
|
IdxModDate),
|
||||||
{reply,
|
{reply,
|
||||||
{SlotBins, PressMethod, IdxModDate},
|
{false, SlotBins, PressMethod, IdxModDate},
|
||||||
delete_pending,
|
delete_pending,
|
||||||
State,
|
State,
|
||||||
?DELETE_TIMEOUT};
|
?DELETE_TIMEOUT};
|
||||||
|
@ -815,8 +837,17 @@ delete_pending(close, State) ->
|
||||||
handle_sync_event(_Msg, _From, StateName, State) ->
|
handle_sync_event(_Msg, _From, StateName, State) ->
|
||||||
{reply, undefined, StateName, State}.
|
{reply, undefined, StateName, State}.
|
||||||
|
|
||||||
handle_event(_Msg, StateName, State) ->
|
handle_event({update_blockindex_cache, BIC}, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{BlockIndexCache, HighModDate} =
|
||||||
|
update_blockindex_cache(true,
|
||||||
|
BIC,
|
||||||
|
State#state.blockindex_cache,
|
||||||
|
State#state.high_modified_date,
|
||||||
|
State#state.index_moddate),
|
||||||
|
{next_state,
|
||||||
|
StateName,
|
||||||
|
State#state{blockindex_cache = BlockIndexCache,
|
||||||
|
high_modified_date = HighModDate}}.
|
||||||
|
|
||||||
handle_info(tidyup_after_startup, delete_pending, State) ->
|
handle_info(tidyup_after_startup, delete_pending, State) ->
|
||||||
% No need to GC, this file is to be shutdown. This message may have
|
% No need to GC, this file is to be shutdown. This message may have
|
||||||
|
@ -850,7 +881,7 @@ code_change(_OldVsn, StateName, State, _Extra) ->
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Expand a list of pointers, maybe ending up with a list of keys and values
|
%% Expand a list of pointers, maybe ending up with a list of keys and values
|
||||||
%% with a tail of pointers
|
%% with a tail of pointers
|
||||||
%% By defauls will not have a segment filter, or a low last_modified_date, but
|
%% By default will not have a segment filter, or a low last_modified_date, but
|
||||||
%% they can be used. Range checking a last modified date must still be made on
|
%% they can be used. Range checking a last modified date must still be made on
|
||||||
%% the output - at this stage the low last_modified_date has been used to bulk
|
%% the output - at this stage the low last_modified_date has been used to bulk
|
||||||
%% skip those slots not containing any information over the low last modified
|
%% skip those slots not containing any information over the low last modified
|
||||||
|
@ -983,11 +1014,17 @@ sst_getslots(Pid, SlotList) ->
|
||||||
%% of the object, if the object is to be covered by the query
|
%% of the object, if the object is to be covered by the query
|
||||||
sst_getfilteredslots(Pid, SlotList, SegList, LowLastMod) ->
|
sst_getfilteredslots(Pid, SlotList, SegList, LowLastMod) ->
|
||||||
SegL0 = tune_seglist(SegList),
|
SegL0 = tune_seglist(SegList),
|
||||||
{SlotBins, PressMethod, IdxModDate} =
|
{NeedBlockIdx, SlotBins, PressMethod, IdxModDate} =
|
||||||
gen_fsm:sync_send_event(Pid,
|
gen_fsm:sync_send_event(Pid,
|
||||||
{get_slots, SlotList, SegL0, LowLastMod},
|
{get_slots, SlotList, SegL0, LowLastMod},
|
||||||
infinity),
|
infinity),
|
||||||
{L, _BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0),
|
{L, BIC} = binaryslot_reader(SlotBins, PressMethod, IdxModDate, SegL0),
|
||||||
|
case NeedBlockIdx of
|
||||||
|
true ->
|
||||||
|
gen_fsm:send_all_state_event(Pid, {update_blockindex_cache, BIC});
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
L.
|
L.
|
||||||
|
|
||||||
|
|
||||||
|
@ -1065,6 +1102,62 @@ tune_seglist(SegList) ->
|
||||||
%%% Internal Functions
|
%%% Internal Functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
-spec new_blockindex_cache(pos_integer()) -> blockindex_cache().
|
||||||
|
new_blockindex_cache(Size) ->
|
||||||
|
array:new([{size, Size}, {default, none}]).
|
||||||
|
|
||||||
|
-spec update_blockindex_cache(boolean(),
|
||||||
|
list({integer(), binary()}),
|
||||||
|
blockindex_cache(),
|
||||||
|
non_neg_integer()|undefined,
|
||||||
|
boolean()) ->
|
||||||
|
{blockindex_cache(),
|
||||||
|
non_neg_integer()|undefined}.
|
||||||
|
update_blockindex_cache(Needed, Entries, BIC, HighModDate, IdxModDate)
|
||||||
|
when Needed,
|
||||||
|
HighModDate == undefined ->
|
||||||
|
FoldFun =
|
||||||
|
fun(CacheEntry, Cache) ->
|
||||||
|
case CacheEntry of
|
||||||
|
{ID, Header} when is_binary(Header) ->
|
||||||
|
array:set(ID - 1, binary:copy(Header), Cache);
|
||||||
|
_ ->
|
||||||
|
Cache
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
BlockIdxC0 = lists:foldl(FoldFun, BIC, Entries),
|
||||||
|
Size = array:size(BlockIdxC0),
|
||||||
|
BestModDates =
|
||||||
|
case IdxModDate of
|
||||||
|
true ->
|
||||||
|
ModDateFold =
|
||||||
|
fun(_ID, Header, Acc) when is_binary(Header) ->
|
||||||
|
[element(2, extract_header(Header, IdxModDate))|Acc]
|
||||||
|
end,
|
||||||
|
array:sparse_foldl(ModDateFold, [], BlockIdxC0);
|
||||||
|
false ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
BestModDate =
|
||||||
|
case length(BestModDates) of
|
||||||
|
Size ->
|
||||||
|
lists:max(BestModDates);
|
||||||
|
_ ->
|
||||||
|
undefined
|
||||||
|
end,
|
||||||
|
{BlockIdxC0, BestModDate};
|
||||||
|
update_blockindex_cache(_Needed, _Entries, BIC, HighModDate, _IdxModDate) ->
|
||||||
|
{BIC, HighModDate}.
|
||||||
|
|
||||||
|
-spec check_modified(non_neg_integer()|undefined,
|
||||||
|
non_neg_integer(),
|
||||||
|
boolean()) -> boolean().
|
||||||
|
check_modified(HighLastModifiedInSST, LowModDate, true)
|
||||||
|
when is_integer(HighLastModifiedInSST) ->
|
||||||
|
LowModDate =< HighLastModifiedInSST;
|
||||||
|
check_modified(_, _, _) ->
|
||||||
|
true.
|
||||||
|
|
||||||
-spec fetch(tuple(),
|
-spec fetch(tuple(),
|
||||||
{integer(), integer()}|integer(),
|
{integer(), integer()}|integer(),
|
||||||
sst_state(), sst_timings())
|
sst_state(), sst_timings())
|
||||||
|
@ -1093,14 +1186,17 @@ fetch(LedgerKey, Hash, State, Timings0) ->
|
||||||
SlotBin = read_slot(State#state.handle, Slot),
|
SlotBin = read_slot(State#state.handle, Slot),
|
||||||
{Result, Header} =
|
{Result, Header} =
|
||||||
binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate),
|
binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate),
|
||||||
BlockIndexCache =
|
{BlockIndexCache, HighModDate} =
|
||||||
array:set(SlotID - 1,
|
update_blockindex_cache(true,
|
||||||
binary:copy(Header),
|
[{SlotID, Header}],
|
||||||
State#state.blockindex_cache),
|
State#state.blockindex_cache,
|
||||||
|
State#state.high_modified_date,
|
||||||
|
State#state.index_moddate),
|
||||||
{_SW3, Timings3} =
|
{_SW3, Timings3} =
|
||||||
update_timings(SW2, Timings2, noncached_block, false),
|
update_timings(SW2, Timings2, noncached_block, false),
|
||||||
{Result,
|
{Result,
|
||||||
State#state{blockindex_cache = BlockIndexCache},
|
State#state{blockindex_cache = BlockIndexCache,
|
||||||
|
high_modified_date = HighModDate},
|
||||||
Timings3};
|
Timings3};
|
||||||
{BlockLengths, _LMD, PosBin} ->
|
{BlockLengths, _LMD, PosBin} ->
|
||||||
PosList = find_pos(PosBin, extract_hash(Hash), [], 0),
|
PosList = find_pos(PosBin, extract_hash(Hash), [], 0),
|
||||||
|
@ -1150,7 +1246,8 @@ fetch(LedgerKey, Hash, State, Timings0) ->
|
||||||
|
|
||||||
-spec fetch_range(tuple(), tuple(), integer(),
|
-spec fetch_range(tuple(), tuple(), integer(),
|
||||||
leveled_codec:segment_list(), non_neg_integer(),
|
leveled_codec:segment_list(), non_neg_integer(),
|
||||||
sst_state()) -> {list(), list()}.
|
sst_state()) ->
|
||||||
|
{boolean(), list(), list()}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Fetch the contents of the SST file for a given key range. This will
|
%% Fetch the contents of the SST file for a given key range. This will
|
||||||
%% pre-fetch some results, and append pointers for additional results.
|
%% pre-fetch some results, and append pointers for additional results.
|
||||||
|
@ -1209,13 +1306,13 @@ fetch_range(StartKey, EndKey, ScanWidth, SegList, LowLastMod, State) ->
|
||||||
lists:split(ScanWidth, ExpandedSlots)
|
lists:split(ScanWidth, ExpandedSlots)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
SlotsToFetchBinList =
|
{NeededBlockIdx, SlotsToFetchBinList} =
|
||||||
read_slots(Handle,
|
read_slots(Handle,
|
||||||
SlotsToFetch,
|
SlotsToFetch,
|
||||||
{SegList, LowLastMod, State#state.blockindex_cache},
|
{SegList, LowLastMod, State#state.blockindex_cache},
|
||||||
State#state.compression_method,
|
State#state.compression_method,
|
||||||
State#state.index_moddate),
|
State#state.index_moddate),
|
||||||
{SlotsToFetchBinList, SlotsToPoint}.
|
{NeededBlockIdx, SlotsToFetchBinList, SlotsToPoint}.
|
||||||
|
|
||||||
-spec compress_level(integer(), press_method()) -> press_method().
|
-spec compress_level(integer(), press_method()) -> press_method().
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -1258,8 +1355,7 @@ read_file(Filename, State, LoadPageCache) ->
|
||||||
UpdState0 = imp_fileversion(FileVersion, State),
|
UpdState0 = imp_fileversion(FileVersion, State),
|
||||||
{Summary, Bloom, SlotList, TombCount} =
|
{Summary, Bloom, SlotList, TombCount} =
|
||||||
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
|
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
|
||||||
BlockIndexCache = array:new([{size, Summary#summary.size},
|
BlockIndexCache = new_blockindex_cache(Summary#summary.size),
|
||||||
{default, none}]),
|
|
||||||
UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache},
|
UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache},
|
||||||
SlotIndex = from_list(SlotList),
|
SlotIndex = from_list(SlotList),
|
||||||
UpdSummary = Summary#summary{index = SlotIndex},
|
UpdSummary = Summary#summary{index = SlotIndex},
|
||||||
|
@ -1389,8 +1485,7 @@ build_all_slots(SlotList) ->
|
||||||
9,
|
9,
|
||||||
1,
|
1,
|
||||||
[],
|
[],
|
||||||
array:new([{size, SlotCount},
|
[],
|
||||||
{default, none}]),
|
|
||||||
<<>>,
|
<<>>,
|
||||||
[]),
|
[]),
|
||||||
Bloom = leveled_ebloom:create_bloom(HashLists),
|
Bloom = leveled_ebloom:create_bloom(HashLists),
|
||||||
|
@ -1410,7 +1505,7 @@ build_all_slots([SlotD|Rest], Pos, SlotID,
|
||||||
Pos + Length,
|
Pos + Length,
|
||||||
SlotID + 1,
|
SlotID + 1,
|
||||||
[{LastKey, SlotIndexV}|SlotIdxAcc],
|
[{LastKey, SlotIndexV}|SlotIdxAcc],
|
||||||
array:set(SlotID - 1, BlockIdx, BlockIdxAcc),
|
[{SlotID, BlockIdx}|BlockIdxAcc],
|
||||||
<<SlotBinAcc/binary, SlotBin/binary>>,
|
<<SlotBinAcc/binary, SlotBin/binary>>,
|
||||||
lists:append(HashLists, HashList)).
|
lists:append(HashLists, HashList)).
|
||||||
|
|
||||||
|
@ -1842,7 +1937,8 @@ binarysplit_mapfun(MultiSlotBin, StartPos) ->
|
||||||
|
|
||||||
-spec read_slots(file:io_device(), list(),
|
-spec read_slots(file:io_device(), list(),
|
||||||
{false|list(), non_neg_integer(), binary()},
|
{false|list(), non_neg_integer(), binary()},
|
||||||
press_method(), boolean()) -> list(binaryslot_element()).
|
press_method(), boolean()) ->
|
||||||
|
{boolean(), list(binaryslot_element())}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% The reading of sots will return a list of either 2-tuples containing
|
%% The reading of sots will return a list of either 2-tuples containing
|
||||||
%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples
|
%% {K, V} pairs - or 3-tuples containing {Binary, SK, EK}. The 3 tuples
|
||||||
|
@ -1861,15 +1957,15 @@ read_slots(Handle, SlotList, {false, 0, _BlockIndexCache},
|
||||||
_PressMethod, _IdxModDate) ->
|
_PressMethod, _IdxModDate) ->
|
||||||
% No list of segments passed or useful Low LastModified Date
|
% No list of segments passed or useful Low LastModified Date
|
||||||
% Just read slots in SlotList
|
% Just read slots in SlotList
|
||||||
read_slotlist(SlotList, Handle);
|
{false, read_slotlist(SlotList, Handle)};
|
||||||
read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
PressMethod, IdxModDate) ->
|
PressMethod, IdxModDate) ->
|
||||||
% List of segments passed so only {K, V} pairs matching those segments
|
% List of segments passed so only {K, V} pairs matching those segments
|
||||||
% should be returned. This required the {K, V} pair to have been added
|
% should be returned. This required the {K, V} pair to have been added
|
||||||
% with the appropriate hash - if the pair were added with no_lookup as
|
% with the appropriate hash - if the pair were added with no_lookup as
|
||||||
% the hash value this will fial unexpectedly.
|
% the hash value this will fail unexpectedly.
|
||||||
BinMapFun =
|
BinMapFun =
|
||||||
fun(Pointer, Acc) ->
|
fun(Pointer, {NeededBlockIdx, Acc}) ->
|
||||||
{SP, _L, ID, SK, EK} = pointer_mapfun(Pointer),
|
{SP, _L, ID, SK, EK} = pointer_mapfun(Pointer),
|
||||||
CachedHeader = array:get(ID - 1, BlockIndexCache),
|
CachedHeader = array:get(ID - 1, BlockIndexCache),
|
||||||
case extract_header(CachedHeader, IdxModDate) of
|
case extract_header(CachedHeader, IdxModDate) of
|
||||||
|
@ -1877,7 +1973,7 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
% If there is an attempt to use the seg list query and the
|
% If there is an attempt to use the seg list query and the
|
||||||
% index block cache isn't cached for any part this may be
|
% index block cache isn't cached for any part this may be
|
||||||
% slower as each slot will be read in turn
|
% slower as each slot will be read in turn
|
||||||
Acc ++ read_slotlist([Pointer], Handle);
|
{true, Acc ++ read_slotlist([Pointer], Handle)};
|
||||||
{BlockLengths, LMD, BlockIdx} ->
|
{BlockLengths, LMD, BlockIdx} ->
|
||||||
% If there is a BlockIndex cached then we can use it to
|
% If there is a BlockIndex cached then we can use it to
|
||||||
% check to see if any of the expected segments are
|
% check to see if any of the expected segments are
|
||||||
|
@ -1894,12 +1990,14 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
% LowLastMod date passed in the query - therefore
|
% LowLastMod date passed in the query - therefore
|
||||||
% there are no interesting modifications in this
|
% there are no interesting modifications in this
|
||||||
% slot - it is all too old
|
% slot - it is all too old
|
||||||
Acc;
|
{NeededBlockIdx, Acc};
|
||||||
false ->
|
false ->
|
||||||
case SegList of
|
case SegList of
|
||||||
false ->
|
false ->
|
||||||
% Need all the slot now
|
% Need all the slot now
|
||||||
Acc ++ read_slotlist([Pointer], Handle);
|
{NeededBlockIdx,
|
||||||
|
Acc ++
|
||||||
|
read_slotlist([Pointer], Handle)};
|
||||||
_SL ->
|
_SL ->
|
||||||
% Need to find just the right keys
|
% Need to find just the right keys
|
||||||
PositionList =
|
PositionList =
|
||||||
|
@ -1920,12 +2018,13 @@ read_slots(Handle, SlotList, {SegList, LowLastMod, BlockIndexCache},
|
||||||
% to be filtered
|
% to be filtered
|
||||||
FilterFun =
|
FilterFun =
|
||||||
fun(KV) -> in_range(KV, SK, EK) end,
|
fun(KV) -> in_range(KV, SK, EK) end,
|
||||||
Acc ++ lists:filter(FilterFun, KVL)
|
{NeededBlockIdx,
|
||||||
|
Acc ++ lists:filter(FilterFun, KVL)}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
lists:foldl(BinMapFun, [], SlotList).
|
lists:foldl(BinMapFun, {false, []}, SlotList).
|
||||||
|
|
||||||
|
|
||||||
-spec in_range(leveled_codec:ledger_kv(),
|
-spec in_range(leveled_codec:ledger_kv(),
|
||||||
|
@ -2015,7 +2114,7 @@ read_length_list(Handle, LengthList) ->
|
||||||
|
|
||||||
|
|
||||||
-spec extract_header(binary()|none, boolean()) ->
|
-spec extract_header(binary()|none, boolean()) ->
|
||||||
{binary(), integer(), binary()}|none.
|
{binary(), non_neg_integer(), binary()}|none.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Helper for extracting the binaries from the header ignoring the missing LMD
|
%% Helper for extracting the binaries from the header ignoring the missing LMD
|
||||||
%% if LMD is not indexed
|
%% if LMD is not indexed
|
||||||
|
@ -3657,8 +3756,6 @@ key_dominates_test() ->
|
||||||
key_dominates([KV7|KL2], [KV2], {true, 1})).
|
key_dominates([KV7|KL2], [KV2], {true, 1})).
|
||||||
|
|
||||||
nonsense_coverage_test() ->
|
nonsense_coverage_test() ->
|
||||||
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
|
|
||||||
ok = gen_fsm:send_all_state_event(Pid, nonsense),
|
|
||||||
?assertMatch({ok, reader, #state{}}, code_change(nonsense,
|
?assertMatch({ok, reader, #state{}}, code_change(nonsense,
|
||||||
reader,
|
reader,
|
||||||
#state{},
|
#state{},
|
||||||
|
@ -3861,6 +3958,39 @@ corrupted_block_fetch_tester(PressMethod) ->
|
||||||
ExpectedMisses = element(2, ?LOOK_BLOCKSIZE),
|
ExpectedMisses = element(2, ?LOOK_BLOCKSIZE),
|
||||||
?assertMatch(ExpectedMisses, MissCount).
|
?assertMatch(ExpectedMisses, MissCount).
|
||||||
|
|
||||||
|
block_index_cache_test() ->
|
||||||
|
{Mega, Sec, _} = os:timestamp(),
|
||||||
|
Now = Mega * 1000000 + Sec,
|
||||||
|
EntriesTS =
|
||||||
|
lists:map(fun(I) ->
|
||||||
|
TS = Now - I + 1,
|
||||||
|
{I, <<0:160/integer, TS:32/integer, 0:32/integer>>}
|
||||||
|
end,
|
||||||
|
lists:seq(1, 8)),
|
||||||
|
EntriesNoTS =
|
||||||
|
lists:map(fun(I) ->
|
||||||
|
{I, <<0:160/integer, 0:32/integer>>}
|
||||||
|
end,
|
||||||
|
lists:seq(1, 8)),
|
||||||
|
HeaderTS = <<0:160/integer, Now:32/integer, 0:32/integer>>,
|
||||||
|
HeaderNoTS = <<0:192>>,
|
||||||
|
BIC = array:new([{size, 8}, {default, none}]),
|
||||||
|
{BIC0, undefined} =
|
||||||
|
update_blockindex_cache(false, EntriesNoTS, BIC, undefined, false),
|
||||||
|
{BIC1, undefined} =
|
||||||
|
update_blockindex_cache(false, EntriesTS, BIC, undefined, true),
|
||||||
|
{BIC2, undefined} =
|
||||||
|
update_blockindex_cache(true, EntriesNoTS, BIC, undefined, false),
|
||||||
|
{BIC3, LMD3} =
|
||||||
|
update_blockindex_cache(true, EntriesTS, BIC, undefined, true),
|
||||||
|
|
||||||
|
?assertMatch(none, array:get(0, BIC0)),
|
||||||
|
?assertMatch(none, array:get(0, BIC1)),
|
||||||
|
?assertMatch(HeaderNoTS, array:get(0, BIC2)),
|
||||||
|
?assertMatch(HeaderTS, array:get(0, BIC3)),
|
||||||
|
?assertMatch(Now, LMD3).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
receive_fun() ->
|
receive_fun() ->
|
||||||
receive
|
receive
|
||||||
|
|
|
@ -299,7 +299,7 @@ journal_compaction_tester(Restart, WRP) ->
|
||||||
end,
|
end,
|
||||||
ok = leveled_penciller:pcl_close(PclClone),
|
ok = leveled_penciller:pcl_close(PclClone),
|
||||||
ok = leveled_inker:ink_close(InkClone),
|
ok = leveled_inker:ink_close(InkClone),
|
||||||
% Snapshot released so deletes shoudl occur at next timeout
|
% Snapshot released so deletes should occur at next timeout
|
||||||
case WRP of
|
case WRP of
|
||||||
undefined ->
|
undefined ->
|
||||||
timer:sleep(10100); % wait for delete_pending timeout
|
timer:sleep(10100); % wait for delete_pending timeout
|
||||||
|
|
|
@ -16,7 +16,8 @@
|
||||||
journal_compaction_bustedjournal/1,
|
journal_compaction_bustedjournal/1,
|
||||||
close_duringcompaction/1,
|
close_duringcompaction/1,
|
||||||
allkeydelta_journal_multicompact/1,
|
allkeydelta_journal_multicompact/1,
|
||||||
recompact_keydeltas/1
|
recompact_keydeltas/1,
|
||||||
|
simple_cachescoring/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() -> [
|
all() -> [
|
||||||
|
@ -33,7 +34,8 @@ all() -> [
|
||||||
close_duringcompaction,
|
close_duringcompaction,
|
||||||
allkeydelta_journal_multicompact,
|
allkeydelta_journal_multicompact,
|
||||||
recompact_keydeltas,
|
recompact_keydeltas,
|
||||||
stdtag_recalc
|
stdtag_recalc,
|
||||||
|
simple_cachescoring
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
||||||
|
@ -555,6 +557,79 @@ aae_missingjournal(_Config) ->
|
||||||
ok = leveled_bookie:book_close(Bookie2),
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
testutil:reset_filestructure().
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
simple_cachescoring(_Config) ->
|
||||||
|
RootPath = testutil:reset_filestructure(),
|
||||||
|
StartOpts = [{root_path, RootPath},
|
||||||
|
{max_journalobjectcount, 2000},
|
||||||
|
{sync_strategy, testutil:sync_strategy()}],
|
||||||
|
{ok, Bookie1} =
|
||||||
|
leveled_bookie:book_start(StartOpts ++
|
||||||
|
[{journalcompaction_scoreonein, 8}]),
|
||||||
|
{TestObject, TestSpec} = testutil:generate_testobject(),
|
||||||
|
ok = testutil:book_riakput(Bookie1, TestObject, TestSpec),
|
||||||
|
testutil:check_forobject(Bookie1, TestObject),
|
||||||
|
GenList = [2, 32002, 64002, 96002],
|
||||||
|
_CLs = testutil:load_objects(32000, GenList, Bookie1, TestObject,
|
||||||
|
fun testutil:generate_objects/2),
|
||||||
|
|
||||||
|
F = fun leveled_bookie:book_islastcompactionpending/1,
|
||||||
|
WaitForCompaction =
|
||||||
|
fun(B) ->
|
||||||
|
fun(X, Pending) ->
|
||||||
|
case X of
|
||||||
|
1 ->
|
||||||
|
leveled_bookie:book_compactjournal(B, 30000);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
case Pending of
|
||||||
|
false ->
|
||||||
|
false;
|
||||||
|
true ->
|
||||||
|
io:format("Loop ~w waiting for journal "
|
||||||
|
++ "compaction to complete~n", [X]),
|
||||||
|
timer:sleep(100),
|
||||||
|
F(B)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
io:format("Scoring for first time - every file should need scoring~n"),
|
||||||
|
Args1 = [WaitForCompaction(Bookie1), true, lists:seq(1, 300)],
|
||||||
|
{TC0, false} = timer:tc(lists, foldl, Args1),
|
||||||
|
io:format("Score four more times with cached scoring~n"),
|
||||||
|
{TC1, false} = timer:tc(lists, foldl, Args1),
|
||||||
|
{TC2, false} = timer:tc(lists, foldl, Args1),
|
||||||
|
{TC3, false} = timer:tc(lists, foldl, Args1),
|
||||||
|
{TC4, false} = timer:tc(lists, foldl, Args1),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
{ok, Bookie2} =
|
||||||
|
leveled_bookie:book_start(StartOpts),
|
||||||
|
io:format("Re-opened bookie withour caching - re-compare compaction time~n"),
|
||||||
|
io:format("Scoring for first time - every file should need scoring~n"),
|
||||||
|
Args2 = [WaitForCompaction(Bookie2), true, lists:seq(1, 300)],
|
||||||
|
{TN0, false} = timer:tc(lists, foldl, Args2),
|
||||||
|
io:format("Score four more times with cached scoring~n"),
|
||||||
|
{TN1, false} = timer:tc(lists, foldl, Args2),
|
||||||
|
{TN2, false} = timer:tc(lists, foldl, Args2),
|
||||||
|
{TN3, false} = timer:tc(lists, foldl, Args2),
|
||||||
|
{TN4, false} = timer:tc(lists, foldl, Args2),
|
||||||
|
|
||||||
|
AvgSecondRunCache = (TC1 + TC2 +TC3 + TC4) div 4000,
|
||||||
|
AvgSecondRunNoCache = (TN1 + TN2 +TN3 + TN4) div 4000,
|
||||||
|
|
||||||
|
io:format("With caching ~w first run ~w average other runs~n",
|
||||||
|
[TC0 div 1000, AvgSecondRunCache]),
|
||||||
|
io:format("Without caching ~w first run ~w average other runs~n",
|
||||||
|
[TN0 div 1000, AvgSecondRunNoCache]),
|
||||||
|
true = (TC0 > AvgSecondRunCache),
|
||||||
|
true = (TC0/AvgSecondRunCache) > (TN0/AvgSecondRunNoCache),
|
||||||
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
|
|
||||||
|
io:format("Exit having proven simply that caching score is faster~n"),
|
||||||
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
aae_bustedjournal(_Config) ->
|
aae_bustedjournal(_Config) ->
|
||||||
RootPath = testutil:reset_filestructure(),
|
RootPath = testutil:reset_filestructure(),
|
||||||
StartOpts = [{root_path, RootPath},
|
StartOpts = [{root_path, RootPath},
|
||||||
|
|
|
@ -258,6 +258,13 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
{ok, Bookie1A} = leveled_bookie:book_start(StartOpts1A),
|
{ok, Bookie1A} = leveled_bookie:book_start(StartOpts1A),
|
||||||
{ok, Bookie1B} = leveled_bookie:book_start(StartOpts1B),
|
{ok, Bookie1B} = leveled_bookie:book_start(StartOpts1B),
|
||||||
|
|
||||||
|
ObjList0 =
|
||||||
|
testutil:generate_objects(100000,
|
||||||
|
{fixed_binary, 1}, [],
|
||||||
|
leveled_rand:rand_bytes(32),
|
||||||
|
fun() -> [] end,
|
||||||
|
<<"BaselineB">>),
|
||||||
|
|
||||||
ObjL1StartTS = testutil:convert_to_seconds(os:timestamp()),
|
ObjL1StartTS = testutil:convert_to_seconds(os:timestamp()),
|
||||||
ObjList1 =
|
ObjList1 =
|
||||||
testutil:generate_objects(20000,
|
testutil:generate_objects(20000,
|
||||||
|
@ -313,7 +320,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
_ObjL5EndTS = testutil:convert_to_seconds(os:timestamp()),
|
_ObjL5EndTS = testutil:convert_to_seconds(os:timestamp()),
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
|
|
||||||
_ObjL6StartTS = testutil:convert_to_seconds(os:timestamp()),
|
ObjL6StartTS = testutil:convert_to_seconds(os:timestamp()),
|
||||||
ObjList6 =
|
ObjList6 =
|
||||||
testutil:generate_objects(7000,
|
testutil:generate_objects(7000,
|
||||||
{fixed_binary, 1}, [],
|
{fixed_binary, 1}, [],
|
||||||
|
@ -331,6 +338,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
testutil:riakload(Bookie1A, ObjList4),
|
testutil:riakload(Bookie1A, ObjList4),
|
||||||
testutil:riakload(Bookie1A, ObjList6),
|
testutil:riakload(Bookie1A, ObjList6),
|
||||||
|
|
||||||
|
testutil:riakload(Bookie1B, ObjList0),
|
||||||
testutil:riakload(Bookie1B, ObjList4),
|
testutil:riakload(Bookie1B, ObjList4),
|
||||||
testutil:riakload(Bookie1B, ObjList5),
|
testutil:riakload(Bookie1B, ObjList5),
|
||||||
testutil:riakload(Bookie1B, ObjList1),
|
testutil:riakload(Bookie1B, ObjList1),
|
||||||
|
@ -412,7 +420,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
fun(_B, K, V, {LK, AccC}) ->
|
fun(_B, K, V, {LK, AccC}) ->
|
||||||
% Value is proxy_object? Can we get the metadata and
|
% Value is proxy_object? Can we get the metadata and
|
||||||
% read the last modified date? The do a non-accelerated
|
% read the last modified date? The do a non-accelerated
|
||||||
% fold to chekc that it is slower
|
% fold to check that it is slower
|
||||||
{proxy_object, MDBin, _Size, _Fetcher} = binary_to_term(V),
|
{proxy_object, MDBin, _Size, _Fetcher} = binary_to_term(V),
|
||||||
LMDTS = testutil:get_lastmodified(MDBin),
|
LMDTS = testutil:get_lastmodified(MDBin),
|
||||||
LMD = testutil:convert_to_seconds(LMDTS),
|
LMD = testutil:convert_to_seconds(LMDTS),
|
||||||
|
@ -458,13 +466,20 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
true = NoFilterTime > PlusFilterTime,
|
true = NoFilterTime > PlusFilterTime,
|
||||||
|
|
||||||
SimpleCountFun =
|
SimpleCountFun =
|
||||||
fun(_B, _K, _V, AccC) -> AccC + 1 end,
|
fun(BucketList) ->
|
||||||
|
fun(B, _K, _V, AccC) ->
|
||||||
|
case lists:member(B, BucketList) of
|
||||||
|
true -> AccC + 1;
|
||||||
|
false -> AccC
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
{async, R4A_MultiBucketRunner} =
|
{async, R4A_MultiBucketRunner} =
|
||||||
leveled_bookie:book_headfold(Bookie1A,
|
leveled_bookie:book_headfold(Bookie1A,
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
{bucket_list, [<<"B0">>, <<"B2">>]},
|
{bucket_list, [<<"B0">>, <<"B2">>]},
|
||||||
{SimpleCountFun, 0},
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -482,7 +497,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
{bucket_list, [<<"B2">>, <<"B0">>]},
|
{bucket_list, [<<"B2">>, <<"B0">>]},
|
||||||
% Reverse the buckets in the bucket
|
% Reverse the buckets in the bucket
|
||||||
% list
|
% list
|
||||||
{SimpleCountFun, 0},
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -495,10 +510,10 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
|
|
||||||
{async, R5B_MultiBucketRunner} =
|
{async, R5B_MultiBucketRunner} =
|
||||||
leveled_bookie:book_headfold(Bookie1B,
|
leveled_bookie:book_headfold(Bookie1B,
|
||||||
% Same query - other bookie
|
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
{bucket_list, [<<"B2">>, <<"B0">>]},
|
{bucket_list,
|
||||||
{SimpleCountFun, 0},
|
[<<"BaselineB">>, <<"B2">>, <<"B0">>]},
|
||||||
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -506,7 +521,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
false),
|
false),
|
||||||
R5B_MultiBucket = R5B_MultiBucketRunner(),
|
R5B_MultiBucket = R5B_MultiBucketRunner(),
|
||||||
io:format("R5B_MultiBucket ~w ~n", [R5B_MultiBucket]),
|
io:format("R5B_MultiBucket ~w ~n", [R5B_MultiBucket]),
|
||||||
true = R5A_MultiBucket == 37000,
|
true = R5B_MultiBucket == 37000,
|
||||||
|
|
||||||
testutil:update_some_objects(Bookie1A, ObjList1, 1000),
|
testutil:update_some_objects(Bookie1A, ObjList1, 1000),
|
||||||
R6A_PlusFilter = lists:foldl(FoldRangesFun(Bookie1A,
|
R6A_PlusFilter = lists:foldl(FoldRangesFun(Bookie1A,
|
||||||
|
@ -523,7 +538,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
leveled_bookie:book_headfold(Bookie1A,
|
leveled_bookie:book_headfold(Bookie1A,
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
{bucket_list, [<<"B1">>, <<"B2">>]},
|
{bucket_list, [<<"B1">>, <<"B2">>]},
|
||||||
{SimpleCountFun, 0},
|
{SimpleCountFun([<<"B1">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -537,7 +552,7 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
leveled_bookie:book_headfold(Bookie1A,
|
leveled_bookie:book_headfold(Bookie1A,
|
||||||
?RIAK_TAG,
|
?RIAK_TAG,
|
||||||
{bucket_list, [<<"B1">>, <<"B2">>]},
|
{bucket_list, [<<"B1">>, <<"B2">>]},
|
||||||
{SimpleCountFun, 0},
|
{SimpleCountFun([<<"B1">>, <<"B2">>]), 0},
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
|
@ -547,9 +562,141 @@ fetchclocks_modifiedbetween(_Config) ->
|
||||||
io:format("R8A_MultiBucket ~w ~n", [R8A_MultiBucket]),
|
io:format("R8A_MultiBucket ~w ~n", [R8A_MultiBucket]),
|
||||||
true = R8A_MultiBucket == {0, 5000},
|
true = R8A_MultiBucket == {0, 5000},
|
||||||
|
|
||||||
ok = leveled_bookie:book_destroy(Bookie1A),
|
ok = leveled_bookie:book_close(Bookie1B),
|
||||||
ok = leveled_bookie:book_destroy(Bookie1B).
|
|
||||||
|
|
||||||
|
io:format("Double query to generate index cache and use~n"),
|
||||||
|
{ok, Bookie1BS} = leveled_bookie:book_start(StartOpts1B),
|
||||||
|
|
||||||
|
TooLate = testutil:convert_to_seconds(os:timestamp()),
|
||||||
|
|
||||||
|
lmdrange_tester(Bookie1BS, SimpleCountFun,
|
||||||
|
ObjL4StartTS, ObjL6StartTS, ObjL6EndTS, TooLate),
|
||||||
|
|
||||||
|
io:format("Push tested keys down levels with new objects~n"),
|
||||||
|
ObjList7 =
|
||||||
|
testutil:generate_objects(200000,
|
||||||
|
{fixed_binary, 1}, [],
|
||||||
|
leveled_rand:rand_bytes(32),
|
||||||
|
fun() -> [] end,
|
||||||
|
<<"B1.9">>),
|
||||||
|
testutil:riakload(Bookie1BS, ObjList7),
|
||||||
|
|
||||||
|
lmdrange_tester(Bookie1BS, SimpleCountFun,
|
||||||
|
ObjL4StartTS, ObjL6StartTS, ObjL6EndTS, TooLate),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_destroy(Bookie1A),
|
||||||
|
ok = leveled_bookie:book_destroy(Bookie1BS).
|
||||||
|
|
||||||
|
|
||||||
|
lmdrange_tester(Bookie1BS, SimpleCountFun,
|
||||||
|
ObjL4StartTS, ObjL6StartTS, ObjL6EndTS, TooLate) ->
|
||||||
|
{async, R5B_MultiBucketRunner0} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
all,
|
||||||
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL4StartTS, ObjL6EndTS},
|
||||||
|
false),
|
||||||
|
R5B_MultiBucket0 = R5B_MultiBucketRunner0(),
|
||||||
|
io:format("R5B_MultiBucket0 ~w ~n", [R5B_MultiBucket0]),
|
||||||
|
true = R5B_MultiBucket0 == 37000,
|
||||||
|
{async, R5B_MultiBucketRunner1} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
all,
|
||||||
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL4StartTS, ObjL6EndTS},
|
||||||
|
false),
|
||||||
|
R5B_MultiBucket1 = R5B_MultiBucketRunner1(),
|
||||||
|
io:format("R5B_MultiBucket1 ~w ~n", [R5B_MultiBucket1]),
|
||||||
|
true = R5B_MultiBucket1 == 37000,
|
||||||
|
SimpleMinMaxFun =
|
||||||
|
fun(B, K, _V, Acc) ->
|
||||||
|
case lists:keyfind(B, 1, Acc) of
|
||||||
|
{B, MinK, MaxK} ->
|
||||||
|
lists:ukeysort(1, [{B, min(K, MinK), max(K, MaxK)}|Acc]);
|
||||||
|
false ->
|
||||||
|
lists:ukeysort(1, [{B, K, K}|Acc])
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{async, R5B_MultiBucketRunner2} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
{bucket_list, [<<"B0">>, <<"B2">>]},
|
||||||
|
{SimpleMinMaxFun, []},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL4StartTS, ObjL6EndTS},
|
||||||
|
false),
|
||||||
|
[{<<"B0">>, MinB0K, MaxB0K}, {<<"B2">>, MinB2K, MaxB2K}] =
|
||||||
|
R5B_MultiBucketRunner2(),
|
||||||
|
io:format("Found Min and Max Keys~n"),
|
||||||
|
io:format("B ~s MinK ~s MaxK ~s~n", [<<"B0">>, MinB0K, MaxB0K]),
|
||||||
|
io:format("B ~s MinK ~s MaxK ~s~n", [<<"B2">>, MinB2K, MaxB2K]),
|
||||||
|
{async, R5B_MultiBucketRunner3a} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
{range, <<"B0">>, {MinB0K, MaxB0K}},
|
||||||
|
{SimpleCountFun([<<"B0">>]), 0},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL4StartTS, ObjL6EndTS},
|
||||||
|
false),
|
||||||
|
{async, R5B_MultiBucketRunner3b} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
{range, <<"B2">>, {MinB2K, MaxB2K}},
|
||||||
|
{SimpleCountFun([<<"B2">>]), 0},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL4StartTS, ObjL6EndTS},
|
||||||
|
false),
|
||||||
|
R5B_MultiBucket3a = R5B_MultiBucketRunner3a(),
|
||||||
|
io:format("R5B_MultiBucket3a ~w ~n", [R5B_MultiBucket3a]),
|
||||||
|
R5B_MultiBucket3b = R5B_MultiBucketRunner3b(),
|
||||||
|
io:format("R5B_MultiBucket3b ~w ~n", [R5B_MultiBucket3b]),
|
||||||
|
true = (R5B_MultiBucket3a + R5B_MultiBucket3b) == 37000,
|
||||||
|
|
||||||
|
io:format("Query outside of time range~n"),
|
||||||
|
{async, R5B_MultiBucketRunner4} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
all,
|
||||||
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL6EndTS,
|
||||||
|
TooLate},
|
||||||
|
false),
|
||||||
|
R5B_MultiBucket4 = R5B_MultiBucketRunner4(),
|
||||||
|
io:format("R5B_MultiBucket4 ~w ~n", [R5B_MultiBucket4]),
|
||||||
|
true = R5B_MultiBucket4 == 0,
|
||||||
|
|
||||||
|
io:format("Query with one foot inside of time range~n"),
|
||||||
|
{async, R5B_MultiBucketRunner5} =
|
||||||
|
leveled_bookie:book_headfold(Bookie1BS,
|
||||||
|
?RIAK_TAG,
|
||||||
|
all,
|
||||||
|
{SimpleCountFun([<<"B0">>, <<"B2">>]), 0},
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
{ObjL6StartTS,
|
||||||
|
TooLate},
|
||||||
|
false),
|
||||||
|
R5B_MultiBucket5 = R5B_MultiBucketRunner5(),
|
||||||
|
io:format("R5B_MultiBucket5 ~w ~n", [R5B_MultiBucket5]),
|
||||||
|
true = R5B_MultiBucket5 == 7000.
|
||||||
|
|
||||||
|
|
||||||
crossbucket_aae(_Config) ->
|
crossbucket_aae(_Config) ->
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue