0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Optionally reset the idle timeout when sending a chunk

* The default behavior (sending data is considered being idle) is
  maintained.
* There is a new protocol option for http and http2 which makes
  the idle timeout be reset when data is sent to the client.
This commit is contained in:
Robert J. Macomber 2021-02-08 16:05:05 -08:00
parent 879a6b8bc5
commit 37dd628ead
No known key found for this signature in database
GPG key ID: 8636363F39888735
5 changed files with 172 additions and 6 deletions

View file

@ -29,6 +29,7 @@
env => cowboy_middleware:env(),
http10_keepalive => boolean(),
idle_timeout => timeout(),
reset_idle_on_send => boolean(),
inactivity_timeout => timeout(),
initial_stream_flow_size => non_neg_integer(),
linger_timeout => timeout(),
@ -1083,7 +1084,7 @@ commands(State0=#state{socket=Socket, transport=Transport,
commands(State, StreamID, Tail);
%% Send a response body chunk.
%% @todo We need to kill the stream if it tries to send data before headers.
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState, opts=Opts},
StreamID, [{data, IsFin, Data}|Tail]) ->
%% Do not send anything when the user asks to send an empty
%% data frame, as that would break the protocol.
@ -1134,10 +1135,16 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out
end,
Stream0#stream{local_sent_size=SentSize}
end,
State = case IsFin of
State1 = case IsFin of
fin -> State0#state{out_state=done};
nofin -> State0
end,
State = case maps:get(reset_idle_on_send, Opts, false) of
true ->
set_timeout(State1, idle_timeout);
false ->
State1
end,
Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
commands(State#state{streams=Streams}, StreamID, Tail);
commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},

View file

@ -56,6 +56,7 @@
middlewares => [module()],
preface_timeout => timeout(),
proxy_header => boolean(),
reset_idle_on_send => boolean(),
sendfile => boolean(),
settings_timeout => timeout(),
shutdown_timeout => timeout(),
@ -668,8 +669,14 @@ commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) ->
State = send_headers(State0, StreamID, nofin, StatusCode, Headers),
commands(State, StreamID, Tail);
%% Send a response body chunk.
commands(State0, StreamID, [{data, IsFin, Data}|Tail]) ->
State = maybe_send_data(State0, StreamID, IsFin, Data, []),
commands(State0=#state{opts=Opts}, StreamID, [{data, IsFin, Data}|Tail]) ->
State1 = maybe_send_data(State0, StreamID, IsFin, Data, []),
State = case maps:get(reset_idle_on_send, Opts, false) of
true ->
set_idle_timeout(State1);
false ->
State1
end,
commands(State, StreamID, Tail);
%% Send trailers.
commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->

View file

@ -0,0 +1,20 @@
-module(streamed_result_h).
-export([init/2]).
init(Req, Opts) ->
N = list_to_integer(binary_to_list(cowboy_req:binding(n, Req))),
Interval = list_to_integer(binary_to_list(cowboy_req:binding(interval, Req))),
chunked(N, Interval, Req, Opts).
chunked(N, Interval, Req0, Opts) ->
Req = cowboy_req:stream_reply(200, Req0),
{ok, loop(N, Interval, Req), Opts}.
loop(0, _Interval, Req) ->
ok = cowboy_req:stream_body("Finished!\n", fin, Req),
Req;
loop(N, Interval, Req) ->
ok = cowboy_req:stream_body(iolist_to_binary([integer_to_list(N), <<"\n">>]), nofin, Req),
timer:sleep(Interval),
loop(N-1, Interval, Req).

View file

@ -29,7 +29,8 @@ init_dispatch(_) ->
cowboy_router:compile([{"localhost", [
{"/", hello_h, []},
{"/echo/:key", echo_h, []},
{"/resp_iolist_body", resp_iolist_body_h, []}
{"/resp_iolist_body", resp_iolist_body_h, []},
{"/streamed_result/:n/:interval", streamed_result_h, []}
]}]).
%% Do a prior knowledge handshake (function originally copied from rfc7540_SUITE).
@ -416,3 +417,68 @@ graceful_shutdown_listener_timeout(Config) ->
%% Check that the slow request is aborted.
{error, {stream_error, closed}} = gun:await(ConnPid, Ref),
gun:close(ConnPid).
idle_timeout_on_send(Config) ->
doc("The idle timeout is not reset by default by sending."),
{ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{
env => #{dispatch => init_dispatch(Config)},
idle_timeout => 1000
}),
Port = ranch:get_port(?FUNCTION_NAME),
try
ConnPid = gun_open([{type, tcp}, {protocol, http2}, {port, Port}|Config]),
{ok, http2} = gun:await_up(ConnPid),
#{socket := Socket} = gun:info(ConnPid),
Pid = get_remote_pid_tcp(Socket),
StreamRef = gun:get(ConnPid, "/streamed_result/10/250"),
Ref = erlang:monitor(process, Pid),
receive
{gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} ->
idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, false)
after 2000 ->
error(timeout)
end
after
cowboy:stop_listener(?FUNCTION_NAME)
end.
idle_timeout_reset_on_send(Config) ->
doc("The idle timeout can be made to reset by sending."),
{ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{
env => #{dispatch => init_dispatch(Config)},
idle_timeout => 1000,
reset_idle_on_send => true
}),
Port = ranch:get_port(?FUNCTION_NAME),
try
ConnPid = gun_open([{type, tcp}, {protocol, http2}, {port, Port}|Config]),
{ok, http2} = gun:await_up(ConnPid),
#{socket := Socket} = gun:info(ConnPid),
Pid = get_remote_pid_tcp(Socket),
StreamRef = gun:get(ConnPid, "/streamed_result/10/250"),
Ref = erlang:monitor(process, Pid),
receive
{gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} ->
idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, true)
after 2000 ->
error(timeout)
end
after
cowboy:stop_listener(?FUNCTION_NAME)
end.
idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion) ->
receive
{gun_data, ConnPid, StreamRef, nofin, _Data} ->
idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion);
{gun_data, ConnPid, StreamRef, fin, _Data} when ExpectCompletion ->
ok;
{gun_data, ConnPid, StreamRef, fin, _Data} ->
error(completed);
{'DOWN', Ref, process, Pid, _} when ExpectCompletion ->
error(exited);
{'DOWN', Ref, process, Pid, _} ->
ok
after 2000 ->
error(timeout)
end.

