0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Fix and optimize sending of WINDOW_UPDATE frames

For long-running connections it was possible for the connection
window to become larger than allowed by the protocol because the
window increases claimed by stream handlers were never reclaimed
even if no data was consumed.

The new code applies heuristics to fix this and reduce the number
of WINDOW_UPDATE frames that are sent. It includes six new options
to control that behavior: margin, max and threshold for both the
connection and stream windows. The margin is some extra space
added on top of the requested read size. The max is the maximum
window size at any given time. The threshold is a minimum window
size that must be reached before we even consider sending more
WINDOW_UPDATE frames. We also avoid sending WINDOW_UPDATE frames
when there is already enough space in the window, or when the
read size is 0.
This commit is contained in:
Loïc Hoguin 2019-09-02 14:48:28 +02:00
parent aedf6379cc
commit eb4735ef7a
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
2 changed files with 84 additions and 36 deletions

View file

@ -15,7 +15,7 @@ CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl
LOCAL_DEPS = crypto LOCAL_DEPS = crypto
DEPS = cowlib ranch DEPS = cowlib ranch
dep_cowlib = git https://github.com/ninenines/cowlib 2.7.3 dep_cowlib = git https://github.com/ninenines/cowlib better-window-updates
dep_ranch = git https://github.com/ninenines/ranch 1.7.1 dep_ranch = git https://github.com/ninenines/ranch 1.7.1
DOC_DEPS = asciideck DOC_DEPS = asciideck

View file

