diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index c9bceed8..3213dd57 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -29,6 +29,7 @@ env => cowboy_middleware:env(), http10_keepalive => boolean(), idle_timeout => timeout(), + reset_idle_on_send => boolean(), inactivity_timeout => timeout(), initial_stream_flow_size => non_neg_integer(), linger_timeout => timeout(), @@ -1083,7 +1084,7 @@ commands(State0=#state{socket=Socket, transport=Transport, commands(State, StreamID, Tail); %% Send a response body chunk. %% @todo We need to kill the stream if it tries to send data before headers. -commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState}, +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState, opts=Opts}, StreamID, [{data, IsFin, Data}|Tail]) -> %% Do not send anything when the user asks to send an empty %% data frame, as that would break the protocol. @@ -1134,10 +1135,16 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out end, Stream0#stream{local_sent_size=SentSize} end, - State = case IsFin of + State1 = case IsFin of fin -> State0#state{out_state=done}; nofin -> State0 end, + State = case maps:get(reset_idle_on_send, Opts, false) of + true -> + set_timeout(State1, idle_timeout); + false -> + State1 + end, Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream), commands(State#state{streams=Streams}, StreamID, Tail); commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState}, diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 7440d910..b1fb354a 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -56,6 +56,7 @@ middlewares => [module()], preface_timeout => timeout(), proxy_header => boolean(), + reset_idle_on_send => boolean(), sendfile => boolean(), settings_timeout => timeout(), shutdown_timeout => timeout(), @@ -668,8 +669,14 @@ commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) -> State = send_headers(State0, StreamID, nofin, StatusCode, Headers), commands(State, StreamID, Tail); %% Send a response body chunk. -commands(State0, StreamID, [{data, IsFin, Data}|Tail]) -> - State = maybe_send_data(State0, StreamID, IsFin, Data, []), +commands(State0=#state{opts=Opts}, StreamID, [{data, IsFin, Data}|Tail]) -> + State1 = maybe_send_data(State0, StreamID, IsFin, Data, []), + State = case maps:get(reset_idle_on_send, Opts, false) of + true -> + set_idle_timeout(State1); + false -> + State1 + end, commands(State, StreamID, Tail); %% Send trailers. commands(State0, StreamID, [{trailers, Trailers}|Tail]) -> diff --git a/test/handlers/streamed_result_h.erl b/test/handlers/streamed_result_h.erl new file mode 100644 index 00000000..ea6f492c --- /dev/null +++ b/test/handlers/streamed_result_h.erl @@ -0,0 +1,20 @@ +-module(streamed_result_h). + +-export([init/2]). + +init(Req, Opts) -> + N = list_to_integer(binary_to_list(cowboy_req:binding(n, Req))), + Interval = list_to_integer(binary_to_list(cowboy_req:binding(interval, Req))), + chunked(N, Interval, Req, Opts). + +chunked(N, Interval, Req0, Opts) -> + Req = cowboy_req:stream_reply(200, Req0), + {ok, loop(N, Interval, Req), Opts}. + +loop(0, _Interval, Req) -> + ok = cowboy_req:stream_body("Finished!\n", fin, Req), + Req; +loop(N, Interval, Req) -> + ok = cowboy_req:stream_body(iolist_to_binary([integer_to_list(N), <<"\n">>]), nofin, Req), + timer:sleep(Interval), + loop(N-1, Interval, Req). diff --git a/test/http2_SUITE.erl b/test/http2_SUITE.erl index fe6325d0..fdf629a4 100644 --- a/test/http2_SUITE.erl +++ b/test/http2_SUITE.erl @@ -29,7 +29,8 @@ init_dispatch(_) -> cowboy_router:compile([{"localhost", [ {"/", hello_h, []}, {"/echo/:key", echo_h, []}, - {"/resp_iolist_body", resp_iolist_body_h, []} + {"/resp_iolist_body", resp_iolist_body_h, []}, + {"/streamed_result/:n/:interval", streamed_result_h, []} ]}]). %% Do a prior knowledge handshake (function originally copied from rfc7540_SUITE). @@ -416,3 +417,68 @@ graceful_shutdown_listener_timeout(Config) -> %% Check that the slow request is aborted. {error, {stream_error, closed}} = gun:await(ConnPid, Ref), gun:close(ConnPid). + +idle_timeout_on_send(Config) -> + doc("The idle timeout is not reset by default by sending."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{ + env => #{dispatch => init_dispatch(Config)}, + idle_timeout => 1000 + }), + Port = ranch:get_port(?FUNCTION_NAME), + try + ConnPid = gun_open([{type, tcp}, {protocol, http2}, {port, Port}|Config]), + {ok, http2} = gun:await_up(ConnPid), + #{socket := Socket} = gun:info(ConnPid), + Pid = get_remote_pid_tcp(Socket), + StreamRef = gun:get(ConnPid, "/streamed_result/10/250"), + Ref = erlang:monitor(process, Pid), + receive + {gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} -> + idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, false) + after 2000 -> + error(timeout) + end + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +idle_timeout_reset_on_send(Config) -> + doc("The idle timeout can be made to reset by sending."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{ + env => #{dispatch => init_dispatch(Config)}, + idle_timeout => 1000, + reset_idle_on_send => true + }), + Port = ranch:get_port(?FUNCTION_NAME), + try + ConnPid = gun_open([{type, tcp}, {protocol, http2}, {port, Port}|Config]), + {ok, http2} = gun:await_up(ConnPid), + #{socket := Socket} = gun:info(ConnPid), + Pid = get_remote_pid_tcp(Socket), + StreamRef = gun:get(ConnPid, "/streamed_result/10/250"), + Ref = erlang:monitor(process, Pid), + receive + {gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} -> + idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, true) + after 2000 -> + error(timeout) + end + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion) -> + receive + {gun_data, ConnPid, StreamRef, nofin, _Data} -> + idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion); + {gun_data, ConnPid, StreamRef, fin, _Data} when ExpectCompletion -> + ok; + {gun_data, ConnPid, StreamRef, fin, _Data} -> + error(completed); + {'DOWN', Ref, process, Pid, _} when ExpectCompletion -> + error(exited); + {'DOWN', Ref, process, Pid, _} -> + ok + after 2000 -> + error(timeout) + end. diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl index d0c92e45..d65e059b 100644 --- a/test/http_SUITE.erl +++ b/test/http_SUITE.erl @@ -44,7 +44,8 @@ init_dispatch(_) -> {"/", hello_h, []}, {"/echo/:key", echo_h, []}, {"/resp/:key[/:arg]", resp_h, []}, - {"/set_options/:key", set_options_h, []} + {"/set_options/:key", set_options_h, []}, + {"/streamed_result/:n/:interval", streamed_result_h, []} ]}]). chunked_false(Config) -> @@ -251,6 +252,71 @@ idle_timeout_infinity(Config) -> cowboy:stop_listener(?FUNCTION_NAME) end. +idle_timeout_on_send(Config) -> + doc("The idle timeout is not reset by default by sending."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{ + env => #{dispatch => init_dispatch(Config)}, + idle_timeout => 1000 + }), + Port = ranch:get_port(?FUNCTION_NAME), + try + ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]), + {ok, http} = gun:await_up(ConnPid), + #{socket := Socket} = gun:info(ConnPid), + Pid = get_remote_pid_tcp(Socket), + StreamRef = gun:get(ConnPid, "/streamed_result/10/250"), + Ref = erlang:monitor(process, Pid), + receive + {gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} -> + idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, false) + after 2000 -> + error(timeout) + end + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +idle_timeout_reset_on_send(Config) -> + doc("The idle timeout can be made to reset by sending."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{ + env => #{dispatch => init_dispatch(Config)}, + idle_timeout => 1000, + reset_idle_on_send => true + }), + Port = ranch:get_port(?FUNCTION_NAME), + try + ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]), + {ok, http} = gun:await_up(ConnPid), + #{socket := Socket} = gun:info(ConnPid), + Pid = get_remote_pid_tcp(Socket), + StreamRef = gun:get(ConnPid, "/streamed_result/10/250"), + Ref = erlang:monitor(process, Pid), + receive + {gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} -> + idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, true) + after 2000 -> + error(timeout) + end + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion) -> + receive + {gun_data, ConnPid, StreamRef, nofin, _Data} -> + idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion); + {gun_data, ConnPid, StreamRef, fin, _Data} when ExpectCompletion -> + ok; + {gun_data, ConnPid, StreamRef, fin, _Data} -> + error(completed); + {'DOWN', Ref, process, Pid, _} when ExpectCompletion -> + error(exited); + {'DOWN', Ref, process, Pid, _} -> + ok + after 2000 -> + error(timeout) + end. + persistent_term_router(Config) -> doc("The router can retrieve the routes from persistent_term storage."), case erlang:function_exported(persistent_term, get, 1) of