Check not pending before compacting

Also check for existence before deleting a CDB file
This commit is contained in:
Martin Sumner 2019-01-25 19:11:34 +00:00
parent 7801f16de9
commit f7022627e5
7 changed files with 68 additions and 88 deletions

BIN
.eqc-info

Binary file not shown.

Binary file not shown.

View file

@ -1006,7 +1006,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
-spec book_compactjournal(pid(), integer()) -> ok|busy.
-spec book_eqccompactjournal(pid(), integer()) -> {ok, pid()}.
-spec book_eqccompactjournal(pid(), integer()) -> {ok|busy, pid()|undefined}.
-spec book_islastcompactionpending(pid()) -> boolean().
-spec book_trimjournal(pid()) -> ok.
@ -1016,8 +1016,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
%% in Riak it will be triggered by a vnode callback.
book_eqccompactjournal(Pid, Timeout) ->
{_R, P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity),
{ok, P}.
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
book_compactjournal(Pid, Timeout) ->
{R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity),
@ -1129,7 +1128,7 @@ init([Opts]) ->
ConfiguredCacheSize =
max(proplists:get_value(cache_size, Opts), ?MIN_CACHE_SIZE),
CacheJitter =
ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER),
max(1, ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER)),
CacheSize =
ConfiguredCacheSize + erlang:phash2(self()) rem CacheJitter,
PCLMaxSize =
@ -1378,10 +1377,17 @@ handle_call({return_runner, QueryType}, _From, State) ->
fold_countdown = CountDown}};
handle_call({compact_journal, Timeout}, _From, State)
when State#state.head_only == false ->
R = leveled_inker:ink_compactjournal(State#state.inker,
self(),
Timeout),
{reply, R, State};
case leveled_inker:ink_compactionpending(State#state.inker) of
true ->
{reply, {busy, undefined}, State};
false ->
{ok, PclSnap, null} =
snapshot_store(State, ledger, undefined, true),
R = leveled_inker:ink_compactjournal(State#state.inker,
PclSnap,
Timeout),
{reply, R, State}
end;
handle_call(confirm_compact, _From, State)
when State#state.head_only == false ->
{reply, leveled_inker:ink_compactionpending(State#state.inker), State};

View file

@ -823,15 +823,21 @@ finished_rolling(CDB) ->
%% If delete is pending - thent he close behaviour needs to actuallly delete
%% the file
close_pendingdelete(Handle, Filename, WasteFP) ->
case WasteFP of
undefined ->
ok = file:close(Handle),
ok = file:delete(Filename);
WasteFP ->
file:close(Handle),
Components = filename:split(Filename),
NewName = WasteFP ++ lists:last(Components),
file:rename(Filename, NewName)
ok = file:close(Handle),
case filelib:is_file(Filename) of
true ->
case WasteFP of
undefined ->
ok = file:delete(Filename);
WasteFP ->
Components = filename:split(Filename),
NewName = WasteFP ++ lists:last(Components),
file:rename(Filename, NewName)
end;
false ->
% This may happen when there has been a destroy while files are
% still pending deletion
leveled_log:log("CDB21", [Filename])
end.
-spec set_writeops(sync|riak_sync|none) -> {list(), sync|riak_sync|none}.

View file

@ -276,7 +276,7 @@ ink_close(Pid) ->
%% Test function used to close a file, and return all file paths (potentially
%% to erase all persisted existence)
ink_doom(Pid) ->
gen_server:call(Pid, doom, 60000).
gen_server:call(Pid, doom, infinity).
-spec ink_fold(pid(), integer(), {fun(), fun(), fun()}, any()) -> fun().
%% @doc
@ -566,19 +566,14 @@ handle_call({compact,
FilterFun},
_From, State) ->
Clerk = State#state.clerk,
case State#state.compaction_pending of
true ->
{reply, {busy, Clerk}, State};
false ->
Manifest = leveled_imanifest:to_list(State#state.manifest),
leveled_iclerk:clerk_compact(State#state.clerk,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Manifest),
{reply, {ok, Clerk}, State#state{compaction_pending=true}}
end;
Manifest = leveled_imanifest:to_list(State#state.manifest),
leveled_iclerk:clerk_compact(State#state.clerk,
Checker,
InitiateFun,
CloseFun,
FilterFun,
Manifest),
{reply, {ok, Clerk}, State#state{compaction_pending=true}};
handle_call(compaction_pending, _From, State) ->
{reply, State#state.compaction_pending, State};
handle_call({trim, PersistedSQN}, _From, State) ->
@ -1230,9 +1225,7 @@ filepath(CompactFilePath, NewSQN, compact_journal) ->
++ "." ++ ?PENDING_FILEX).
initiate_penciller_snapshot(Bookie) ->
{ok, LedgerSnap, _} =
leveled_bookie:book_snapshot(Bookie, ledger, undefined, true),
initiate_penciller_snapshot(LedgerSnap) ->
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
{LedgerSnap, MaxSQN}.

View file

@ -392,8 +392,11 @@
++ "with totals of cycle_count=~w "
++ "fetch_time=~w index_time=~w"}},
{"CDB20",
{warn, "Error ~w caught when safe reading a file to length ~w"}}
]).
{warn, "Error ~w caught when safe reading a file to length ~w"}},
{"CDB21",
{warn, "File ~s to be deleted but already gone"}}
]).
%%%============================================================================

View file

@ -94,8 +94,8 @@ init_backend_adapt(S, [Tag, Options, Name]) ->
%% @doc init_backend - The actual operation
%% Start the database and read data from disk
init_backend(_Tag, Options, Name) ->
% Options0 = proplists:delete(log_level, Options),
case leveled_bookie:book_start(Options) of
Options0 = proplists:delete(log_level, Options),
case leveled_bookie:book_start(Options0) of
{ok, Bookie} ->
unlink(Bookie),
erlang:register(Name, Bookie),
@ -570,33 +570,9 @@ kill_next(S, Value, [Pid]) ->
%% --- Operation: compactisalive ---
compactisalive_pre(S) ->
is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined.
compactisalive_args(#{iclerk := IClerk}) ->
[IClerk].
compactisalive_pre(#{iclerk := Pid}, [IClerk]) ->
Pid == IClerk.
compactisalive(IClerk) ->
is_process_alive(IClerk).
compactisalive_post(_S, [_IClerk], Res) ->
Res.
%% --- Operation: compacthappened ---
compacthappened_pre(S) ->
is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined.
%% Commenting out args disables the operation
%% compacthappened_args(#{dir := DataDir}) ->
%% [DataDir].
compacthappened(DataDir) ->
PostCompact = filename:join(DataDir, "journal/journal_files/post_compact"),
case filelib:is_dir(PostCompact) of
@ -627,10 +603,6 @@ journalwritten(DataDir) ->
[]
end.
compacthappened_post(_S, [_DataDir], Res) ->
eq(Res, []).
%% --- Operation: compact journal ---
@ -647,27 +619,27 @@ compact_adapt(#{leveled := Leveled}, [_Pid, TS]) ->
[ Leveled, TS ].
compact(Pid, TS) ->
{ok, IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS),
IClerk.
{R, _IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS),
R.
compact_next(S, IClerk, [_Pid, _TS]) ->
case maps:get(iclerk, S, undefined) of
undefined ->
S#{iclerk => IClerk};
compact_next(S, R, [_Pid, _TS]) ->
case {R, maps:get(previous_compact, S, undefined)} of
{ok, undefined} ->
S#{previous_compact => true};
_ ->
S
end.
compact_post(S, [_Pid, _TS], Res) ->
case maps:get(iclerk, S, undefined) of
undefined ->
is_pid(Res);
IClerk ->
IClerk == Res
case Res of
ok ->
true;
busy ->
true == maps:get(previous_compact, S, undefined)
end.
compact_features(S, [_Pid, _TS], _Res) ->
case maps:get(iclerk, S, undefined) of
case maps:get(previous_compact, S, undefined) of
undefined ->
[{compact, fresh}];
_ ->
@ -1054,12 +1026,16 @@ prop_db() ->
CompactionFiles = compacthappened(Dir),
LedgerFiles = ledgerpersisted(Dir),
JournalFiles = journalwritten(Dir),
io:format("File counts: Compacted ~w Journal ~w Ledger ~w~n", [length(CompactionFiles), length(LedgerFiles), length(JournalFiles)]),
% io:format("File counts: Compacted ~w Journal ~w Ledger ~w~n", [length(CompactionFiles), length(LedgerFiles), length(JournalFiles)]),
case whereis(maps:get(sut, initial_state())) of
undefined -> delete_level_data(Dir);
undefined ->
% io:format("Init state undefined - deleting~n"),
delete_level_data(Dir);
Pid when is_pid(Pid) ->
% io:format("Init state defined - destroying~n"),
leveled_bookie:book_destroy(Pid)
end,
@ -1236,18 +1212,14 @@ in_head_only_mode(S) ->
wait_for_procs(Known, Timeout) ->
case erlang:processes() -- Known of
[] -> [];
Running ->
_NonEmptyList ->
if
Timeout > 100 ->
timer:sleep(100),
wait_for_procs(Known, Timeout - 100);
Timeout >= 0 ->
lists:map(fun(P) -> io:format("********* Sending timeout to ~w *********~n", [P]), gen_fsm:send_event(P, timeout) end, Running),
Timeout > 0 ->
timer:sleep(100),
wait_for_procs(Known, Timeout - 100);
true ->
lists:foreach(fun(P) -> io:format("Process info : ~w~n", [process_info(P)]) end, Running),
Running
timer:sleep(10000),
erlang:processes() -- Known
end
end.