0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 04:10:24 +00:00

Implement flow control for HTTP/1.1

We now stop reading from the socket unless asked to,
when we reach the request body. The option
initial_stream_flow_size controls how much data
we read without being asked, as an optimization.
We may also have received additional data along
with the request headers.

This commit also reworks the timeout handling for HTTP/1.1
because the stray timeout message was easily reproducible
after implementing the flow control. The issue should be
gone for good this time.
This commit is contained in:
Loïc Hoguin 2019-10-09 20:54:33 +02:00
parent 0c4103984b
commit cc54c207e3
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
3 changed files with 120 additions and 97 deletions

View file

@ -63,6 +63,13 @@ Cowboy 2.7 requires Erlang/OTP 20.0 or greater.
willing to accept. By default it will accept 10
stream resets every 10 seconds.
* Flow control for incoming data has been implemented
for HTTP/1.1. Cowboy will now wait for the user code
to ask for the request body before reading it from
the socket. The option `initial_stream_flow_size`
controls how much data Cowboy will read without
being asked.
* The HTTP/1.1 and HTTP/2 option `logger` is now
documented.
@ -154,7 +161,9 @@ Cowboy 2.7 requires Erlang/OTP 20.0 or greater.
was waiting for more data.
* It was possible for Cowboy to receive stray timeout messages
for HTTP/1.1 connections. This has been addressed.
for HTTP/1.1 connections, resulting in crashes. The timeout
handling in HTTP/1.1 has been reworked and the issue should
no longer occur.
* The type for the Req object has been updated to accept
custom fields as was already documented.

View file

@ -17,25 +17,26 @@ as a Ranch protocol.
[source,erlang]
----
opts() :: #{
chunked => boolean(),
connection_type => worker | supervisor,
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
linger_timeout => timeout(),
logger => module(),
max_empty_lines => non_neg_integer(),
max_header_name_length => non_neg_integer(),
max_header_value_length => non_neg_integer(),
max_headers => non_neg_integer(),
max_keepalive => non_neg_integer(),
max_method_length => non_neg_integer(),
max_request_line_length => non_neg_integer(),
max_skip_body_length => non_neg_integer(),
proxy_header => boolean(),
request_timeout => timeout(),
sendfile => boolean(),
stream_handlers => [module()]
chunked => boolean(),
connection_type => worker | supervisor,
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
initial_stream_flow_size => non_neg_integer(),
linger_timeout => timeout(),
logger => module(),
max_empty_lines => non_neg_integer(),
max_header_name_length => non_neg_integer(),
max_header_value_length => non_neg_integer(),
max_headers => non_neg_integer(),
max_keepalive => non_neg_integer(),
max_method_length => non_neg_integer(),
max_request_line_length => non_neg_integer(),
max_skip_body_length => non_neg_integer(),
proxy_header => boolean(),
request_timeout => timeout(),
sendfile => boolean(),
stream_handlers => [module()]
}
----
@ -79,6 +80,12 @@ inactivity_timeout (300000)::
Time in ms with nothing received at all before Cowboy closes the connection.
initial_stream_flow_size (65535)::
Amount of data in bytes Cowboy will read from the socket
right after a request was fully received. This is a soft
limit.
linger_timeout (1000)::
Time in ms that Cowboy will wait when closing the connection. This is
@ -144,7 +151,7 @@ Ordered list of stream handlers that will handle all stream events.
== Changelog
* *2.7*: The `logger` option was added.
* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
* *2.6*: The `chunked`, `http10_keepalive`, `proxy_header` and `sendfile` options were added.
* *2.5*: The `linger_timeout` option was added.
* *2.2*: The `max_skip_body_length` option was added.

View file

