0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 04:10:24 +00:00

Fix and optimize sending of WINDOW_UPDATE frames

For long-running connections it was possible for the connection
window to become larger than allowed by the protocol because the
window increases claimed by stream handlers were never reclaimed
even if no data was consumed.

The new code applies heuristics to fix this and reduce the number
of WINDOW_UPDATE frames that are sent. It includes six new options
to control that behavior: margin, max and threshold for both the
connection and stream windows. The margin is some extra space
added on top of the requested read size. The max is the maximum
window size at any given time. The threshold is a minimum window
size that must be reached before we even consider sending more
WINDOW_UPDATE frames. We also avoid sending WINDOW_UPDATE frames
when there is already enough space in the window, or when the
read size is 0.

Cowlib is set to master until a new tag is done.
This commit is contained in:
Loïc Hoguin 2019-09-02 14:48:28 +02:00
parent aedf6379cc
commit 48f417ac8f
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
6 changed files with 136 additions and 93 deletions

View file

@ -15,7 +15,7 @@ CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl
LOCAL_DEPS = crypto
DEPS = cowlib ranch
dep_cowlib = git https://github.com/ninenines/cowlib 2.7.3
dep_cowlib = git https://github.com/ninenines/cowlib master
dep_ranch = git https://github.com/ninenines/ranch 1.7.1
DOC_DEPS = asciideck

View file

@ -18,21 +18,27 @@ as a Ranch protocol.
----
opts() :: #{
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
enable_connect_protocol => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
initial_connection_window_size => 65535..16#7fffffff,
initial_stream_window_size => 0..16#7fffffff,
max_concurrent_streams => non_neg_integer() | infinity,
max_connection_window_size => 0..16#7fffffff,
max_decode_table_size => non_neg_integer(),
max_encode_table_size => non_neg_integer(),
max_frame_size_received => 16384..16777215,
max_frame_size_sent => 16384..16777215 | infinity,
max_stream_window_size => 0..16#7fffffff,
preface_timeout => timeout(),
proxy_header => boolean(),
sendfile => boolean(),
settings_timeout => timeout(),
stream_handlers => [module()]
stream_handlers => [module()],
stream_window_margin_size => 0..16#7fffffff,
stream_window_update_threshold => 0..16#7fffffff
}
----
@ -51,6 +57,19 @@ connection_type (supervisor)::
Whether the connection process also acts as a supervisor.
connection_window_margin_size (65535)::
Extra amount to be added to the window size when
updating the connection window. This is used to
ensure that there is always some space available in
the window.
connection_window_update_threshold (163840)::
The connection window will only get updated when its size
becomes lower than this threshold. This is to avoid sending
too many `WINDOW_UPDATE` frames.
enable_connect_protocol (false)::
Whether to enable the extended CONNECT method to allow
@ -84,6 +103,12 @@ max_concurrent_streams (infinity)::
Maximum number of concurrent streams allowed on the connection.
max_connection_window_size (16#7fffffff)::
Maximum connection window size. This is used as an upper bound
when calculating the window size, either when reading the request
body or receiving said body.
max_decode_table_size (4096)::
Maximum header table size used by the decoder. This is the value advertised
@ -111,6 +136,12 @@ following the client's advertised maximum.
Note that actual frame sizes may be lower than the limit when
there is not enough space left in the flow control window.
max_stream_window_size (16#7fffffff)::
Maximum stream window size. This is used as an upper bound
when calculating the window size, either when reading the request
body or receiving said body.
preface_timeout (5000)::
Time in ms Cowboy is willing to wait for the connection preface.
@ -135,8 +166,27 @@ stream_handlers ([cowboy_stream_h])::
Ordered list of stream handlers that will handle all stream events.
stream_window_margin_size (65535)::
Extra amount to be added to the window size when
updating a stream's window. This is used to
ensure that there is always some space available in
the window.
stream_window_update_threshold (163840)::
A stream's window will only get updated when its size
becomes lower than this threshold. This is to avoid sending
too many `WINDOW_UPDATE` frames.
== Changelog
* *2.7*: Add the options `connection_window_margin_size`,
`connection_window_update_threshold`,
`max_connection_window_size`, `max_stream_window_size`,
`stream_window_margin_size` and
`stream_window_update_threshold` to configure
behavior on sending WINDOW_UPDATE frames.
* *2.6*: The `proxy_header` and `sendfile` options were added.
* *2.4*: Add the options `initial_connection_window_size`,
`initial_stream_window_size`, `max_concurrent_streams`,

View file

@ -1,4 +1,4 @@
{deps, [
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.7.3"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.7.1"}}
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","master"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.7.1"}}
]}.
{erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard,warn_missing_spec,warn_untyped_record]}.