@ -30,6 +30,8 @@
compress_buffering => boolean(), compress_buffering => boolean(),
compress_threshold => non_neg_integer(), compress_threshold => non_neg_integer(),
connection_type => worker | supervisor, connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
enable_connect_protocol => boolean(), enable_connect_protocol => boolean(),
env => cowboy_middleware:env(), env => cowboy_middleware:env(),
idle_timeout => timeout(), idle_timeout => timeout(),
@ -38,10 +40,12 @@
initial_stream_window_size => 0..16#7fffffff, initial_stream_window_size => 0..16#7fffffff,
logger => module(), logger => module(),
max_concurrent_streams => non_neg_integer() | infinity, max_concurrent_streams => non_neg_integer() | infinity,
max_connection_window_size => 0..16#7fffffff,
max_decode_table_size => non_neg_integer(), max_decode_table_size => non_neg_integer(),
max_encode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(),
max_frame_size_received => 16384..16777215, max_frame_size_received => 16384..16777215,
max_frame_size_sent => 16384..16777215 | infinity, max_frame_size_sent => 16384..16777215 | infinity,
max_stream_window_size => 0..16#7fffffff,
metrics_callback => cowboy_metrics_h:metrics_callback(), metrics_callback => cowboy_metrics_h:metrics_callback(),
middlewares => [module()], middlewares => [module()],
preface_timeout => timeout(), preface_timeout => timeout(),
@ -50,6 +54,8 @@
settings_timeout => timeout(), settings_timeout => timeout(),
shutdown_timeout => timeout(), shutdown_timeout => timeout(),
stream_handlers => [module()], stream_handlers => [module()],
stream_window_margin_size => 0..16#7fffffff,
stream_window_update_threshold => 0..16#7fffffff,
tracer_callback => cowboy_tracer_h:tracer_callback(), tracer_callback => cowboy_tracer_h:tracer_callback(),
tracer_match_specs => cowboy_tracer_h:tracer_match_specs(), tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
%% Open ended because configured stream handlers might add options. %% Open ended because configured stream handlers might add options.
@ -57,6 +63,17 @@
}. }.
-export_type([opts/0]). -export_type([opts/0]).
-record(stream, {
%% Whether the stream is currently stopping.
status = running :: running | stopping,
%% Flow requested for this stream.
flow = 0 :: non_neg_integer(),
%% Stream state.
state :: {module, any()}
}).
-record(state, { -record(state, {
parent = undefined :: pid(), parent = undefined :: pid(),
ref :: ranch:ref(), ref :: ranch:ref(),
@ -81,9 +98,12 @@
http2_status :: sequence | settings | upgrade | connected | closing, http2_status :: sequence | settings | upgrade | connected | closing,
http2_machine :: cow_http2_machine:http2_machine(), http2_machine :: cow_http2_machine:http2_machine(),
%% Flow requested for all streams.
flow = 0 :: non_neg_integer(),
%% Currently active HTTP/2 streams. Streams may be initiated either %% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames. %% by the client or by the server through PUSH_PROMISE frames.
streams = #{} :: #{cow_http2:streamid() => {running | stopping, {module, any()}}}, streams = #{} :: #{cow_http2:streamid() => #stream{}},
%% Streams can spawn zero or more children which are then managed %% Streams can spawn zero or more children which are then managed
%% by this module if operating as a supervisor. %% by this module if operating as a supervisor.
@ -259,8 +279,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame); maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data); data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
{ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} -> {ok, {lingering_data, StreamID, DataLen}, HTTP2Machine} ->
lingering_data_frame(State#state{http2_machine=HTTP2Machine}, DataLen); lingering_data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, DataLen);
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
headers_frame(State#state{http2_machine=HTTP2Machine}, headers_frame(State#state{http2_machine=HTTP2Machine},
StreamID, IsFin, Headers, PseudoHeaders, BodyLen); StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
@ -292,30 +312,38 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
end, end,
State. State.
data_frame(State=#state{opts=Opts, streams=Streams}, StreamID, IsFin, Data) -> data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) ->
case Streams of case Streams of
#{StreamID := {running, StreamState0}} -> #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} -> {Commands, StreamState} ->
commands(State#state{streams=Streams#{StreamID => {running, StreamState}}}, %% Remove the amount of data received from the flow.
StreamID, Commands) %% We may receive more data than we requested. We ensure
%% that the flow value doesn't go lower than 0.
Size = byte_size(Data),
State = update_window(State0#state{flow=max(0, Flow - Size),
streams=Streams#{StreamID => Stream#stream{
flow=max(0, StreamFlow - Size), state=StreamState}}},
StreamID),
commands(State, StreamID, Commands)
catch Class:Exception -> catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(data, cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0], [StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts), Class, Exception, erlang:get_stacktrace()), Opts),
reset_stream(State, StreamID, {internal_error, {Class, Exception}, reset_stream(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'}) 'Unhandled exception in cowboy_stream:data/4.'})
end; end;
%% We ignore DATA frames for streams that are stopping. %% We ignore DATA frames for streams that are stopping.
#{} -> #{} ->
State State0
end. end.
lingering_data_frame(State=#state{socket=Socket, transport=Transport, lingering_data_frame(State, _StreamID, _DataLen) ->
http2_machine=HTTP2Machine0}, DataLen) -> %% We do nothing when receiving a lingering DATA frame.
Transport:send(Socket, cow_http2:window_update(DataLen)), %% We already removed the stream flow from the connection
HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), %% flow and are therefore already accounting for the window
State#state{http2_machine=HTTP2Machine1}. %% being reduced by these frames.
State.
headers_frame(State, StreamID, IsFin, Headers, headers_frame(State, StreamID, IsFin, Headers,
PseudoHeaders=#{method := <<"CONNECT">>}, _) PseudoHeaders=#{method := <<"CONNECT">>}, _)
@ -410,7 +438,7 @@ headers_frame(State=#state{opts=Opts, streams=Streams}, StreamID, Req) ->
try cowboy_stream:init(StreamID, Req, Opts) of try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} -> {Commands, StreamState} ->
commands(State#state{ commands(State#state{
streams=Streams#{StreamID => {running, StreamState}}}, streams=Streams#{StreamID => #stream{state=StreamState}}},
StreamID, Commands) StreamID, Commands)
catch Class:Exception -> catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(init, cowboy:log(cowboy_stream:make_error_log(init,
@ -449,7 +477,7 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) ->
case maps:take(StreamID, Streams0) of case maps:take(StreamID, Streams0) of
{{_, StreamState}, Streams} -> {#stream{state=StreamState}, Streams} ->
terminate_stream_handler(State, StreamID, Reason, StreamState), terminate_stream_handler(State, StreamID, Reason, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID), Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=Streams, children=Children}; State#state{streams=Streams, children=Children};
@ -494,10 +522,10 @@ down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) -> info(State=#state{opts=Opts, streams=Streams}, StreamID, Msg) ->
case Streams of case Streams of
#{StreamID := {IsRunning, StreamState0}} -> #{StreamID := Stream=#stream{state=StreamState0}} ->
try cowboy_stream:info(StreamID, Msg, StreamState0) of try cowboy_stream:info(StreamID, Msg, StreamState0) of
{Commands, StreamState} -> {Commands, StreamState} ->
commands(State#state{streams=Streams#{StreamID => {IsRunning, StreamState}}}, commands(State#state{streams=Streams#{StreamID => Stream#stream{state=StreamState}}},
StreamID, Commands) StreamID, Commands)
catch Class:Exception -> catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(info, cowboy:log(cowboy_stream:make_error_log(info,
@ -586,15 +614,13 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma
State0 State0
end, end,
commands(State, StreamID, Tail); commands(State, StreamID, Tail);
commands(State=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, %% Read the request body.
StreamID, [{flow, Size}|Tail]) -> commands(State0=#state{flow=Flow, streams=Streams}, StreamID, [{flow, Size}|Tail]) ->
Transport:send(Socket, [ #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
cow_http2:window_update(Size), State = update_window(State0#state{flow=Flow + Size,
cow_http2:window_update(StreamID, Size) streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
]), StreamID),
HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), commands(State, StreamID, Tail);
HTTP2Machine = cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
commands(State#state{http2_machine=HTTP2Machine}, StreamID, Tail);
%% Supervise a child process. %% Supervise a child process.
commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
@ -628,6 +654,25 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
cowboy:log(Log, Opts), cowboy:log(Log, Opts),
commands(State, StreamID, Tail). commands(State, StreamID, Tail).
%% Tentatively update the window after the flow was updated.
update_window(State=#state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) ->
#{StreamID := #stream{flow=StreamFlow}} = Streams,
{Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of
ok -> {<<>>, HTTP2Machine0};
{ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1}
end,
{Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of
ok -> {<<>>, HTTP2Machine2};
{ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
end,
case {Data1, Data2} of
{<<>>, <<>>} -> ok;
_ -> Transport:send(Socket, [Data1, Data2])
end,
State#state{http2_machine=HTTP2Machine}.
%% Send the response, trailers or data. %% Send the response, trailers or data.
send_response(State0, StreamID, StatusCode, Headers, Body) -> send_response(State0, StreamID, StatusCode, Headers, Body) ->
@ -741,7 +786,7 @@ goaway(State, {goaway, _, Reason, _}) ->
%% Cancel client-initiated streams that are above LastStreamID. %% Cancel client-initiated streams that are above LastStreamID.
goaway_streams(_, [], _, _, Acc) -> goaway_streams(_, [], _, _, Acc) ->
Acc; Acc;
goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc) goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamID, Reason, Acc)
when StreamID > LastStreamID, (StreamID rem 2) =:= 0 -> when StreamID > LastStreamID, (StreamID rem 2) =:= 0 ->
terminate_stream_handler(State, StreamID, Reason, StreamState), terminate_stream_handler(State, StreamID, Reason, StreamState),
goaway_streams(State, Tail, LastStreamID, Reason, Acc); goaway_streams(State, Tail, LastStreamID, Reason, Acc);
@ -780,7 +825,7 @@ terminate_reason({internal_error, _, _}) -> internal_error.
terminate_all_streams(_, [], _) -> terminate_all_streams(_, [], _) ->
ok; ok;
terminate_all_streams(State, [{StreamID, {_, StreamState}}|Tail], Reason) -> terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
terminate_stream_handler(State, StreamID, Reason, StreamState), terminate_stream_handler(State, StreamID, Reason, StreamState),
terminate_all_streams(State, Tail, Reason). terminate_all_streams(State, Tail, Reason).
@ -820,13 +865,13 @@ stop_stream(State=#state{http2_machine=HTTP2Machine}, StreamID) ->
end. end.
stopping(State=#state{streams=Streams}, StreamID) -> stopping(State=#state{streams=Streams}, StreamID) ->
#{StreamID := {_, StreamState}} = Streams, #{StreamID := Stream} = Streams,
State#state{streams=Streams#{StreamID => {stopping, StreamState}}}. State#state{streams=Streams#{StreamID => Stream#stream{status=stopping}}}.
%% If we finished sending data and the stream is stopping, terminate it. %% If we finished sending data and the stream is stopping, terminate it.
maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) -> maybe_terminate_stream(State=#state{streams=Streams}, StreamID, fin) ->
case Streams of case Streams of
#{StreamID := {stopping, _}} -> #{StreamID := #stream{status=stopping}} ->
terminate_stream(State, StreamID); terminate_stream(State, StreamID);
_ -> _ ->
State State
@ -849,12 +894,15 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport,
end, end,
terminate_stream(State, StreamID, normal). terminate_stream(State, StreamID, normal).
terminate_stream(State=#state{streams=Streams0, children=Children0}, StreamID, Reason) -> %% We remove the stream flow from the connection flow. Any further
%% data received for this stream is therefore fully contained within
%% the extra window we allocated for this stream.
terminate_stream(State=#state{flow=Flow, streams=Streams0, children=Children0}, StreamID, Reason) ->
case maps:take(StreamID, Streams0) of case maps:take(StreamID, Streams0) of
{{_, StreamState}, Streams} -> {#stream{flow=StreamFlow, state=StreamState}, Streams} ->
terminate_stream_handler(State, StreamID, Reason, StreamState), terminate_stream_handler(State, StreamID, Reason, StreamState),
Children = cowboy_children:shutdown(Children0, StreamID), Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=Streams, children=Children}; State#state{flow=Flow - StreamFlow, streams=Streams, children=Children};
error -> error ->
State State
end. end.