View file

@ -44,7 +44,8 @@ init_dispatch(_) ->
{"/", hello_h, []},
{"/echo/:key", echo_h, []},
{"/resp/:key[/:arg]", resp_h, []},
{"/set_options/:key", set_options_h, []}
{"/set_options/:key", set_options_h, []},
{"/streamed_result/:n/:interval", streamed_result_h, []}
]}]).
chunked_false(Config) ->
@ -251,6 +252,71 @@ idle_timeout_infinity(Config) ->
cowboy:stop_listener(?FUNCTION_NAME)
end.
idle_timeout_on_send(Config) ->
doc("The idle timeout is not reset by default by sending."),
{ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{
env => #{dispatch => init_dispatch(Config)},
idle_timeout => 1000
}),
Port = ranch:get_port(?FUNCTION_NAME),
try
ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
{ok, http} = gun:await_up(ConnPid),
#{socket := Socket} = gun:info(ConnPid),
Pid = get_remote_pid_tcp(Socket),
StreamRef = gun:get(ConnPid, "/streamed_result/10/250"),
Ref = erlang:monitor(process, Pid),
receive
{gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} ->
idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, false)
after 2000 ->
error(timeout)
end
after
cowboy:stop_listener(?FUNCTION_NAME)
end.
idle_timeout_reset_on_send(Config) ->
doc("The idle timeout can be made to reset by sending."),
{ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{
env => #{dispatch => init_dispatch(Config)},
idle_timeout => 1000,
reset_idle_on_send => true
}),
Port = ranch:get_port(?FUNCTION_NAME),
try
ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
{ok, http} = gun:await_up(ConnPid),
#{socket := Socket} = gun:info(ConnPid),
Pid = get_remote_pid_tcp(Socket),
StreamRef = gun:get(ConnPid, "/streamed_result/10/250"),
Ref = erlang:monitor(process, Pid),
receive
{gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} ->
idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, true)
after 2000 ->
error(timeout)
end
after
cowboy:stop_listener(?FUNCTION_NAME)
end.
idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion) ->
receive
{gun_data, ConnPid, StreamRef, nofin, _Data} ->
idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion);
{gun_data, ConnPid, StreamRef, fin, _Data} when ExpectCompletion ->
ok;
{gun_data, ConnPid, StreamRef, fin, _Data} ->
error(completed);
{'DOWN', Ref, process, Pid, _} when ExpectCompletion ->
error(exited);
{'DOWN', Ref, process, Pid, _} ->
ok
after 2000 ->
error(timeout)
end.
persistent_term_router(Config) ->
doc("The router can retrieve the routes from persistent_term storage."),
case erlang:function_exported(persistent_term, get, 1) of