Inker Close nastiness
Try to stop some of the potential deadlocking around Inker close and prove that snapshots at higher Manifest SQNs can be ignored
This commit is contained in:
parent
75d6af75c6
commit
630f802780
7 changed files with 57 additions and 28 deletions
|
@ -502,8 +502,8 @@ terminate(Reason, StateName, State) ->
|
||||||
{undefined, _, _} ->
|
{undefined, _, _} ->
|
||||||
ok;
|
ok;
|
||||||
{Handle, delete_pending, undefined} ->
|
{Handle, delete_pending, undefined} ->
|
||||||
file:close(Handle),
|
ok = file:close(Handle),
|
||||||
file:delete(Handle);
|
ok = file:delete(State#state.filename);
|
||||||
{Handle, delete_pending, WasteFP} ->
|
{Handle, delete_pending, WasteFP} ->
|
||||||
file:close(Handle),
|
file:close(Handle),
|
||||||
Components = filename:split(State#state.filename),
|
Components = filename:split(State#state.filename),
|
||||||
|
|
|
@ -203,10 +203,7 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
|
||||||
update_inker(Inker,
|
update_inker(Inker,
|
||||||
ManifestSlice,
|
ManifestSlice,
|
||||||
FilesToDelete),
|
FilesToDelete),
|
||||||
{noreply, State};
|
{noreply, State}
|
||||||
false ->
|
|
||||||
leveled_log:log("IC001", []),
|
|
||||||
{stop, normal, State}
|
|
||||||
end;
|
end;
|
||||||
Score ->
|
Score ->
|
||||||
leveled_log:log("IC003", [Score]),
|
leveled_log:log("IC003", [Score]),
|
||||||
|
@ -223,8 +220,10 @@ handle_cast(stop, State) ->
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(normal, _State) ->
|
||||||
ok.
|
ok;
|
||||||
|
terminate(Reason, _State) ->
|
||||||
|
leveled_log:log("IC001", Reason).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
|
@ -163,10 +163,11 @@ ink_registersnapshot(Pid, Requestor) ->
|
||||||
gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
|
gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
|
||||||
|
|
||||||
ink_releasesnapshot(Pid, Snapshot) ->
|
ink_releasesnapshot(Pid, Snapshot) ->
|
||||||
gen_server:call(Pid, {release_snapshot, Snapshot}, infinity).
|
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
||||||
|
|
||||||
ink_confirmdelete(Pid, ManSQN) ->
|
ink_confirmdelete(Pid, ManSQN) ->
|
||||||
gen_server:call(Pid, {confirm_delete, ManSQN}, 1000).
|
io:format("Confirm delete request received~n"),
|
||||||
|
gen_server:call(Pid, {confirm_delete, ManSQN}).
|
||||||
|
|
||||||
ink_close(Pid) ->
|
ink_close(Pid) ->
|
||||||
gen_server:call(Pid, close, infinity).
|
gen_server:call(Pid, close, infinity).
|
||||||
|
@ -267,12 +268,8 @@ handle_call({register_snapshot, Requestor}, _From , State) ->
|
||||||
{reply, {State#state.manifest,
|
{reply, {State#state.manifest,
|
||||||
State#state.active_journaldb},
|
State#state.active_journaldb},
|
||||||
State#state{registered_snapshots=Rs}};
|
State#state{registered_snapshots=Rs}};
|
||||||
handle_call({release_snapshot, Snapshot}, _From , State) ->
|
|
||||||
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
|
|
||||||
leveled_log:log("I0003", [Snapshot]),
|
|
||||||
leveled_log:log("I0004", [length(Rs)]),
|
|
||||||
{reply, ok, State#state{registered_snapshots=Rs}};
|
|
||||||
handle_call({confirm_delete, ManSQN}, _From, State) ->
|
handle_call({confirm_delete, ManSQN}, _From, State) ->
|
||||||
|
io:format("Confirm delete request to be processed~n"),
|
||||||
Reply = lists:foldl(fun({_R, SnapSQN}, Bool) ->
|
Reply = lists:foldl(fun({_R, SnapSQN}, Bool) ->
|
||||||
case SnapSQN >= ManSQN of
|
case SnapSQN >= ManSQN of
|
||||||
true ->
|
true ->
|
||||||
|
@ -282,6 +279,7 @@ handle_call({confirm_delete, ManSQN}, _From, State) ->
|
||||||
end end,
|
end end,
|
||||||
true,
|
true,
|
||||||
State#state.registered_snapshots),
|
State#state.registered_snapshots),
|
||||||
|
io:format("Confirm delete request complete with reply ~w~n", [Reply]),
|
||||||
{reply, Reply, State};
|
{reply, Reply, State};
|
||||||
handle_call(get_manifest, _From, State) ->
|
handle_call(get_manifest, _From, State) ->
|
||||||
{reply, State#state.manifest, State};
|
{reply, State#state.manifest, State};
|
||||||
|
@ -328,8 +326,11 @@ handle_call(compaction_pending, _From, State) ->
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State) ->
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast({release_snapshot, Snapshot}, State) ->
|
||||||
{noreply, State}.
|
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
|
||||||
|
leveled_log:log("I0003", [Snapshot]),
|
||||||
|
leveled_log:log("I0004", [length(Rs)]),
|
||||||
|
{noreply, State#state{registered_snapshots=Rs}}.
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
|
@ -163,8 +163,7 @@
|
||||||
{info, "At SQN=~w journal has filename ~s"}},
|
{info, "At SQN=~w journal has filename ~s"}},
|
||||||
|
|
||||||
{"IC001",
|
{"IC001",
|
||||||
{info, "Inker no longer alive so Clerk to abandon work "
|
{info, "Closed for reason ~w so maybe leaving garbage"}},
|
||||||
++ "leaving garbage"}},
|
|
||||||
{"IC002",
|
{"IC002",
|
||||||
{info, "Clerk updating Inker as compaction complete of ~w files"}},
|
{info, "Clerk updating Inker as compaction complete of ~w files"}},
|
||||||
{"IC003",
|
{"IC003",
|
||||||
|
|
|
@ -1580,8 +1580,9 @@ commit_manifest_test() ->
|
||||||
start_time=os:timestamp()},
|
start_time=os:timestamp()},
|
||||||
Resp_WI = #penciller_work{next_sqn=1,
|
Resp_WI = #penciller_work{next_sqn=1,
|
||||||
src_level=0},
|
src_level=0},
|
||||||
State = #state{ongoing_work=[Sent_WI],
|
State = #state{ongoing_work = [Sent_WI],
|
||||||
root_path = "test"},
|
root_path = "test",
|
||||||
|
manifest_sqn = 0},
|
||||||
ManifestFP = "test" ++ "/" ++ ?MANIFEST_FP ++ "/",
|
ManifestFP = "test" ++ "/" ++ ?MANIFEST_FP ++ "/",
|
||||||
ok = filelib:ensure_dir(ManifestFP),
|
ok = filelib:ensure_dir(ManifestFP),
|
||||||
ok = file:write_file(ManifestFP ++ "nonzero_1.pnd",
|
ok = file:write_file(ManifestFP ++ "nonzero_1.pnd",
|
||||||
|
@ -1591,7 +1592,6 @@ commit_manifest_test() ->
|
||||||
Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0,
|
Resp_WI0 = Resp_WI#penciller_work{new_manifest=L1_0,
|
||||||
unreferenced_files=[]},
|
unreferenced_files=[]},
|
||||||
{ok, State0} = commit_manifest_change(Resp_WI0, State),
|
{ok, State0} = commit_manifest_change(Resp_WI0, State),
|
||||||
?assertMatch(0, State#state.manifest_sqn),
|
|
||||||
?assertMatch(1, State0#state.manifest_sqn),
|
?assertMatch(1, State0#state.manifest_sqn),
|
||||||
?assertMatch([], get_item(0, State0#state.manifest, [])),
|
?assertMatch([], get_item(0, State0#state.manifest, [])),
|
||||||
|
|
||||||
|
|
|
@ -146,21 +146,51 @@ journal_compaction(_Config) ->
|
||||||
testutil:riakload(Bookie1, ObjList2),
|
testutil:riakload(Bookie1, ObjList2),
|
||||||
|
|
||||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
||||||
|
|
||||||
testutil:wait_for_compaction(Bookie1),
|
testutil:wait_for_compaction(Bookie1),
|
||||||
|
% Start snapshot - should not stop deletions
|
||||||
|
{ok,
|
||||||
|
{PclClone, _LdgCache},
|
||||||
|
InkClone} = leveled_bookie:book_snapshotstore(Bookie1,
|
||||||
|
self(),
|
||||||
|
300000),
|
||||||
|
% Wait 2 seconds for files to be deleted
|
||||||
|
WasteFP = RootPath ++ "/journal/journal_files/waste",
|
||||||
|
lists:foldl(fun(X, Found) ->
|
||||||
|
case Found of
|
||||||
|
true ->
|
||||||
|
Found;
|
||||||
|
false ->
|
||||||
|
{ok, Files} = file:list_dir(WasteFP),
|
||||||
|
if
|
||||||
|
length(Files) > 0 ->
|
||||||
|
io:format("Deleted files found~n"),
|
||||||
|
true;
|
||||||
|
length(Files) == 0 ->
|
||||||
|
timer:sleep(X),
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
false,
|
||||||
|
[2000,2000,2000,2000,2000,2000]),
|
||||||
|
{ok, ClearedJournals} = file:list_dir(WasteFP),
|
||||||
|
io:format("~w ClearedJournals found~n", [length(ClearedJournals)]),
|
||||||
|
true = length(ClearedJournals) > 0,
|
||||||
|
|
||||||
ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
|
ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
|
||||||
testutil:check_forlist(Bookie1, ChkList3),
|
testutil:check_forlist(Bookie1, ChkList3),
|
||||||
|
|
||||||
|
ok = leveled_penciller:pcl_close(PclClone),
|
||||||
|
ok = leveled_inker:ink_close(InkClone),
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie1),
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
% Restart
|
% Restart
|
||||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||||
testutil:check_forobject(Bookie2, TestObject),
|
testutil:check_forobject(Bookie2, TestObject),
|
||||||
testutil:check_forlist(Bookie2, ChkList3),
|
testutil:check_forlist(Bookie2, ChkList3),
|
||||||
ok = leveled_bookie:book_close(Bookie2),
|
|
||||||
|
|
||||||
WasteFP = RootPath ++ "/journal/journal_files/waste",
|
ok = leveled_bookie:book_close(Bookie2),
|
||||||
{ok, ClearedJournals} = file:list_dir(WasteFP),
|
|
||||||
io:format("~w ClearedJournals found~n", [length(ClearedJournals)]),
|
|
||||||
true = length(ClearedJournals) > 0,
|
|
||||||
|
|
||||||
StartOpts2 = [{root_path, RootPath},
|
StartOpts2 = [{root_path, RootPath},
|
||||||
{max_journalsize, 10000000},
|
{max_journalsize, 10000000},
|
||||||
|
|
|
@ -95,7 +95,7 @@ wait_for_compaction(Bookie) ->
|
||||||
true ->
|
true ->
|
||||||
io:format("Loop ~w waiting for journal "
|
io:format("Loop ~w waiting for journal "
|
||||||
++ "compaction to complete~n", [X]),
|
++ "compaction to complete~n", [X]),
|
||||||
timer:sleep(20000),
|
timer:sleep(5000),
|
||||||
F(Bookie)
|
F(Bookie)
|
||||||
end end,
|
end end,
|
||||||
true,
|
true,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue