diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 90c5a3a0..0f558772 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -887,7 +887,14 @@ stream_body(Data, IsFin, Req=#{has_sent_resp := headers}) %% @todo Do we need a timeout? stream_body(Msg, Req=#{pid := Pid}) -> cast(Msg, Req), - receive {data_ack, Pid} -> ok end. + MRef = monitor(process, Pid), + receive + {data_ack, Pid} -> + demonitor(MRef, [flush]), + ok; + {'DOWN', MRef, _, _, Reason} -> + {terminated, stream_body, Pid, Msg, Reason} + end. -spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok. stream_events(Event, IsFin, Req) when is_map(Event) -> diff --git a/test/handlers/resp_h.erl b/test/handlers/resp_h.erl index 8031d0ed..ea41ed2f 100644 --- a/test/handlers/resp_h.erl +++ b/test/handlers/resp_h.erl @@ -285,6 +285,11 @@ do(<<"stream_body">>, Req0, Opts) -> error(timeout) end, {ok, Req, Opts}; + <<"let_test_stream_body">> -> + Req = cowboy_req:stream_reply(200, Req0), + the_test_case ! {here_is_the_req, self(), Req}, + receive test_case_done -> ok end, + {ok, Req, Opts}; _ -> %% Call stream_body without initiating streaming. cowboy_req:stream_body(<<0:800000>>, fin, Req0), diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index 352b2a04..5105da3d 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -1022,6 +1022,40 @@ stream_body_concurrent(Config) -> {ok, _} = gun:await_body(ConnPid, Ref2, infinity), gun:close(ConnPid). +stream_body_close(Config) -> + doc("Confirm server does not hang in cowboy_req:stream_body/3 if client closes the connection."), + Path = "/resp/stream_body/let_test_stream_body", + ConnPid = gun_open(Config), + %% After sending the first part, we close the connection. After the request + %% process has died, we send the second part. If the handler gets stuck + %% sending on the closed connection, the test will fail. + register(the_test_case, self()), + Ref = gun:get(ConnPid, Path, []), + spawn(fun() -> gun:await(ConnPid, Ref, infinity) end), + receive + {here_is_the_req, HandlerPid, Req} -> + cowboy_req:stream_body(<<"Hello ">>, nofin, Req), + + #{pid := ReqPid} = Req, + ReqMRef = monitor(process, ReqPid), + gun:close(ConnPid), + receive + {'DOWN', ReqMRef, _, _, _} -> ok + end, + + {_, StreamMRef} = spawn_monitor(fun () -> + exit(cowboy_req:stream_body(<<"world">>, nofin, Req)) + end), + receive + {'DOWN', StreamMRef, _, _, _} -> ok + after 5000 -> + ct:fail("Hanging in stream_body/3 on closed connection") + end, + + HandlerPid ! test_case_done + end, + ok. + %% @todo Crash when calling stream_body after the fin flag has been set. %% @todo Crash when calling stream_body after calling reply. %% @todo Crash when calling stream_body before calling stream_reply.