0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 20:30:23 +00:00

Add options controlling initial control flow windows

This commit is contained in:
Loïc Hoguin 2018-04-26 22:08:05 +02:00
parent b2f16d462a
commit d38d86c4a9
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
4 changed files with 309 additions and 39 deletions

View file

@ -27,6 +27,8 @@
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
inactivity_timeout => timeout(),
initial_connection_window_size => 65535..16#7fffffff,
initial_stream_window_size => 0..16#7fffffff,
max_concurrent_streams => non_neg_integer() | infinity,
max_decode_table_size => non_neg_integer(),
max_encode_table_size => non_neg_integer(),
@ -171,6 +173,7 @@ init(Parent, Ref, Socket, Transport, Opts) ->
init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) ->
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
remote_window=maps:get(initial_connection_window_size, Opts, 65535),
parse_state={preface, sequence, preface_timeout(Opts)}},
State = settings_init(State0, Opts),
preface(State),
@ -186,6 +189,7 @@ init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) ->
init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer, _Settings, Req) ->
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
remote_window=maps:get(initial_connection_window_size, Opts, 65535),
parse_state={preface, sequence, preface_timeout(Opts)}},
%% @todo Apply settings.
%% StreamID from HTTP/1.1 Upgrade requests is always 1.
@ -209,10 +213,11 @@ settings_init(State, Opts) ->
header_table_size, 4096),
S1 = setting_from_opt(S0, Opts, max_concurrent_streams,
max_concurrent_streams, infinity),
%% @todo initial_window_size
S2 = setting_from_opt(S1, Opts, initial_stream_window_size,
initial_window_size, 65535),
%% @todo max_frame_size
%% @todo max_header_list_size
Settings = setting_from_opt(S1, Opts, enable_connect_protocol,
Settings = setting_from_opt(S2, Opts, enable_connect_protocol,
enable_connect_protocol, false),
State#state{next_settings=Settings}.
@ -222,9 +227,16 @@ setting_from_opt(Settings, Opts, OptName, SettingName, Default) ->
Value -> Settings#{SettingName => Value}
end.
preface(#state{socket=Socket, transport=Transport, next_settings=Settings}) ->
%% We send next_settings and use defaults until we get a ack.
Transport:send(Socket, cow_http2:settings(Settings)).
%% We send next_settings and use defaults until we get an ack.
%%
%% We also send a WINDOW_UPDATE frame for the connection when
%% the user specified an initial_connection_window_size.
preface(#state{socket=Socket, transport=Transport, opts=Opts, next_settings=Settings}) ->
MaybeWindowUpdate = case maps:get(initial_connection_window_size, Opts, 65535) of
65535 -> <<>>;
Size -> cow_http2:window_update(Size - 65535)
end,
Transport:send(Socket, [cow_http2:settings(Settings), MaybeWindowUpdate]).
preface_timeout(Opts) ->
case maps:get(preface_timeout, Opts, 5000) of
@ -348,11 +360,18 @@ frame(State=#state{client_streamid=LastStreamID}, {data, StreamID, _, _})
when StreamID > LastStreamID ->
terminate(State, {connection_error, protocol_error,
'DATA frame received on a stream in idle state. (RFC7540 5.1)'});
frame(State=#state{remote_window=ConnWindow}, {data, _, _, Data})
when byte_size(Data) > ConnWindow ->
terminate(State, {connection_error, flow_control_error,
'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'});
frame(State0=#state{remote_window=ConnWindow, streams=Streams, lingering_streams=Lingering},
{data, StreamID, IsFin, Data}) ->
DataLen = byte_size(Data),
State = State0#state{remote_window=ConnWindow - DataLen},
case lists:keyfind(StreamID, #stream.id, Streams) of
#stream{remote_window=StreamWindow} when StreamWindow < DataLen ->
stream_reset(State, StreamID, {stream_error, flow_control_error,
'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)'});
Stream = #stream{state=flush, remote=nofin, remote_window=StreamWindow} ->
after_commands(State, Stream#stream{remote=IsFin, remote_window=StreamWindow - DataLen});
Stream = #stream{state=StreamState0, remote=nofin, remote_window=StreamWindow} ->
@ -436,7 +455,7 @@ frame(State0=#state{socket=Socket, transport=Transport, opts=Opts,
State#state{encode_state=EncodeState};
(initial_window_size, NewWindowSize, State) ->
OldWindowSize = maps:get(initial_window_size, Settings0, 65535),
update_stream_windows(State, NewWindowSize - OldWindowSize);
update_streams_local_window(State, NewWindowSize - OldWindowSize);
(_, _, State) ->
State
end, State1, Settings);
@ -448,6 +467,9 @@ frame(State0=#state{local_settings=Local0, next_settings=NextSettings}, settings
(header_table_size, MaxSize, State=#state{decode_state=DecodeState0}) ->
DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0),
State#state{decode_state=DecodeState};
(initial_window_size, NewWindowSize, State) ->
OldWindowSize = maps:get(initial_window_size, Local0, 65535),
update_streams_remote_window(State, NewWindowSize - OldWindowSize);
(_, _, State) ->
State
end, State1, NextSettings);
@ -718,14 +740,22 @@ send_data(State=#state{streams=Streams}) ->
resume_streams(State, Streams, []).
%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
%% the stream windows for all active streams and perhaps resume
%% sending data.
update_stream_windows(State=#state{streams=Streams0}, Increment) ->
%% the local stream windows for all active streams and perhaps
%% resume sending data.
update_streams_local_window(State=#state{streams=Streams0}, Increment) ->
Streams = [
S#stream{local_window=StreamWindow + Increment}
|| S=#stream{local_window=StreamWindow} <- Streams0],
resume_streams(State, Streams, []).
%% When we receive an ack to a SETTINGS frame we sent we need to update
%% the remote stream windows for all active streams.
update_streams_remote_window(State=#state{streams=Streams0}, Increment) ->
Streams = [
S#stream{remote_window=StreamWindow + Increment}
|| S=#stream{remote_window=StreamWindow} <- Streams0],
State#state{streams=Streams}.
resume_streams(State, [], Acc) ->
State#state{streams=lists:reverse(Acc)};
%% While technically we should never get < 0 here, let's be on the safe side.