mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00

A new option reset_idle_timeout_on_send has been added. When set to 'true', the idle timeout is reset not only when data is received, but also when data is sent. This allows sending large responses without having to worry about timeouts triggering. The default is currently unchanged but might change in a future release. LH: Greatly reworked the implementation so that the timeout gets reset on almost all socket writes. This essentially completely supersets the original work. Tests are mostly the same although I refactored a bit to avoid test code duplication. This commit also changes HTTP/2 behavior a little when data is received: Cowboy will not attempt to update the window before running stream handler commands to avoid sending WINDOW_UPDATE frames twice. Now it has some small heuristic to ensure they can only be sent once at most.
1346 lines
54 KiB
Erlang
1346 lines
54 KiB
Erlang
%% Copyright (c) 2015-2017, Loïc Hoguin <essen@ninenines.eu>
|
|
%%
|
|
%% Permission to use, copy, modify, and/or distribute this software for any
|
|
%% purpose with or without fee is hereby granted, provided that the above
|
|
%% copyright notice and this permission notice appear in all copies.
|
|
%%
|
|
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
-module(cowboy_http2).
|
|
|
|
-export([init/6]).
|
|
-export([init/10]).
|
|
-export([init/12]).
|
|
|
|
-export([system_continue/3]).
|
|
-export([system_terminate/4]).
|
|
-export([system_code_change/4]).
|
|
|
|
-type opts() :: #{
|
|
active_n => pos_integer(),
|
|
compress_buffering => boolean(),
|
|
compress_threshold => non_neg_integer(),
|
|
connection_type => worker | supervisor,
|
|
connection_window_margin_size => 0..16#7fffffff,
|
|
connection_window_update_threshold => 0..16#7fffffff,
|
|
enable_connect_protocol => boolean(),
|
|
env => cowboy_middleware:env(),
|
|
goaway_initial_timeout => timeout(),
|
|
goaway_complete_timeout => timeout(),
|
|
idle_timeout => timeout(),
|
|
inactivity_timeout => timeout(),
|
|
initial_connection_window_size => 65535..16#7fffffff,
|
|
initial_stream_window_size => 0..16#7fffffff,
|
|
linger_timeout => timeout(),
|
|
logger => module(),
|
|
max_concurrent_streams => non_neg_integer() | infinity,
|
|
max_connection_buffer_size => non_neg_integer(),
|
|
max_connection_window_size => 0..16#7fffffff,
|
|
max_decode_table_size => non_neg_integer(),
|
|
max_encode_table_size => non_neg_integer(),
|
|
max_frame_size_received => 16384..16777215,
|
|
max_frame_size_sent => 16384..16777215 | infinity,
|
|
max_received_frame_rate => {pos_integer(), timeout()},
|
|
max_reset_stream_rate => {pos_integer(), timeout()},
|
|
max_cancel_stream_rate => {pos_integer(), timeout()},
|
|
max_stream_buffer_size => non_neg_integer(),
|
|
max_stream_window_size => 0..16#7fffffff,
|
|
metrics_callback => cowboy_metrics_h:metrics_callback(),
|
|
metrics_req_filter => fun((cowboy_req:req()) -> map()),
|
|
metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
|
|
middlewares => [module()],
|
|
preface_timeout => timeout(),
|
|
proxy_header => boolean(),
|
|
reset_idle_timeout_on_send => boolean(),
|
|
sendfile => boolean(),
|
|
settings_timeout => timeout(),
|
|
shutdown_timeout => timeout(),
|
|
stream_handlers => [module()],
|
|
stream_window_data_threshold => 0..16#7fffffff,
|
|
stream_window_margin_size => 0..16#7fffffff,
|
|
stream_window_update_threshold => 0..16#7fffffff,
|
|
tracer_callback => cowboy_tracer_h:tracer_callback(),
|
|
tracer_flags => [atom()],
|
|
tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
|
|
%% Open ended because configured stream handlers might add options.
|
|
_ => _
|
|
}.
|
|
-export_type([opts/0]).
|
|
|
|
-record(stream, {
|
|
%% Whether the stream is currently stopping.
|
|
status = running :: running | stopping,
|
|
|
|
%% Flow requested for this stream.
|
|
flow = 0 :: non_neg_integer(),
|
|
|
|
%% Stream state.
|
|
state :: {module, any()}
|
|
}).
|
|
|
|
-record(state, {
|
|
parent = undefined :: pid(),
|
|
ref :: ranch:ref(),
|
|
socket = undefined :: inet:socket(),
|
|
transport :: module(),
|
|
proxy_header :: undefined | ranch_proxy_header:proxy_info(),
|
|
opts = #{} :: opts(),
|
|
|
|
%% Timer for idle_timeout; also used for goaway timers.
|
|
timer = undefined :: undefined | reference(),
|
|
|
|
%% Remote address and port for the connection.
|
|
peer = undefined :: {inet:ip_address(), inet:port_number()},
|
|
|
|
%% Local address and port for the connection.
|
|
sock = undefined :: {inet:ip_address(), inet:port_number()},
|
|
|
|
%% Client certificate (TLS only).
|
|
cert :: undefined | binary(),
|
|
|
|
%% HTTP/2 state machine.
|
|
http2_status :: sequence | settings | upgrade | connected | closing_initiated | closing,
|
|
http2_machine :: cow_http2_machine:http2_machine(),
|
|
|
|
%% HTTP/2 frame rate flood protection.
|
|
frame_rate_num :: undefined | pos_integer(),
|
|
frame_rate_time :: undefined | integer(),
|
|
|
|
%% HTTP/2 reset stream flood protection.
|
|
reset_rate_num :: undefined | pos_integer(),
|
|
reset_rate_time :: undefined | integer(),
|
|
|
|
%% HTTP/2 rapid reset attack protection.
|
|
cancel_rate_num :: undefined | pos_integer(),
|
|
cancel_rate_time :: undefined | integer(),
|
|
|
|
%% Flow requested for all streams.
|
|
flow = 0 :: non_neg_integer(),
|
|
|
|
%% Currently active HTTP/2 streams. Streams may be initiated either
|
|
%% by the client or by the server through PUSH_PROMISE frames.
|
|
streams = #{} :: #{cow_http2:streamid() => #stream{}},
|
|
|
|
%% Streams can spawn zero or more children which are then managed
|
|
%% by this module if operating as a supervisor.
|
|
children = cowboy_children:init() :: cowboy_children:children()
|
|
}).
|
|
|
|
-spec init(pid(), ranch:ref(), inet:socket(), module(),
|
|
ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok.
|
|
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
|
|
{ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket),
|
|
'A socket error occurred when retrieving the peer name.'),
|
|
{ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket),
|
|
'A socket error occurred when retrieving the sock name.'),
|
|
CertResult = case Transport:name() of
|
|
ssl ->
|
|
case ssl:peercert(Socket) of
|
|
{error, no_peercert} ->
|
|
{ok, undefined};
|
|
Cert0 ->
|
|
Cert0
|
|
end;
|
|
_ ->
|
|
{ok, undefined}
|
|
end,
|
|
{ok, Cert} = maybe_socket_error(undefined, CertResult,
|
|
'A socket error occurred when retrieving the client TLS certificate.'),
|
|
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>).
|
|
|
|
-spec init(pid(), ranch:ref(), inet:socket(), module(),
|
|
ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
|
|
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
|
|
binary() | undefined, binary()) -> ok.
|
|
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
|
|
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
|
|
%% Send the preface before doing all the init in case we get a socket error.
|
|
ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)),
|
|
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
|
|
transport=Transport, proxy_header=ProxyHeader,
|
|
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
|
http2_status=sequence, http2_machine=HTTP2Machine})),
|
|
safe_setopts_active(State),
|
|
case Buffer of
|
|
<<>> -> loop(State, Buffer);
|
|
_ -> parse(State, Buffer)
|
|
end.
|
|
|
|
init_rate_limiting(State0) ->
|
|
CurrentTime = erlang:monotonic_time(millisecond),
|
|
State1 = init_frame_rate_limiting(State0, CurrentTime),
|
|
State2 = init_reset_rate_limiting(State1, CurrentTime),
|
|
init_cancel_rate_limiting(State2, CurrentTime).
|
|
|
|
init_frame_rate_limiting(State=#state{opts=Opts}, CurrentTime) ->
|
|
{FrameRateNum, FrameRatePeriod} = maps:get(max_received_frame_rate, Opts, {10000, 10000}),
|
|
State#state{
|
|
frame_rate_num=FrameRateNum, frame_rate_time=add_period(CurrentTime, FrameRatePeriod)
|
|
}.
|
|
|
|
init_reset_rate_limiting(State=#state{opts=Opts}, CurrentTime) ->
|
|
{ResetRateNum, ResetRatePeriod} = maps:get(max_reset_stream_rate, Opts, {10, 10000}),
|
|
State#state{
|
|
reset_rate_num=ResetRateNum, reset_rate_time=add_period(CurrentTime, ResetRatePeriod)
|
|
}.
|
|
|
|
init_cancel_rate_limiting(State=#state{opts=Opts}, CurrentTime) ->
|
|
{CancelRateNum, CancelRatePeriod} = maps:get(max_cancel_stream_rate, Opts, {500, 10000}),
|
|
State#state{
|
|
cancel_rate_num=CancelRateNum, cancel_rate_time=add_period(CurrentTime, CancelRatePeriod)
|
|
}.
|
|
|
|
add_period(_, infinity) -> infinity;
|
|
add_period(Time, Period) -> Time + Period.
|
|
|
|
%% @todo Add an argument for the request body.
|
|
-spec init(pid(), ranch:ref(), inet:socket(), module(),
|
|
ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
|
|
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
|
|
binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
|
|
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
|
|
_Settings, Req=#{method := Method}) ->
|
|
{ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
|
|
{ok, StreamID, HTTP2Machine}
|
|
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
|
|
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
|
transport=Transport, proxy_header=ProxyHeader,
|
|
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
|
http2_status=upgrade, http2_machine=HTTP2Machine},
|
|
State1 = headers_frame(State0#state{
|
|
http2_machine=HTTP2Machine}, StreamID, Req),
|
|
%% We assume that the upgrade will be applied. A stream handler
|
|
%% must not prevent the normal operations of the server.
|
|
State2 = info(State1, 1, {switch_protocol, #{
|
|
<<"connection">> => <<"Upgrade">>,
|
|
<<"upgrade">> => <<"h2c">>
|
|
}, ?MODULE, undefined}), %% @todo undefined or #{}?
|
|
State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
|
|
%% In the case of HTTP/1.1 Upgrade we cannot send the Preface
|
|
%% until we send the 101 response.
|
|
ok = maybe_socket_error(State, Transport:send(Socket, Preface)),
|
|
safe_setopts_active(State),
|
|
case Buffer of
|
|
<<>> -> loop(State, Buffer);
|
|
_ -> parse(State, Buffer)
|
|
end.
|
|
|
|
%% Because HTTP/2 has flow control and Cowboy has other rate limiting
|
|
%% mechanisms implemented, a very large active_n value should be fine,
|
|
%% as long as the stream handlers do their work in a timely manner.
|
|
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
|
N = maps:get(active_n, Opts, 100),
|
|
Transport:setopts(Socket, [{active, N}]).
|
|
|
|
safe_setopts_active(State) ->
|
|
ok = maybe_socket_error(State, setopts_active(State)).
|
|
|
|
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
|
|
Messages = Transport:messages(),
|
|
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
|
|
receive
|
|
%% Socket messages.
|
|
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
|
parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>);
|
|
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
|
Reason = case State#state.http2_status of
|
|
closing -> {stop, closed, 'The client is going away.'};
|
|
_ -> {socket_error, closed, 'The socket has been closed.'}
|
|
end,
|
|
terminate(State, Reason);
|
|
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
|
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
|
|
{Passive, Socket} when Passive =:= element(4, Messages);
|
|
%% Hardcoded for compatibility with Ranch 1.x.
|
|
Passive =:= tcp_passive; Passive =:= ssl_passive ->
|
|
safe_setopts_active(State),
|
|
loop(State, Buffer);
|
|
%% System messages.
|
|
{'EXIT', Parent, shutdown} ->
|
|
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
|
|
loop(initiate_closing(State, Reason), Buffer);
|
|
{'EXIT', Parent, Reason} ->
|
|
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
|
|
{system, From, Request} ->
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
|
|
%% Timeouts.
|
|
{timeout, TimerRef, idle_timeout} ->
|
|
terminate(State, {stop, timeout,
|
|
'Connection idle longer than configuration allows.'});
|
|
{timeout, Ref, {shutdown, Pid}} ->
|
|
cowboy_children:shutdown_timeout(Children, Ref, Pid),
|
|
loop(State, Buffer);
|
|
{timeout, TRef, {cow_http2_machine, Name}} ->
|
|
loop(timeout(State, Name, TRef), Buffer);
|
|
{timeout, TimerRef, {goaway_initial_timeout, Reason}} ->
|
|
loop(closing(State, Reason), Buffer);
|
|
{timeout, TimerRef, {goaway_complete_timeout, Reason}} ->
|
|
terminate(State, {stop, stop_reason(Reason),
|
|
'Graceful shutdown timed out.'});
|
|
%% Messages pertaining to a stream.
|
|
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
|
loop(info(State, StreamID, Msg), Buffer);
|
|
%% Exit signal from children.
|
|
Msg = {'EXIT', Pid, _} ->
|
|
loop(down(State, Pid, Msg), Buffer);
|
|
%% Calls from supervisor module.
|
|
{'$gen_call', From, Call} ->
|
|
cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
|
|
loop(State, Buffer);
|
|
Msg ->
|
|
cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
|
|
loop(State, Buffer)
|
|
after InactivityTimeout ->
|
|
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
|
|
end.
|
|
|
|
set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef})
|
|
when Status =:= closing_initiated orelse Status =:= closing,
|
|
TimerRef =/= undefined ->
|
|
State;
|
|
set_idle_timeout(State=#state{opts=Opts}) ->
|
|
set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout).
|
|
|
|
set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
|
|
ok = case TimerRef0 of
|
|
undefined -> ok;
|
|
_ -> erlang:cancel_timer(TimerRef0, [{async, true}, {info, false}])
|
|
end,
|
|
TimerRef = case Timeout of
|
|
infinity -> undefined;
|
|
Timeout -> erlang:start_timer(Timeout, self(), Message)
|
|
end,
|
|
State#state{timer=TimerRef}.
|
|
|
|
maybe_reset_idle_timeout(State=#state{opts=Opts}) ->
|
|
case maps:get(reset_idle_timeout_on_send, Opts, false) of
|
|
true ->
|
|
set_idle_timeout(State);
|
|
false ->
|
|
State
|
|
end.
|
|
|
|
%% HTTP/2 protocol parsing.
|
|
|
|
parse(State=#state{http2_status=sequence}, Data) ->
|
|
case cow_http2:parse_sequence(Data) of
|
|
{ok, Rest} ->
|
|
parse(State#state{http2_status=settings}, Rest);
|
|
more ->
|
|
loop(State, Data);
|
|
Error = {connection_error, _, _} ->
|
|
terminate(State, Error)
|
|
end;
|
|
parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Streams}, Data) ->
|
|
MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
|
|
case cow_http2:parse(Data, MaxFrameSize) of
|
|
{ok, Frame, Rest} ->
|
|
parse(frame_rate(State, Frame), Rest);
|
|
{ignore, Rest} ->
|
|
parse(frame_rate(State, ignore), Rest);
|
|
{stream_error, StreamID, Reason, Human, Rest} ->
|
|
parse(reset_stream(State, StreamID, {stream_error, Reason, Human}), Rest);
|
|
Error = {connection_error, _, _} ->
|
|
terminate(State, Error);
|
|
%% Terminate the connection if we are closing and all streams have completed.
|
|
more when Status =:= closing, Streams =:= #{} ->
|
|
terminate(State, {stop, normal, 'The connection is going away.'});
|
|
more ->
|
|
loop(State, Data)
|
|
end.
|
|
|
|
%% Frame rate flood protection.
|
|
|
|
frame_rate(State0=#state{frame_rate_num=Num0, frame_rate_time=Time}, Frame) ->
|
|
{Result, State} = case Num0 - 1 of
|
|
0 ->
|
|
CurrentTime = erlang:monotonic_time(millisecond),
|
|
if
|
|
CurrentTime < Time ->
|
|
{error, State0};
|
|
true ->
|
|
%% When the option has a period of infinity we cannot reach this clause.
|
|
{ok, init_frame_rate_limiting(State0, CurrentTime)}
|
|
end;
|
|
Num ->
|
|
{ok, State0#state{frame_rate_num=Num}}
|
|
end,
|
|
case {Result, Frame} of
|
|
{ok, ignore} -> ignored_frame(State);
|
|
{ok, _} -> frame(State, Frame);
|
|
{error, _} -> terminate(State, {connection_error, enhance_your_calm,
|
|
'Frame rate larger than configuration allows. Flood? (CVE-2019-9512, CVE-2019-9515, CVE-2019-9518)'})
|
|
end.
|
|
|
|
%% Frames received.
|
|
|
|
%% We do nothing when receiving a lingering DATA frame.
|
|
%% We already removed the stream flow from the connection
|
|
%% flow and are therefore already accounting for the window
|
|
%% being reduced by these frames.
|
|
frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
|
|
case cow_http2_machine:frame(Frame, HTTP2Machine0) of
|
|
{ok, HTTP2Machine} ->
|
|
maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
|
|
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
|
|
data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
|
|
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
|
|
headers_frame(State#state{http2_machine=HTTP2Machine},
|
|
StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
|
|
{ok, {trailers, _StreamID, _Trailers}, HTTP2Machine} ->
|
|
%% @todo Propagate trailers.
|
|
State#state{http2_machine=HTTP2Machine};
|
|
{ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
|
|
rst_stream_frame(State#state{http2_machine=HTTP2Machine}, StreamID, Reason);
|
|
{ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
|
|
goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
|
|
{send, SendData, HTTP2Machine} ->
|
|
%% We may need to send an alarm for each of the streams sending data.
|
|
State1 = lists:foldl(
|
|
fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end,
|
|
send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData, []),
|
|
SendData),
|
|
maybe_reset_idle_timeout(State1);
|
|
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
|
|
reset_stream(State#state{http2_machine=HTTP2Machine},
|
|
StreamID, {stream_error, Reason, Human});
|
|
{error, Error={connection_error, _, _}, HTTP2Machine} ->
|
|
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
|
end.
|
|
|
|
%% We use this opportunity to mark the HTTP/2 status as connected
|
|
%% if we were still waiting for a SETTINGS frame.
|
|
maybe_ack(State=#state{http2_status=settings}, Frame) ->
|
|
maybe_ack(State#state{http2_status=connected}, Frame);
|
|
%% We do not reset the idle timeout on send here because we are
|
|
%% sending data as a consequence of receiving data, which means
|
|
%% we already resetted the idle timeout.
|
|
maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
|
|
case Frame of
|
|
{settings, _} ->
|
|
ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:settings_ack()));
|
|
{ping, Opaque} ->
|
|
ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:ping_ack(Opaque)));
|
|
_ -> ok
|
|
end,
|
|
State.
|
|
|
|
data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFin, Data) ->
|
|
case Streams of
|
|
#{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
|
|
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
|
|
{Commands, StreamState} ->
|
|
%% Remove the amount of data received from the flow.
|
|
%% We may receive more data than we requested. We ensure
|
|
%% that the flow value doesn't go lower than 0.
|
|
Size = byte_size(Data),
|
|
Flow = max(0, Flow0 - Size),
|
|
%% We would normally update the window when changing the flow
|
|
%% value. But because we are running commands, which themselves
|
|
%% may update the window, and we want to avoid updating the
|
|
%% window twice in a row, we first run the commands and then
|
|
%% only update the window a flow command was executed. We know
|
|
%% that it was because the flow value changed in the state.
|
|
State1 = State0#state{flow=Flow,
|
|
streams=Streams#{StreamID => Stream#stream{
|
|
flow=max(0, StreamFlow - Size), state=StreamState}}},
|
|
State = commands(State1, StreamID, Commands),
|
|
case State of
|
|
%% No flow command was executed. We must update the window
|
|
%% because we changed the flow value earlier.
|
|
#state{flow=Flow} ->
|
|
update_window(State, StreamID);
|
|
%% Otherwise the window was updated already.
|
|
_ ->
|
|
State
|
|
end
|
|
catch Class:Exception:Stacktrace ->
|
|
cowboy:log(cowboy_stream:make_error_log(data,
|
|
[StreamID, IsFin, Data, StreamState0],
|
|
Class, Exception, Stacktrace), Opts),
|
|
reset_stream(State0, StreamID, {internal_error, {Class, Exception},
|
|
'Unhandled exception in cowboy_stream:data/4.'})
|
|
end;
|
|
%% We ignore DATA frames for streams that are stopping.
|
|
#{} ->
|
|
State0
|
|
end.
|
|
|
|
headers_frame(State, StreamID, IsFin, Headers,
|
|
PseudoHeaders=#{method := <<"CONNECT">>}, _)
|
|
when map_size(PseudoHeaders) =:= 2 ->
|
|
early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501,
|
|
'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
|
|
headers_frame(State, StreamID, IsFin, Headers,
|
|
PseudoHeaders=#{method := <<"TRACE">>}, _) ->
|
|
early_error(State, StreamID, IsFin, Headers, PseudoHeaders, 501,
|
|
'The TRACE method is currently not implemented. (RFC7231 4.3.8)');
|
|
headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
|
|
headers_frame_parse_host(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
|
|
headers_frame(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen) ->
|
|
case lists:keyfind(<<"host">>, 1, Headers) of
|
|
{_, Authority} ->
|
|
headers_frame_parse_host(State, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
|
|
_ ->
|
|
reset_stream(State, StreamID, {stream_error, protocol_error,
|
|
'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
|
|
end.
|
|
|
|
headers_frame_parse_host(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert, proxy_header=ProxyHeader},
|
|
StreamID, IsFin, Headers, PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs},
|
|
BodyLen, Authority) ->
|
|
try cow_http_hd:parse_host(Authority) of
|
|
{Host, Port0} ->
|
|
Port = ensure_port(Scheme, Port0),
|
|
try cow_http:parse_fullpath(PathWithQs) of
|
|
{<<>>, _} ->
|
|
reset_stream(State, StreamID, {stream_error, protocol_error,
|
|
'The path component must not be empty. (RFC7540 8.1.2.3)'});
|
|
{Path, Qs} ->
|
|
Req0 = #{
|
|
ref => Ref,
|
|
pid => self(),
|
|
streamid => StreamID,
|
|
peer => Peer,
|
|
sock => Sock,
|
|
cert => Cert,
|
|
method => Method,
|
|
scheme => Scheme,
|
|
host => Host,
|
|
port => Port,
|
|
path => Path,
|
|
qs => Qs,
|
|
version => 'HTTP/2',
|
|
headers => headers_to_map(Headers, #{}),
|
|
has_body => IsFin =:= nofin,
|
|
body_length => BodyLen
|
|
},
|
|
%% We add the PROXY header information if any.
|
|
Req1 = case ProxyHeader of
|
|
undefined -> Req0;
|
|
_ -> Req0#{proxy_header => ProxyHeader}
|
|
end,
|
|
%% We add the protocol information for extended CONNECTs.
|
|
Req = case PseudoHeaders of
|
|
#{protocol := Protocol} -> Req1#{protocol => Protocol};
|
|
_ -> Req1
|
|
end,
|
|
headers_frame(State, StreamID, Req)
|
|
catch _:_ ->
|
|
reset_stream(State, StreamID, {stream_error, protocol_error,
|
|
'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
|
|
end
|
|
catch _:_ ->
|
|
reset_stream(State, StreamID, {stream_error, protocol_error,
|
|
'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
|
|
end.
|
|
|
|
ensure_port(<<"http">>, undefined) -> 80;
|
|
ensure_port(<<"https">>, undefined) -> 443;
|
|
ensure_port(_, Port) -> Port.
|
|
|
|
%% This function is necessary to properly handle duplicate headers
|
|
%% and the special-case cookie header.
|
|
headers_to_map([], Acc) ->
|
|
Acc;
|
|
headers_to_map([{Name, Value}|Tail], Acc0) ->
|
|
Acc = case Acc0 of
|
|
%% The cookie header does not use proper HTTP header lists.
|
|
#{Name := Value0} when Name =:= <<"cookie">> ->
|
|
Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
|
|
#{Name := Value0} ->
|
|
Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
|
|
_ ->
|
|
Acc0#{Name => Value}
|
|
end,
|
|
headers_to_map(Tail, Acc).
|
|
|
|
headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) ->
|
|
try cowboy_stream:init(StreamID, Req, Opts) of
|
|
{Commands, StreamState} ->
|
|
commands(State#state{
|
|
streams=Streams#{StreamID => #stream{state=StreamState}}},
|
|
StreamID, Commands)
|
|
catch Class:Exception:Stacktrace ->
|
|
cowboy:log(cowboy_stream:make_error_log(init,
|
|
[StreamID, Req, Opts],
|
|
Class, Exception, Stacktrace), Opts),
|
|
reset_stream(State, StreamID, {internal_error, {Class, Exception},
|
|
'Unhandled exception in cowboy_stream:init/3.'})
|
|
end.
|
|
|
|
early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
|
|
StreamID, _IsFin, Headers, #{method := Method},
|
|
StatusCode0, HumanReadable) ->
|
|
%% We automatically terminate the stream but it is not an error
|
|
%% per se (at least not in the first implementation).
|
|
Reason = {stream_error, no_error, HumanReadable},
|
|
%% The partial Req is minimal for now. We only have one case
|
|
%% where it can be called (when a method is completely disabled).
|
|
%% @todo Fill in the other elements.
|
|
PartialReq = #{
|
|
ref => Ref,
|
|
peer => Peer,
|
|
method => Method,
|
|
headers => headers_to_map(Headers, #{})
|
|
},
|
|
Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
|
|
try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
|
|
{response, StatusCode, RespHeaders, RespBody} ->
|
|
send_response(State0, StreamID, StatusCode, RespHeaders, RespBody)
|
|
catch Class:Exception:Stacktrace ->
|
|
cowboy:log(cowboy_stream:make_error_log(early_error,
|
|
[StreamID, Reason, PartialReq, Resp, Opts],
|
|
Class, Exception, Stacktrace), Opts),
|
|
%% We still need to send an error response, so send what we initially
|
|
%% wanted to send. It's better than nothing.
|
|
send_headers(State0, StreamID, fin, StatusCode0, RespHeaders0)
|
|
end.
|
|
|
|
rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
|
|
case maps:take(StreamID, Streams0) of
|
|
{#stream{state=StreamState}, Streams} ->
|
|
terminate_stream_handler(State, StreamID, Reason, StreamState),
|
|
Children = cowboy_children:shutdown(Children0, StreamID),
|
|
cancel_rate_limit(State#state{streams=Streams, children=Children});
|
|
error ->
|
|
State
|
|
end.
|
|
|
|
cancel_rate_limit(State0=#state{cancel_rate_num=Num0, cancel_rate_time=Time}) ->
|
|
case Num0 - 1 of
|
|
0 ->
|
|
CurrentTime = erlang:monotonic_time(millisecond),
|
|
if
|
|
CurrentTime < Time ->
|
|
terminate(State0, {connection_error, enhance_your_calm,
|
|
'Stream cancel rate larger than configuration allows. Flood? (CVE-2023-44487)'});
|
|
true ->
|
|
%% When the option has a period of infinity we cannot reach this clause.
|
|
init_cancel_rate_limiting(State0, CurrentTime)
|
|
end;
|
|
Num ->
|
|
State0#state{cancel_rate_num=Num}
|
|
end.
|
|
|
|
ignored_frame(State=#state{http2_machine=HTTP2Machine0}) ->
|
|
case cow_http2_machine:ignored_frame(HTTP2Machine0) of
|
|
{ok, HTTP2Machine} ->
|
|
State#state{http2_machine=HTTP2Machine};
|
|
{error, Error={connection_error, _, _}, HTTP2Machine} ->
|
|
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
|
end.
|
|
|
|
%% HTTP/2 timeouts.
|
|
|
|
timeout(State=#state{http2_machine=HTTP2Machine0}, Name, TRef) ->
|
|
case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of
|
|
{ok, HTTP2Machine} ->
|
|
State#state{http2_machine=HTTP2Machine};
|
|
{error, Error={connection_error, _, _}, HTTP2Machine} ->
|
|
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
|
end.
|
|
|
|
%% Erlang messages.
|
|
|
|
down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
|
State = case cowboy_children:down(Children0, Pid) of
|
|
%% The stream was terminated already.
|
|
{ok, undefined, Children} ->
|
|
State0#state{children=Children};
|
|
%% The stream is still running.
|
|
{ok, StreamID, Children} ->
|
|
info(State0#state{children=Children}, StreamID, Msg);
|
|
%% The process was unknown.
|
|
error ->
|
|
cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
|
|
[Msg, Pid], Opts),
|
|
State0
|
|
end,
|
|
if
|
|
State#state.http2_status =:= closing, State#state.streams =:= #{} ->
|
|
terminate(State, {stop, normal, 'The connection is going away.'});
|
|
true ->
|
|
State
|
|
end.
|
|
|
|
info(State=#state{opts=Opts, http2_machine=HTTP2Machine, streams=Streams}, StreamID, Msg) ->
|
|
case Streams of
|
|
#{StreamID := Stream=#stream{state=StreamState0}} ->
|
|
try cowboy_stream:info(StreamID, Msg, StreamState0) of
|
|
{Commands, StreamState} ->
|
|
commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}},
|
|
StreamID, Commands)
|
|
catch Class:Exception:Stacktrace ->
|
|
cowboy:log(cowboy_stream:make_error_log(info,
|
|
[StreamID, Msg, StreamState0],
|
|
Class, Exception, Stacktrace), Opts),
|
|
reset_stream(State, StreamID, {internal_error, {Class, Exception},
|
|
'Unhandled exception in cowboy_stream:info/3.'})
|
|
end;
|
|
_ ->
|
|
case cow_http2_machine:is_lingering_stream(StreamID, HTTP2Machine) of
|
|
true ->
|
|
ok;
|
|
false ->
|
|
cowboy:log(warning, "Received message ~p for unknown stream ~p.",
|
|
[Msg, StreamID], Opts)
|
|
end,
|
|
State
|
|
end.
|
|
|
|
%% Stream handler commands.
|
|
%%
|
|
%% @todo Kill the stream if it tries to send a response, headers,
|
|
%% data or push promise when the stream is closed or half-closed.
|
|
|
|
commands(State, _, []) ->
|
|
State;
|
|
%% Error responses are sent only if a response wasn't sent already.
|
|
commands(State=#state{http2_machine=HTTP2Machine}, StreamID,
|
|
[{error_response, StatusCode, Headers, Body}|Tail]) ->
|
|
case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
|
|
{ok, idle, _} ->
|
|
commands(State, StreamID, [{response, StatusCode, Headers, Body}|Tail]);
|
|
_ ->
|
|
commands(State, StreamID, Tail)
|
|
end;
|
|
%% Send an informational response.
|
|
commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) ->
|
|
State1 = send_headers(State0, StreamID, idle, StatusCode, Headers),
|
|
State = maybe_reset_idle_timeout(State1),
|
|
commands(State, StreamID, Tail);
|
|
%% Send response headers.
|
|
commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) ->
|
|
State1 = send_response(State0, StreamID, StatusCode, Headers, Body),
|
|
State = maybe_reset_idle_timeout(State1),
|
|
commands(State, StreamID, Tail);
|
|
%% Send response headers.
|
|
commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) ->
|
|
State1 = send_headers(State0, StreamID, nofin, StatusCode, Headers),
|
|
State = maybe_reset_idle_timeout(State1),
|
|
commands(State, StreamID, Tail);
|
|
%% Send a response body chunk.
|
|
commands(State0, StreamID, [{data, IsFin, Data}|Tail]) ->
|
|
State = case maybe_send_data(State0, StreamID, IsFin, Data, []) of
|
|
{data_sent, State1} ->
|
|
maybe_reset_idle_timeout(State1);
|
|
{no_data_sent, State1} ->
|
|
State1
|
|
end,
|
|
commands(State, StreamID, Tail);
|
|
%% Send trailers.
|
|
commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->
|
|
State = case maybe_send_data(State0, StreamID, fin,
|
|
{trailers, maps:to_list(Trailers)}, []) of
|
|
{data_sent, State1} ->
|
|
maybe_reset_idle_timeout(State1);
|
|
{no_data_sent, State1} ->
|
|
State1
|
|
end,
|
|
commands(State, StreamID, Tail);
|
|
%% Send a push promise.
|
|
%%
|
|
%% @todo Responses sent as a result of a push_promise request
|
|
%% must not send push_promise frames themselves.
|
|
%%
|
|
%% @todo We should not send push_promise frames when we are
|
|
%% in the closing http2_status.
|
|
commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
|
|
StreamID, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
|
|
Authority = case {Scheme, Port} of
|
|
{<<"http">>, 80} -> Host;
|
|
{<<"https">>, 443} -> Host;
|
|
_ -> iolist_to_binary([Host, $:, integer_to_binary(Port)])
|
|
end,
|
|
PathWithQs = iolist_to_binary(case Qs of
|
|
<<>> -> Path;
|
|
_ -> [Path, $?, Qs]
|
|
end),
|
|
PseudoHeaders = #{
|
|
method => Method,
|
|
scheme => Scheme,
|
|
authority => Authority,
|
|
path => PathWithQs
|
|
},
|
|
%% We need to make sure the header value is binary before we can
|
|
%% create the Req object, as it expects them to be flat.
|
|
Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
|
|
State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP2Machine0,
|
|
PseudoHeaders, Headers) of
|
|
{ok, PromisedStreamID, HeaderBlock, HTTP2Machine} ->
|
|
State1 = State0#state{http2_machine=HTTP2Machine},
|
|
ok = maybe_socket_error(State1, Transport:send(Socket,
|
|
cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock))),
|
|
State2 = maybe_reset_idle_timeout(State1),
|
|
headers_frame(State2, PromisedStreamID, fin, Headers, PseudoHeaders, 0);
|
|
{error, no_push} ->
|
|
State0
|
|
end,
|
|
commands(State, StreamID, Tail);
|
|
%% Read the request body.
|
|
commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) ->
|
|
#{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
|
|
State = update_window(State0#state{flow=Flow + Size,
|
|
streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
|
|
StreamID),
|
|
commands(State, StreamID, Tail);
|
|
%% Supervise a child process.
|
|
commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
|
|
commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
|
|
StreamID, Tail);
|
|
%% Error handling.
|
|
commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) ->
|
|
%% @todo Do we want to run the commands after an internal_error?
|
|
%% @todo Do we even allow commands after?
|
|
%% @todo Only reset when the stream still exists.
|
|
reset_stream(State, StreamID, Error);
|
|
%% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
|
|
%%
|
|
%% We do not need to reset the idle timeout on send because it
|
|
%% hasn't been set yet. This is called from init/12.
|
|
commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
|
|
StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
|
|
%% @todo This 101 response needs to be passed through stream handlers.
|
|
ok = maybe_socket_error(State, Transport:send(Socket,
|
|
cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers)))),
|
|
commands(State, StreamID, Tail);
|
|
%% Use a different protocol within the stream (CONNECT :protocol).
|
|
%% @todo Make sure we error out when the feature is disabled.
|
|
commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
|
|
State = info(State0, StreamID, {headers, 200, Headers}),
|
|
commands(State, StreamID, Tail);
|
|
%% Set options dynamically.
|
|
commands(State, StreamID, [{set_options, _Opts}|Tail]) ->
|
|
commands(State, StreamID, Tail);
|
|
commands(State, StreamID, [stop|_Tail]) ->
|
|
%% @todo Do we want to run the commands after a stop?
|
|
%% @todo Do we even allow commands after?
|
|
stop_stream(State, StreamID);
|
|
%% Log event.
|
|
commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
|
|
cowboy:log(Log, Opts),
|
|
commands(State, StreamID, Tail).
|
|
|
|
%% Tentatively update the window after the flow was updated.
|
|
|
|
update_window(State0=#state{socket=Socket, transport=Transport,
|
|
http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) ->
|
|
#{StreamID := #stream{flow=StreamFlow}} = Streams,
|
|
{Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of
|
|
ok -> {<<>>, HTTP2Machine0};
|
|
{ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
|
|
end,
|
|
{Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of
|
|
ok -> {<<>>, HTTP2Machine2};
|
|
{ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
|
|
end,
|
|
State = State0#state{http2_machine=HTTP2Machine},
|
|
case {Data1, Data2} of
|
|
{<<>>, <<>>} ->
|
|
State;
|
|
_ ->
|
|
ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2])),
|
|
maybe_reset_idle_timeout(State)
|
|
end.
|
|
|
|
%% Send the response, trailers or data.
|
|
|
|
send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode, Headers, Body) ->
|
|
Size = case Body of
|
|
{sendfile, _, Bytes, _} -> Bytes;
|
|
_ -> iolist_size(Body)
|
|
end,
|
|
case Size of
|
|
0 ->
|
|
State = send_headers(State0, StreamID, fin, StatusCode, Headers),
|
|
maybe_terminate_stream(State, StreamID, fin);
|
|
_ ->
|
|
%% @todo Add a test for HEAD to make sure we don't send the body when
|
|
%% returning {response...} from a stream handler (or {headers...} then {data...}).
|
|
{ok, _IsFin, HeaderBlock, HTTP2Machine}
|
|
= cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, nofin,
|
|
#{status => cow_http:status_to_integer(StatusCode)},
|
|
headers_to_list(Headers)),
|
|
{_, State} = maybe_send_data(State0#state{http2_machine=HTTP2Machine},
|
|
StreamID, fin, Body, [cow_http2:headers(StreamID, nofin, HeaderBlock)]),
|
|
State
|
|
end.
|
|
|
|
send_headers(State0=#state{socket=Socket, transport=Transport,
|
|
http2_machine=HTTP2Machine0}, StreamID, IsFin0, StatusCode, Headers) ->
|
|
{ok, IsFin, HeaderBlock, HTTP2Machine}
|
|
= cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, IsFin0,
|
|
#{status => cow_http:status_to_integer(StatusCode)},
|
|
headers_to_list(Headers)),
|
|
State = State0#state{http2_machine=HTTP2Machine},
|
|
ok = maybe_socket_error(State, Transport:send(Socket,
|
|
cow_http2:headers(StreamID, IsFin, HeaderBlock))),
|
|
State.
|
|
|
|
%% The set-cookie header is special; we can only send one cookie per header.
|
|
headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
|
|
Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
|
|
Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
|
|
headers_to_list(Headers) ->
|
|
maps:to_list(Headers).
|
|
|
|
maybe_send_data(State0=#state{socket=Socket, transport=Transport,
|
|
http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0, Prefix) ->
|
|
Data = case is_tuple(Data0) of
|
|
false -> {data, Data0};
|
|
true -> Data0
|
|
end,
|
|
case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
|
|
{ok, HTTP2Machine} ->
|
|
State1 = State0#state{http2_machine=HTTP2Machine},
|
|
%% If we have prefix data (like a HEADERS frame) we need to send it
|
|
%% even if we do not send any DATA frames.
|
|
WasDataSent = case Prefix of
|
|
[] ->
|
|
no_data_sent;
|
|
_ ->
|
|
ok = maybe_socket_error(State1, Transport:send(Socket, Prefix)),
|
|
data_sent
|
|
end,
|
|
State = maybe_send_data_alarm(State1, HTTP2Machine0, StreamID),
|
|
{WasDataSent, State};
|
|
{send, SendData, HTTP2Machine} ->
|
|
State = #state{http2_status=Status, streams=Streams}
|
|
= send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix),
|
|
%% Terminate the connection if we are closing and all streams have completed.
|
|
if
|
|
Status =:= closing, Streams =:= #{} ->
|
|
terminate(State, {stop, normal, 'The connection is going away.'});
|
|
true ->
|
|
{data_sent, maybe_send_data_alarm(State, HTTP2Machine0, StreamID)}
|
|
end
|
|
end.
|
|
|
|
send_data(State0=#state{socket=Socket, transport=Transport, opts=Opts}, SendData, Prefix) ->
|
|
{Acc, State} = prepare_data(State0, SendData, [], Prefix),
|
|
_ = [case Data of
|
|
{sendfile, Offset, Bytes, Path} ->
|
|
%% When sendfile is disabled we explicitly use the fallback.
|
|
{ok, _} = maybe_socket_error(State,
|
|
case maps:get(sendfile, Opts, true) of
|
|
true -> Transport:sendfile(Socket, Path, Offset, Bytes);
|
|
false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
|
|
end
|
|
),
|
|
ok;
|
|
_ ->
|
|
ok = maybe_socket_error(State, Transport:send(Socket, Data))
|
|
end || Data <- Acc],
|
|
send_data_terminate(State, SendData).
|
|
|
|
send_data_terminate(State, []) ->
|
|
State;
|
|
send_data_terminate(State0, [{StreamID, IsFin, _}|Tail]) ->
|
|
State = maybe_terminate_stream(State0, StreamID, IsFin),
|
|
send_data_terminate(State, Tail).
|
|
|
|
prepare_data(State, [], Acc, []) ->
|
|
{lists:reverse(Acc), State};
|
|
prepare_data(State, [], Acc, Buffer) ->
|
|
{lists:reverse([lists:reverse(Buffer)|Acc]), State};
|
|
prepare_data(State0, [{StreamID, IsFin, SendData}|Tail], Acc0, Buffer0) ->
|
|
{Acc, Buffer, State} = prepare_data(State0, StreamID, IsFin, SendData, Acc0, Buffer0),
|
|
prepare_data(State, Tail, Acc, Buffer).
|
|
|
|
prepare_data(State, _, _, [], Acc, Buffer) ->
|
|
{Acc, Buffer, State};
|
|
prepare_data(State0, StreamID, IsFin, [FrameData|Tail], Acc, Buffer) ->
|
|
FrameIsFin = case Tail of
|
|
[] -> IsFin;
|
|
_ -> nofin
|
|
end,
|
|
case prepare_data_frame(State0, StreamID, FrameIsFin, FrameData) of
|
|
{{MoreData, Sendfile}, State} when is_tuple(Sendfile) ->
|
|
case Buffer of
|
|
[] ->
|
|
prepare_data(State, StreamID, IsFin, Tail,
|
|
[Sendfile, MoreData|Acc], []);
|
|
_ ->
|
|
prepare_data(State, StreamID, IsFin, Tail,
|
|
[Sendfile, lists:reverse([MoreData|Buffer])|Acc], [])
|
|
end;
|
|
{MoreData, State} ->
|
|
prepare_data(State, StreamID, IsFin, Tail,
|
|
Acc, [MoreData|Buffer])
|
|
end.
|
|
|
|
prepare_data_frame(State, StreamID, IsFin, {data, Data}) ->
|
|
{cow_http2:data(StreamID, IsFin, Data),
|
|
State};
|
|
prepare_data_frame(State, StreamID, IsFin, Sendfile={sendfile, _, Bytes, _}) ->
|
|
{{cow_http2:data_header(StreamID, IsFin, Bytes), Sendfile},
|
|
State};
|
|
%% The stream is terminated in cow_http2_machine:prepare_trailers.
|
|
prepare_data_frame(State=#state{http2_machine=HTTP2Machine0},
|
|
StreamID, nofin, {trailers, Trailers}) ->
|
|
{ok, HeaderBlock, HTTP2Machine}
|
|
= cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers),
|
|
{cow_http2:headers(StreamID, fin, HeaderBlock),
|
|
State#state{http2_machine=HTTP2Machine}}.
|
|
|
|
%% After we have sent or queued data we may need to set or clear an alarm.
|
|
%% We do this by comparing the HTTP2Machine buffer state before/after for
|
|
%% the relevant streams.
|
|
maybe_send_data_alarm(State=#state{opts=Opts, http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID) ->
|
|
ConnBufferSizeBefore = cow_http2_machine:get_connection_local_buffer_size(HTTP2Machine0),
|
|
ConnBufferSizeAfter = cow_http2_machine:get_connection_local_buffer_size(HTTP2Machine),
|
|
{ok, StreamBufferSizeBefore} = cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine0),
|
|
%% When the stream ends up closed after it finished sending data,
|
|
%% we do not want to trigger an alarm. We act as if the buffer
|
|
%% size did not change.
|
|
StreamBufferSizeAfter = case cow_http2_machine:get_stream_local_buffer_size(StreamID, HTTP2Machine) of
|
|
{ok, BSA} -> BSA;
|
|
{error, closed} -> StreamBufferSizeBefore
|
|
end,
|
|
MaxConnBufferSize = maps:get(max_connection_buffer_size, Opts, 16000000),
|
|
MaxStreamBufferSize = maps:get(max_stream_buffer_size, Opts, 8000000),
|
|
%% I do not want to document these internal events yet. I am not yet
|
|
%% convinced it should be {alarm, Name, on|off} and not {internal_event, E}
|
|
%% or something else entirely. Though alarms are probably right.
|
|
if
|
|
ConnBufferSizeBefore >= MaxConnBufferSize, ConnBufferSizeAfter < MaxConnBufferSize ->
|
|
connection_alarm(State, connection_buffer_full, off);
|
|
ConnBufferSizeBefore < MaxConnBufferSize, ConnBufferSizeAfter >= MaxConnBufferSize ->
|
|
connection_alarm(State, connection_buffer_full, on);
|
|
StreamBufferSizeBefore >= MaxStreamBufferSize, StreamBufferSizeAfter < MaxStreamBufferSize ->
|
|
stream_alarm(State, StreamID, stream_buffer_full, off);
|
|
StreamBufferSizeBefore < MaxStreamBufferSize, StreamBufferSizeAfter >= MaxStreamBufferSize ->
|
|
stream_alarm(State, StreamID, stream_buffer_full, on);
|
|
true ->
|
|
State
|
|
end.
|
|
|
|
connection_alarm(State0=#state{streams=Streams}, Name, Value) ->
|
|
lists:foldl(fun(StreamID, State) ->
|
|
stream_alarm(State, StreamID, Name, Value)
|
|
end, State0, maps:keys(Streams)).
|
|
|
|
stream_alarm(State, StreamID, Name, Value) ->
|
|
info(State, StreamID, {alarm, Name, Value}).
|
|
|
|
%% Terminate a stream or the connection.
|
|
|
|
%% We may have to cancel streams even if we receive multiple
|
|
%% GOAWAY frames as the LastStreamID value may be lower than
|
|
%% the one previously received.
|
|
%%
|
|
%% We do not reset the idle timeout on send here. We already
|
|
%% disabled it if we initiated shutdown; and we already reset
|
|
%% it if the client sent a GOAWAY frame.
|
|
goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0,
|
|
http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _})
|
|
when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
|
|
Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID,
|
|
{stop, {goaway, Reason}, 'The connection is going away.'}, []),
|
|
State1 = State0#state{streams=maps:from_list(Streams)},
|
|
if
|
|
Status =:= connected; Status =:= closing_initiated ->
|
|
{OurLastStreamID, HTTP2Machine} =
|
|
cow_http2_machine:set_last_streamid(HTTP2Machine0),
|
|
State = State1#state{http2_status=closing, http2_machine=HTTP2Machine},
|
|
ok = maybe_socket_error(State, Transport:send(Socket,
|
|
cow_http2:goaway(OurLastStreamID, no_error, <<>>))),
|
|
State;
|
|
true ->
|
|
State1
|
|
end;
|
|
%% We terminate the connection immediately if it hasn't fully been initialized.
|
|
goaway(State, {goaway, _, Reason, _}) ->
|
|
terminate(State, {stop, {goaway, Reason}, 'The connection is going away.'}).
|
|
|
|
%% Cancel client-initiated streams that are above LastStreamID.
|
|
goaway_streams(_, [], _, _, Acc) ->
|
|
Acc;
|
|
goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc)
|
|
when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
|
|
terminate_stream_handler(State, StreamID, Reason, StreamState),
|
|
goaway_streams(State, Tail, LastStreamID, Reason, Acc);
|
|
goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) ->
|
|
goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]).
|
|
|
|
%% A server that is attempting to gracefully shut down a connection SHOULD send
|
|
%% an initial GOAWAY frame with the last stream identifier set to 2^31-1 and a
|
|
%% NO_ERROR code. This signals to the client that a shutdown is imminent and
|
|
%% that initiating further requests is prohibited. After allowing time for any
|
|
%% in-flight stream creation (at least one round-trip time), the server can send
|
|
%% another GOAWAY frame with an updated last stream identifier. This ensures
|
|
%% that a connection can be cleanly shut down without losing requests.
|
|
-spec initiate_closing(#state{}, _) -> #state{}.
|
|
initiate_closing(State=#state{http2_status=connected, socket=Socket,
|
|
transport=Transport, opts=Opts}, Reason) ->
|
|
ok = maybe_socket_error(State, Transport:send(Socket,
|
|
cow_http2:goaway(16#7fffffff, no_error, <<>>))),
|
|
Timeout = maps:get(goaway_initial_timeout, Opts, 1000),
|
|
Message = {goaway_initial_timeout, Reason},
|
|
set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message);
|
|
initiate_closing(State=#state{http2_status=Status}, _Reason)
|
|
when Status =:= closing_initiated; Status =:= closing ->
|
|
%% This happens if sys:terminate/2,3 is called twice or if the supervisor
|
|
%% tells us to shutdown after sys:terminate/2,3 is called or vice versa.
|
|
State;
|
|
initiate_closing(State, Reason) ->
|
|
terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}).
|
|
|
|
%% Switch to 'closing' state and stop accepting new streams.
|
|
-spec closing(#state{}, Reason :: term()) -> #state{}.
|
|
closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} ->
|
|
terminate(State, Reason);
|
|
closing(State0=#state{http2_status=closing_initiated,
|
|
http2_machine=HTTP2Machine0, socket=Socket, transport=Transport},
|
|
Reason) ->
|
|
%% Stop accepting new streams.
|
|
{LastStreamID, HTTP2Machine} =
|
|
cow_http2_machine:set_last_streamid(HTTP2Machine0),
|
|
State = State0#state{http2_status=closing, http2_machine=HTTP2Machine},
|
|
ok = maybe_socket_error(State, Transport:send(Socket,
|
|
cow_http2:goaway(LastStreamID, no_error, <<>>))),
|
|
closing(State, Reason);
|
|
closing(State=#state{http2_status=closing, opts=Opts}, Reason) ->
|
|
%% If client sent GOAWAY, we may already be in 'closing' but without the
|
|
%% goaway complete timeout set.
|
|
Timeout = maps:get(goaway_complete_timeout, Opts, 3000),
|
|
Message = {goaway_complete_timeout, Reason},
|
|
set_timeout(State, Timeout, Message).
|
|
|
|
stop_reason({stop, Reason, _}) -> Reason;
|
|
stop_reason(Reason) -> Reason.
|
|
|
|
%% Function copied from cowboy_http.
|
|
maybe_socket_error(State, {error, closed}) ->
|
|
terminate(State, {socket_error, closed, 'The socket has been closed.'});
|
|
maybe_socket_error(State, Reason) ->
|
|
maybe_socket_error(State, Reason, 'An error has occurred on the socket.').
|
|
|
|
maybe_socket_error(_, Result = ok, _) ->
|
|
Result;
|
|
maybe_socket_error(_, Result = {ok, _}, _) ->
|
|
Result;
|
|
maybe_socket_error(State, {error, Reason}, Human) ->
|
|
terminate(State, {socket_error, Reason, Human}).
|
|
|
|
-spec terminate(#state{}, _) -> no_return().
|
|
terminate(undefined, Reason) ->
|
|
exit({shutdown, Reason});
|
|
terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
|
|
http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason)
|
|
when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
|
|
%% @todo We might want to optionally send the Reason value
|
|
%% as debug data in the GOAWAY frame here. Perhaps more.
|
|
if
|
|
Status =:= connected; Status =:= closing_initiated ->
|
|
%% We are terminating so it's OK if we can't send the GOAWAY anymore.
|
|
_ = Transport:send(Socket, cow_http2:goaway(
|
|
cow_http2_machine:get_last_streamid(HTTP2Machine),
|
|
terminate_reason(Reason), <<>>));
|
|
%% We already sent the GOAWAY frame.
|
|
Status =:= closing ->
|
|
ok
|
|
end,
|
|
terminate_all_streams(State, maps:to_list(Streams), Reason),
|
|
cowboy_children:terminate(Children),
|
|
%% @todo Don't linger on connection errors.
|
|
terminate_linger(State),
|
|
exit({shutdown, Reason});
|
|
%% We are not fully connected so we can just terminate the connection.
|
|
terminate(_State, Reason) ->
|
|
exit({shutdown, Reason}).
|
|
|
|
terminate_reason({connection_error, Reason, _}) -> Reason;
|
|
terminate_reason({stop, _, _}) -> no_error;
|
|
terminate_reason({socket_error, _, _}) -> internal_error;
|
|
terminate_reason({internal_error, _, _}) -> internal_error.
|
|
|
|
terminate_all_streams(_, [], _) ->
|
|
ok;
|
|
terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
|
|
terminate_stream_handler(State, StreamID, Reason, StreamState),
|
|
terminate_all_streams(State, Tail, Reason).
|
|
|
|
%% This code is copied from cowboy_http.
|
|
terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
|
case Transport:shutdown(Socket, write) of
|
|
ok ->
|
|
case maps:get(linger_timeout, Opts, 1000) of
|
|
0 ->
|
|
ok;
|
|
infinity ->
|
|
terminate_linger_before_loop(State, undefined, Transport:messages());
|
|
Timeout ->
|
|
TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
|
|
terminate_linger_before_loop(State, TimerRef, Transport:messages())
|
|
end;
|
|
{error, _} ->
|
|
ok
|
|
end.
|
|
|
|
terminate_linger_before_loop(State, TimerRef, Messages) ->
|
|
%% We may already be in active mode when we do this
|
|
%% but it's OK because we are shutting down anyway.
|
|
%%
|
|
%% We specially handle the socket error to terminate
|
|
%% when an error occurs.
|
|
case setopts_active(State) of
|
|
ok ->
|
|
terminate_linger_loop(State, TimerRef, Messages);
|
|
{error, _} ->
|
|
ok
|
|
end.
|
|
|
|
terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
|
|
receive
|
|
{OK, Socket, _} when OK =:= element(1, Messages) ->
|
|
terminate_linger_loop(State, TimerRef, Messages);
|
|
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
|
ok;
|
|
{Error, Socket, _} when Error =:= element(3, Messages) ->
|
|
ok;
|
|
{Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
|
|
terminate_linger_before_loop(State, TimerRef, Messages);
|
|
{timeout, TimerRef, linger_timeout} ->
|
|
ok;
|
|
_ ->
|
|
terminate_linger_loop(State, TimerRef, Messages)
|
|
end.
|
|
|
|
%% @todo Don't send an RST_STREAM if one was already sent.
|
|
%%
|
|
%% When resetting the stream we are technically sending data
|
|
%% on the socket. However due to implementation complexities
|
|
%% we do not attempt to reset the idle timeout on send.
|
|
reset_stream(State0=#state{socket=Socket, transport=Transport,
|
|
http2_machine=HTTP2Machine0}, StreamID, Error) ->
|
|
Reason = case Error of
|
|
{internal_error, _, _} -> internal_error;
|
|
{stream_error, Reason0, _} -> Reason0
|
|
end,
|
|
ok = maybe_socket_error(State0, Transport:send(Socket,
|
|
cow_http2:rst_stream(StreamID, Reason))),
|
|
State1 = case cow_http2_machine:reset_stream(StreamID, HTTP2Machine0) of
|
|
{ok, HTTP2Machine} ->
|
|
terminate_stream(State0#state{http2_machine=HTTP2Machine}, StreamID, Error);
|
|
{error, not_found} ->
|
|
terminate_stream(State0, StreamID, Error)
|
|
end,
|
|
case reset_rate(State1) of
|
|
{ok, State} ->
|
|
State;
|
|
error ->
|
|
terminate(State1, {connection_error, enhance_your_calm,
|
|
'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'})
|
|
end.
|
|
|
|
reset_rate(State0=#state{reset_rate_num=Num0, reset_rate_time=Time}) ->
|
|
case Num0 - 1 of
|
|
0 ->
|
|
CurrentTime = erlang:monotonic_time(millisecond),
|
|
if
|
|
CurrentTime < Time ->
|
|
error;
|
|
true ->
|
|
%% When the option has a period of infinity we cannot reach this clause.
|
|
{ok, init_reset_rate_limiting(State0, CurrentTime)}
|
|
end;
|
|
Num ->
|
|
{ok, State0#state{reset_rate_num=Num}}
|
|
end.
|
|
|
|
stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) ->
|
|
case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
|
|
%% When the stream terminates normally (without sending RST_STREAM)
|
|
%% and no response was sent, we need to send a proper response back to the client.
|
|
%% We delay the termination of the stream until the response is fully sent.
|
|
{ok, idle, _} ->
|
|
info(stopping(State, StreamID), StreamID, {response, 204, #{}, <<>>});
|
|
%% When a response was sent but not terminated, we need to close the stream.
|
|
%% We delay the termination of the stream until the response is fully sent.
|
|
{ok, nofin, fin} ->
|
|
stopping(State, StreamID);
|
|
%% We only send a final DATA frame if there isn't one queued yet.
|
|
{ok, nofin, _} ->
|
|
info(stopping(State, StreamID), StreamID, {data, fin, <<>>});
|
|
%% When a response was sent fully we can terminate the stream,
|
|
%% regardless of the stream being in half-closed or closed state.
|
|
_ ->
|
|
terminate_stream(State, StreamID)
|
|
end.
|
|
|
|
stopping(State=#state{streams=Streams}, StreamID) ->
|
|
#{StreamID := Stream} = Streams,
|
|
State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}.
|
|
|
|
%% If we finished sending data and the stream is stopping, terminate it.
|
|
maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) ->
|
|
case Streams of
|
|
#{StreamID := #stream{status=stopping}} ->
|
|
terminate_stream(State, StreamID);
|
|
_ ->
|
|
State
|
|
end;
|
|
maybe_terminate_stream(State, _, _) ->
|
|
State.
|
|
|
|
%% When the stream stops normally without reading the request
|
|
%% body fully we need to tell the client to stop sending it.
|
|
%% We do this by sending an RST_STREAM with reason NO_ERROR. (RFC7540 8.1.0)
|
|
terminate_stream(State0=#state{socket=Socket, transport=Transport,
|
|
http2_machine=HTTP2Machine0}, StreamID) ->
|
|
State = case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine0) of
|
|
{ok, fin, _} ->
|
|
ok = maybe_socket_error(State0, Transport:send(Socket,
|
|
cow_http2:rst_stream(StreamID, no_error))),
|
|
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
|
|
State0#state{http2_machine=HTTP2Machine};
|
|
{error, closed} ->
|
|
State0
|
|
end,
|
|
terminate_stream(State, StreamID, normal).
|
|
|
|
%% We remove the stream flow from the connection flow. Any further
|
|
%% data received for this stream is therefore fully contained within
|
|
%% the extra window we allocated for this stream.
|
|
terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) ->
|
|
case maps:take(StreamID, Streams0) of
|
|
{#stream{flow=StreamFlow, state=StreamState}, Streams} ->
|
|
terminate_stream_handler(State, StreamID, Reason, StreamState),
|
|
Children = cowboy_children:shutdown(Children0, StreamID),
|
|
State#state{flow=Flow - StreamFlow, streams=Streams, children=Children};
|
|
error ->
|
|
State
|
|
end.
|
|
|
|
terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
|
|
try
|
|
cowboy_stream:terminate(StreamID, Reason, StreamState)
|
|
catch Class:Exception:Stacktrace ->
|
|
cowboy:log(cowboy_stream:make_error_log(terminate,
|
|
[StreamID, Reason, StreamState],
|
|
Class, Exception, Stacktrace), Opts)
|
|
end.
|
|
|
|
%% System callbacks.
|
|
|
|
-spec system_continue(_, _, {#state{}, binary()}) -> ok.
|
|
system_continue(_, _, {State, Buffer}) ->
|
|
loop(State, Buffer).
|
|
|
|
-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
|
|
system_terminate(Reason0, _, _, {State, Buffer}) ->
|
|
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
|
|
loop(initiate_closing(State, Reason), Buffer).
|
|
|
|
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
|
|
system_code_change(Misc, _, _, _) ->
|
|
{ok, Misc}.
|