@ -33,6 +33,7 @@
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
initial_stream_flow_size => non_neg_integer(),
linger_timeout => timeout(),
logger => module(),
max_authority_length => non_neg_integer(),
@ -74,15 +75,9 @@
name = undefined :: binary() | undefined
}).
%% @todo We need a state where we wait for the stream process to ask for the body
%% and do not attempt to read from the socket while in that state (we should read
%% up to a certain length, and then wait, basically implementing flow control but
%% by not reading from the socket when the window is empty).
-record(ps_body, {
length :: non_neg_integer() | undefined,
received = 0 :: non_neg_integer(),
%% @todo flow
transfer_decode_fun :: fun(), %% @todo better type
transfer_decode_state :: any() %% @todo better type
}).
@ -136,6 +131,9 @@
%% Parsing state for the current stream or stream-to-be.
in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
%% Flow requested for the current stream.
flow = infinity :: non_neg_integer() | infinity,
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@ -174,12 +172,12 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
end,
case {Peer0, Sock0, Cert1} of
{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
LastStreamID = maps:get(max_keepalive, Opts, 100),
before_loop(set_timeout(#state{
State = #state{
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
last_streamid=LastStreamID}));
last_streamid=maps:get(max_keepalive, Opts, 100)},
before_loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
@ -191,15 +189,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
end.
%% Do not read from the socket unless flow is large enough.
before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
loop(State);
before_loop(State=#state{socket=Socket, transport=Transport}) ->
%% @todo disable this when we get to the body, until the stream asks for it?
%% Perhaps have a threshold for how much we're willing to read before waiting.
Transport:setopts(Socket, [{active, once}]),
loop(State).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
last_streamid=LastStreamID, streams=Streams}) ->
last_streamid=LastStreamID}) ->
Messages = Transport:messages(),
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
receive
@ -209,12 +208,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
before_loop(State);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
%% Only reset the timeout if it is idle_timeout (active streams).
State1 = case Streams of
[] -> State;
_ -> set_timeout(State)
end,
parse(<< Buffer/binary, Data/binary >>, State1);
parse(<< Buffer/binary, Data/binary >>, State);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@ -250,13 +244,23 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
%% We set request_timeout when there are no active streams,
%% and idle_timeout otherwise.
set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) ->
%% We do not set request_timeout if there are active streams.
set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
State;
%% We do not set request_timeout if we are skipping a body.
set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
State;
%% We do not set idle_timeout if there are no active streams,
%% unless when we are skipping a body.
set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
when element(1, InState) =/= ps_body ->
State;
%% Otherwise we can set the timeout.
set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
State = cancel_timeout(State0),
{Name, Default} = case Streams of
[] -> {request_timeout, 5000};
_ -> {idle_timeout, 60000}
Default = case Name of
request_timeout -> 5000;
idle_timeout -> 60000
end,
Timeout = case Override of
%% The timeout may have been overriden for the current stream.
@ -314,9 +318,6 @@ parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}})
State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
Headers, Name));
parse(Buffer, State=#state{in_state=#ps_body{}}) ->
%% @todo We do not want to get the body automatically if the request doesn't ask for it.
%% We may want to get bodies that are below a threshold without waiting, and buffer them
%% until the request asks, though.
after_parse(parse_body(Buffer, State)).
after_parse({request, Req=#{streamid := StreamID, method := Method,
@ -324,14 +325,15 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
Flow = maps:get(initial_stream_flow_size, Opts, 65535),
TE = maps:get(<<"te">>, Headers, undefined),
Streams = [#stream{id=StreamID, state=StreamState,
method=Method, version=Version, te=TE}|Streams0],
State1 = case maybe_req_close(State0, Headers, Version) of
close -> State0#state{streams=Streams, last_streamid=StreamID};
keepalive -> State0#state{streams=Streams}
close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
keepalive -> State0#state{streams=Streams, flow=Flow}
end,
State = set_timeout(State1),
State = set_timeout(State1, idle_timeout),
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(init,
@ -343,26 +345,40 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
end;
%% Streams are sequential so the body is always about the last stream created
%% unless that stream has terminated.
after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts, buffer=Buffer,
after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer,
streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}),
parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
State1 = set_timeout(State0, case IsFin of
fin -> request_timeout;
nofin -> idle_timeout
end),
State = update_flow(IsFin, Data, State1#state{streams=Streams}),
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
stream_reset(State, StreamID, {internal_error, {Class, Exception},
%% @todo Should call parse after this.
stream_terminate(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
after_parse({data, _, _, _, State}) ->
before_loop(State);
after_parse({data, _, IsFin, _, State}) ->
before_loop(set_timeout(State, case IsFin of
fin -> request_timeout;
nofin -> idle_timeout
end));
after_parse({more, State}) ->
before_loop(State).
before_loop(set_timeout(State, idle_timeout)).
update_flow(fin, _, State) ->
State#state{flow=infinity};
update_flow(nofin, Data, State=#state{flow=Flow0}) ->
State#state{flow=Flow0 - byte_size(Data)}.
%% Request-line.
@ -838,29 +854,25 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
%% @todo Proper trailers.
try TDecode(Buffer, TState0) of
more ->
%% @todo Asks for 0 or more bytes.
{more, State#state{buffer=Buffer}};
{more, Data, TState} ->
%% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{more, Data, _Length, TState} when is_integer(_Length) ->
%% @todo Asks for Length more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{more, Data, Rest, TState} ->
%% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer=Rest,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{done, _HasTrailers, Rest} ->
{data, StreamID, fin, <<>>, set_timeout(
State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})};
{data, StreamID, fin, <<>>,
State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}};
{done, Data, _HasTrailers, Rest} ->
{data, StreamID, fin, Data, set_timeout(
State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})}
{data, StreamID, fin, Data,
State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}
catch _:_ ->
Reason = {connection_error, protocol_error,
'Failure to decode the content. (RFC7230 4)'},
@ -896,7 +908,7 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
cowboy:log(cowboy_stream:make_error_log(info,
[StreamID, Msg, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
stream_reset(State, StreamID, {internal_error, {Class, Exception},
stream_terminate(State, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:info/3.'})
end;
false ->
@ -915,7 +927,7 @@ commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail
StreamID, Tail);
%% Error handling.
commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
commands(stream_reset(State, StreamID, Error), StreamID, Tail);
commands(stream_terminate(State, StreamID, Error), StreamID, Tail);
%% Commands for a stream currently inactive.
commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
when Current =/= StreamID ->
@ -927,13 +939,22 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
State#state{streams=Streams};
%% Read the request body.
commands(State, StreamID, [{flow, _Length}|Tail]) ->
%% @todo We only read from socket if buffer is empty, otherwise
%% we decode the buffer.
%% @todo Set the body reading length to min(Length, BodyLength)
commands(State, StreamID, Tail);
commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
[{flow, Size}|Tail]) ->
%% We must read *at least* Size of data otherwise functions
%% like cowboy_req:read_body/1,2 will wait indefinitely.
Flow = if
Flow0 < 0 -> Size;
true -> Flow0 + Size
end,
%% Reenable active mode if necessary.
_ = if
Flow0 =< 0, Flow > 0 ->
Transport:setopts(Socket, [{active, once}]);
true ->
ok
end,
commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
[{error_response, Status, Headers0, Body}|Tail]) ->
@ -1100,7 +1121,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo This should be the last stream running otherwise we need to wait before switching.
%% @todo If there's streams opened after this one, fail instead of 101.
State = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
@ -1127,7 +1147,8 @@ commands(State0=#state{overriden_opts=Opts},
StreamID, [{set_options, SetOpts}|Tail]) ->
State1 = case SetOpts of
#{idle_timeout := IdleTimeout} ->
set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}});
set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
idle_timeout);
_ ->
State0
end,
@ -1209,16 +1230,6 @@ maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
maybe_terminate(State, StreamID, _Tail) ->
stream_terminate(State, StreamID, normal).
stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
%% @todo headers
%% @todo Don't send this if there are no streams left.
% Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
% {<<"content-length">>, <<"0">>}
% ])),
%% @todo update IsFin local
% stream_terminate(State#state{out_state=done}, StreamID, StreamError).
stream_terminate(State, StreamID, StreamError).
stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
children=Children0}, StreamID, Reason) ->
@ -1241,15 +1252,12 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
%% Remove the stream from the state and reset the overriden options.
{value, #stream{state=StreamState}, Streams}
= lists:keytake(StreamID, #stream.id, Streams1),
State2 = State1#state{streams=Streams, overriden_opts=#{}},
State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity},
%% Stop the stream.
stream_call_terminate(StreamID, Reason, StreamState, State2),
Children = cowboy_children:shutdown(Children0, StreamID),
%% We reset the timeout if there are no active streams anymore.
State = case Streams of
[] -> set_timeout(State2);
_ -> State2
end,
State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout),
%% We want to drop the connection if the body was not read fully
%% and we don't know its length or more remains to be read than
%% configuration allows.
@ -1258,10 +1266,10 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
case InState of
#ps_body{length=undefined}
when InStreamID =:= OutStreamID ->
terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
terminate(State, skip_body_unknown_length);
#ps_body{length=Len, received=Received}
when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
terminate(State, skip_body_too_large);
_ ->
%% Move on to the next stream.
NextOutStreamID = OutStreamID + 1,
@ -1269,12 +1277,11 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
false ->
%% @todo This is clearly wrong, if the stream is gone we need to check if
%% there used to be such a stream, and if there was to send an error.
State#state{out_streamid=NextOutStreamID, out_state=wait,
streams=Streams, children=Children};
State#state{out_streamid=NextOutStreamID, out_state=wait};
#stream{queue=Commands} ->
%% @todo Remove queue from the stream.
commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
streams=Streams, children=Children}, NextOutStreamID, Commands)
commands(State#state{out_streamid=NextOutStreamID, out_state=wait},
NextOutStreamID, Commands)
end
end.