diff --git a/.eqc-info b/.eqc-info index 7dba85a..663432b 100644 Binary files a/.eqc-info and b/.eqc-info differ diff --git a/current_counterexample.eqc b/current_counterexample.eqc index c4c9f59..aafab95 100644 Binary files a/current_counterexample.eqc and b/current_counterexample.eqc differ diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index a0fd287..02b66e7 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1006,7 +1006,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> -spec book_compactjournal(pid(), integer()) -> ok|busy. --spec book_eqccompactjournal(pid(), integer()) -> {ok, pid()}. +-spec book_eqccompactjournal(pid(), integer()) -> {ok|busy, pid()|undefined}. -spec book_islastcompactionpending(pid()) -> boolean(). -spec book_trimjournal(pid()) -> ok. @@ -1016,8 +1016,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> %% in Riak it will be triggered by a vnode callback. book_eqccompactjournal(Pid, Timeout) -> - {_R, P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), - {ok, P}. + gen_server:call(Pid, {compact_journal, Timeout}, infinity). book_compactjournal(Pid, Timeout) -> {R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), @@ -1129,7 +1128,7 @@ init([Opts]) -> ConfiguredCacheSize = max(proplists:get_value(cache_size, Opts), ?MIN_CACHE_SIZE), CacheJitter = - ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER), + max(1, ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER)), CacheSize = ConfiguredCacheSize + erlang:phash2(self()) rem CacheJitter, PCLMaxSize = @@ -1378,10 +1377,17 @@ handle_call({return_runner, QueryType}, _From, State) -> fold_countdown = CountDown}}; handle_call({compact_journal, Timeout}, _From, State) when State#state.head_only == false -> - R = leveled_inker:ink_compactjournal(State#state.inker, - self(), - Timeout), - {reply, R, State}; + case leveled_inker:ink_compactionpending(State#state.inker) of + true -> + {reply, {busy, undefined}, State}; + false -> + {ok, PclSnap, null} = + snapshot_store(State, ledger, undefined, true), + R = leveled_inker:ink_compactjournal(State#state.inker, + PclSnap, + Timeout), + {reply, R, State} + end; handle_call(confirm_compact, _From, State) when State#state.head_only == false -> {reply, leveled_inker:ink_compactionpending(State#state.inker), State}; diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 546945d..1693ed3 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -823,15 +823,21 @@ finished_rolling(CDB) -> %% If delete is pending - thent he close behaviour needs to actuallly delete %% the file close_pendingdelete(Handle, Filename, WasteFP) -> - case WasteFP of - undefined -> - ok = file:close(Handle), - ok = file:delete(Filename); - WasteFP -> - file:close(Handle), - Components = filename:split(Filename), - NewName = WasteFP ++ lists:last(Components), - file:rename(Filename, NewName) + ok = file:close(Handle), + case filelib:is_file(Filename) of + true -> + case WasteFP of + undefined -> + ok = file:delete(Filename); + WasteFP -> + Components = filename:split(Filename), + NewName = WasteFP ++ lists:last(Components), + file:rename(Filename, NewName) + end; + false -> + % This may happen when there has been a destroy while files are + % still pending deletion + leveled_log:log("CDB21", [Filename]) end. -spec set_writeops(sync|riak_sync|none) -> {list(), sync|riak_sync|none}. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 858a5e9..b27fbb7 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -276,7 +276,7 @@ ink_close(Pid) -> %% Test function used to close a file, and return all file paths (potentially %% to erase all persisted existence) ink_doom(Pid) -> - gen_server:call(Pid, doom, 60000). + gen_server:call(Pid, doom, infinity). -spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> fun(). %% @doc @@ -566,19 +566,14 @@ handle_call({compact, FilterFun}, _From, State) -> Clerk = State#state.clerk, - case State#state.compaction_pending of - true -> - {reply, {busy, Clerk}, State}; - false -> - Manifest = leveled_imanifest:to_list(State#state.manifest), - leveled_iclerk:clerk_compact(State#state.clerk, - Checker, - InitiateFun, - CloseFun, - FilterFun, - Manifest), - {reply, {ok, Clerk}, State#state{compaction_pending=true}} - end; + Manifest = leveled_imanifest:to_list(State#state.manifest), + leveled_iclerk:clerk_compact(State#state.clerk, + Checker, + InitiateFun, + CloseFun, + FilterFun, + Manifest), + {reply, {ok, Clerk}, State#state{compaction_pending=true}}; handle_call(compaction_pending, _From, State) -> {reply, State#state.compaction_pending, State}; handle_call({trim, PersistedSQN}, _From, State) -> @@ -1230,9 +1225,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) -> ++ "." ++ ?PENDING_FILEX). -initiate_penciller_snapshot(Bookie) -> - {ok, LedgerSnap, _} = - leveled_bookie:book_snapshot(Bookie, ledger, undefined, true), +initiate_penciller_snapshot(LedgerSnap) -> MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 954e3c0..8c0837a 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -392,8 +392,11 @@ ++ "with totals of cycle_count=~w " ++ "fetch_time=~w index_time=~w"}}, {"CDB20", - {warn, "Error ~w caught when safe reading a file to length ~w"}} - ]). + {warn, "Error ~w caught when safe reading a file to length ~w"}}, + {"CDB21", + {warn, "File ~s to be deleted but already gone"}} + + ]). %%%============================================================================ diff --git a/test/leveledjc_eqc.erl b/test/leveledjc_eqc.erl index 18c80c8..b188b2d 100644 --- a/test/leveledjc_eqc.erl +++ b/test/leveledjc_eqc.erl @@ -94,8 +94,8 @@ init_backend_adapt(S, [Tag, Options, Name]) -> %% @doc init_backend - The actual operation %% Start the database and read data from disk init_backend(_Tag, Options, Name) -> - % Options0 = proplists:delete(log_level, Options), - case leveled_bookie:book_start(Options) of + Options0 = proplists:delete(log_level, Options), + case leveled_bookie:book_start(Options0) of {ok, Bookie} -> unlink(Bookie), erlang:register(Name, Bookie), @@ -570,33 +570,9 @@ kill_next(S, Value, [Pid]) -> -%% --- Operation: compactisalive --- - -compactisalive_pre(S) -> - is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined. - -compactisalive_args(#{iclerk := IClerk}) -> - [IClerk]. - -compactisalive_pre(#{iclerk := Pid}, [IClerk]) -> - Pid == IClerk. - -compactisalive(IClerk) -> - is_process_alive(IClerk). - -compactisalive_post(_S, [_IClerk], Res) -> - Res. - %% --- Operation: compacthappened --- -compacthappened_pre(S) -> - is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined. - -%% Commenting out args disables the operation -%% compacthappened_args(#{dir := DataDir}) -> -%% [DataDir]. - compacthappened(DataDir) -> PostCompact = filename:join(DataDir, "journal/journal_files/post_compact"), case filelib:is_dir(PostCompact) of @@ -627,10 +603,6 @@ journalwritten(DataDir) -> [] end. -compacthappened_post(_S, [_DataDir], Res) -> - eq(Res, []). - - %% --- Operation: compact journal --- @@ -647,27 +619,27 @@ compact_adapt(#{leveled := Leveled}, [_Pid, TS]) -> [ Leveled, TS ]. compact(Pid, TS) -> - {ok, IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS), - IClerk. + {R, _IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS), + R. -compact_next(S, IClerk, [_Pid, _TS]) -> - case maps:get(iclerk, S, undefined) of - undefined -> - S#{iclerk => IClerk}; +compact_next(S, R, [_Pid, _TS]) -> + case {R, maps:get(previous_compact, S, undefined)} of + {ok, undefined} -> + S#{previous_compact => true}; _ -> S end. compact_post(S, [_Pid, _TS], Res) -> - case maps:get(iclerk, S, undefined) of - undefined -> - is_pid(Res); - IClerk -> - IClerk == Res + case Res of + ok -> + true; + busy -> + true == maps:get(previous_compact, S, undefined) end. compact_features(S, [_Pid, _TS], _Res) -> - case maps:get(iclerk, S, undefined) of + case maps:get(previous_compact, S, undefined) of undefined -> [{compact, fresh}]; _ -> @@ -1054,12 +1026,16 @@ prop_db() -> CompactionFiles = compacthappened(Dir), LedgerFiles = ledgerpersisted(Dir), JournalFiles = journalwritten(Dir), - io:format("File counts: Compacted ~w Journal ~w Ledger ~w~n", [length(CompactionFiles), length(LedgerFiles), length(JournalFiles)]), + % io:format("File counts: Compacted ~w Journal ~w Ledger ~w~n", [length(CompactionFiles), length(LedgerFiles), length(JournalFiles)]), + case whereis(maps:get(sut, initial_state())) of - undefined -> delete_level_data(Dir); + undefined -> + % io:format("Init state undefined - deleting~n"), + delete_level_data(Dir); Pid when is_pid(Pid) -> + % io:format("Init state defined - destroying~n"), leveled_bookie:book_destroy(Pid) end, @@ -1236,18 +1212,14 @@ in_head_only_mode(S) -> wait_for_procs(Known, Timeout) -> case erlang:processes() -- Known of [] -> []; - Running -> + _NonEmptyList -> if - Timeout > 100 -> - timer:sleep(100), - wait_for_procs(Known, Timeout - 100); - Timeout >= 0 -> - lists:map(fun(P) -> io:format("********* Sending timeout to ~w *********~n", [P]), gen_fsm:send_event(P, timeout) end, Running), + Timeout > 0 -> timer:sleep(100), wait_for_procs(Known, Timeout - 100); true -> - lists:foreach(fun(P) -> io:format("Process info : ~w~n", [process_info(P)]) end, Running), - Running + timer:sleep(10000), + erlang:processes() -- Known end end.