diff --git a/.eqc-info b/.eqc-info new file mode 100644 index 0000000..663432b Binary files /dev/null and b/.eqc-info differ diff --git a/.gitignore b/.gitignore index 8031a86..ae9a11c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,7 @@ .DS_Store rebar.lock test/test_area/* +cover +cover_* +.eqc-info +leveled_data/* diff --git a/current_counterexample.eqc b/current_counterexample.eqc new file mode 100644 index 0000000..aafab95 Binary files /dev/null and b/current_counterexample.eqc differ diff --git a/include/leveled.hrl b/include/leveled.hrl index a945f0c..9357bd5 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -50,7 +50,8 @@ {press_method = native :: leveled_sst:press_method(), log_options = leveled_log:get_opts() - :: leveled_log:log_options()}). + :: leveled_log:log_options(), + max_sstslots = 256 :: pos_integer()}). -record(inker_options, {cdb_max_size :: integer() | undefined, diff --git a/rebar.config b/rebar.config index 352ec3f..9b92d91 100644 --- a/rebar.config +++ b/rebar.config @@ -11,8 +11,8 @@ {profiles, [{eqc, [{deps, [meck, fqc]}, - {erl_opts, [debug_info, {parse_transform, lager_transform}, {parse_transform, eqc_cover}]}, - {plugins, [rebar_eqc]}]} + {erl_opts, [debug_info, {parse_transform, eqc_cover}]}, + {extra_src_dirs, ["test"]}]} ]}. {deps, [ diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 73c0c33..11a5312 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -124,6 +124,7 @@ {snapshot_bookie, undefined}, {cache_size, ?CACHE_SIZE}, {max_journalsize, 1000000000}, + {max_sstslots, 256}, {sync_strategy, none}, {head_only, false}, {waste_retention_period, undefined}, @@ -220,6 +221,10 @@ {max_journalsize, pos_integer()} | % The maximum size of a journal file in bytes. The abolute % maximum must be 4GB due to 4 byte file pointers being used + {max_sstslots, pos_integer()} | + % The maximum number of slots in a SST file. All testing is done + % at a size of 256 (except for Quickcheck tests}, altering this + % value is not recommended {sync_strategy, sync_mode()} | % Should be sync if it is necessary to flush to disk after every % write, or none if not (allow the OS to schecdule). This has a @@ -1004,7 +1009,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> gen_server:call(Pid, {snapshot, SnapType, Query, LongRunning}, infinity). --spec book_compactjournal(pid(), integer()) -> ok. +-spec book_compactjournal(pid(), integer()) -> ok|busy. -spec book_islastcompactionpending(pid()) -> boolean(). -spec book_trimjournal(pid()) -> ok. @@ -1014,7 +1019,8 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> %% in Riak it will be triggered by a vnode callback. book_compactjournal(Pid, Timeout) -> - gen_server:call(Pid, {compact_journal, Timeout}, infinity). + {R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), + R. %% @doc Check on progress of the last compaction @@ -1122,7 +1128,7 @@ init([Opts]) -> ConfiguredCacheSize = max(proplists:get_value(cache_size, Opts), ?MIN_CACHE_SIZE), CacheJitter = - ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER), + max(1, ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER)), CacheSize = ConfiguredCacheSize + erlang:phash2(self()) rem CacheJitter, PCLMaxSize = @@ -1371,10 +1377,17 @@ handle_call({return_runner, QueryType}, _From, State) -> fold_countdown = CountDown}}; handle_call({compact_journal, Timeout}, _From, State) when State#state.head_only == false -> - ok = leveled_inker:ink_compactjournal(State#state.inker, - self(), - Timeout), - {reply, ok, State}; + case leveled_inker:ink_compactionpending(State#state.inker) of + true -> + {reply, {busy, undefined}, State}; + false -> + {ok, PclSnap, null} = + snapshot_store(State, ledger, undefined, true), + R = leveled_inker:ink_compactjournal(State#state.inker, + PclSnap, + Timeout), + {reply, R, State} + end; handle_call(confirm_compact, _From, State) when State#state.head_only == false -> {reply, leveled_inker:ink_compactionpending(State#state.inker), State}; @@ -1626,6 +1639,8 @@ set_options(Opts) -> % If using lz4 this is not recommended false end, + + MaxSSTSlots = proplists:get_value(max_sstslots, Opts), {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, @@ -1647,8 +1662,9 @@ set_options(Opts) -> snaptimeout_short = SnapTimeoutShort, snaptimeout_long = SnapTimeoutLong, sst_options = - #sst_options{press_method = CompressionMethod, - log_options=leveled_log:get_opts()}} + #sst_options{press_method=CompressionMethod, + log_options=leveled_log:get_opts(), + max_sstslots=MaxSSTSlots}} }. diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 546945d..8d8dcb0 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -820,18 +820,24 @@ finished_rolling(CDB) -> -spec close_pendingdelete(file:io_device(), list(), list()|undefined) -> ok. %% @doc -%% If delete is pending - thent he close behaviour needs to actuallly delete +%% If delete is pending - then the close behaviour needs to actuallly delete %% the file close_pendingdelete(Handle, Filename, WasteFP) -> - case WasteFP of - undefined -> - ok = file:close(Handle), - ok = file:delete(Filename); - WasteFP -> - file:close(Handle), - Components = filename:split(Filename), - NewName = WasteFP ++ lists:last(Components), - file:rename(Filename, NewName) + ok = file:close(Handle), + case filelib:is_file(Filename) of + true -> + case WasteFP of + undefined -> + ok = file:delete(Filename); + WasteFP -> + Components = filename:split(Filename), + NewName = WasteFP ++ lists:last(Components), + file:rename(Filename, NewName) + end; + false -> + % This may happen when there has been a destroy while files are + % still pending deletion + leveled_log:log("CDB21", [Filename]) end. -spec set_writeops(sync|riak_sync|none) -> {list(), sync|riak_sync|none}. @@ -2600,6 +2606,24 @@ badly_written_test() -> ok = cdb_close(P2), file:delete(F1). +pendingdelete_test() -> + F1 = "test/test_area/deletfile_test.pnd", + file:delete(F1), + {ok, P1} = cdb_open_writer(F1, #cdb_options{binary_mode=false}), + KVList = generate_sequentialkeys(1000, []), + ok = cdb_mput(P1, KVList), + ?assertMatch(probably, cdb_keycheck(P1, "Key1")), + ?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")), + ?assertMatch({"Key100", "Value100"}, cdb_get(P1, "Key100")), + {ok, F2} = cdb_complete(P1), + {ok, P2} = cdb_open_reader(F2, #cdb_options{binary_mode=false}), + ?assertMatch({"Key1", "Value1"}, cdb_get(P2, "Key1")), + ?assertMatch({"Key100", "Value100"}, cdb_get(P2, "Key100")), + file:delete(F2), + ok = cdb_deletepending(P2), + % No issues destroying even though the file has already been removed + ok = cdb_destroy(P2). + nonsense_coverage_test() -> {ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 5f6b09a..076d73b 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -82,9 +82,10 @@ code_change/3]). -export([clerk_new/1, - clerk_compact/7, + clerk_compact/6, clerk_hashtablecalc/3, clerk_trim/3, + clerk_promptdeletions/3, clerk_stop/1, clerk_loglevel/2, clerk_addlogs/2, @@ -114,15 +115,24 @@ reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(), maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(), - compression_method = native :: lz4|native}). + compression_method = native :: lz4|native, + scored_files = [] :: list(candidate()), + scoring_state :: scoring_state()|undefined}). -record(candidate, {low_sqn :: integer() | undefined, filename :: string() | undefined, journal :: pid() | undefined, compaction_perc :: float() | undefined}). +-record(scoring_state, {filter_fun :: fun(), + filter_server :: pid(), + max_sqn :: non_neg_integer(), + close_fun :: fun(), + start_time :: erlang:timestamp()}). + -type iclerk_options() :: #iclerk_options{}. -type candidate() :: #candidate{}. +-type scoring_state() :: #scoring_state{}. -type score_parameters() :: {integer(), float(), float()}. % Score parameters are a tuple % - of maximum run length; how long a run of consecutive files can be for @@ -148,25 +158,30 @@ clerk_new(InkerClerkOpts) -> -spec clerk_compact(pid(), pid(), fun(), fun(), fun(), - pid(), integer()) -> ok. + list()) -> ok. %% @doc %% Trigger a compaction for this clerk if the threshold of data recovery has %% been met -clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) -> +clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Manifest) -> gen_server:cast(Pid, {compact, Checker, InitiateFun, CloseFun, FilterFun, - Inker, - TimeO}). + Manifest}). --spec clerk_trim(pid(), pid(), integer()) -> ok. +-spec clerk_trim(pid(), integer(), list()) -> ok. %% @doc %% Trim the Inker back to the persisted SQN -clerk_trim(Pid, Inker, PersistedSQN) -> - gen_server:cast(Pid, {trim, Inker, PersistedSQN}). +clerk_trim(Pid, PersistedSQN, ManifestAsList) -> + gen_server:cast(Pid, {trim, PersistedSQN, ManifestAsList}). + +-spec clerk_promptdeletions(pid(), pos_integer(), list()) -> ok. +%% @doc +%% +clerk_promptdeletions(Pid, ManifestSQN, DeletedFiles) -> + gen_server:cast(Pid, {prompt_deletions, ManifestSQN, DeletedFiles}). -spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok. %% @doc @@ -182,7 +197,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> %% @doc %% Stop the clerk clerk_stop(Pid) -> - gen_server:cast(Pid, stop). + gen_server:call(Pid, stop, infinity). -spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. %% @doc @@ -202,6 +217,17 @@ clerk_addlogs(Pid, ForcedLogs) -> clerk_removelogs(Pid, ForcedLogs) -> gen_server:cast(Pid, {remove_logs, ForcedLogs}). + +-spec clerk_scorefilelist(pid(), list(candidate())) -> ok. +%% @doc +%% Score the file at the head of the list and then send the tail of the list to +%% be scored +clerk_scorefilelist(Pid, []) -> + gen_server:cast(Pid, scoring_complete); +clerk_scorefilelist(Pid, CandidateList) -> + gen_server:cast(Pid, {score_filelist, CandidateList}). + + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -247,10 +273,20 @@ init([LogOpts, IClerkOpts]) -> compression_method = IClerkOpts#iclerk_options.compression_method}}. -handle_call(_Msg, _From, State) -> - {reply, not_supported, State}. +handle_call(stop, _From, State) -> + case State#state.scoring_state of + undefined -> + ok; + ScoringState -> + % Closed when scoring files, and so need to shutdown FilterServer + % to close down neatly + CloseFun = ScoringState#scoring_state.close_fun, + FilterServer = ScoringState#scoring_state.filter_server, + CloseFun(FilterServer) + end, + {stop, normal, ok, State}. -handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, +handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0}, State) -> % Empty the waste folder clear_waste(State), @@ -260,12 +296,43 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest - [_Active|Manifest] = leveled_inker:ink_getmanifest(Inker), - MaxRunLength = State#state.max_run_length, + [_Active|Manifest] = Manifest0, {FilterServer, MaxSQN} = InitiateFun(Checker), + ok = clerk_scorefilelist(self(), Manifest), + ScoringState = + #scoring_state{filter_fun = FilterFun, + filter_server = FilterServer, + max_sqn = MaxSQN, + close_fun = CloseFun, + start_time = SW}, + {noreply, State#state{scored_files = [], scoring_state = ScoringState}}; +handle_cast({score_filelist, [Entry|Tail]}, State) -> + Candidates = State#state.scored_files, + {LowSQN, FN, JournalP, _LK} = Entry, + ScoringState = State#state.scoring_state, + CpctPerc = check_single_file(JournalP, + ScoringState#scoring_state.filter_fun, + ScoringState#scoring_state.filter_server, + ScoringState#scoring_state.max_sqn, + ?SAMPLE_SIZE, + ?BATCH_SIZE), + Candidate = + #candidate{low_sqn = LowSQN, + filename = FN, + journal = JournalP, + compaction_perc = CpctPerc}, + ok = clerk_scorefilelist(self(), Tail), + {noreply, State#state{scored_files = [Candidate|Candidates]}}; +handle_cast(scoring_complete, State) -> + MaxRunLength = State#state.max_run_length, CDBopts = State#state.cdb_options, - - Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN), + Candidates = lists:reverse(State#state.scored_files), + ScoringState = State#state.scoring_state, + FilterFun = ScoringState#scoring_state.filter_fun, + FilterServer = ScoringState#scoring_state.filter_server, + MaxSQN = ScoringState#scoring_state.max_sqn, + CloseFun = ScoringState#scoring_state.close_fun, + SW = ScoringState#scoring_state.start_time, ScoreParams = {MaxRunLength, State#state.maxrunlength_compactionperc, @@ -291,24 +358,29 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, end, BestRun1), leveled_log:log("IC002", [length(FilesToDelete)]), - case is_process_alive(Inker) of - true -> - update_inker(Inker, - ManifestSlice, - FilesToDelete), - ok = CloseFun(FilterServer), - {noreply, State} - end; - false -> - ok = leveled_inker:ink_compactioncomplete(Inker), ok = CloseFun(FilterServer), - {noreply, State} + ok = leveled_inker:ink_clerkcomplete(State#state.inker, + ManifestSlice, + FilesToDelete), + {noreply, State#state{scoring_state = undefined}}; + false -> + ok = CloseFun(FilterServer), + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []), + {noreply, State#state{scoring_state = undefined}} end; -handle_cast({trim, Inker, PersistedSQN}, State) -> - ManifestAsList = leveled_inker:ink_getmanifest(Inker), +handle_cast({trim, PersistedSQN, ManifestAsList}, State) -> FilesToDelete = leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList), - ok = update_inker(Inker, [], FilesToDelete), + leveled_log:log("IC007", []), + ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], FilesToDelete), + {noreply, State}; +handle_cast({prompt_deletions, ManifestSQN, FilesToDelete}, State) -> + lists:foreach(fun({_SQN, _FN, J2D, _LK}) -> + leveled_cdb:cdb_deletepending(J2D, + ManifestSQN, + State#state.inker) + end, + FilesToDelete), {noreply, State}; handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), @@ -328,9 +400,7 @@ handle_cast({remove_logs, ForcedLogs}, State) -> ok = leveled_log:remove_forcedlogs(ForcedLogs), CDBopts = State#state.cdb_options, CDBopts0 = CDBopts#cdb_options{log_options = leveled_log:get_opts()}, - {noreply, State#state{cdb_options = CDBopts0}}; -handle_cast(stop, State) -> - {stop, normal, State}. + {noreply, State#state{cdb_options = CDBopts0}}. handle_info(_Info, State) -> {noreply, State}. @@ -477,28 +547,6 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> 100 * ActiveSize / (ActiveSize + ReplacedSize) end. -scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) -> - scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN, []). - -scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) -> - CandidateList; -scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) -> - {LowSQN, FN, JournalP, _LK} = Entry, - CpctPerc = check_single_file(JournalP, - FilterFun, - FilterServer, - MaxSQN, - ?SAMPLE_SIZE, - ?BATCH_SIZE), - scan_all_files(Tail, - FilterFun, - FilterServer, - MaxSQN, - CandidateList ++ - [#candidate{low_sqn = LowSQN, - filename = FN, - journal = JournalP, - compaction_perc = CpctPerc}]). fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> CheckedList; @@ -613,20 +661,6 @@ sort_run(RunOfFiles) -> Cand1#candidate.low_sqn =< Cand2#candidate.low_sqn end, lists:sort(CompareFun, RunOfFiles). -update_inker(Inker, ManifestSlice, FilesToDelete) -> - {ok, ManSQN} = leveled_inker:ink_updatemanifest(Inker, - ManifestSlice, - FilesToDelete), - ok = leveled_inker:ink_compactioncomplete(Inker), - leveled_log:log("IC007", []), - lists:foreach(fun({_SQN, _FN, J2D, _LK}) -> - leveled_cdb:cdb_deletepending(J2D, - ManSQN, - Inker) - end, - FilesToDelete), - ok. - compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy, PressMethod) -> BatchesOfPositions = get_all_positions(BestRun, []), @@ -761,8 +795,7 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> SQN, compact_journal), leveled_log:log("IC009", [FN]), - leveled_cdb:cdb_open_writer(FN, - CDBopts); + leveled_cdb:cdb_open_writer(FN, CDBopts); _ -> {ok, Journal0} end, @@ -985,9 +1018,10 @@ compact_single_file_recovr_test() -> LedgerFun1, CompactFP, CDB} = compact_single_file_setup(), - [{LowSQN, FN, PidR, _LastKey}] = + CDBOpts = #cdb_options{binary_mode=true}, + [{LowSQN, FN, _PidOldR, LastKey}] = compact_files([Candidate], - #cdb_options{file_path=CompactFP, binary_mode=true}, + CDBOpts#cdb_options{file_path=CompactFP}, LedgerFun1, LedgerSrv1, 9, @@ -995,6 +1029,7 @@ compact_single_file_recovr_test() -> native), io:format("FN of ~s~n", [FN]), ?assertMatch(2, LowSQN), + {ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts), ?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, @@ -1014,6 +1049,7 @@ compact_single_file_recovr_test() -> test_ledgerkey("Key2")}), ?assertMatch({{_, _}, {"Value2", {[], infinity}}}, leveled_codec:from_inkerkv(RKV1)), + ok = leveled_cdb:cdb_close(PidR), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -1024,9 +1060,10 @@ compact_single_file_retain_test() -> LedgerFun1, CompactFP, CDB} = compact_single_file_setup(), - [{LowSQN, FN, PidR, _LK}] = + CDBOpts = #cdb_options{binary_mode=true}, + [{LowSQN, FN, _PidOldR, LastKey}] = compact_files([Candidate], - #cdb_options{file_path=CompactFP, binary_mode=true}, + CDBOpts#cdb_options{file_path=CompactFP}, LedgerFun1, LedgerSrv1, 9, @@ -1034,6 +1071,7 @@ compact_single_file_retain_test() -> native), io:format("FN of ~s~n", [FN]), ?assertMatch(1, LowSQN), + {ok, PidR} = leveled_cdb:cdb_reopen_reader(FN, LastKey, CDBOpts), ?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, @@ -1048,11 +1086,12 @@ compact_single_file_retain_test() -> stnd, test_ledgerkey("Key1")})), RKV1 = leveled_cdb:cdb_get(PidR, - {2, - stnd, - test_ledgerkey("Key2")}), + {2, + stnd, + test_ledgerkey("Key2")}), ?assertMatch({{_, _}, {"Value2", {[], infinity}}}, leveled_codec:from_inkerkv(RKV1)), + ok = leveled_cdb:cdb_close(PidR), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -1147,7 +1186,6 @@ size_score_test() -> coverage_cheat_test() -> {noreply, _State0} = handle_info(timeout, #state{}), {ok, _State1} = code_change(null, #state{}, null), - {reply, not_supported, _State2} = handle_call(null, null, #state{}), terminate(error, #state{}). -endif. diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl index fb72e84..1f431fe 100644 --- a/src/leveled_imanifest.erl +++ b/src/leveled_imanifest.erl @@ -52,8 +52,12 @@ generate_entry(Journal) -> case leveled_cdb:cdb_firstkey(PidR) of {StartSQN, _Type, _PK} -> LastKey = leveled_cdb:cdb_lastkey(PidR), + % close the file here. This will then be re-opened by the inker + % and so will be correctly linked to the inker not to the iclerk + ok = leveled_cdb:cdb_close(PidR), [{StartSQN, NewFN, PidR, LastKey}]; empty -> + ok = leveled_cdb:cdb_close(PidR), leveled_log:log("IC013", [NewFN]), [] end. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2faadf6..4044ff7 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -105,11 +105,10 @@ ink_registersnapshot/2, ink_confirmdelete/2, ink_compactjournal/3, - ink_compactioncomplete/1, + ink_clerkcomplete/3, ink_compactionpending/1, ink_trim/2, ink_getmanifest/1, - ink_updatemanifest/3, ink_printmanifest/1, ink_close/1, ink_doom/1, @@ -142,7 +141,7 @@ journal_sqn = 0 :: integer(), active_journaldb :: pid() | undefined, pending_removals = [] :: list(), - registered_snapshots = [] :: list(), + registered_snapshots = [] :: list(registered_snapshot()), root_path :: string() | undefined, cdb_options :: #cdb_options{} | undefined, clerk :: pid() | undefined, @@ -157,7 +156,7 @@ -type inker_options() :: #inker_options{}. -type ink_state() :: #state{}. - +-type registered_snapshot() :: {pid(), os:timestamp(), integer()}. %%%============================================================================ %%% API @@ -277,7 +276,7 @@ ink_close(Pid) -> %% Test function used to close a file, and return all file paths (potentially %% to erase all persisted existence) ink_doom(Pid) -> - gen_server:call(Pid, doom, 60000). + gen_server:call(Pid, doom, infinity). -spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> fun(). %% @doc @@ -348,7 +347,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> as_ink}, infinity). --spec ink_compactjournal(pid(), pid(), integer()) -> ok. +-spec ink_compactjournal(pid(), pid(), integer()) -> {ok|busy, pid()}. %% @doc %% Trigger a compaction event. the compaction event will use a sqn check %% against the Ledger to see if a value can be compacted - if the penciller @@ -359,7 +358,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> %% that any value that was written more recently than the last flush to disk %% of the Ledger will not be considered for compaction (as this may be %% required to reload the Ledger on startup). -ink_compactjournal(Pid, Bookie, Timeout) -> +ink_compactjournal(Pid, Bookie, _Timeout) -> CheckerInitiateFun = fun initiate_penciller_snapshot/1, CheckerCloseFun = fun leveled_penciller:pcl_close/1, CheckerFilterFun = @@ -369,28 +368,26 @@ ink_compactjournal(Pid, Bookie, Timeout) -> Bookie, CheckerInitiateFun, CheckerCloseFun, - CheckerFilterFun, - Timeout}, + CheckerFilterFun}, infinity). %% Allows the Checker to be overriden in test, use something other than a %% penciller -ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, Timeout) -> +ink_compactjournal(Pid, Checker, InitiateFun, CloseFun, FilterFun, _Timeout) -> gen_server:call(Pid, {compact, Checker, InitiateFun, CloseFun, - FilterFun, - Timeout}, + FilterFun}, infinity). --spec ink_compactioncomplete(pid()) -> ok. +-spec ink_clerkcomplete(pid(), list(), list()) -> ok. %% @doc %% Used by a clerk to state that a compaction process is over, only change %% is to unlock the Inker for further compactions. -ink_compactioncomplete(Pid) -> - gen_server:call(Pid, compaction_complete, infinity). +ink_clerkcomplete(Pid, ManifestSnippet, FilesToDelete) -> + gen_server:cast(Pid, {clerk_complete, ManifestSnippet, FilesToDelete}). -spec ink_compactionpending(pid()) -> boolean(). %% @doc @@ -425,21 +422,6 @@ ink_backup(Pid, BackupPath) -> ink_getmanifest(Pid) -> gen_server:call(Pid, get_manifest, infinity). --spec ink_updatemanifest(pid(), list(), list()) -> {ok, integer()}. -%% @doc -%% Add a section of new entries into the manifest, and drop a bunch of deleted -%% files out of the manifest. Used to update the manifest after a compaction -%% job. -%% -%% Returns {ok, ManSQN} with the ManSQN being the sequence number of the -%% updated manifest -ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) -> - gen_server:call(Pid, - {update_manifest, - ManifestSnippet, - DeletedFiles}, - infinity). - -spec ink_printmanifest(pid()) -> ok. %% @doc %% Used in tests to print out the manifest @@ -574,27 +556,6 @@ handle_call({confirm_delete, ManSQN}, _From, State) -> State#state{registered_snapshots = RegisteredSnapshots0}}; handle_call(get_manifest, _From, State) -> {reply, leveled_imanifest:to_list(State#state.manifest), State}; -handle_call({update_manifest, - ManifestSnippet, - DeletedFiles}, _From, State) -> - DropFun = - fun(E, Acc) -> - leveled_imanifest:remove_entry(Acc, E) - end, - Man0 = lists:foldl(DropFun, State#state.manifest, DeletedFiles), - AddFun = - fun(E, Acc) -> - leveled_imanifest:add_entry(Acc, E, false) - end, - Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), - NewManifestSQN = State#state.manifest_sqn + 1, - leveled_imanifest:printer(Man1), - leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path), - {reply, - {ok, NewManifestSQN}, - State#state{manifest=Man1, - manifest_sqn=NewManifestSQN, - pending_removals=DeletedFiles}}; handle_call(print_manifest, _From, State) -> leveled_imanifest:printer(State#state.manifest), {reply, ok, State}; @@ -602,23 +563,22 @@ handle_call({compact, Checker, InitiateFun, CloseFun, - FilterFun, - Timeout}, + FilterFun}, _From, State) -> + Clerk = State#state.clerk, + Manifest = leveled_imanifest:to_list(State#state.manifest), leveled_iclerk:clerk_compact(State#state.clerk, Checker, InitiateFun, CloseFun, FilterFun, - self(), - Timeout), - {reply, ok, State#state{compaction_pending=true}}; -handle_call(compaction_complete, _From, State) -> - {reply, ok, State#state{compaction_pending=false}}; + Manifest), + {reply, {ok, Clerk}, State#state{compaction_pending=true}}; handle_call(compaction_pending, _From, State) -> {reply, State#state.compaction_pending, State}; handle_call({trim, PersistedSQN}, _From, State) -> - ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN), + Manifest = leveled_imanifest:to_list(State#state.manifest), + ok = leveled_iclerk:clerk_trim(State#state.clerk, PersistedSQN, Manifest), {reply, ok, State}; handle_call(roll, _From, State) -> case leveled_cdb:cdb_lastkey(State#state.active_journaldb) of @@ -712,7 +672,7 @@ handle_call(close, _From, State) -> leveled_log:log("I0005", [close]), leveled_log:log("I0006", [State#state.journal_sqn, State#state.manifest_sqn]), - leveled_iclerk:clerk_stop(State#state.clerk), + ok = leveled_iclerk:clerk_stop(State#state.clerk), shutdown_snapshots(State#state.registered_snapshots), shutdown_manifest(State#state.manifest) end, @@ -727,12 +687,39 @@ handle_call(doom, _From, State) -> leveled_log:log("I0005", [doom]), leveled_log:log("I0006", [State#state.journal_sqn, State#state.manifest_sqn]), - leveled_iclerk:clerk_stop(State#state.clerk), + ok = leveled_iclerk:clerk_stop(State#state.clerk), shutdown_snapshots(State#state.registered_snapshots), shutdown_manifest(State#state.manifest), - {stop, normal, {ok, FPs}, State}. + +handle_cast({clerk_complete, ManifestSnippet, FilesToDelete}, State) -> + CDBOpts = State#state.cdb_options, + DropFun = + fun(E, Acc) -> + leveled_imanifest:remove_entry(Acc, E) + end, + Man0 = lists:foldl(DropFun, State#state.manifest, FilesToDelete), + AddFun = + fun(ManEntry, Acc) -> + {LowSQN, FN, _, LK_RO} = ManEntry, + % At this stage the FN has a .cdb extension, which will be + % stripped during add_entry - so need to add the .cdb here + {ok, Pid} = leveled_cdb:cdb_reopen_reader(FN, LK_RO, CDBOpts), + UpdEntry = {LowSQN, FN, Pid, LK_RO}, + leveled_imanifest:add_entry(Acc, UpdEntry, false) + end, + Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), + NewManifestSQN = State#state.manifest_sqn + 1, + leveled_imanifest:printer(Man1), + leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path), + ok = leveled_iclerk:clerk_promptdeletions(State#state.clerk, + NewManifestSQN, + FilesToDelete), + {noreply, State#state{manifest=Man1, + manifest_sqn=NewManifestSQN, + pending_removals=FilesToDelete, + compaction_pending=false}}; handle_cast({release_snapshot, Snapshot}, State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), leveled_log:log("I0003", [Snapshot]), @@ -843,11 +830,12 @@ start_from_file(InkOpts) -> clerk = Clerk}}. --spec shutdown_snapshots(list(tuple())) -> ok. +-spec shutdown_snapshots(list(registered_snapshot())) -> ok. %% @doc %% Shutdown any snapshots before closing the store shutdown_snapshots(Snapshots) -> - lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, Snapshots). + lists:foreach(fun({Snap, _TS, _SQN}) -> ok = ink_close(Snap) end, + Snapshots). -spec shutdown_manifest(leveled_imanifest:manifest()) -> ok. %% @doc @@ -1243,9 +1231,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) -> ++ "." ++ ?PENDING_FILEX). -initiate_penciller_snapshot(Bookie) -> - {ok, LedgerSnap, _} = - leveled_bookie:book_snapshot(Bookie, ledger, undefined, true), +initiate_penciller_snapshot(LedgerSnap) -> MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. @@ -1445,22 +1431,26 @@ compact_journal_testto(WRP, ExpectedFiles) -> ActualManifest = ink_getmanifest(Ink1), ok = ink_printmanifest(Ink1), ?assertMatch(3, length(ActualManifest)), - ok = ink_compactjournal(Ink1, - Checker, - fun(X) -> {X, 55} end, - fun(_F) -> ok end, - fun(L, K, SQN) -> lists:member({SQN, K}, L) end, - 5000), + {ok, _ICL1} = ink_compactjournal(Ink1, + Checker, + fun(X) -> {X, 55} end, + fun(_F) -> ok end, + fun(L, K, SQN) -> + lists:member({SQN, K}, L) + end, + 5000), timer:sleep(1000), CompactedManifest1 = ink_getmanifest(Ink1), ?assertMatch(2, length(CompactedManifest1)), Checker2 = lists:sublist(Checker, 16), - ok = ink_compactjournal(Ink1, - Checker2, - fun(X) -> {X, 55} end, - fun(_F) -> ok end, - fun(L, K, SQN) -> lists:member({SQN, K}, L) end, - 5000), + {ok, _ICL2} = ink_compactjournal(Ink1, + Checker2, + fun(X) -> {X, 55} end, + fun(_F) -> ok end, + fun(L, K, SQN) -> + lists:member({SQN, K}, L) + end, + 5000), timer:sleep(1000), CompactedManifest2 = ink_getmanifest(Ink1), {ok, PrefixTest} = re:compile(?COMPACT_FP), @@ -1489,12 +1479,12 @@ empty_manifest_test() -> CheckFun = fun(L, K, SQN) -> lists:member({SQN, key_converter(K)}, L) end, ?assertMatch(false, CheckFun([], "key", 1)), - ok = ink_compactjournal(Ink1, - [], - fun(X) -> {X, 55} end, - fun(_F) -> ok end, - CheckFun, - 5000), + {ok, _ICL1} = ink_compactjournal(Ink1, + [], + fun(X) -> {X, 55} end, + fun(_F) -> ok end, + CheckFun, + 5000), timer:sleep(1000), ?assertMatch(1, length(ink_getmanifest(Ink1))), ok = ink_close(Ink1), diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 954e3c0..8c0837a 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -392,8 +392,11 @@ ++ "with totals of cycle_count=~w " ++ "fetch_time=~w index_time=~w"}}, {"CDB20", - {warn, "Error ~w caught when safe reading a file to length ~w"}} - ]). + {warn, "Error ~w caught when safe reading a file to length ~w"}}, + {"CDB21", + {warn, "File ~s to be deleted but already gone"}} + + ]). %%%============================================================================ diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index d86f9c5..3e7b156 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -72,7 +72,6 @@ -include("include/leveled.hrl"). --define(MAX_SLOTS, 256). -define(LOOK_SLOTSIZE, 128). % Maximum of 128 -define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE -define(NOLOOK_SLOTSIZE, 256). @@ -258,7 +257,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {[], [], SlotList, FK} = - merge_lists(KVList, PressMethod0, IndexModDate), + merge_lists(KVList, OptsSST0, IndexModDate), case gen_fsm:sync_send_event(Pid, {sst_new, RootPath, @@ -309,7 +308,7 @@ sst_new(RootPath, Filename, OptsSST0 = OptsSST#sst_options{press_method = PressMethod0}, {Rem1, Rem2, SlotList, FK} = merge_lists(KVL1, KVL2, {IsBasement, Level}, - PressMethod0, IndexModDate), + OptsSST0, IndexModDate), case SlotList of [] -> empty; @@ -499,7 +498,7 @@ starting({sst_newlevelzero, RootPath, Filename, SW1 = os:timestamp(), {[], [], SlotList, FirstKey} = - merge_lists(KVList, PressMethod, IdxModDate), + merge_lists(KVList, OptsSST, IdxModDate), Time1 = timer:now_diff(os:timestamp(), SW1), SW2 = os:timestamp(), @@ -2131,16 +2130,17 @@ revert_position(Pos) -> %% there are matching keys then the highest sequence number must be chosen and %% any lower sequence numbers should be compacted out of existence --spec merge_lists(list(), press_method(), boolean()) +-spec merge_lists(list(), sst_options(), boolean()) -> {list(), list(), list(tuple()), tuple()|null}. %% @doc %% %% Merge from asingle list (i.e. at Level 0) -merge_lists(KVList1, PressMethod, IdxModDate) -> +merge_lists(KVList1, SSTOpts, IdxModDate) -> SlotCount = length(KVList1) div ?LOOK_SLOTSIZE, {[], [], - split_lists(KVList1, [], SlotCount, PressMethod, IdxModDate), + split_lists(KVList1, [], + SlotCount, SSTOpts#sst_options.press_method, IdxModDate), element(1, lists:nth(1, KVList1))}. @@ -2157,33 +2157,34 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) -> split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod, IdxModDate). --spec merge_lists(list(), list(), tuple(), press_method(), boolean()) -> +-spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) -> {list(), list(), list(tuple()), tuple()|null}. %% @doc %% Merge lists when merging across more thna one file. KVLists that are %% provided may include pointers to fetch more Keys/Values from the source %% file -merge_lists(KVList1, KVList2, LevelInfo, PressMethod, IndexModDate) -> +merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) -> merge_lists(KVList1, KVList2, LevelInfo, [], null, 0, - PressMethod, + SSTOpts#sst_options.max_sstslots, + SSTOpts#sst_options.press_method, IndexModDate, #build_timings{}). -merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, ?MAX_SLOTS, +merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, MaxSlots, MaxSlots, _PressMethod, _IdxModDate, T0) -> % This SST file is full, move to complete file, and return the % remainder log_buildtimings(T0, LI), {KVL1, KVL2, lists:reverse(SlotList), FirstKey}; -merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, +merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, _MaxSlots, _PressMethod, _IdxModDate, T0) -> % the source files are empty, complete the file log_buildtimings(T0, LI), {[], [], lists:reverse(SlotList), FirstKey}; -merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, +merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots, PressMethod, IdxModDate, T0) -> % Form a slot by merging the two lists until the next 128 K/V pairs have % been determined @@ -2200,6 +2201,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, SlotList, FK0, SlotCount, + MaxSlots, PressMethod, IdxModDate, T1); @@ -2214,6 +2216,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, [SlotD|SlotList], FK0, SlotCount + 1, + MaxSlots, PressMethod, IdxModDate, T2) @@ -2560,7 +2563,8 @@ merge_tombstonelist_test() -> R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5], [SkippingKV2, SkippingKV4], {true, 9999999}, - native, + #sst_options{press_method = native, + max_sstslots = 256}, ?INDEX_MODDATE), ?assertMatch({[], [], [], null}, R). diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 5aef2e8..a59da44 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -12,6 +12,7 @@ is_empty_test/1, many_put_fetch_switchcompression/1, bigjournal_littlejournal/1, + bigsst_littlesst/1, safereaderror_startup/1, remove_journal_test/1 ]). @@ -27,6 +28,7 @@ all() -> [ is_empty_test, many_put_fetch_switchcompression, bigjournal_littlejournal, + bigsst_littlesst, safereaderror_startup, remove_journal_test ]. @@ -164,6 +166,39 @@ bigjournal_littlejournal(_Config) -> ok = leveled_bookie:book_destroy(Bookie2). +bigsst_littlesst(_Config) -> + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 50000000}, + {cache_size, 1000}, + {max_pencillercachesize, 16000}, + {max_sstslots, 256}, + {sync_strategy, testutil:sync_strategy()}, + {compression_point, on_compact}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjL1 = + testutil:generate_objects(60000, 1, [], + leveled_rand:rand_bytes(100), + fun() -> [] end, <<"B">>), + testutil:riakload(Bookie1, ObjL1), + testutil:check_forlist(Bookie1, ObjL1), + JFP = RootPath ++ "/ledger/ledger_files/", + {ok, FNS1} = file:list_dir(JFP), + ok = leveled_bookie:book_destroy(Bookie1), + + + StartOpts2 = lists:ukeysort(1, [{max_sstslots, 24}|StartOpts1]), + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + testutil:riakload(Bookie2, ObjL1), + testutil:check_forlist(Bookie2, ObjL1), + {ok, FNS2} = file:list_dir(JFP), + ok = leveled_bookie:book_destroy(Bookie2), + io:format("Big SST ~w files Little SST ~w files~n", + [length(FNS1), length(FNS2)]), + true = length(FNS2) > (2 * length(FNS1)). + + + journal_compaction(_Config) -> journal_compaction_tester(false, 3600), journal_compaction_tester(false, undefined), @@ -300,6 +335,7 @@ journal_compaction_tester(Restart, WRP) -> {sync_strategy, testutil:sync_strategy()}], {ok, Bookie3} = leveled_bookie:book_start(StartOpts2), ok = leveled_bookie:book_compactjournal(Bookie3, 30000), + busy = leveled_bookie:book_compactjournal(Bookie3, 30000), testutil:wait_for_compaction(Bookie3), ok = leveled_bookie:book_close(Bookie3), diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index 304b1d7..fb4a9db 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -10,7 +10,8 @@ recovr_strategy/1, aae_missingjournal/1, aae_bustedjournal/1, - journal_compaction_bustedjournal/1 + journal_compaction_bustedjournal/1, + close_duringcompaction/1 ]). all() -> [ @@ -21,10 +22,29 @@ all() -> [ recovr_strategy, aae_missingjournal, aae_bustedjournal, - journal_compaction_bustedjournal + journal_compaction_bustedjournal, + close_duringcompaction ]. +close_duringcompaction(_Config) -> + % Prompt a compaction, and close immedately - confirm that the close + % happens without error. + % This should trigger the iclerk to receive a close during the file + % scoring stage + RootPath = testutil:reset_filestructure(), + BookOpts = [{root_path, RootPath}, + {cache_size, 2000}, + {max_journalsize, 2000000}, + {sync_strategy, testutil:sync_strategy()}], + {ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 6400), + {ok, Book1} = leveled_bookie:book_start(BookOpts), + ok = leveled_bookie:book_compactjournal(Book1, 30000), + ok = leveled_bookie:book_close(Book1), + {ok, Book2} = leveled_bookie:book_start(BookOpts), + ok = testutil:check_indexed_objects(Book2, "Bucket1", Spcl1, LastV1), + ok = leveled_bookie:book_close(Book2). + recovery_with_samekeyupdates(_Config) -> % Setup to prove https://github.com/martinsumner/leveled/issues/229 % run a test that involves many updates to the same key, and check that diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index e0f5f01..ba422b0 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -724,6 +724,9 @@ basic_headonly_test(ObjectCount, RemoveCount, HeadOnly) -> {ok, FinalFNs} = file:list_dir(JFP), + ok = leveled_bookie:book_trimjournal(Bookie1), + % CCheck a second trim is still OK + [{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL, case HeadOnly of with_lookup ->