Merge pull request #252 from martinsumner/mas-i249-iclerkandconfig

Mas i249 iclerkandconfig
This commit is contained in:
Martin Sumner 2019-01-28 08:56:36 +00:00 committed by GitHub
commit b12b6e4c91
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 348 additions and 205 deletions

BIN
.eqc-info Normal file

Binary file not shown.

4
.gitignore vendored
View file

@ -5,3 +5,7 @@
.DS_Store
rebar.lock
test/test_area/*
cover
cover_*
.eqc-info
leveled_data/*

BIN
current_counterexample.eqc Normal file

Binary file not shown.

View file

@ -50,7 +50,8 @@
{press_method = native
:: leveled_sst:press_method(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options()}).
:: leveled_log:log_options(),
max_sstslots = 256 :: pos_integer()}).
-record(inker_options,
{cdb_max_size :: integer() | undefined,

View file

@ -11,8 +11,8 @@
{profiles,
[{eqc, [{deps, [meck, fqc]},
{erl_opts, [debug_info, {parse_transform, lager_transform}, {parse_transform, eqc_cover}]},
{plugins, [rebar_eqc]}]}
{erl_opts, [debug_info, {parse_transform, eqc_cover}]},
{extra_src_dirs, ["test"]}]}
]}.
{deps, [

View file

@ -124,6 +124,7 @@
{snapshot_bookie, undefined},
{cache_size, ?CACHE_SIZE},
{max_journalsize, 1000000000},
{max_sstslots, 256},
{sync_strategy, none},
{head_only, false},
{waste_retention_period, undefined},
@ -220,6 +221,10 @@
{max_journalsize, pos_integer()} |
% The maximum size of a journal file in bytes. The abolute
% maximum must be 4GB due to 4 byte file pointers being used
{max_sstslots, pos_integer()} |
% The maximum number of slots in a SST file. All testing is done
% at a size of 256 (except for Quickcheck tests}, altering this
% value is not recommended
{sync_strategy, sync_mode()} |
% Should be sync if it is necessary to flush to disk after every
% write, or none if not (allow the OS to schecdule). This has a
@ -1004,7 +1009,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity).
-spec book_compactjournal(pid(), integer()) -> ok.
-spec book_compactjournal(pid(), integer()) -> ok|busy.
-spec book_islastcompactionpending(pid()) -> boolean().
-spec book_trimjournal(pid()) -> ok.
@ -1014,7 +1019,8 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
%% in Riak it will be triggered by a vnode callback.
book_compactjournal(Pid, Timeout) ->
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
{R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity),
R.
%% @doc Check on progress of the last compaction
@ -1122,7 +1128,7 @@ init([Opts]) ->
ConfiguredCacheSize =
max(proplists:get_value(cache_size, Opts), ?MIN_CACHE_SIZE),
CacheJitter =
ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER),
max(1, ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER)),
CacheSize =
ConfiguredCacheSize + erlang:phash2(self()) rem CacheJitter,
PCLMaxSize =
@ -1371,10 +1377,17 @@ handle_call({return_runner, QueryType}, _From, State) ->
fold_countdown = CountDown}};
handle_call({compact_journal, Timeout}, _From, State)
when State#state.head_only == false ->
ok = leveled_inker:ink_compactjournal(State#state.inker,
self(),
Timeout),
{reply, ok, State};
case leveled_inker:ink_compactionpending(State#state.inker) of
true ->
{reply, {busy, undefined}, State};
false ->
{ok, PclSnap, null} =
snapshot_store(State, ledger, undefined, true),
R = leveled_inker:ink_compactjournal(State#state.inker,
PclSnap,
Timeout),
{reply, R, State}
end;
handle_call(confirm_compact, _From, State)
when State#state.head_only == false ->
{reply, leveled_inker:ink_compactionpending(State#state.inker), State};
@ -1627,6 +1640,8 @@ set_options(Opts) ->
false
end,
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
{#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy,
max_run_length = proplists:get_value(max_run_length, Opts),
@ -1647,8 +1662,9 @@ set_options(Opts) ->
snaptimeout_short = SnapTimeoutShort,
snaptimeout_long = SnapTimeoutLong,
sst_options =
#sst_options{press_method = CompressionMethod,
log_options=leveled_log:get_opts()}}
#sst_options{press_method=CompressionMethod,
log_options=leveled_log:get_opts(),
max_sstslots=MaxSSTSlots}}
}.

View file

@ -820,18 +820,24 @@ finished_rolling(CDB) ->
-spec close_pendingdelete(file:io_device(), list(), list()|undefined) -> ok.
%% @doc
%% If delete is pending - thent he close behaviour needs to actuallly delete
%% If delete is pending - then the close behaviour needs to actuallly delete
%% the file
close_pendingdelete(Handle, Filename, WasteFP) ->
case WasteFP of
undefined ->
ok = file:close(Handle),
ok = file:delete(Filename);
WasteFP ->
file:close(Handle),
Components = filename:split(Filename),
NewName = WasteFP ++ lists:last(Components),
file:rename(Filename, NewName)
ok = file:close(Handle),
case filelib:is_file(Filename) of
true ->
case WasteFP of
undefined ->
ok = file:delete(Filename);
WasteFP ->
Components = filename:split(Filename),
NewName = WasteFP ++ lists:last(Components),
file:rename(Filename, NewName)
end;
false ->
% This may happen when there has been a destroy while files are
% still pending deletion
leveled_log:log("CDB21", [Filename])
end.
-spec set_writeops(sync|riak_sync|none) -> {list(), sync|riak_sync|none}.
@ -2600,6 +2606,24 @@ badly_written_test() ->
ok = cdb_close(P2),
file:delete(F1).
pendingdelete_test() ->
F1 = "test/test_area/deletfile_test.pnd",
file:delete(F1),
{ok, P1} = cdb_open_writer(F1, #cdb_options{binary_mode=false}),
KVList = generate_sequentialkeys(1000, []),
ok = cdb_mput(P1, KVList),
?assertMatch(probably, cdb_keycheck(P1, "Key1")),
?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")),
?assertMatch({"Key100", "Value100"}, cdb_get(P1, "Key100")),
{ok, F2} = cdb_complete(P1),
{ok, P2} = cdb_open_reader(F2, #cdb_options{binary_mode=false}),
?assertMatch({"Key1", "Value1"}, cdb_get(P2, "Key1")),
?assertMatch({"Key100", "Value100"}, cdb_get(P2, "Key100")),
file:delete(F2),
ok = cdb_deletepending(P2),
% No issues destroying even though the file has already been removed
ok = cdb_destroy(P2).
nonsense_coverage_test() ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []),

View file

@ -82,9 +82,10 @@
code_change/3]).
-export([clerk_new/1,
clerk_compact/7,
clerk_compact/6,
clerk_hashtablecalc/3,
clerk_trim/3,
clerk_promptdeletions/3,
clerk_stop/1,
clerk_loglevel/2,
clerk_addlogs/2,
@ -114,15 +115,24 @@
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(),
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
compression_method = native :: lz4|native}).
compression_method = native :: lz4|native,
scored_files = [] :: list(candidate()),
scoring_state :: scoring_state()|undefined}).
-record(candidate, {low_sqn :: integer() | undefined,
filename :: string() | undefined,
journal :: pid() | undefined,
compaction_perc :: float() | undefined}).
-record(scoring_state, {filter_fun :: fun(),
filter_server :: pid(),
max_sqn :: non_neg_integer(),
close_fun :: fun(),
start_time :: erlang:timestamp()}).
-type iclerk_options() :: #iclerk_options{}.
-type candidate() :: #candidate{}.
-type scoring_state() :: #scoring_state{}.
-type score_parameters() :: {integer(), float(), float()}.
% Score parameters are a tuple
% - of maximum run length; how long a run of consecutive files can be for
@ -148,25 +158,30 @@ clerk_new(InkerClerkOpts) ->
-spec clerk_compact(pid(), pid(),
fun(), fun(), fun(),
pid(), integer()) -> ok.
list()) -> ok.
%% @doc
%% Trigger a compaction for this clerk if the threshold of data recovery has
%% been met
clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) ->
clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Manifest) ->
gen_server:cast(Pid,
{compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Inker,
TimeO}).
Manifest}).
-spec clerk_trim(pid(), pid(), integer()) -> ok.
-spec clerk_trim(pid(), integer(), list()) -> ok.
%% @doc
%% Trim the Inker back to the persisted SQN
clerk_trim(Pid, Inker, PersistedSQN) ->
gen_server:cast(Pid, {trim, Inker, PersistedSQN}).
clerk_trim(Pid, PersistedSQN, ManifestAsList) ->
gen_server:cast(Pid, {trim, PersistedSQN, ManifestAsList}).
-spec clerk_promptdeletions(pid(), pos_integer(), list()) -> ok.
%% @doc
%%
clerk_promptdeletions(Pid, ManifestSQN, DeletedFiles) ->
gen_server:cast(Pid, {prompt_deletions, ManifestSQN, DeletedFiles}).
-spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok.
%% @doc
@ -182,7 +197,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
%% @doc
%% Stop the clerk
clerk_stop(Pid) ->
gen_server:cast(Pid, stop).
gen_server:call(Pid, stop, infinity).
-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok.
%% @doc
@ -202,6 +217,17 @@ clerk_addlogs(Pid, ForcedLogs) ->
clerk_removelogs(Pid, ForcedLogs) ->
gen_server:cast(Pid, {remove_logs, ForcedLogs}).
-spec clerk_scorefilelist(pid(), list(candidate())) -> ok.
%% @doc
%% Score the file at the head of the list and then send the tail of the list to
%% be scored
clerk_scorefilelist(Pid, []) ->
gen_server:cast(Pid, scoring_complete);
clerk_scorefilelist(Pid, CandidateList) ->
gen_server:cast(Pid, {score_filelist, CandidateList}).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
@ -247,10 +273,20 @@ init([LogOpts, IClerkOpts]) ->
compression_method =
IClerkOpts#iclerk_options.compression_method}}.
handle_call(_Msg, _From, State) ->
{reply, not_supported, State}.
handle_call(stop, _From, State) ->
case State#state.scoring_state of
undefined ->
ok;
ScoringState ->
% Closed when scoring files, and so need to shutdown FilterServer
% to close down neatly
CloseFun = ScoringState#scoring_state.close_fun,
FilterServer = ScoringState#scoring_state.filter_server,
CloseFun(FilterServer)
end,
{stop, normal, ok, State}.
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
State) ->
% Empty the waste folder
clear_waste(State),
@ -260,12 +296,43 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
% Need to fetch manifest at start rather than have it be passed in
% Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = leveled_inker:ink_getmanifest(Inker),
MaxRunLength = State#state.max_run_length,
[_Active|Manifest] = Manifest0,
{FilterServer, MaxSQN} = InitiateFun(Checker),
ok = clerk_scorefilelist(self(), Manifest),
ScoringState =
#scoring_state{filter_fun = FilterFun,
filter_server = FilterServer,
max_sqn = MaxSQN,
close_fun = CloseFun,
start_time = SW},
{noreply, State#state{scored_files = [], scoring_state = ScoringState}};
handle_cast({score_filelist, [Entry|Tail]}, State) ->
Candidates = State#state.scored_files,
{LowSQN, FN, JournalP, _LK} = Entry,
ScoringState = State#state.scoring_state,
CpctPerc = check_single_file(JournalP,
ScoringState#scoring_state.filter_fun,
ScoringState#scoring_state.filter_server,
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE),
Candidate =
#candidate{low_sqn = LowSQN,
filename = FN,
journal = JournalP,
compaction_perc = CpctPerc},
ok = clerk_scorefilelist(self(), Tail),
{noreply, State#state{scored_files = [Candidate|Candidates]}};
handle_cast(scoring_complete, State) ->
MaxRunLength = State#state.max_run_length,
CDBopts = State#state.cdb_options,
Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN),
Candidates = lists:reverse(State#state.scored_files),
ScoringState = State#state.scoring_state,
FilterFun = ScoringState#scoring_state.filter_fun,
FilterServer = ScoringState#scoring_state.filter_server,
MaxSQN = ScoringState#scoring_state.max_sqn,
CloseFun = ScoringState#scoring_state.close_fun,
SW = ScoringState#scoring_state.start_time,
ScoreParams =
{MaxRunLength,
State#state.maxrunlength_compactionperc,
@ -291,24 +358,29 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
end,
BestRun1),
leveled_log:log("IC002", [length(FilesToDelete)]),
case is_process_alive(Inker) of
true ->
update_inker(Inker,
ManifestSlice,
FilesToDelete),
ok = CloseFun(FilterServer),
{noreply, State}
end;
false ->
ok = leveled_inker:ink_compactioncomplete(Inker),
ok = CloseFun(FilterServer),
{noreply, State}
ok = leveled_inker:ink_clerkcomplete(State#state.inker,
ManifestSlice,
FilesToDelete),
{noreply, State#state{scoring_state = undefined}};
false ->
ok = CloseFun(FilterServer),
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
{noreply, State#state{scoring_state = undefined}}
end;
handle_cast({trim, Inker, PersistedSQN}, State) ->
ManifestAsList = leveled_inker:ink_getmanifest(Inker),
handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
FilesToDelete =
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
ok = update_inker(Inker, [], FilesToDelete),
leveled_log:log("IC007", []),
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], FilesToDelete),
{noreply, State};
handle_cast({prompt_deletions, ManifestSQN, FilesToDelete}, State) ->
lists:foreach(fun({_SQN, _FN, J2D, _LK}) ->
leveled_cdb:cdb_deletepending(J2D,
ManifestSQN,
State#state.inker)
end,
FilesToDelete),
{noreply, State};
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
{IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos),
@ -328,9 +400,7 @@ handle_cast({remove_logs, ForcedLogs}, State) ->
ok = leveled_log:remove_forcedlogs(ForcedLogs),
CDBopts = State#state.cdb_options,
CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()},
{noreply, State#state{cdb_options = CDBopts0}};
handle_cast(stop, State) ->
{stop, normal, State}.
{noreply, State#state{cdb_options = CDBopts0}}.
handle_info(_Info, State) ->
{noreply, State}.
@ -477,28 +547,6 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
100 * ActiveSize / (ActiveSize + ReplacedSize)
end.
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) ->
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN, []).
scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) ->
CandidateList;
scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) ->
{LowSQN, FN, JournalP, _LK} = Entry,
CpctPerc = check_single_file(JournalP,
FilterFun,
FilterServer,
MaxSQN,
?SAMPLE_SIZE,
?BATCH_SIZE),
scan_all_files(Tail,
FilterFun,
FilterServer,
MaxSQN,
CandidateList ++
[#candidate{low_sqn = LowSQN,
filename = FN,
journal = JournalP,
compaction_perc = CpctPerc}]).
fetch_inbatches([], _BatchSize, _CDB, CheckedList) ->
CheckedList;
@ -613,20 +661,6 @@ sort_run(RunOfFiles) ->
Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end,
lists:sort(CompareFun, RunOfFiles).
update_inker(Inker, ManifestSlice, FilesToDelete) ->
{ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker,
ManifestSlice,
FilesToDelete),
ok = leveled_inker:ink_compactioncomplete(Inker),
leveled_log:log("IC007", []),
lists:foreach(fun({_SQN, _FN, J2D, _LK}) ->
leveled_cdb:cdb_deletepending(J2D,
ManSQN,
Inker)
end,
FilesToDelete),
ok.
compact_files(BestRun, CDBopts, FilterFun, FilterServer,
MaxSQN, RStrategy, PressMethod) ->
BatchesOfPositions = get_all_positions(BestRun, []),
@ -761,8 +795,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
SQN,
compact_journal),
leveled_log:log("IC009", [FN]),
leveled_cdb:cdb_open_writer(FN,
CDBopts);
leveled_cdb:cdb_open_writer(FN, CDBopts);
_ ->
{ok, Journal0}
end,
@ -985,9 +1018,10 @@ compact_single_file_recovr_test() ->
LedgerFun1,
CompactFP,
CDB} = compact_single_file_setup(),
[{LowSQN, FN, PidR, _LastKey}] =
CDBOpts = #cdb_options{binary_mode=true},
[{LowSQN, FN, _PidOldR, LastKey}] =
compact_files([Candidate],
#cdb_options{file_path=CompactFP, binary_mode=true},
CDBOpts#cdb_options{file_path=CompactFP},
LedgerFun1,
LedgerSrv1,
9,
@ -995,6 +1029,7 @@ compact_single_file_recovr_test() ->
native),
io:format("FN of ~s~n", [FN]),
?assertMatch(2, LowSQN),
{ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts),
?assertMatch(probably,
leveled_cdb:cdb_keycheck(PidR,
{8,
@ -1014,6 +1049,7 @@ compact_single_file_recovr_test() ->
test_ledgerkey("Key2")}),
?assertMatch({{_, _}, {"Value2", {[], infinity}}},
leveled_codec:from_inkerkv(RKV1)),
ok = leveled_cdb:cdb_close(PidR),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
@ -1024,9 +1060,10 @@ compact_single_file_retain_test() ->
LedgerFun1,
CompactFP,
CDB} = compact_single_file_setup(),
[{LowSQN, FN, PidR, _LK}] =
CDBOpts = #cdb_options{binary_mode=true},
[{LowSQN, FN, _PidOldR, LastKey}] =
compact_files([Candidate],
#cdb_options{file_path=CompactFP, binary_mode=true},
CDBOpts#cdb_options{file_path=CompactFP},
LedgerFun1,
LedgerSrv1,
9,
@ -1034,6 +1071,7 @@ compact_single_file_retain_test() ->
native),
io:format("FN of ~s~n", [FN]),
?assertMatch(1, LowSQN),
{ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts),
?assertMatch(probably,
leveled_cdb:cdb_keycheck(PidR,
{8,
@ -1048,11 +1086,12 @@ compact_single_file_retain_test() ->
stnd,
test_ledgerkey("Key1")})),
RKV1 = leveled_cdb:cdb_get(PidR,
{2,
stnd,
test_ledgerkey("Key2")}),
{2,
stnd,
test_ledgerkey("Key2")}),
?assertMatch({{_, _}, {"Value2", {[], infinity}}},
leveled_codec:from_inkerkv(RKV1)),
ok = leveled_cdb:cdb_close(PidR),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
@ -1147,7 +1186,6 @@ size_score_test() ->
coverage_cheat_test() ->
{noreply, _State0} = handle_info(timeout, #state{}),
{ok, _State1} = code_change(null, #state{}, null),
{reply, not_supported, _State2} = handle_call(null, null, #state{}),
terminate(error, #state{}).
-endif.

View file

@ -52,8 +52,12 @@ generate_entry(Journal) ->
case leveled_cdb:cdb_firstkey(PidR) of
{StartSQN, _Type, _PK} ->
LastKey = leveled_cdb:cdb_lastkey(PidR),
% close the file here. This will then be re-opened by the inker
% and so will be correctly linked to the inker not to the iclerk
ok = leveled_cdb:cdb_close(PidR),
[{StartSQN, NewFN, PidR, LastKey}];
empty ->
ok = leveled_cdb:cdb_close(PidR),
leveled_log:log("IC013", [NewFN]),
[]
end.

View file

@ -105,11 +105,10 @@
ink_registersnapshot/2,
ink_confirmdelete/2,
ink_compactjournal/3,
ink_compactioncomplete/1,
ink_clerkcomplete/3,
ink_compactionpending/1,
ink_trim/2,
ink_getmanifest/1,
ink_updatemanifest/3,
ink_printmanifest/1,
ink_close/1,
ink_doom/1,
@ -142,7 +141,7 @@
journal_sqn = 0 :: integer(),
active_journaldb :: pid() | undefined,
pending_removals = [] :: list(),
registered_snapshots = [] :: list(),
registered_snapshots = [] :: list(registered_snapshot()),
root_path :: string() | undefined,
cdb_options :: #cdb_options{} | undefined,
clerk :: pid() | undefined,
@ -157,7 +156,7 @@
-type inker_options() :: #inker_options{}.
-type ink_state() :: #state{}.
-type registered_snapshot() :: {pid(), os:timestamp(), integer()}.
%%%============================================================================
%%% API
@ -277,7 +276,7 @@ ink_close(Pid) ->
%% Test function used to close a file, and return all file paths (potentially
%% to erase all persisted existence)
ink_doom(Pid) ->
gen_server:call(Pid, doom, 60000).
gen_server:call(Pid, doom, infinity).
-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> fun().
%% @doc
@ -348,7 +347,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
as_ink},
infinity).
-spec ink_compactjournal(pid(), pid(), integer()) -> ok.
-spec ink_compactjournal(pid(), pid(), integer()) -> {ok|busy, pid()}.
%% @doc
%% Trigger a compaction event. the compaction event will use a sqn check
%% against the Ledger to see if a value can be compacted - if the penciller
@ -359,7 +358,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
%% that any value that was written more recently than the last flush to disk
%% of the Ledger will not be considered for compaction (as this may be
%% required to reload the Ledger on startup).
ink_compactjournal(Pid, Bookie, Timeout) ->
ink_compactjournal(Pid, Bookie, _Timeout) ->
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
CheckerCloseFun = fun leveled_penciller:pcl_close/1,
CheckerFilterFun =
@ -369,28 +368,26 @@ ink_compactjournal(Pid, Bookie, Timeout) ->
Bookie,
CheckerInitiateFun,
CheckerCloseFun,
CheckerFilterFun,
Timeout},
CheckerFilterFun},
infinity).
%% Allows the Checker to be overriden in test, use something other than a
%% penciller
ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) ->
ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, _Timeout) ->
gen_server:call(Pid,
{compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Timeout},
FilterFun},
infinity).
-spec ink_compactioncomplete(pid()) -> ok.
-spec ink_clerkcomplete(pid(), list(), list()) -> ok.
%% @doc
%% Used by a clerk to state that a compaction process is over, only change
%% is to unlock the Inker for further compactions.
ink_compactioncomplete(Pid) ->
gen_server:call(Pid, compaction_complete, infinity).
ink_clerkcomplete(Pid, ManifestSnippet, FilesToDelete) ->
gen_server:cast(Pid, {clerk_complete, ManifestSnippet, FilesToDelete}).
-spec ink_compactionpending(pid()) -> boolean().
%% @doc
@ -425,21 +422,6 @@ ink_backup(Pid, BackupPath) ->
ink_getmanifest(Pid) ->
gen_server:call(Pid, get_manifest, infinity).
-spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}.
%% @doc
%% Add a section of new entries into the manifest, and drop a bunch of deleted
%% files out of the manifest. Used to update the manifest after a compaction
%% job.
%%
%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the
%% updated manifest
ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) ->
gen_server:call(Pid,
{update_manifest,
ManifestSnippet,
DeletedFiles},
infinity).
-spec ink_printmanifest(pid()) -> ok.
%% @doc
%% Used in tests to print out the manifest
@ -574,27 +556,6 @@ handle_call({confirm_delete, ManSQN}, _From, State) ->
State#state{registered_snapshots = RegisteredSnapshots0}};
handle_call(get_manifest, _From, State) ->
{reply, leveled_imanifest:to_list(State#state.manifest), State};
handle_call({update_manifest,
ManifestSnippet,
DeletedFiles}, _From, State) ->
DropFun =
fun(E, Acc) ->
leveled_imanifest:remove_entry(Acc, E)
end,
Man0 = lists:foldl(DropFun, State#state.manifest, DeletedFiles),
AddFun =
fun(E, Acc) ->
leveled_imanifest:add_entry(Acc, E, false)
end,
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
NewManifestSQN = State#state.manifest_sqn + 1,
leveled_imanifest:printer(Man1),
leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path),
{reply,
{ok, NewManifestSQN},
State#state{manifest=Man1,
manifest_sqn=NewManifestSQN,
pending_removals=DeletedFiles}};
handle_call(print_manifest, _From, State) ->
leveled_imanifest:printer(State#state.manifest),
{reply, ok, State};
@ -602,23 +563,22 @@ handle_call({compact,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Timeout},
FilterFun},
_From, State) ->
Clerk = State#state.clerk,
Manifest = leveled_imanifest:to_list(State#state.manifest),
leveled_iclerk:clerk_compact(State#state.clerk,
Checker,
InitiateFun,
CloseFun,
FilterFun,
self(),
Timeout),
{reply, ok, State#state{compaction_pending=true}};
handle_call(compaction_complete, _From, State) ->
{reply, ok, State#state{compaction_pending=false}};
Manifest),
{reply, {ok, Clerk}, State#state{compaction_pending=true}};
handle_call(compaction_pending, _From, State) ->
{reply, State#state.compaction_pending, State};
handle_call({trim, PersistedSQN}, _From, State) ->
ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN),
Manifest = leveled_imanifest:to_list(State#state.manifest),
ok = leveled_iclerk:clerk_trim(State#state.clerk, PersistedSQN, Manifest),
{reply, ok, State};
handle_call(roll, _From, State) ->
case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of
@ -712,7 +672,7 @@ handle_call(close, _From, State) ->
leveled_log:log("I0005", [close]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest)
end,
@ -727,12 +687,39 @@ handle_call(doom, _From, State) ->
leveled_log:log("I0005", [doom]),
leveled_log:log("I0006", [State#state.journal_sqn,
State#state.manifest_sqn]),
leveled_iclerk:clerk_stop(State#state.clerk),
ok = leveled_iclerk:clerk_stop(State#state.clerk),
shutdown_snapshots(State#state.registered_snapshots),
shutdown_manifest(State#state.manifest),
{stop, normal, {ok, FPs}, State}.
handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) ->
CDBOpts = State#state.cdb_options,
DropFun =
fun(E, Acc) ->
leveled_imanifest:remove_entry(Acc, E)
end,
Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete),
AddFun =
fun(ManEntry, Acc) ->
{LowSQN, FN, _, LK_RO} = ManEntry,
% At this stage the FN has a .cdb extension, which will be
% stripped during add_entry - so need to add the .cdb here
{ok, Pid} = leveled_cdb:cdb_reopen_reader(FN, LK_RO, CDBOpts),
UpdEntry = {LowSQN, FN, Pid, LK_RO},
leveled_imanifest:add_entry(Acc, UpdEntry, false)
end,
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
NewManifestSQN = State#state.manifest_sqn + 1,
leveled_imanifest:printer(Man1),
leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path),
ok = leveled_iclerk:clerk_promptdeletions(State#state.clerk,
NewManifestSQN,
FilesToDelete),
{noreply, State#state{manifest=Man1,
manifest_sqn=NewManifestSQN,
pending_removals=FilesToDelete,
compaction_pending=false}};
handle_cast({release_snapshot, Snapshot}, State) ->
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
leveled_log:log("I0003", [Snapshot]),
@ -843,11 +830,12 @@ start_from_file(InkOpts) ->
clerk = Clerk}}.
-spec shutdown_snapshots(list(tuple())) -> ok.
-spec shutdown_snapshots(list(registered_snapshot())) -> ok.
%% @doc
%% Shutdown any snapshots before closing the store
shutdown_snapshots(Snapshots) ->
lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, Snapshots).
lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end,
Snapshots).
-spec shutdown_manifest(leveled_imanifest:manifest()) -> ok.
%% @doc
@ -1243,9 +1231,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) ->
++ "." ++ ?PENDING_FILEX).
initiate_penciller_snapshot(Bookie) ->
{ok, LedgerSnap, _} =
leveled_bookie:book_snapshot(Bookie, ledger, undefined, true),
initiate_penciller_snapshot(LedgerSnap) ->
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
{LedgerSnap, MaxSQN}.
@ -1445,22 +1431,26 @@ compact_journal_testto(WRP, ExpectedFiles) ->
ActualManifest = ink_getmanifest(Ink1),
ok = ink_printmanifest(Ink1),
?assertMatch(3, length(ActualManifest)),
ok = ink_compactjournal(Ink1,
Checker,
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
5000),
{ok, _ICL1} = ink_compactjournal(Ink1,
Checker,
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
fun(L, K, SQN) ->
lists:member({SQN, K}, L)
end,
5000),
timer:sleep(1000),
CompactedManifest1 = ink_getmanifest(Ink1),
?assertMatch(2, length(CompactedManifest1)),
Checker2 = lists:sublist(Checker, 16),
ok = ink_compactjournal(Ink1,
Checker2,
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
5000),
{ok, _ICL2} = ink_compactjournal(Ink1,
Checker2,
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
fun(L, K, SQN) ->
lists:member({SQN, K}, L)
end,
5000),
timer:sleep(1000),
CompactedManifest2 = ink_getmanifest(Ink1),
{ok, PrefixTest} = re:compile(?COMPACT_FP),
@ -1489,12 +1479,12 @@ empty_manifest_test() ->
CheckFun = fun(L, K, SQN) -> lists:member({SQN, key_converter(K)}, L) end,
?assertMatch(false, CheckFun([], "key", 1)),
ok = ink_compactjournal(Ink1,
[],
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
CheckFun,
5000),
{ok, _ICL1} = ink_compactjournal(Ink1,
[],
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
CheckFun,
5000),
timer:sleep(1000),
?assertMatch(1, length(ink_getmanifest(Ink1))),
ok = ink_close(Ink1),

View file

@ -392,8 +392,11 @@
++ "with totals of cycle_count=~w "
++ "fetch_time=~w index_time=~w"}},
{"CDB20",
{warn, "Error ~w caught when safe reading a file to length ~w"}}
]).
{warn, "Error ~w caught when safe reading a file to length ~w"}},
{"CDB21",
{warn, "File ~s to be deleted but already gone"}}
]).
%%%============================================================================

View file

@ -72,7 +72,6 @@
-include("include/leveled.hrl").
-define(MAX_SLOTS, 256).
-define(LOOK_SLOTSIZE, 128). % Maximum of 128
-define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE
-define(NOLOOK_SLOTSIZE, 256).
@ -258,7 +257,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{[], [], SlotList, FK} =
merge_lists(KVList, PressMethod0, IndexModDate),
merge_lists(KVList, OptsSST0, IndexModDate),
case gen_fsm:sync_send_event(Pid,
{sst_new,
RootPath,
@ -309,7 +308,7 @@ sst_new(RootPath, Filename,
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{Rem1, Rem2, SlotList, FK} =
merge_lists(KVL1, KVL2, {IsBasement, Level},
PressMethod0, IndexModDate),
OptsSST0, IndexModDate),
case SlotList of
[] ->
empty;
@ -499,7 +498,7 @@ starting({sst_newlevelzero, RootPath, Filename,
SW1 = os:timestamp(),
{[], [], SlotList, FirstKey} =
merge_lists(KVList, PressMethod, IdxModDate),
merge_lists(KVList, OptsSST, IdxModDate),
Time1 = timer:now_diff(os:timestamp(), SW1),
SW2 = os:timestamp(),
@ -2131,16 +2130,17 @@ revert_position(Pos) ->
%% there are matching keys then the highest sequence number must be chosen and
%% any lower sequence numbers should be compacted out of existence
-spec merge_lists(list(), press_method(), boolean())
-spec merge_lists(list(), sst_options(), boolean())
-> {list(), list(), list(tuple()), tuple()|null}.
%% @doc
%%
%% Merge from asingle list (i.e. at Level 0)
merge_lists(KVList1, PressMethod, IdxModDate) ->
merge_lists(KVList1, SSTOpts, IdxModDate) ->
SlotCount = length(KVList1) div ?LOOK_SLOTSIZE,
{[],
[],
split_lists(KVList1, [], SlotCount, PressMethod, IdxModDate),
split_lists(KVList1, [],
SlotCount, SSTOpts#sst_options.press_method, IdxModDate),
element(1, lists:nth(1, KVList1))}.
@ -2157,33 +2157,34 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) ->
split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod, IdxModDate).
-spec merge_lists(list(), list(), tuple(), press_method(), boolean()) ->
-spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) ->
{list(), list(), list(tuple()), tuple()|null}.
%% @doc
%% Merge lists when merging across more thna one file. KVLists that are
%% provided may include pointers to fetch more Keys/Values from the source
%% file
merge_lists(KVList1, KVList2, LevelInfo, PressMethod, IndexModDate) ->
merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) ->
merge_lists(KVList1, KVList2,
LevelInfo,
[], null, 0,
PressMethod,
SSTOpts#sst_options.max_sstslots,
SSTOpts#sst_options.press_method,
IndexModDate,
#build_timings{}).
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, ?MAX_SLOTS,
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, MaxSlots, MaxSlots,
_PressMethod, _IdxModDate, T0) ->
% This SST file is full, move to complete file, and return the
% remainder
log_buildtimings(T0, LI),
{KVL1, KVL2, lists:reverse(SlotList), FirstKey};
merge_lists([], [], LI, SlotList, FirstKey, _SlotCount,
merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, _MaxSlots,
_PressMethod, _IdxModDate, T0) ->
% the source files are empty, complete the file
log_buildtimings(T0, LI),
{[], [], lists:reverse(SlotList), FirstKey};
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount,
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
PressMethod, IdxModDate, T0) ->
% Form a slot by merging the two lists until the next 128 K/V pairs have
% been determined
@ -2200,6 +2201,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount,
SlotList,
FK0,
SlotCount,
MaxSlots,
PressMethod,
IdxModDate,
T1);
@ -2214,6 +2216,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount,
[SlotD|SlotList],
FK0,
SlotCount + 1,
MaxSlots,
PressMethod,
IdxModDate,
T2)
@ -2560,7 +2563,8 @@ merge_tombstonelist_test() ->
R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5],
[SkippingKV2, SkippingKV4],
{true, 9999999},
native,
#sst_options{press_method = native,
max_sstslots = 256},
?INDEX_MODDATE),
?assertMatch({[], [], [], null}, R).

View file

@ -12,6 +12,7 @@
is_empty_test/1,
many_put_fetch_switchcompression/1,
bigjournal_littlejournal/1,
bigsst_littlesst/1,
safereaderror_startup/1,
remove_journal_test/1
]).
@ -27,6 +28,7 @@ all() -> [
is_empty_test,
many_put_fetch_switchcompression,
bigjournal_littlejournal,
bigsst_littlesst,
safereaderror_startup,
remove_journal_test
].
@ -164,6 +166,39 @@ bigjournal_littlejournal(_Config) ->
ok = leveled_bookie:book_destroy(Bookie2).
bigsst_littlesst(_Config) ->
RootPath = testutil:reset_filestructure(),
StartOpts1 = [{root_path, RootPath},
{max_journalsize, 50000000},
{cache_size, 1000},
{max_pencillercachesize, 16000},
{max_sstslots, 256},
{sync_strategy, testutil:sync_strategy()},
{compression_point, on_compact}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
ObjL1 =
testutil:generate_objects(60000, 1, [],
leveled_rand:rand_bytes(100),
fun() -> [] end, <<"B">>),
testutil:riakload(Bookie1, ObjL1),
testutil:check_forlist(Bookie1, ObjL1),
JFP = RootPath ++ "/ledger/ledger_files/",
{ok, FNS1} = file:list_dir(JFP),
ok = leveled_bookie:book_destroy(Bookie1),
StartOpts2 = lists:ukeysort(1, [{max_sstslots, 24}|StartOpts1]),
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
testutil:riakload(Bookie2, ObjL1),
testutil:check_forlist(Bookie2, ObjL1),
{ok, FNS2} = file:list_dir(JFP),
ok = leveled_bookie:book_destroy(Bookie2),
io:format("Big SST ~w files Little SST ~w files~n",
[length(FNS1), length(FNS2)]),
true = length(FNS2) > (2 * length(FNS1)).
journal_compaction(_Config) ->
journal_compaction_tester(false, 3600),
journal_compaction_tester(false, undefined),
@ -300,6 +335,7 @@ journal_compaction_tester(Restart, WRP) ->
{sync_strategy, testutil:sync_strategy()}],
{ok, Bookie3} = leveled_bookie:book_start(StartOpts2),
ok = leveled_bookie:book_compactjournal(Bookie3, 30000),
busy = leveled_bookie:book_compactjournal(Bookie3, 30000),
testutil:wait_for_compaction(Bookie3),
ok = leveled_bookie:book_close(Bookie3),

View file

@ -10,7 +10,8 @@
recovr_strategy/1,
aae_missingjournal/1,
aae_bustedjournal/1,
journal_compaction_bustedjournal/1
journal_compaction_bustedjournal/1,
close_duringcompaction/1
]).
all() -> [
@ -21,10 +22,29 @@ all() -> [
recovr_strategy,
aae_missingjournal,
aae_bustedjournal,
journal_compaction_bustedjournal
journal_compaction_bustedjournal,
close_duringcompaction
].
close_duringcompaction(_Config) ->
% Prompt a compaction, and close immedately - confirm that the close
% happens without error.
% This should trigger the iclerk to receive a close during the file
% scoring stage
RootPath = testutil:reset_filestructure(),
BookOpts = [{root_path, RootPath},
{cache_size, 2000},
{max_journalsize, 2000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 6400),
{ok, Book1} = leveled_bookie:book_start(BookOpts),
ok = leveled_bookie:book_compactjournal(Book1, 30000),
ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts),
ok = testutil:check_indexed_objects(Book2, "Bucket1", Spcl1, LastV1),
ok = leveled_bookie:book_close(Book2).
recovery_with_samekeyupdates(_Config) ->
% Setup to prove https://github.com/martinsumner/leveled/issues/229
% run a test that involves many updates to the same key, and check that

View file

@ -724,6 +724,9 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) ->
{ok, FinalFNs} = file:list_dir(JFP),
ok = leveled_bookie:book_trimjournal(Bookie1),
% CCheck a second trim is still OK
[{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL,
case HeadOnly of
with_lookup ->