View file

@ -30,6 +30,8 @@
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
idle_timeout => timeout(),
@ -38,10 +40,12 @@
initial_stream_window_size => 0..16#7fffffff,
logger => module(),
max_concurrent_streams => non_neg_integer() | infinity,
max_connection_window_size => 0..16#7fffffff,
max_decode_table_size => non_neg_integer(),
max_encode_table_size => non_neg_integer(),
max_frame_size_received => 16384..16777215,
max_frame_size_sent => 16384..16777215 | infinity,
max_stream_window_size => 0..16#7fffffff,
metrics_callback => cowboy_metrics_h:metrics_callback(),
middlewares => [module()],
preface_timeout => timeout(),
@ -50,6 +54,8 @@
settings_timeout => timeout(),
shutdown_timeout => timeout(),
stream_handlers => [module()],
stream_window_margin_size => 0..16#7fffffff,
stream_window_update_threshold => 0..16#7fffffff,
tracer_callback => cowboy_tracer_h:tracer_callback(),
tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
%% Open ended because configured stream handlers might add options.
@ -57,6 +63,17 @@
}.
-export_type([opts/0]).
-record(stream, {
%% Whether the stream is currently stopping.
status = running :: running | stopping,
%% Flow requested for this stream.
flow = 0 :: non_neg_integer(),
%% Stream state.
state :: {module, any()}
}).
-record(state, {
parent = undefined :: pid(),
ref :: ranch:ref(),
@ -81,9 +98,12 @@
http2_status :: sequence | settings | upgrade | connected | closing,
http2_machine :: cow_http2_machine:http2_machine(),
%% Flow requested for all streams.
flow = 0 :: non_neg_integer(),
%% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
streams = #{} :: #{cow_http2:streamid() => {running | stopping, {module, any()}}},
streams = #{} :: #{cow_http2:streamid() => #stream{}},
%% Streams can spawn zero or more children which are then managed
%% by this module if operating as a supervisor.
@ -259,8 +279,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
{ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} ->
lingering_data_frame(State#state{http2_machine=HTTP2Machine}, DataLen);
{ok, {lingering_data, StreamID, DataLen}, HTTP2Machine} ->
lingering_data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, DataLen);
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
headers_frame(State#state{http2_machine=HTTP2Machine},
StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
@ -292,30 +312,38 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
end,
State.
data_frame(State=#state{opts=Opts, streams=Streams}, StreamID, IsFin, Data) ->
data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) ->
case Streams of
#{StreamID := {running, StreamState0}} ->
#{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
commands(State#state{streams=Streams#{StreamID => {running, StreamState}}},
StreamID, Commands)
%% Remove the amount of data received from the flow.
%% We may receive more data than we requested. We ensure
%% that the flow value doesn't go lower than 0.
Size = byte_size(Data),
State = update_window(State0#state{flow=max(0, Flow - Size),
streams=Streams#{StreamID => Stream#stream{
flow=max(0, StreamFlow - Size), state=StreamState}}},
StreamID),
commands(State, StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
reset_stream(State, StreamID, {internal_error, {Class, Exception},
reset_stream(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
%% We ignore DATA frames for streams that are stopping.
#{} ->
State
State0
end.
lingering_data_frame(State=#state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, DataLen) ->
Transport:send(Socket, cow_http2:window_update(DataLen)),
HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0),
State#state{http2_machine=HTTP2Machine1}.
lingering_data_frame(State, _StreamID, _DataLen) ->
%% We do nothing when receiving a lingering DATA frame.
%% We already removed the stream flow from the connection
%% flow and are therefore already accounting for the window
%% being reduced by these frames.
State.
headers_frame(State, StreamID, IsFin, Headers,
PseudoHeaders=#{method := <<"CONNECT">>}, _)
@ -410,7 +438,7 @@ headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
commands(State#state{
streams=Streams#{StreamID => {running, StreamState}}},
streams=Streams#{StreamID => #stream{state=StreamState}}},
StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(init,
@ -449,7 +477,7 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
case maps:take(StreamID, Streams0) of
{{_, StreamState}, Streams} ->
{#stream{state=StreamState}, Streams} ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=Streams, children=Children};
@ -494,10 +522,10 @@ down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) ->
case Streams of
#{StreamID := {IsRunning, StreamState0}} ->
#{StreamID := Stream=#stream{state=StreamState0}} ->
try cowboy_stream:info(StreamID, Msg, StreamState0) of
{Commands, StreamState} ->
commands(State#state{streams=Streams#{StreamID => {IsRunning, StreamState}}},
commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}},
StreamID, Commands)
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(info,
@ -586,15 +614,13 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma
State0
end,
commands(State, StreamID, Tail);
commands(State=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
StreamID, [{flow, Size}|Tail]) ->
Transport:send(Socket, [
cow_http2:window_update(Size),
cow_http2:window_update(StreamID, Size)
]),
HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
HTTP2Machine = cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
commands(State#state{http2_machine=HTTP2Machine}, StreamID, Tail);
%% Read the request body.
commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) ->
#{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
State = update_window(State0#state{flow=Flow + Size,
streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
StreamID),
commands(State, StreamID, Tail);
%% Supervise a child process.
commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
@ -628,6 +654,25 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
cowboy:log(Log, Opts),
commands(State, StreamID, Tail).
%% Tentatively update the window after the flow was updated.
update_window(State=#state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) ->
#{StreamID := #stream{flow=StreamFlow}} = Streams,
{Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of
ok -> {<<>>, HTTP2Machine0};
{ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
end,
{Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of
ok -> {<<>>, HTTP2Machine2};
{ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
end,
case {Data1, Data2} of
{<<>>, <<>>} -> ok;
_ -> Transport:send(Socket, [Data1, Data2])
end,
State#state{http2_machine=HTTP2Machine}.
%% Send the response, trailers or data.
send_response(State0, StreamID, StatusCode, Headers, Body) ->
@ -741,7 +786,7 @@ goaway(State, {goaway, _, Reason, _}) ->
%% Cancel client-initiated streams that are above LastStreamID.
goaway_streams(_, [], _, _, Acc) ->
Acc;
goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc)
goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc)
when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
goaway_streams(State, Tail, LastStreamID, Reason, Acc);
@ -780,7 +825,7 @@ terminate_reason({internal_error, _, _}) -> internal_error.
terminate_all_streams(_, [], _) ->
ok;
terminate_all_streams(State, [{StreamID, {_, StreamState}}|Tail], Reason) ->
terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
terminate_all_streams(State, Tail, Reason).
@ -820,13 +865,13 @@ stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) ->
end.
stopping(State=#state{streams=Streams}, StreamID) ->
#{StreamID := {_, StreamState}} = Streams,
State#state{streams=Streams#{StreamID => {stopping, StreamState}}}.
#{StreamID := Stream} = Streams,
State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}.
%% If we finished sending data and the stream is stopping, terminate it.
maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) ->
case Streams of
#{StreamID := {stopping, _}} ->
#{StreamID := #stream{status=stopping}} ->
terminate_stream(State, StreamID);
_ ->
State
@ -849,12 +894,15 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport,
end,
terminate_stream(State, StreamID, normal).
terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
%% We remove the stream flow from the connection flow. Any further
%% data received for this stream is therefore fully contained within
%% the extra window we allocated for this stream.
terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) ->
case maps:take(StreamID, Streams0) of
{{_, StreamState}, Streams} ->
{#stream{flow=StreamFlow, state=StreamState}, Streams} ->
terminate_stream_handler(State, StreamID, Reason, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=Streams, children=Children};
State#state{flow=Flow - StreamFlow, streams=Streams, children=Children};
error ->
State
end.

View file

@ -3117,56 +3117,6 @@ data_reject_overflow_stream(Config0) ->
cowboy:stop_listener(?FUNCTION_NAME)
end.
lingering_data_counts_toward_connection_window(Config0) ->
doc("DATA frames received after sending RST_STREAM must be counted "
"toward the connection flow-control window. (RFC7540 5.1)"),
Config = cowboy_test:init_http(?FUNCTION_NAME, #{
env => #{dispatch => cowboy_router:compile(init_routes(Config0))},
initial_connection_window_size => 100000
}, Config0),
try
%% We need to do the handshake manually because a WINDOW_UPDATE
%% frame will be sent to update the connection window.
{ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]),
%% Send a valid preface.
ok = gen_tcp:send(Socket, ["PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", cow_http2:settings(#{})]),
%% Receive the server preface.
{ok, << Len1:24 >>} = gen_tcp:recv(Socket, 3, 1000),
{ok, << 4:8, 0:40, _:Len1/binary >>} = gen_tcp:recv(Socket, 6 + Len1, 1000),
%% Send the SETTINGS ack.
ok = gen_tcp:send(Socket, cow_http2:settings_ack()),
%% Receive the WINDOW_UPDATE for the connection.
{ok, << 4:24, 8:8, 0:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000),
%% Receive the SETTINGS ack.
{ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
Headers = [
{<<":method">>, <<"POST">>},
{<<":scheme">>, <<"http">>},
{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
{<<":path">>, <<"/loop_handler_abort">>}
],
{HeadersBlock, _} = cow_hpack:encode(Headers),
ok = gen_tcp:send(Socket, [
cow_http2:headers(1, nofin, HeadersBlock),
cow_http2:data(1, nofin, <<0:1000/unit:8>>)
]),
% Make sure server send RST_STREAM.
timer:sleep(100),
ok = gen_tcp:send(Socket, [
cow_http2:data(1, nofin, <<0:0/unit:8>>),
cow_http2:data(1, fin, <<0:1000/unit:8>>)
]),
{ok, << SkipLen:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000),
% Skip the header.
{ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
% Skip RST_STREAM.
{ok, << 4:24, 3:8, 1:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000),
% Received a WINDOW_UPDATE frame after we got RST_STREAM.
{ok, << 4:24, 8:8, 0:40, 1000:32 >>} = gen_tcp:recv(Socket, 13, 1000)
after
cowboy:stop_listener(?FUNCTION_NAME)
end.
%% (RFC7540 6.9.1)
% Frames with zero length with the END_STREAM flag set (that
% is, an empty DATA frame) MAY be sent if there is no available space

View file

@ -389,15 +389,10 @@ accept_handshake_when_enabled(Config) ->
{RespHeaders, _} = cow_hpack:decode(RespHeadersBlock),
{_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders),
%% Masked text hello echoed back clear by the server.
%%
%% We receive WINDOW_UPDATE frames before the actual data
%% due to flow control updates every time a data frame is received.
Mask = 16#37fa213d,
MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>),
ok = gen_tcp:send(Socket, cow_http2:data(1, nofin,
<<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)),
{ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000),
{ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000),
{ok, <<Len2:24, _:8, _:8, _:32>>} = gen_tcp:recv(Socket, 9, 1000),
{ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000),
ok.