Merge remote-tracking branch 'refs/remotes/origin/master' into mas-specs-i61a

This commit is contained in:
martinsumner 2017-05-22 09:58:41 +01:00
commit a81dd2839e
3 changed files with 201 additions and 105 deletions

View file

@ -208,7 +208,37 @@ cdb_mput(Pid, KVList) ->
%% atom all. To be used for sampling queries, for example to assess the
%% potential for compaction.
cdb_getpositions(Pid, SampleSize) ->
gen_fsm:sync_send_event(Pid, {get_positions, SampleSize}, infinity).
% Getting many positions from the index, especially getting all positions
% can take time (about 1s for all positions). Rather than queue all
% requests waiting for this to complete, loop over each of the 256 indexes
% outside of the FSM processing loop - to allow for other messages to be
% interleaved
case SampleSize of
all ->
FoldFun =
fun(Index, Acc) ->
cdb_getpositions_fromidx(Pid, all, Index, Acc)
end,
IdxList = lists:seq(0, 255),
lists:foldl(FoldFun, [], IdxList);
S0 ->
FoldFun =
fun({_R, Index}, Acc) ->
case length(Acc) of
S0 ->
Acc;
L when L < S0 ->
cdb_getpositions_fromidx(Pid, S0, Index, Acc)
end
end,
RandFun = fun(X) -> {random:uniform(), X} end,
SeededL = lists:map(RandFun, lists:seq(0, 255)),
SortedL = lists:keysort(1, SeededL),
lists:foldl(FoldFun, [], SortedL)
end.
cdb_getpositions_fromidx(Pid, SampleSize, Index, Acc) ->
gen_fsm:sync_send_event(Pid, {get_positions, SampleSize, Index, Acc}).
-spec cdb_directfetch(pid(), list(), key_only|key_size|key_value_check) ->
list().
@ -454,8 +484,8 @@ rolling({key_check, Key}, _From, State) ->
loose_presence),
rolling,
State};
rolling({get_positions, _SampleSize}, _From, State) ->
{reply, [], rolling, State};
rolling({get_positions, _SampleSize, _Index, SampleAcc}, _From, State) ->
{reply, SampleAcc, rolling, State};
rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) ->
SW = os:timestamp(),
Handle = State#state.handle,
@ -507,28 +537,14 @@ reader({key_check, Key}, _From, State) ->
State#state.binary_mode),
reader,
State};
reader({get_positions, SampleSize}, _From, State) ->
reader({get_positions, SampleSize, Index, Acc}, _From, State) ->
{Index, {Pos, Count}} = lists:keyfind(Index, 1, State#state.hash_index),
UpdAcc = scan_index_returnpositions(State#state.handle, Pos, Count, Acc),
case SampleSize of
all ->
{reply,
scan_index(State#state.handle,
State#state.hash_index,
{fun scan_index_returnpositions/4, []}),
reader,
State};
{reply, UpdAcc, reader, State};
_ ->
SeededL = lists:map(fun(X) -> {random:uniform(), X} end,
State#state.hash_index),
SortedL = lists:keysort(1, SeededL),
RandomisedHashIndex = lists:map(fun({_R, X}) -> X end, SortedL),
{reply,
scan_index_forsample(State#state.handle,
RandomisedHashIndex,
fun scan_index_returnpositions/4,
[],
SampleSize),
reader,
State}
{reply, lists:sublist(UpdAcc, SampleSize), reader, State}
end;
reader({direct_fetch, PositionList, Info}, _From, State) ->
H = State#state.handle,
@ -941,18 +957,21 @@ open_for_readonly(Filename, LastKeyKnown) ->
load_index(Handle) ->
Index = lists:seq(0, 255),
lists:map(fun(X) ->
file:position(Handle, {bof, ?DWORD_SIZE * X}),
{HashTablePos, Count} = read_next_2_integers(Handle),
{X, {HashTablePos, Count}} end,
Index).
LoadIndexFun =
fun(X) ->
file:position(Handle, {bof, ?DWORD_SIZE * X}),
{HashTablePos, Count} = read_next_2_integers(Handle),
{X, {HashTablePos, Count}}
end,
lists:map(LoadIndexFun, Index).
%% Function to find the LastKey in the file
find_lastkey(Handle, IndexCache) ->
{LastPosition, TotalKeys} = scan_index(Handle,
IndexCache,
{fun scan_index_findlast/4,
{0, 0}}),
ScanIndexFun =
fun({_X, {Pos, Count}}, {LastPos, KeyCount}) ->
scan_index_findlast(Handle, Pos, Count, {LastPos, KeyCount})
end,
{LastPosition, TotalKeys} = lists:foldl(ScanIndexFun, {0, 0}, IndexCache),
case TotalKeys of
0 ->
empty;
@ -963,45 +982,29 @@ find_lastkey(Handle, IndexCache) ->
end.
scan_index(Handle, IndexCache, {ScanFun, InitAcc}) ->
lists:foldl(fun({_X, {Pos, Count}}, Acc) ->
ScanFun(Handle, Pos, Count, Acc)
end,
InitAcc,
IndexCache).
scan_index_forsample(_Handle, [], _ScanFun, Acc, SampleSize) ->
lists:sublist(Acc, SampleSize);
scan_index_forsample(Handle, [CacheEntry|Tail], ScanFun, Acc, SampleSize) ->
case length(Acc) of
L when L >= SampleSize ->
lists:sublist(Acc, SampleSize);
_ ->
{_X, {Pos, Count}} = CacheEntry,
scan_index_forsample(Handle,
Tail,
ScanFun,
ScanFun(Handle, Pos, Count, Acc),
SampleSize)
end.
scan_index_findlast(Handle, Position, Count, {LastPosition, TotalKeys}) ->
{ok, _} = file:position(Handle, Position),
MaxPos = lists:foldl(fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end,
MaxPosFun = fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end,
MaxPos = lists:foldl(MaxPosFun,
LastPosition,
read_next_n_integerpairs(Handle, Count)),
{MaxPos, TotalKeys + Count}.
scan_index_returnpositions(Handle, Position, Count, PosList0) ->
{ok, _} = file:position(Handle, Position),
lists:foldl(fun({Hash, HPosition}, PosList) ->
case Hash of
0 -> PosList;
_ -> PosList ++ [HPosition]
end end,
AddPosFun =
fun({Hash, HPosition}, PosList) ->
case Hash of
0 ->
PosList;
_ ->
[HPosition|PosList]
end
end,
PosList = lists:foldl(AddPosFun,
PosList0,
read_next_n_integerpairs(Handle, Count)).
read_next_n_integerpairs(Handle, Count)),
lists:reverse(PosList).
%% Take an active file and write the hash details necessary to close that
@ -1981,6 +1984,7 @@ get_keys_byposition_manykeys_test() ->
{ok, P2} = cdb_open_reader(F2, #cdb_options{binary_mode=false}),
PositionList = cdb_getpositions(P2, all),
L1 = length(PositionList),
io:format("Length of all positions ~w~n", [L1]),
?assertMatch(KeyCount, L1),
SampleList1 = cdb_getpositions(P2, 10),

View file

@ -85,6 +85,8 @@
clerk_stop/1,
code_change/3]).
-export([schedule_compaction/3]).
-include_lib("eunit/include/eunit.hrl").
-define(JOURNAL_FILEX, "cdb").
@ -93,13 +95,14 @@
-define(BATCH_SIZE, 32).
-define(BATCHES_TO_CHECK, 8).
%% How many consecutive files to compact in one run
-define(MAX_COMPACTION_RUN, 4).
-define(MAX_COMPACTION_RUN, 6).
%% Sliding scale to allow preference of longer runs up to maximum
-define(SINGLEFILE_COMPACTION_TARGET, 60.0).
-define(MAXRUN_COMPACTION_TARGET, 80.0).
-define(SINGLEFILE_COMPACTION_TARGET, 40.0).
-define(MAXRUN_COMPACTION_TARGET, 60.0).
-define(CRC_SIZE, 4).
-define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])).
-define(DEFAULT_WASTE_RETENTION_PERIOD, 86400).
-define(INTERVALS_PER_HOUR, 4).
-record(state, {inker :: pid(),
max_run_length :: integer(),
@ -230,6 +233,79 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%============================================================================
%%% External functions
%%%============================================================================
%% Schedule the next compaction event for this store. Chooses a random
%% interval, and then a random start time within the first third
%% of the interval.
%%
%% The number of Compaction runs per day can be set. This doesn't guaranteee
%% those runs, but uses the assumption there will be n runs when scheduling
%% the next one
%%
%% Compaction Hours should be the list of hours during the day (based on local
%% time when compcation can be scheduled to run)
%% e.g. [0, 1, 2, 3, 4, 21, 22, 23]
%% Runs per day is the number of compaction runs per day that should be
%% scheduled - expected to be a small integer, probably 1
%%
%% Current TS should be the outcome of os:timestamp()
%%
schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
% We chedule the next interval by acting as if we were scheduing all
% n intervals at random, but then only chose the next one. After each
% event is occurred the random process is repeated to determine the next
% event to schedule i.e. the unused schedule is discarded.
IntervalLength = 60 div ?INTERVALS_PER_HOUR,
TotalHours = length(CompactionHours),
LocalTime = calendar:now_to_local_time(CurrentTS),
{{NowY, NowMon, NowD},
{NowH, NowMin, _NowS}} = LocalTime,
CurrentInterval = {NowH, NowMin div IntervalLength + 1},
% Randomly select an hour and an interval for each of the runs expected
% today.
RandSelect =
fun(_X) ->
{lists:nth(random:uniform(TotalHours), CompactionHours),
random:uniform(?INTERVALS_PER_HOUR)}
end,
RandIntervals = lists:sort(lists:map(RandSelect,
lists:seq(1, RunsPerDay))),
% Pick the next interval from the list. The intervals before current time
% are considered as intervals tomorrow, so will only be next if there are
% no other today
CheckNotBefore = fun(A) -> A =< CurrentInterval end,
{TooEarly, MaybeOK} = lists:splitwith(CheckNotBefore, RandIntervals),
{NextDate, {NextH, NextI}} =
case MaybeOK of
[] ->
% Use first interval picked tomorrow if none of selected run times
% are today
Tmrw = calendar:date_to_gregorian_days(NowY, NowMon, NowD) + 1,
{calendar:gregorian_days_to_date(Tmrw),
lists:nth(1, TooEarly)};
_ ->
{{NowY, NowMon, NowD}, lists:nth(1, MaybeOK)}
end,
% Calculate the offset in seconds to this next interval
NextS0 = NextI * (IntervalLength * 60)
- random:uniform(IntervalLength * 60),
NextM = NextS0 div 60,
NextS = NextS0 rem 60,
TimeDiff = calendar:time_difference(LocalTime,
{NextDate, {NextH, NextM, NextS}}),
{Days, {Hours, Mins, Secs}} = TimeDiff,
Days * 86400 + Hours * 3600 + Mins * 60 + Secs.
%%%============================================================================
%%% Internal functions
%%%============================================================================
@ -536,7 +612,6 @@ clear_waste(State) ->
end,
ClearedJournals).
%%%============================================================================
%%% Test
@ -545,25 +620,42 @@ clear_waste(State) ->
-ifdef(TEST).
schedule_test() ->
schedule_test_bycount(1),
schedule_test_bycount(2),
schedule_test_bycount(4).
schedule_test_bycount(N) ->
CurrentTS = {1490,883918,94000}, % Actually 30th March 2017 15:27
SecondsToCompaction0 = schedule_compaction([16], N, CurrentTS),
io:format("Seconds to compaction ~w~n", [SecondsToCompaction0]),
?assertMatch(true, SecondsToCompaction0 > 1800),
?assertMatch(true, SecondsToCompaction0 < 5700),
SecondsToCompaction1 = schedule_compaction([14], N, CurrentTS), % tomorrow!
io:format("Seconds to compaction ~w~n", [SecondsToCompaction1]),
?assertMatch(true, SecondsToCompaction1 > 81000),
?assertMatch(true, SecondsToCompaction1 < 84300).
simple_score_test() ->
Run1 = [#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 76.0},
#candidate{compaction_perc = 70.0}],
?assertMatch(6.0, score_run(Run1, 4)),
?assertMatch(-14.0, score_run(Run1, 4)),
Run2 = [#candidate{compaction_perc = 75.0}],
?assertMatch(-15.0, score_run(Run2, 4)),
?assertMatch(-35.0, score_run(Run2, 4)),
?assertMatch(0.0, score_run([], 4)),
Run3 = [#candidate{compaction_perc = 100.0}],
?assertMatch(-40.0, score_run(Run3, 4)).
?assertMatch(-60.0, score_run(Run3, 4)).
score_compare_test() ->
Run1 = [#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 76.0},
#candidate{compaction_perc = 70.0}],
Run1 = [#candidate{compaction_perc = 55.0},
#candidate{compaction_perc = 55.0},
#candidate{compaction_perc = 56.0},
#candidate{compaction_perc = 50.0}],
?assertMatch(6.0, score_run(Run1, 4)),
Run2 = [#candidate{compaction_perc = 75.0}],
Run2 = [#candidate{compaction_perc = 55.0}],
?assertMatch(Run1, choose_best_assessment(Run1, Run2, 4)),
?assertMatch(Run2, choose_best_assessment(Run1 ++ Run2, Run2, 4)).
@ -585,27 +677,27 @@ file_gc_test() ->
find_bestrun_test() ->
%% Tests dependent on these defaults
%% -define(MAX_COMPACTION_RUN, 4).
%% -define(SINGLEFILE_COMPACTION_TARGET, 60.0).
%% -define(MAXRUN_COMPACTION_TARGET, 80.0).
%% -define(SINGLEFILE_COMPACTION_TARGET, 40.0).
%% -define(MAXRUN_COMPACTION_TARGET, 60.0).
%% Tested first with blocks significant as no back-tracking
Block1 = [#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 85.0},
#candidate{compaction_perc = 62.0},
#candidate{compaction_perc = 70.0}],
Block2 = [#candidate{compaction_perc = 58.0},
#candidate{compaction_perc = 95.0},
#candidate{compaction_perc = 95.0},
#candidate{compaction_perc = 65.0}],
Block3 = [#candidate{compaction_perc = 90.0},
Block1 = [#candidate{compaction_perc = 55.0},
#candidate{compaction_perc = 65.0},
#candidate{compaction_perc = 42.0},
#candidate{compaction_perc = 50.0}],
Block2 = [#candidate{compaction_perc = 38.0},
#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 45.0}],
Block3 = [#candidate{compaction_perc = 70.0},
#candidate{compaction_perc = 100.0},
#candidate{compaction_perc = 100.0},
#candidate{compaction_perc = 100.0}],
Block4 = [#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 76.0},
#candidate{compaction_perc = 76.0},
Block4 = [#candidate{compaction_perc = 55.0},
#candidate{compaction_perc = 56.0},
#candidate{compaction_perc = 56.0},
#candidate{compaction_perc = 40.0}],
Block5 = [#candidate{compaction_perc = 60.0},
#candidate{compaction_perc = 60.0}],
Block5 = [#candidate{compaction_perc = 80.0},
#candidate{compaction_perc = 80.0}],
CList0 = Block1 ++ Block2 ++ Block3 ++ Block4 ++ Block5,
?assertMatch(Block4, assess_candidates(CList0, 4, [], [])),
CList1 = CList0 ++ [#candidate{compaction_perc = 20.0}],
@ -614,26 +706,26 @@ find_bestrun_test() ->
CList2 = Block4 ++ Block3 ++ Block2 ++ Block1 ++ Block5,
?assertMatch(Block4, assess_candidates(CList2, 4, [], [])),
CList3 = Block5 ++ Block1 ++ Block2 ++ Block3 ++ Block4,
?assertMatch([#candidate{compaction_perc = 62.0},
#candidate{compaction_perc = 70.0},
#candidate{compaction_perc = 58.0}],
?assertMatch([#candidate{compaction_perc = 42.0},
#candidate{compaction_perc = 50.0},
#candidate{compaction_perc = 38.0}],
assess_candidates(CList3, 4, [], [])),
%% Now do some back-tracking to get a genuinely optimal solution without
%% needing to re-order
?assertMatch([#candidate{compaction_perc = 62.0},
#candidate{compaction_perc = 70.0},
#candidate{compaction_perc = 58.0}],
?assertMatch([#candidate{compaction_perc = 42.0},
#candidate{compaction_perc = 50.0},
#candidate{compaction_perc = 38.0}],
assess_candidates(CList0, 4)),
?assertMatch([#candidate{compaction_perc = 62.0},
#candidate{compaction_perc = 70.0},
#candidate{compaction_perc = 58.0}],
?assertMatch([#candidate{compaction_perc = 42.0},
#candidate{compaction_perc = 50.0},
#candidate{compaction_perc = 38.0}],
assess_candidates(CList0, 5)),
?assertMatch([#candidate{compaction_perc = 62.0},
#candidate{compaction_perc = 70.0},
#candidate{compaction_perc = 58.0},
#candidate{compaction_perc = 95.0},
#candidate{compaction_perc = 95.0},
#candidate{compaction_perc = 65.0}],
?assertMatch([#candidate{compaction_perc = 42.0},
#candidate{compaction_perc = 50.0},
#candidate{compaction_perc = 38.0},
#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 45.0}],
assess_candidates(CList0, 6)).
test_ledgerkey(Key) ->

View file

@ -900,7 +900,7 @@ compact_journal_test() ->
end end,
false,
CompactedManifest2),
?assertMatch(true, R),
?assertMatch(false, R),
?assertMatch(2, length(CompactedManifest2)),
ink_close(Ink1),
clean_testdir(RootPath).