0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Don't hang forever if stream proc is down

If the stream process---the 'pid' in the cowboy request---has
terminated, don't hang forever waiting for an ack.

The stream process could presumably terminate if the remote end
closes the tcp connection.
This commit is contained in:
Simon Johansson 2020-11-03 13:32:41 +01:00
parent c47f1e5fb8
commit d535682fa7
3 changed files with 47 additions and 1 deletions

View file

@ -887,7 +887,14 @@ stream_body(Data, IsFin, Req=#{has_sent_resp := headers})
%% @todo Do we need a timeout? %% @todo Do we need a timeout?
stream_body(Msg, Req=#{pid := Pid}) -> stream_body(Msg, Req=#{pid := Pid}) ->
cast(Msg, Req), cast(Msg, Req),
receive {data_ack, Pid} -> ok end. MRef = monitor(process, Pid),
receive
{data_ack, Pid} ->
demonitor(MRef, [flush]),
ok;
{'DOWN', MRef, _, _, Reason} ->
{terminated, stream_body, Pid, Msg, Reason}
end.
-spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok. -spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok.
stream_events(Event, IsFin, Req) when is_map(Event) -> stream_events(Event, IsFin, Req) when is_map(Event) ->

View file

@ -285,6 +285,11 @@ do(<<"stream_body">>, Req0, Opts) ->
error(timeout) error(timeout)
end, end,
{ok, Req, Opts}; {ok, Req, Opts};
<<"let_test_stream_body">> ->
Req = cowboy_req:stream_reply(200, Req0),
the_test_case ! {here_is_the_req, self(), Req},
receive test_case_done -> ok end,
{ok, Req, Opts};
_ -> _ ->
%% Call stream_body without initiating streaming. %% Call stream_body without initiating streaming.
cowboy_req:stream_body(<<0:800000>>, fin, Req0), cowboy_req:stream_body(<<0:800000>>, fin, Req0),

View file

@ -1022,6 +1022,40 @@ stream_body_concurrent(Config) ->
{ok, _} = gun:await_body(ConnPid, Ref2, infinity), {ok, _} = gun:await_body(ConnPid, Ref2, infinity),
gun:close(ConnPid). gun:close(ConnPid).
stream_body_close(Config) ->
doc("Confirm server does not hang in cowboy_req:stream_body/3 if client closes the connection."),
Path = "/resp/stream_body/let_test_stream_body",
ConnPid = gun_open(Config),
%% After sending the first part, we close the connection. After the request
%% process has died, we send the second part. If the handler gets stuck
%% sending on the closed connection, the test will fail.
register(the_test_case, self()),
Ref = gun:get(ConnPid, Path, []),
spawn(fun() -> gun:await(ConnPid, Ref, infinity) end),
receive
{here_is_the_req, HandlerPid, Req} ->
cowboy_req:stream_body(<<"Hello ">>, nofin, Req),
#{pid := ReqPid} = Req,
ReqMRef = monitor(process, ReqPid),
gun:close(ConnPid),
receive
{'DOWN', ReqMRef, _, _, _} -> ok
end,
{_, StreamMRef} = spawn_monitor(fun () ->
exit(cowboy_req:stream_body(<<"world">>, nofin, Req))
end),
receive
{'DOWN', StreamMRef, _, _, _} -> ok
after 5000 ->
ct:fail("Hanging in stream_body/3 on closed connection")
end,
HandlerPid ! test_case_done
end,
ok.
%% @todo Crash when calling stream_body after the fin flag has been set. %% @todo Crash when calling stream_body after the fin flag has been set.
%% @todo Crash when calling stream_body after calling reply. %% @todo Crash when calling stream_body after calling reply.
%% @todo Crash when calling stream_body before calling stream_reply. %% @todo Crash when calling stream_body before calling stream_reply.