mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Graceful shutdown
Note: This commit makes cowboy depend on cowlib master. Graceful shutdown for HTTP/2: 1. A GOAWAY frame with the last stream id set to 2^31-1 is sent and a timer is started (goaway_initial_timeout, default 1000ms), to wait for any in-flight requests sent by the client, and the status is set to 'closing_initiated'. If the client responds with GOAWAY and closes the connection, we're done. 2. A second GOAWAY frame is sent with the actual last stream id and the status is set to 'closing'. If no streams exist, the connection terminates. Otherwise a second timer (goaway_complete_timeout, default 3000ms) is started, to wait for the streams to complete. New streams are not accepted when status is 'closing'. 3. If all streams haven't completed after the second timeout, the connection is forcefully terminated. Graceful shutdown for HTTP/1.x: 1. If a request is currently being handled, it is waited for and the response is sent back to the client with the header "Connection: close". Then, the connection is closed. 2. If the current request handler is not finished within the time configured in transport option 'shutdown' (default 5000ms), the connection process is killed by its supervisor (ranch). Implemented for HTTP/1.x and HTTP/2 in the following scenarios: * When receiving exit signal 'shutdown' from the supervisor (e.g. when cowboy:stop_listener/3 is called). * When a connection process is requested to terminate using sys:terminate/2,3.
This commit is contained in:
parent
fa9c8ad832
commit
6f0c5260af
9 changed files with 393 additions and 42 deletions
2
Makefile
2
Makefile
|
@ -15,7 +15,7 @@ CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl
|
|||
LOCAL_DEPS = crypto
|
||||
|
||||
DEPS = cowlib ranch
|
||||
dep_cowlib = git https://github.com/ninenines/cowlib 2.9.1
|
||||
dep_cowlib = git https://github.com/Nordix/cowlib last-remote-streamid
|
||||
dep_ranch = git https://github.com/ninenines/ranch 1.7.1
|
||||
|
||||
DOC_DEPS = asciideck
|
||||
|
|
|
@ -22,6 +22,8 @@ opts() :: #{
|
|||
connection_window_margin_size => 0..16#7fffffff,
|
||||
connection_window_update_threshold => 0..16#7fffffff,
|
||||
enable_connect_protocol => boolean(),
|
||||
goaway_initial_timeout => timeout(),
|
||||
goaway_complete_timeout => timeout(),
|
||||
idle_timeout => timeout(),
|
||||
inactivity_timeout => timeout(),
|
||||
initial_connection_window_size => 65535..16#7fffffff,
|
||||
|
@ -92,6 +94,16 @@ Whether to enable the extended CONNECT method to allow
|
|||
protocols like Websocket to be used over an HTTP/2 stream.
|
||||
This option is experimental and disabled by default.
|
||||
|
||||
goaway_initial_timeout (1000)::
|
||||
|
||||
Time in ms to wait for any in-flight stream creations before stopping to accept
|
||||
new streams on an existing connection during a graceful shutdown.
|
||||
|
||||
goaway_complete_timeout (3000)::
|
||||
|
||||
Time in ms to wait for ongoing streams to complete before closing the connection
|
||||
during a graceful shutdown.
|
||||
|
||||
idle_timeout (60000)::
|
||||
|
||||
Time in ms with no data received before Cowboy closes the connection.
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{deps, [
|
||||
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.9.1"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.7.1"}}
|
||||
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","master"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.7.1"}}
|
||||
]}.
|
||||
{erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard,warn_missing_spec,warn_untyped_record]}.
|
||||
|
|
|
@ -245,6 +245,9 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
|
|||
{timeout, _, _} ->
|
||||
loop(State);
|
||||
%% System messages.
|
||||
{'EXIT', Parent, shutdown} ->
|
||||
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
|
||||
loop(initiate_closing(State, Reason));
|
||||
{'EXIT', Parent, Reason} ->
|
||||
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
|
||||
{system, From, Request} ->
|
||||
|
@ -1440,6 +1443,13 @@ early_error(StatusCode0, #state{socket=Socket, transport=Transport,
|
|||
end,
|
||||
ok.
|
||||
|
||||
initiate_closing(State=#state{streams=[]}, Reason) ->
|
||||
terminate(State, Reason);
|
||||
initiate_closing(State=#state{streams=[_Stream|Streams],
|
||||
out_streamid=OutStreamID}, Reason) ->
|
||||
terminate_all_streams(State, Streams, Reason),
|
||||
State#state{last_streamid = OutStreamID}.
|
||||
|
||||
-spec terminate(_, _) -> no_return().
|
||||
terminate(undefined, Reason) ->
|
||||
exit({shutdown, Reason});
|
||||
|
@ -1503,9 +1513,10 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
|
|||
system_continue(_, _, State) ->
|
||||
loop(State).
|
||||
|
||||
-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
|
||||
system_terminate(Reason, _, _, State) ->
|
||||
terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).
|
||||
-spec system_terminate(any(), _, _, #state{}) -> no_return().
|
||||
system_terminate(Reason0, _, _, State) ->
|
||||
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
|
||||
loop(initiate_closing(State, Reason)).
|
||||
|
||||
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
|
||||
system_code_change(Misc, _, _, _) ->
|
||||
|
|
|
@ -31,6 +31,8 @@
|
|||
connection_window_update_threshold => 0..16#7fffffff,
|
||||
enable_connect_protocol => boolean(),
|
||||
env => cowboy_middleware:env(),
|
||||
goaway_initial_timeout => timeout(),
|
||||
goaway_complete_timeout => timeout(),
|
||||
idle_timeout => timeout(),
|
||||
inactivity_timeout => timeout(),
|
||||
initial_connection_window_size => 65535..16#7fffffff,
|
||||
|
@ -88,7 +90,7 @@
|
|||
proxy_header :: undefined | ranch_proxy_header:proxy_info(),
|
||||
opts = #{} :: opts(),
|
||||
|
||||
%% Timer for idle_timeout.
|
||||
%% Timer for idle_timeout; also used for goaway timers.
|
||||
timer = undefined :: undefined | reference(),
|
||||
|
||||
%% Remote address and port for the connection.
|
||||
|
@ -101,7 +103,7 @@
|
|||
cert :: undefined | binary(),
|
||||
|
||||
%% HTTP/2 state machine.
|
||||
http2_status :: sequence | settings | upgrade | connected | closing,
|
||||
http2_status :: sequence | settings | upgrade | connected | closing_initiated | closing,
|
||||
http2_machine :: cow_http2_machine:http2_machine(),
|
||||
|
||||
%% HTTP/2 frame rate flood protection.
|
||||
|
@ -160,7 +162,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
|
|||
binary() | undefined, binary()) -> ok.
|
||||
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
|
||||
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
|
||||
State = set_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
|
||||
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
|
||||
transport=Transport, proxy_header=ProxyHeader,
|
||||
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
||||
http2_status=sequence, http2_machine=HTTP2Machine})),
|
||||
|
@ -205,7 +207,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
|
|||
<<"connection">> => <<"Upgrade">>,
|
||||
<<"upgrade">> => <<"h2c">>
|
||||
}, ?MODULE, undefined}), %% @todo undefined or #{}?
|
||||
State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
|
||||
State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
|
||||
Transport:send(Socket, Preface),
|
||||
setopts_active(State),
|
||||
case Buffer of
|
||||
|
@ -227,9 +229,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|||
receive
|
||||
%% Socket messages.
|
||||
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
||||
parse(set_timeout(State), << Buffer/binary, Data/binary >>);
|
||||
parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>);
|
||||
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
||||
terminate(State, {socket_error, closed, 'The socket has been closed.'});
|
||||
Reason = case State#state.http2_status of
|
||||
closing -> {stop, closed, 'The client is going away.'};
|
||||
_ -> {socket_error, closed, 'The socket has been closed.'}
|
||||
end,
|
||||
terminate(State, Reason);
|
||||
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
||||
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
|
||||
{Passive, Socket} when Passive =:= element(4, Messages);
|
||||
|
@ -238,8 +244,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|||
setopts_active(State),
|
||||
loop(State, Buffer);
|
||||
%% System messages.
|
||||
{'EXIT', Parent, shutdown} ->
|
||||
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
|
||||
loop(initiate_closing(State, Reason), Buffer);
|
||||
{'EXIT', Parent, Reason} ->
|
||||
%% @todo Graceful shutdown here as well?
|
||||
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
|
||||
{system, From, Request} ->
|
||||
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
|
||||
|
@ -252,6 +260,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|||
loop(State, Buffer);
|
||||
{timeout, TRef, {cow_http2_machine, Name}} ->
|
||||
loop(timeout(State, Name, TRef), Buffer);
|
||||
{timeout, TimerRef, {goaway_initial_timeout, Reason}} ->
|
||||
loop(closing(State, Reason), Buffer);
|
||||
{timeout, TimerRef, {goaway_complete_timeout, Reason}} ->
|
||||
terminate(State, {stop, stop_reason(Reason),
|
||||
'Graceful shutdown timed out.'});
|
||||
%% Messages pertaining to a stream.
|
||||
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
||||
loop(info(State, StreamID, Msg), Buffer);
|
||||
|
@ -269,14 +282,21 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|||
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
|
||||
end.
|
||||
|
||||
set_timeout(State=#state{opts=Opts, timer=TimerRef0}) ->
|
||||
set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef})
|
||||
when Status =:= closing_initiated orelse Status =:= closing,
|
||||
TimerRef =/= undefined ->
|
||||
State;
|
||||
set_idle_timeout(State=#state{opts=Opts}) ->
|
||||
set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout).
|
||||
|
||||
set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
|
||||
ok = case TimerRef0 of
|
||||
undefined -> ok;
|
||||
_ -> erlang:cancel_timer(TimerRef0, [{async, true}, {info, false}])
|
||||
end,
|
||||
TimerRef = case maps:get(idle_timeout, Opts, 60000) of
|
||||
TimerRef = case Timeout of
|
||||
infinity -> undefined;
|
||||
Timeout -> erlang:start_timer(Timeout, self(), idle_timeout)
|
||||
Timeout -> erlang:start_timer(Timeout, self(), Message)
|
||||
end,
|
||||
State#state{timer=TimerRef}.
|
||||
|
||||
|
@ -567,18 +587,24 @@ timeout(State=#state{http2_machine=HTTP2Machine0}, Name, TRef) ->
|
|||
|
||||
%% Erlang messages.
|
||||
|
||||
down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
||||
case cowboy_children:down(Children0, Pid) of
|
||||
down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
||||
State = case cowboy_children:down(Children0, Pid) of
|
||||
%% The stream was terminated already.
|
||||
{ok, undefined, Children} ->
|
||||
State#state{children=Children};
|
||||
State0#state{children=Children};
|
||||
%% The stream is still running.
|
||||
{ok, StreamID, Children} ->
|
||||
info(State#state{children=Children}, StreamID, Msg);
|
||||
info(State0#state{children=Children}, StreamID, Msg);
|
||||
%% The process was unknown.
|
||||
error ->
|
||||
cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
|
||||
[Msg, Pid], Opts),
|
||||
State0
|
||||
end,
|
||||
if
|
||||
State#state.http2_status =:= closing, State#state.streams =:= #{} ->
|
||||
terminate(State, {stop, normal, 'The connection is going away.'});
|
||||
true ->
|
||||
State
|
||||
end.
|
||||
|
||||
|
@ -909,19 +935,21 @@ stream_alarm(State, StreamID, Name, Value) ->
|
|||
%% We may have to cancel streams even if we receive multiple
|
||||
%% GOAWAY frames as the LastStreamID value may be lower than
|
||||
%% the one previously received.
|
||||
goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
|
||||
goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0,
|
||||
http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _})
|
||||
when Status =:= connected; Status =:= closing ->
|
||||
when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
|
||||
Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID,
|
||||
{stop, {goaway, Reason}, 'The connection is going away.'}, []),
|
||||
State = State0#state{streams=maps:from_list(Streams)},
|
||||
case Status of
|
||||
connected ->
|
||||
if
|
||||
Status =:= connected; Status =:= closing_initiated ->
|
||||
{OurLastStreamID, HTTP2Machine} =
|
||||
cow_http2_machine:set_last_streamid(HTTP2Machine0),
|
||||
Transport:send(Socket, cow_http2:goaway(
|
||||
cow_http2_machine:get_last_streamid(HTTP2Machine),
|
||||
no_error, <<>>)),
|
||||
State#state{http2_status=closing};
|
||||
_ ->
|
||||
OurLastStreamID, no_error, <<>>)),
|
||||
State#state{http2_status=closing,
|
||||
http2_machine=HTTP2Machine};
|
||||
true ->
|
||||
State
|
||||
end;
|
||||
%% We terminate the connection immediately if it hasn't fully been initialized.
|
||||
|
@ -938,21 +966,65 @@ goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamI
|
|||
goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) ->
|
||||
goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]).
|
||||
|
||||
%% A server that is attempting to gracefully shut down a connection SHOULD send
|
||||
%% an initial GOAWAY frame with the last stream identifier set to 2^31-1 and a
|
||||
%% NO_ERROR code. This signals to the client that a shutdown is imminent and
|
||||
%% that initiating further requests is prohibited. After allowing time for any
|
||||
%% in-flight stream creation (at least one round-trip time), the server can send
|
||||
%% another GOAWAY frame with an updated last stream identifier. This ensures
|
||||
%% that a connection can be cleanly shut down without losing requests.
|
||||
-spec initiate_closing(#state{}, _) -> #state{}.
|
||||
initiate_closing(State=#state{http2_status=connected, socket=Socket,
|
||||
transport=Transport, opts=Opts}, Reason) ->
|
||||
Transport:send(Socket, cow_http2:goaway(16#7fffffff, no_error, <<>>)),
|
||||
Timeout = maps:get(goaway_initial_timeout, Opts, 1000),
|
||||
Message = {goaway_initial_timeout, Reason},
|
||||
set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message);
|
||||
initiate_closing(State=#state{http2_status=Status}, _Reason)
|
||||
when Status =:= closing_initiated; Status =:= closing ->
|
||||
%% This happens if sys:terminate/2,3 is called twice or if the supervisor
|
||||
%% tells us to shutdown after sys:terminate/2,3 is called or vice versa.
|
||||
State;
|
||||
initiate_closing(State, Reason) ->
|
||||
terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}).
|
||||
|
||||
%% Switch to 'closing' state and stop accepting new streams.
|
||||
-spec closing(#state{}, Reason :: term()) -> #state{}.
|
||||
closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} ->
|
||||
terminate(State, Reason);
|
||||
closing(State=#state{http2_status=closing_initiated,
|
||||
http2_machine=HTTP2Machine0, socket=Socket, transport=Transport},
|
||||
Reason) ->
|
||||
%% Stop accepting new streams.
|
||||
{LastStreamID, HTTP2Machine} =
|
||||
cow_http2_machine:set_last_streamid(HTTP2Machine0),
|
||||
Transport:send(Socket, cow_http2:goaway(LastStreamID, no_error, <<>>)),
|
||||
closing(State#state{http2_status=closing, http2_machine=HTTP2Machine}, Reason);
|
||||
closing(State=#state{http2_status=closing, opts=Opts}, Reason) ->
|
||||
%% If client sent GOAWAY, we may already be in 'closing' but without the
|
||||
%% goaway complete timeout set.
|
||||
Timeout = maps:get(goaway_complete_timeout, Opts, 3000),
|
||||
Message = {goaway_complete_timeout, Reason},
|
||||
set_timeout(State, Timeout, Message).
|
||||
|
||||
stop_reason({stop, Reason, _}) -> Reason;
|
||||
stop_reason(Reason) -> Reason.
|
||||
|
||||
-spec terminate(#state{}, _) -> no_return().
|
||||
terminate(undefined, Reason) ->
|
||||
exit({shutdown, Reason});
|
||||
terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
|
||||
http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason)
|
||||
when Status =:= connected; Status =:= closing ->
|
||||
when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
|
||||
%% @todo We might want to optionally send the Reason value
|
||||
%% as debug data in the GOAWAY frame here. Perhaps more.
|
||||
case Status of
|
||||
connected ->
|
||||
if
|
||||
Status =:= connected; Status =:= closing_initiated ->
|
||||
Transport:send(Socket, cow_http2:goaway(
|
||||
cow_http2_machine:get_last_streamid(HTTP2Machine),
|
||||
terminate_reason(Reason), <<>>));
|
||||
%% We already sent the GOAWAY frame.
|
||||
closing ->
|
||||
Status =:= closing ->
|
||||
ok
|
||||
end,
|
||||
terminate_all_streams(State, maps:to_list(Streams), Reason),
|
||||
|
@ -1134,9 +1206,9 @@ system_continue(_, _, {State, Buffer}) ->
|
|||
loop(State, Buffer).
|
||||
|
||||
-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
|
||||
system_terminate(Reason, _, _, {State, _}) ->
|
||||
%% @todo Graceful shutdown here as well?
|
||||
terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).
|
||||
system_terminate(Reason0, _, _, {State, Buffer}) ->
|
||||
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
|
||||
loop(initiate_closing(State, Reason), Buffer).
|
||||
|
||||
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
|
||||
system_code_change(Misc, _, _, _) ->
|
||||
|
|
|
@ -4,6 +4,14 @@
|
|||
|
||||
-export([init/2]).
|
||||
|
||||
init(Req, Delay) ->
|
||||
init(Req, Delay) when is_integer(Delay) ->
|
||||
init(Req, #{delay => Delay});
|
||||
init(Req, Opts=#{delay := Delay}) ->
|
||||
_ = case Opts of
|
||||
#{notify_received := Pid} ->
|
||||
Pid ! {request_received, maps:get(path, Req)};
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
timer:sleep(Delay),
|
||||
{ok, cowboy_req:reply(200, #{}, <<"Hello world!">>, Req), Delay}.
|
||||
|
|
|
@ -29,6 +29,8 @@ init_dispatch(_) ->
|
|||
cowboy_router:compile([{"localhost", [
|
||||
{"/", hello_h, []},
|
||||
{"/echo/:key", echo_h, []},
|
||||
{"/delay_hello", delay_hello_h, 500},
|
||||
{"/long_delay_hello", delay_hello_h, 5000},
|
||||
{"/resp_iolist_body", resp_iolist_body_h, []}
|
||||
]}]).
|
||||
|
||||
|
@ -284,3 +286,127 @@ settings_timeout_infinity(Config) ->
|
|||
after
|
||||
cowboy:stop_listener(?FUNCTION_NAME)
|
||||
end.
|
||||
|
||||
graceful_shutdown_connection(Config) ->
|
||||
doc("Check that ongoing requests are handled before gracefully shutting down a connection."),
|
||||
ProtoOpts = #{
|
||||
env => #{dispatch => init_dispatch(Config)}
|
||||
},
|
||||
{ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], ProtoOpts),
|
||||
Port = ranch:get_port(?FUNCTION_NAME),
|
||||
try
|
||||
ConnPid = gun_open([{type, tcp}, {protocol, http2}, {port, Port}|Config]),
|
||||
Ref = gun:get(ConnPid, "/delay_hello"),
|
||||
%% Make sure the request is received. The handler sleeps for 500ms.
|
||||
timer:sleep(100),
|
||||
%% Tell the connection to shutdown while the handler is working.
|
||||
[CowboyConnPid] = ranch:procs(?FUNCTION_NAME, connections),
|
||||
monitor(process, CowboyConnPid),
|
||||
ok = sys:terminate(CowboyConnPid, goaway),
|
||||
%% Check that the response is sent to the client before the
|
||||
%% connection goes down.
|
||||
{response, nofin, 200, _RespHeaders} = gun:await(ConnPid, Ref),
|
||||
{ok, RespBody} = gun:await_body(ConnPid, Ref),
|
||||
<<"Hello world!">> = iolist_to_binary(RespBody),
|
||||
%% Check that the connection is gone soon afterwards. (The exit
|
||||
%% reason is supposed to be 'goaway' as passed to
|
||||
%% sys:terminate/2, but it is {shutdown, closed}.)
|
||||
receive
|
||||
{'DOWN', _, process, CowboyConnPid, _Reason} ->
|
||||
ok
|
||||
end,
|
||||
[] = ranch:procs(?FUNCTION_NAME, connections),
|
||||
gun:close(ConnPid)
|
||||
after
|
||||
cowboy:stop_listener(?FUNCTION_NAME)
|
||||
end.
|
||||
|
||||
graceful_shutdown_timeout(Config) ->
|
||||
doc("Check that a connection is closed when gracefully shutting down times out."),
|
||||
ProtoOpts = #{
|
||||
env => #{dispatch => init_dispatch(Config)},
|
||||
goaway_initial_timeout => 200,
|
||||
goaway_complete_timeout => 500
|
||||
},
|
||||
{ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], ProtoOpts),
|
||||
Port = ranch:get_port(?FUNCTION_NAME),
|
||||
try
|
||||
ConnPid = gun_open([{type, tcp}, {protocol, http2}, {port, Port}|Config]),
|
||||
Ref = gun:get(ConnPid, "/long_delay_hello"),
|
||||
%% Make sure the request is received.
|
||||
timer:sleep(100),
|
||||
%% Tell the connection to shutdown while the handler is working.
|
||||
[CowboyConnPid] = ranch:procs(?FUNCTION_NAME, connections),
|
||||
monitor(process, CowboyConnPid),
|
||||
ok = sys:terminate(CowboyConnPid, goaway),
|
||||
%% Check that connection didn't wait for the slow handler.
|
||||
{error, {stream_error, closed}} = gun:await(ConnPid, Ref),
|
||||
%% Check that the connection is gone. (The exit reason is
|
||||
%% supposed to be 'goaway' as passed to sys:terminate/2, but it
|
||||
%% is {shutdown, {stop, {exit, goaway}, 'Graceful shutdown timed
|
||||
%% out.'}}.)
|
||||
receive
|
||||
{'DOWN', _, process, CowboyConnPid, _Reason} ->
|
||||
ok
|
||||
after 100 ->
|
||||
error(still_alive)
|
||||
end,
|
||||
[] = ranch:procs(?FUNCTION_NAME, connections),
|
||||
gun:close(ConnPid)
|
||||
after
|
||||
cowboy:stop_listener(?FUNCTION_NAME)
|
||||
end.
|
||||
|
||||
graceful_shutdown_listener(Config) ->
|
||||
doc("Check that connections are shut down gracefully when stopping a listener."),
|
||||
Dispatch = cowboy_router:compile([{"localhost", [
|
||||
{"/delay_hello", delay_hello_h,
|
||||
#{delay => 500, notify_received => self()}}
|
||||
]}]),
|
||||
ProtoOpts = #{
|
||||
env => #{dispatch => Dispatch}
|
||||
},
|
||||
{ok, Listener} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], ProtoOpts),
|
||||
Port = ranch:get_port(?FUNCTION_NAME),
|
||||
ConnPid = gun_open([{type, tcp}, {protocol, http2}, {port, Port}|Config]),
|
||||
Ref = gun:get(ConnPid, "/delay_hello"),
|
||||
%% Shutdown listener while the handlers are working.
|
||||
receive {request_received, <<"/delay_hello">>} -> ok end,
|
||||
ListenerMonitorRef = monitor(process, Listener),
|
||||
ok = cowboy:stop_listener(?FUNCTION_NAME),
|
||||
receive
|
||||
{'DOWN', ListenerMonitorRef, process, Listener, _Reason} ->
|
||||
ok
|
||||
end,
|
||||
%% Check that the request is handled before shutting down.
|
||||
{response, nofin, 200, _RespHeaders} = gun:await(ConnPid, Ref),
|
||||
{ok, RespBody} = gun:await_body(ConnPid, Ref),
|
||||
<<"Hello world!">> = iolist_to_binary(RespBody),
|
||||
gun:close(ConnPid).
|
||||
|
||||
graceful_shutdown_listener_timeout(Config) ->
|
||||
doc("Check that connections are shut down when gracefully stopping a listener times out."),
|
||||
Dispatch = cowboy_router:compile([{"localhost", [
|
||||
{"/long_delay_hello", delay_hello_h,
|
||||
#{delay => 10000, notify_received => self()}}
|
||||
]}]),
|
||||
ProtoOpts = #{
|
||||
env => #{dispatch => Dispatch},
|
||||
goaway_initial_timeout => 200,
|
||||
goaway_complete_timeout => 500
|
||||
},
|
||||
{ok, Listener} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], ProtoOpts),
|
||||
Port = ranch:get_port(?FUNCTION_NAME),
|
||||
ConnPid = gun_open([{type, tcp}, {protocol, http2}, {port, Port}|Config]),
|
||||
Ref = gun:get(ConnPid, "/long_delay_hello"),
|
||||
%% Shutdown listener while the handlers are working.
|
||||
receive {request_received, <<"/long_delay_hello">>} -> ok end,
|
||||
ListenerMonitorRef = monitor(process, Listener),
|
||||
ok = cowboy:stop_listener(?FUNCTION_NAME),
|
||||
receive
|
||||
{'DOWN', ListenerMonitorRef, process, Listener, _Reason} ->
|
||||
ok
|
||||
end,
|
||||
%% Check that the slow request is aborted.
|
||||
{error, {stream_error, closed}} = gun:await(ConnPid, Ref),
|
||||
gun:close(ConnPid).
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
-import(ct_helper, [doc/1]).
|
||||
-import(ct_helper, [get_remote_pid_tcp/1]).
|
||||
-import(cowboy_test, [gun_open/1]).
|
||||
-import(cowboy_test, [gun_down/1]).
|
||||
-import(cowboy_test, [raw_open/1]).
|
||||
-import(cowboy_test, [raw_send/2]).
|
||||
-import(cowboy_test, [raw_recv_head/1]).
|
||||
|
@ -41,6 +42,8 @@ end_per_group(Name, _) ->
|
|||
init_dispatch(_) ->
|
||||
cowboy_router:compile([{"localhost", [
|
||||
{"/", hello_h, []},
|
||||
{"/delay_hello", delay_hello_h, 500},
|
||||
{"/long_delay_hello", delay_hello_h, 10000},
|
||||
{"/echo/:key", echo_h, []},
|
||||
{"/resp/:key[/:arg]", resp_h, []},
|
||||
{"/set_options/:key", set_options_h, []}
|
||||
|
@ -443,3 +446,73 @@ switch_protocol_flush(Config) ->
|
|||
after
|
||||
cowboy:stop_listener(?FUNCTION_NAME)
|
||||
end.
|
||||
|
||||
graceful_shutdown_connection(Config) ->
|
||||
doc("Check that the current request is handled before gracefully "
|
||||
"shutting down a connection."),
|
||||
Dispatch = cowboy_router:compile([{"localhost", [
|
||||
{"/delay_hello", delay_hello_h,
|
||||
#{delay => 500, notify_received => self()}},
|
||||
{"/long_delay_hello", delay_hello_h,
|
||||
#{delay => 10000, notify_received => self()}}
|
||||
]}]),
|
||||
ProtoOpts = #{
|
||||
env => #{dispatch => Dispatch}
|
||||
},
|
||||
{ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], ProtoOpts),
|
||||
Port = ranch:get_port(?FUNCTION_NAME),
|
||||
try
|
||||
ConnPid = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
|
||||
{ok, http} = gun:await_up(ConnPid),
|
||||
#{socket := Socket} = gun:info(ConnPid),
|
||||
CowboyConnPid = get_remote_pid_tcp(Socket),
|
||||
CowboyConnRef = erlang:monitor(process, CowboyConnPid),
|
||||
Ref1 = gun:get(ConnPid, "/delay_hello"),
|
||||
Ref2 = gun:get(ConnPid, "/delay_hello"),
|
||||
receive {request_received, <<"/delay_hello">>} -> ok end,
|
||||
receive {request_received, <<"/delay_hello">>} -> ok end,
|
||||
ok = sys:terminate(CowboyConnPid, system_is_going_down),
|
||||
{response, nofin, 200, RespHeaders} = gun:await(ConnPid, Ref1),
|
||||
<<"close">> = proplists:get_value(<<"connection">>, RespHeaders),
|
||||
{ok, RespBody} = gun:await_body(ConnPid, Ref1),
|
||||
<<"Hello world!">> = iolist_to_binary(RespBody),
|
||||
{error, {stream_error, _}} = gun:await(ConnPid, Ref2),
|
||||
ok = gun_down(ConnPid),
|
||||
receive
|
||||
{'DOWN', CowboyConnRef, process, CowboyConnPid, _Reason} ->
|
||||
ok
|
||||
end
|
||||
after
|
||||
cowboy:stop_listener(?FUNCTION_NAME)
|
||||
end.
|
||||
|
||||
graceful_shutdown_listener(Config) ->
|
||||
doc("Check that connections are shut down gracefully when stopping a listener."),
|
||||
Dispatch = cowboy_router:compile([{"localhost", [
|
||||
{"/delay_hello", delay_hello_h,
|
||||
#{delay => 500, notify_received => self()}},
|
||||
{"/long_delay_hello", delay_hello_h,
|
||||
#{delay => 10000, notify_received => self()}}
|
||||
]}]),
|
||||
ProtoOpts = #{
|
||||
env => #{dispatch => Dispatch}
|
||||
},
|
||||
{ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], ProtoOpts),
|
||||
Port = ranch:get_port(?FUNCTION_NAME),
|
||||
ConnPid1 = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
|
||||
Ref1 = gun:get(ConnPid1, "/delay_hello"),
|
||||
ConnPid2 = gun_open([{type, tcp}, {protocol, http}, {port, Port}|Config]),
|
||||
Ref2 = gun:get(ConnPid2, "/long_delay_hello"),
|
||||
%% Shutdown listener while the handlers are working.
|
||||
receive {request_received, <<"/delay_hello">>} -> ok end,
|
||||
receive {request_received, <<"/long_delay_hello">>} -> ok end,
|
||||
ok = cowboy:stop_listener(?FUNCTION_NAME),
|
||||
%% Check that the 1st request is handled before shutting down.
|
||||
{response, nofin, 200, RespHeaders} = gun:await(ConnPid1, Ref1),
|
||||
<<"close">> = proplists:get_value(<<"connection">>, RespHeaders),
|
||||
{ok, RespBody} = gun:await_body(ConnPid1, Ref1),
|
||||
<<"Hello world!">> = iolist_to_binary(RespBody),
|
||||
gun:close(ConnPid1),
|
||||
%% Check that the 2nd (very slow) request is not handled.
|
||||
{error, {stream_error, closed}} = gun:await(ConnPid2, Ref2),
|
||||
gun:close(ConnPid2).
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
-import(ct_helper, [config/2]).
|
||||
-import(ct_helper, [doc/1]).
|
||||
-import(ct_helper, [get_remote_pid_tcp/1]).
|
||||
-import(cowboy_test, [gun_open/1]).
|
||||
-import(cowboy_test, [raw_open/1]).
|
||||
-import(cowboy_test, [raw_send/2]).
|
||||
|
@ -52,6 +53,7 @@ init_routes(_) -> [
|
|||
{"localhost", [
|
||||
{"/", hello_h, []},
|
||||
{"/echo/:key", echo_h, []},
|
||||
{"/delay_hello", delay_hello_h, 1200},
|
||||
{"/long_polling", long_polling_h, []},
|
||||
{"/loop_handler_abort", loop_handler_abort_h, []},
|
||||
{"/resp/:key[/:arg]", resp_h, []}
|
||||
|
@ -2955,11 +2957,6 @@ client_settings_disable_push(Config) ->
|
|||
%% (RFC7540 6.8) GOAWAY
|
||||
% @todo GOAWAY frames have a reserved bit in the payload that must be ignored.
|
||||
%
|
||||
%% @todo We should eventually implement the mechanism for gracefully
|
||||
%% shutting down of the connection. (Send the GOAWAY, finish processing
|
||||
%% the current set of streams, give up after a certain timeout.)
|
||||
%
|
||||
%% @todo If we graceful shutdown and receive a GOAWAY, we give up too.
|
||||
% A GOAWAY frame might not immediately precede closing of the
|
||||
% connection; a receiver of a GOAWAY that has no more use for the
|
||||
% connection SHOULD still send a GOAWAY frame before terminating the
|
||||
|
@ -2975,8 +2972,6 @@ client_settings_disable_push(Config) ->
|
|||
% GOAWAY frame with an updated last stream identifier. This ensures
|
||||
% that a connection can be cleanly shut down without losing requests.
|
||||
%
|
||||
%% @todo And of course even if we shutdown we need to be careful about
|
||||
%% the connection state.
|
||||
% After sending a GOAWAY frame, the sender can discard frames for
|
||||
% streams initiated by the receiver with identifiers higher than the
|
||||
% identified last stream. However, any frames that alter connection
|
||||
|
@ -2988,6 +2983,60 @@ client_settings_disable_push(Config) ->
|
|||
% cause flow control or header compression state to become
|
||||
% unsynchronized.
|
||||
%
|
||||
|
||||
graceful_shutdown_client_stays(Config) ->
|
||||
doc("Successful graceful shutdown where the client doesn't directly go away. (RFC7540 6.8)"),
|
||||
{ok, Socket} = do_handshake(Config),
|
||||
%% Server-side application logic decides to gracefully shutdown the
|
||||
%% connection (for whatever reason).
|
||||
ServerConnPid = get_remote_pid_tcp(Socket),
|
||||
ok = sys:terminate(ServerConnPid, whatever),
|
||||
%% Expect a GOAWAY with last stream id 0x7FFFFFF and NO_ERROR
|
||||
{ok, << _:24, 7:8, 0:8, 0:1, 0:31, %% Length, type, flags, R, stream id
|
||||
0:1, 16#7fffffff:31, 0:32 %% R, last stream id, error code
|
||||
>>} = gen_tcp:recv(Socket, 17, 500),
|
||||
%% The client doesn't respond. The server waits a bit before
|
||||
%% sending a 2nd GOAWAY frame with an actual last stream id = 0
|
||||
{ok, << _:24, 7:8, 0:8, 0:1, 0:31, %% Length, type, flags, R, stream id
|
||||
0:1, 0:31, 0:32 %% R, last stream id, error code
|
||||
>>} = gen_tcp:recv(Socket, 17, 1500),
|
||||
{error, closed} = gen_tcp:recv(Socket, 3, 1000),
|
||||
ok.
|
||||
|
||||
graceful_shutdown_race_condition(Config) ->
|
||||
doc("Graceful shutdown where the client sends requests while the server "
|
||||
"sends GOAWAY. (RFC7540 6.8)"),
|
||||
{ok, Socket} = do_handshake(Config),
|
||||
%% Server-side application logic decides to gracefully shutdown the
|
||||
%% connection (for whatever reason).
|
||||
ServerConnPid = get_remote_pid_tcp(Socket),
|
||||
ok = sys:terminate(ServerConnPid, whatever),
|
||||
%% Expect a GOAWAY with last stream id 0x7FFFFFF and NO_ERROR.
|
||||
{ok, << _:24, 7:8, 0:8, 0:1, 0:31, %% Length, type, flags, R, stream id
|
||||
0:1, 16#7fffffff:31, 0:32 %% R, last stream id, error code
|
||||
>>} = gen_tcp:recv(Socket, 17, 500),
|
||||
%% Here, we simulate an in-flight request, sent by the client before the
|
||||
%% goaway frame arrived to the client.
|
||||
{HeadersBlock, _} = cow_hpack:encode([
|
||||
{<<":method">>, <<"GET">>},
|
||||
{<<":scheme">>, <<"http">>},
|
||||
{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
|
||||
{<<":path">>, <<"/delay_hello">>}
|
||||
]),
|
||||
ok = gen_tcp:send(Socket, cow_http2:headers(1, fin, HeadersBlock)),
|
||||
%% The server sends the 2nd GOAWAY frame
|
||||
{ok, << _:24, 7:8, 0:8, 0:1, 0:31, %% Length, type, flags, R, stream id
|
||||
0:1, 1:31, 0:32 %% R, last stream id, error code
|
||||
>>} = gen_tcp:recv(Socket, 17, 2000),
|
||||
%% The client tries to send another request, ignoring the goaway.
|
||||
ok = gen_tcp:send(Socket, cow_http2:headers(3, fin, HeadersBlock)),
|
||||
%% The server responds to the first request (stream id 1) and closes.
|
||||
{ok, << RespHeadersPayloadLength:24, 1, 4, 0:1, 1:31 >>} = gen_tcp:recv(Socket, 9, 1000),
|
||||
{ok, _RespHeaders} = gen_tcp:recv(Socket, RespHeadersPayloadLength, 1000), % HEADERS
|
||||
{ok, << 12:24, 0, 1, 0:1, 1:31, "Hello world!" >>} = gen_tcp:recv(Socket, 21, 1000), % DATA
|
||||
{error, closed} = gen_tcp:recv(Socket, 3, 1000),
|
||||
ok.
|
||||
|
||||
% The GOAWAY frame applies to the connection, not a specific stream.
|
||||
% An endpoint MUST treat a GOAWAY frame with a stream identifier other
|
||||
% than 0x0 as a connection error (Section 5.4.1) of type
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue