diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 19d660e..4b7d68c 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -152,7 +152,37 @@ cdb_mput(Pid, KVList) -> %% SampleSize can be an integer or the atom all cdb_getpositions(Pid, SampleSize) -> - gen_fsm:sync_send_event(Pid, {get_positions, SampleSize}, infinity). + % Getting many positions from the index, especially getting all positions + % can take time (about 1s for all positions). Rather than queue all + % requests waiting for this to complete, loop over each of the 256 indexes + % outside of the FSM processing loop - to allow for other messages to be + % interleaved + case SampleSize of + all -> + FoldFun = + fun(Index, Acc) -> + cdb_getpositions_fromidx(Pid, all, Index, Acc) + end, + IdxList = lists:seq(0, 255), + lists:foldl(FoldFun, [], IdxList); + S0 -> + FoldFun = + fun({_R, Index}, Acc) -> + case length(Acc) of + S0 -> + Acc; + L when L < S0 -> + cdb_getpositions_fromidx(Pid, S0, Index, Acc) + end + end, + RandFun = fun(X) -> {random:uniform(), X} end, + SeededL = lists:map(RandFun, lists:seq(0, 255)), + SortedL = lists:keysort(1, SeededL), + lists:foldl(FoldFun, [], SortedL) + end. + +cdb_getpositions_fromidx(Pid, SampleSize, Index, Acc) -> + gen_fsm:sync_send_event(Pid, {get_positions, SampleSize, Index, Acc}). %% Info can be key_only, key_size (size being the size of the value) or %% key_value_check (with the check part indicating if the CRC is correct for @@ -352,8 +382,8 @@ rolling({key_check, Key}, _From, State) -> loose_presence), rolling, State}; -rolling({get_positions, _SampleSize}, _From, State) -> - {reply, [], rolling, State}; +rolling({get_positions, _SampleSize, _Index, SampleAcc}, _From, State) -> + {reply, SampleAcc, rolling, State}; rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> SW = os:timestamp(), Handle = State#state.handle, @@ -405,28 +435,14 @@ reader({key_check, Key}, _From, State) -> State#state.binary_mode), reader, State}; -reader({get_positions, SampleSize}, _From, State) -> +reader({get_positions, SampleSize, Index, Acc}, _From, State) -> + {Index, {Pos, Count}} = lists:keyfind(Index, 1, State#state.hash_index), + UpdAcc = scan_index_returnpositions(State#state.handle, Pos, Count, Acc), case SampleSize of all -> - {reply, - scan_index(State#state.handle, - State#state.hash_index, - {fun scan_index_returnpositions/4, []}), - reader, - State}; + {reply, UpdAcc, reader, State}; _ -> - SeededL = lists:map(fun(X) -> {random:uniform(), X} end, - State#state.hash_index), - SortedL = lists:keysort(1, SeededL), - RandomisedHashIndex = lists:map(fun({_R, X}) -> X end, SortedL), - {reply, - scan_index_forsample(State#state.handle, - RandomisedHashIndex, - fun scan_index_returnpositions/4, - [], - SampleSize), - reader, - State} + {reply, lists:sublist(UpdAcc, SampleSize), reader, State} end; reader({direct_fetch, PositionList, Info}, _From, State) -> H = State#state.handle, @@ -839,18 +855,21 @@ open_for_readonly(Filename, LastKeyKnown) -> load_index(Handle) -> Index = lists:seq(0, 255), - lists:map(fun(X) -> - file:position(Handle, {bof, ?DWORD_SIZE * X}), - {HashTablePos, Count} = read_next_2_integers(Handle), - {X, {HashTablePos, Count}} end, - Index). + LoadIndexFun = + fun(X) -> + file:position(Handle, {bof, ?DWORD_SIZE * X}), + {HashTablePos, Count} = read_next_2_integers(Handle), + {X, {HashTablePos, Count}} + end, + lists:map(LoadIndexFun, Index). %% Function to find the LastKey in the file find_lastkey(Handle, IndexCache) -> - {LastPosition, TotalKeys} = scan_index(Handle, - IndexCache, - {fun scan_index_findlast/4, - {0, 0}}), + ScanIndexFun = + fun({_X, {Pos, Count}}, {LastPos, KeyCount}) -> + scan_index_findlast(Handle, Pos, Count, {LastPos, KeyCount}) + end, + {LastPosition, TotalKeys} = lists:foldl(ScanIndexFun, {0, 0}, IndexCache), case TotalKeys of 0 -> empty; @@ -861,45 +880,29 @@ find_lastkey(Handle, IndexCache) -> end. -scan_index(Handle, IndexCache, {ScanFun, InitAcc}) -> - lists:foldl(fun({_X, {Pos, Count}}, Acc) -> - ScanFun(Handle, Pos, Count, Acc) - end, - InitAcc, - IndexCache). - -scan_index_forsample(_Handle, [], _ScanFun, Acc, SampleSize) -> - lists:sublist(Acc, SampleSize); -scan_index_forsample(Handle, [CacheEntry|Tail], ScanFun, Acc, SampleSize) -> - case length(Acc) of - L when L >= SampleSize -> - lists:sublist(Acc, SampleSize); - _ -> - {_X, {Pos, Count}} = CacheEntry, - scan_index_forsample(Handle, - Tail, - ScanFun, - ScanFun(Handle, Pos, Count, Acc), - SampleSize) - end. - - scan_index_findlast(Handle, Position, Count, {LastPosition, TotalKeys}) -> {ok, _} = file:position(Handle, Position), - MaxPos = lists:foldl(fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end, + MaxPosFun = fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end, + MaxPos = lists:foldl(MaxPosFun, LastPosition, read_next_n_integerpairs(Handle, Count)), {MaxPos, TotalKeys + Count}. scan_index_returnpositions(Handle, Position, Count, PosList0) -> {ok, _} = file:position(Handle, Position), - lists:foldl(fun({Hash, HPosition}, PosList) -> - case Hash of - 0 -> PosList; - _ -> PosList ++ [HPosition] - end end, + AddPosFun = + fun({Hash, HPosition}, PosList) -> + case Hash of + 0 -> + PosList; + _ -> + [HPosition|PosList] + end + end, + PosList = lists:foldl(AddPosFun, PosList0, - read_next_n_integerpairs(Handle, Count)). + read_next_n_integerpairs(Handle, Count)), + lists:reverse(PosList). %% Take an active file and write the hash details necessary to close that @@ -1879,6 +1882,7 @@ get_keys_byposition_manykeys_test() -> {ok, P2} = cdb_open_reader(F2, #cdb_options{binary_mode=false}), PositionList = cdb_getpositions(P2, all), L1 = length(PositionList), + io:format("Length of all positions ~w~n", [L1]), ?assertMatch(KeyCount, L1), SampleList1 = cdb_getpositions(P2, 10), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 7b0dc08..0183240 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -85,6 +85,8 @@ clerk_stop/1, code_change/3]). +-export([schedule_compaction/3]). + -include_lib("eunit/include/eunit.hrl"). -define(JOURNAL_FILEX, "cdb"). @@ -93,13 +95,14 @@ -define(BATCH_SIZE, 32). -define(BATCHES_TO_CHECK, 8). %% How many consecutive files to compact in one run --define(MAX_COMPACTION_RUN, 4). +-define(MAX_COMPACTION_RUN, 6). %% Sliding scale to allow preference of longer runs up to maximum --define(SINGLEFILE_COMPACTION_TARGET, 60.0). --define(MAXRUN_COMPACTION_TARGET, 80.0). +-define(SINGLEFILE_COMPACTION_TARGET, 40.0). +-define(MAXRUN_COMPACTION_TARGET, 60.0). -define(CRC_SIZE, 4). -define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])). -define(DEFAULT_WASTE_RETENTION_PERIOD, 86400). +-define(INTERVALS_PER_HOUR, 4). -record(state, {inker :: pid(), max_run_length :: integer(), @@ -230,6 +233,79 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================ +%%% External functions +%%%============================================================================ + +%% Schedule the next compaction event for this store. Chooses a random +%% interval, and then a random start time within the first third +%% of the interval. +%% +%% The number of Compaction runs per day can be set. This doesn't guaranteee +%% those runs, but uses the assumption there will be n runs when scheduling +%% the next one +%% +%% Compaction Hours should be the list of hours during the day (based on local +%% time when compcation can be scheduled to run) +%% e.g. [0, 1, 2, 3, 4, 21, 22, 23] +%% Runs per day is the number of compaction runs per day that should be +%% scheduled - expected to be a small integer, probably 1 +%% +%% Current TS should be the outcome of os:timestamp() +%% + +schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) -> + % We chedule the next interval by acting as if we were scheduing all + % n intervals at random, but then only chose the next one. After each + % event is occurred the random process is repeated to determine the next + % event to schedule i.e. the unused schedule is discarded. + + IntervalLength = 60 div ?INTERVALS_PER_HOUR, + TotalHours = length(CompactionHours), + + LocalTime = calendar:now_to_local_time(CurrentTS), + {{NowY, NowMon, NowD}, + {NowH, NowMin, _NowS}} = LocalTime, + CurrentInterval = {NowH, NowMin div IntervalLength + 1}, + + % Randomly select an hour and an interval for each of the runs expected + % today. + RandSelect = + fun(_X) -> + {lists:nth(random:uniform(TotalHours), CompactionHours), + random:uniform(?INTERVALS_PER_HOUR)} + end, + RandIntervals = lists:sort(lists:map(RandSelect, + lists:seq(1, RunsPerDay))), + + % Pick the next interval from the list. The intervals before current time + % are considered as intervals tomorrow, so will only be next if there are + % no other today + CheckNotBefore = fun(A) -> A =< CurrentInterval end, + {TooEarly, MaybeOK} = lists:splitwith(CheckNotBefore, RandIntervals), + {NextDate, {NextH, NextI}} = + case MaybeOK of + [] -> + % Use first interval picked tomorrow if none of selected run times + % are today + Tmrw = calendar:date_to_gregorian_days(NowY, NowMon, NowD) + 1, + {calendar:gregorian_days_to_date(Tmrw), + lists:nth(1, TooEarly)}; + _ -> + {{NowY, NowMon, NowD}, lists:nth(1, MaybeOK)} + end, + + % Calculate the offset in seconds to this next interval + NextS0 = NextI * (IntervalLength * 60) + - random:uniform(IntervalLength * 60), + NextM = NextS0 div 60, + NextS = NextS0 rem 60, + TimeDiff = calendar:time_difference(LocalTime, + {NextDate, {NextH, NextM, NextS}}), + {Days, {Hours, Mins, Secs}} = TimeDiff, + Days * 86400 + Hours * 3600 + Mins * 60 + Secs. + + %%%============================================================================ %%% Internal functions %%%============================================================================ @@ -536,7 +612,6 @@ clear_waste(State) -> end, ClearedJournals). - %%%============================================================================ %%% Test @@ -545,25 +620,42 @@ clear_waste(State) -> -ifdef(TEST). +schedule_test() -> + schedule_test_bycount(1), + schedule_test_bycount(2), + schedule_test_bycount(4). + +schedule_test_bycount(N) -> + CurrentTS = {1490,883918,94000}, % Actually 30th March 2017 15:27 + SecondsToCompaction0 = schedule_compaction([16], N, CurrentTS), + io:format("Seconds to compaction ~w~n", [SecondsToCompaction0]), + ?assertMatch(true, SecondsToCompaction0 > 1800), + ?assertMatch(true, SecondsToCompaction0 < 5700), + SecondsToCompaction1 = schedule_compaction([14], N, CurrentTS), % tomorrow! + io:format("Seconds to compaction ~w~n", [SecondsToCompaction1]), + ?assertMatch(true, SecondsToCompaction1 > 81000), + ?assertMatch(true, SecondsToCompaction1 < 84300). + + simple_score_test() -> Run1 = [#candidate{compaction_perc = 75.0}, #candidate{compaction_perc = 75.0}, #candidate{compaction_perc = 76.0}, #candidate{compaction_perc = 70.0}], - ?assertMatch(6.0, score_run(Run1, 4)), + ?assertMatch(-14.0, score_run(Run1, 4)), Run2 = [#candidate{compaction_perc = 75.0}], - ?assertMatch(-15.0, score_run(Run2, 4)), + ?assertMatch(-35.0, score_run(Run2, 4)), ?assertMatch(0.0, score_run([], 4)), Run3 = [#candidate{compaction_perc = 100.0}], - ?assertMatch(-40.0, score_run(Run3, 4)). + ?assertMatch(-60.0, score_run(Run3, 4)). score_compare_test() -> - Run1 = [#candidate{compaction_perc = 75.0}, - #candidate{compaction_perc = 75.0}, - #candidate{compaction_perc = 76.0}, - #candidate{compaction_perc = 70.0}], + Run1 = [#candidate{compaction_perc = 55.0}, + #candidate{compaction_perc = 55.0}, + #candidate{compaction_perc = 56.0}, + #candidate{compaction_perc = 50.0}], ?assertMatch(6.0, score_run(Run1, 4)), - Run2 = [#candidate{compaction_perc = 75.0}], + Run2 = [#candidate{compaction_perc = 55.0}], ?assertMatch(Run1, choose_best_assessment(Run1, Run2, 4)), ?assertMatch(Run2, choose_best_assessment(Run1 ++ Run2, Run2, 4)). @@ -585,27 +677,27 @@ file_gc_test() -> find_bestrun_test() -> %% Tests dependent on these defaults %% -define(MAX_COMPACTION_RUN, 4). -%% -define(SINGLEFILE_COMPACTION_TARGET, 60.0). -%% -define(MAXRUN_COMPACTION_TARGET, 80.0). +%% -define(SINGLEFILE_COMPACTION_TARGET, 40.0). +%% -define(MAXRUN_COMPACTION_TARGET, 60.0). %% Tested first with blocks significant as no back-tracking - Block1 = [#candidate{compaction_perc = 75.0}, - #candidate{compaction_perc = 85.0}, - #candidate{compaction_perc = 62.0}, - #candidate{compaction_perc = 70.0}], - Block2 = [#candidate{compaction_perc = 58.0}, - #candidate{compaction_perc = 95.0}, - #candidate{compaction_perc = 95.0}, - #candidate{compaction_perc = 65.0}], - Block3 = [#candidate{compaction_perc = 90.0}, + Block1 = [#candidate{compaction_perc = 55.0}, + #candidate{compaction_perc = 65.0}, + #candidate{compaction_perc = 42.0}, + #candidate{compaction_perc = 50.0}], + Block2 = [#candidate{compaction_perc = 38.0}, + #candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 45.0}], + Block3 = [#candidate{compaction_perc = 70.0}, #candidate{compaction_perc = 100.0}, #candidate{compaction_perc = 100.0}, #candidate{compaction_perc = 100.0}], - Block4 = [#candidate{compaction_perc = 75.0}, - #candidate{compaction_perc = 76.0}, - #candidate{compaction_perc = 76.0}, + Block4 = [#candidate{compaction_perc = 55.0}, + #candidate{compaction_perc = 56.0}, + #candidate{compaction_perc = 56.0}, + #candidate{compaction_perc = 40.0}], + Block5 = [#candidate{compaction_perc = 60.0}, #candidate{compaction_perc = 60.0}], - Block5 = [#candidate{compaction_perc = 80.0}, - #candidate{compaction_perc = 80.0}], CList0 = Block1 ++ Block2 ++ Block3 ++ Block4 ++ Block5, ?assertMatch(Block4, assess_candidates(CList0, 4, [], [])), CList1 = CList0 ++ [#candidate{compaction_perc = 20.0}], @@ -614,26 +706,26 @@ find_bestrun_test() -> CList2 = Block4 ++ Block3 ++ Block2 ++ Block1 ++ Block5, ?assertMatch(Block4, assess_candidates(CList2, 4, [], [])), CList3 = Block5 ++ Block1 ++ Block2 ++ Block3 ++ Block4, - ?assertMatch([#candidate{compaction_perc = 62.0}, - #candidate{compaction_perc = 70.0}, - #candidate{compaction_perc = 58.0}], + ?assertMatch([#candidate{compaction_perc = 42.0}, + #candidate{compaction_perc = 50.0}, + #candidate{compaction_perc = 38.0}], assess_candidates(CList3, 4, [], [])), %% Now do some back-tracking to get a genuinely optimal solution without %% needing to re-order - ?assertMatch([#candidate{compaction_perc = 62.0}, - #candidate{compaction_perc = 70.0}, - #candidate{compaction_perc = 58.0}], + ?assertMatch([#candidate{compaction_perc = 42.0}, + #candidate{compaction_perc = 50.0}, + #candidate{compaction_perc = 38.0}], assess_candidates(CList0, 4)), - ?assertMatch([#candidate{compaction_perc = 62.0}, - #candidate{compaction_perc = 70.0}, - #candidate{compaction_perc = 58.0}], + ?assertMatch([#candidate{compaction_perc = 42.0}, + #candidate{compaction_perc = 50.0}, + #candidate{compaction_perc = 38.0}], assess_candidates(CList0, 5)), - ?assertMatch([#candidate{compaction_perc = 62.0}, - #candidate{compaction_perc = 70.0}, - #candidate{compaction_perc = 58.0}, - #candidate{compaction_perc = 95.0}, - #candidate{compaction_perc = 95.0}, - #candidate{compaction_perc = 65.0}], + ?assertMatch([#candidate{compaction_perc = 42.0}, + #candidate{compaction_perc = 50.0}, + #candidate{compaction_perc = 38.0}, + #candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 75.0}, + #candidate{compaction_perc = 45.0}], assess_candidates(CList0, 6)). test_ledgerkey(Key) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index acfed4d..2513a4a 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -900,7 +900,7 @@ compact_journal_test() -> end end, false, CompactedManifest2), - ?assertMatch(true, R), + ?assertMatch(false, R), ?assertMatch(2, length(CompactedManifest2)), ink_close(Ink1), clean_testdir(RootPath).