0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Add more flow control tests to rfc7540 and fix related issues

This commit is contained in:
Loïc Hoguin 2017-11-27 22:49:50 +01:00
parent 7f80ff28a5
commit 4cdd1aa70e
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
2 changed files with 104 additions and 7 deletions

View file

@ -400,10 +400,17 @@ frame(State=#state{client_streamid=LastStreamID}, {rst_stream, StreamID, _})
frame(State, {rst_stream, StreamID, Reason}) -> frame(State, {rst_stream, StreamID, Reason}) ->
stream_terminate(State, StreamID, {stream_error, Reason, 'Stream reset requested by client.'}); stream_terminate(State, StreamID, {stream_error, Reason, 'Stream reset requested by client.'});
%% SETTINGS frame. %% SETTINGS frame.
frame(State=#state{socket=Socket, transport=Transport, remote_settings=Settings0}, frame(State0=#state{socket=Socket, transport=Transport, remote_settings=Settings0},
{settings, Settings}) -> {settings, Settings}) ->
Transport:send(Socket, cow_http2:settings_ack()), Transport:send(Socket, cow_http2:settings_ack()),
State#state{remote_settings=maps:merge(Settings0, Settings)}; State = State0#state{remote_settings=maps:merge(Settings0, Settings)},
case Settings of
#{initial_window_size := NewWindowSize} ->
OldWindowSize = maps:get(initial_window_size, Settings0, 65535),
update_stream_windows(State, NewWindowSize - OldWindowSize);
_ ->
State
end;
%% Ack for a previously sent SETTINGS frame. %% Ack for a previously sent SETTINGS frame.
frame(State=#state{next_settings=_NextSettings}, settings_ack) -> frame(State=#state{next_settings=_NextSettings}, settings_ack) ->
%% @todo Apply SETTINGS that require synchronization. %% @todo Apply SETTINGS that require synchronization.
@ -426,7 +433,7 @@ frame(State, Frame={goaway, _, _, _}) ->
terminate(State, {stop, Frame, 'Client is going away.'}); terminate(State, {stop, Frame, 'Client is going away.'});
%% Connection-wide WINDOW_UPDATE frame. %% Connection-wide WINDOW_UPDATE frame.
frame(State=#state{local_window=ConnWindow}, {window_update, Increment}) frame(State=#state{local_window=ConnWindow}, {window_update, Increment})
when ConnWindow + Increment > 2147483647 -> when ConnWindow + Increment > 16#7fffffff ->
terminate(State, {connection_error, flow_control_error, terminate(State, {connection_error, flow_control_error,
'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}); 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'});
frame(State=#state{local_window=ConnWindow}, {window_update, Increment}) -> frame(State=#state{local_window=ConnWindow}, {window_update, Increment}) ->
@ -438,7 +445,7 @@ frame(State=#state{client_streamid=LastStreamID}, {window_update, StreamID, _})
'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'}); 'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'});
frame(State0=#state{streams=Streams0}, {window_update, StreamID, Increment}) -> frame(State0=#state{streams=Streams0}, {window_update, StreamID, Increment}) ->
case lists:keyfind(StreamID, #stream.id, Streams0) of case lists:keyfind(StreamID, #stream.id, Streams0) of
#stream{local_window=StreamWindow} when StreamWindow + Increment > 2147483647 -> #stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff ->
stream_reset(State0, StreamID, {stream_error, flow_control_error, stream_reset(State0, StreamID, {stream_error, flow_control_error,
'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}); 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'});
Stream0 = #stream{local_window=StreamWindow} -> Stream0 = #stream{local_window=StreamWindow} ->
@ -656,6 +663,15 @@ status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0,
send_data(State=#state{streams=Streams}) -> send_data(State=#state{streams=Streams}) ->
resume_streams(State, Streams, []). resume_streams(State, Streams, []).
%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
%% the stream windows for all active streams and perhaps resume
%% sending data.
update_stream_windows(State=#state{streams=Streams0}, Increment) ->
Streams = [
S#stream{local_window=StreamWindow + Increment}
|| S=#stream{local_window=StreamWindow} <- Streams0],
resume_streams(State, Streams, []).
resume_streams(State, [], Acc) -> resume_streams(State, [], Acc) ->
State#state{streams=lists:reverse(Acc)}; State#state{streams=lists:reverse(Acc)};
%% While technically we should never get < 0 here, let's be on the safe side. %% While technically we should never get < 0 here, let's be on the safe side.

View file

@ -2622,9 +2622,9 @@ window_update_reject_overflow(Config) ->
"window to exceed 2^31-1 must be rejected with a " "window to exceed 2^31-1 must be rejected with a "
"FLOW_CONTROL_ERROR connection error. (RFC7540 6.9.1)"), "FLOW_CONTROL_ERROR connection error. (RFC7540 6.9.1)"),
{ok, Socket} = do_handshake(Config), {ok, Socket} = do_handshake(Config),
%% Send connection-wide WINDOW_UPDATE frame that causes the window to overflow. %% Send a connection-wide WINDOW_UPDATE frame that causes the window to overflow.
ok = gen_tcp:send(Socket, [ ok = gen_tcp:send(Socket, [
cow_http2:window_update(2147483647) cow_http2:window_update(16#7fffffff)
]), ]),
%% Receive a FLOW_CONTROL_ERROR connection error. %% Receive a FLOW_CONTROL_ERROR connection error.
{ok, << _:24, 7:8, _:72, 3:32 >>} = gen_tcp:recv(Socket, 17, 6000), {ok, << _:24, 7:8, _:72, 3:32 >>} = gen_tcp:recv(Socket, 17, 6000),
@ -2645,8 +2645,89 @@ window_update_reject_overflow_stream(Config) ->
]), ]),
ok = gen_tcp:send(Socket, [ ok = gen_tcp:send(Socket, [
cow_http2:headers(1, fin, HeadersBlock), cow_http2:headers(1, fin, HeadersBlock),
cow_http2:window_update(1, 2147483647) cow_http2:window_update(1, 16#7fffffff)
]), ]),
%% Receive a FLOW_CONTROL_ERROR stream error. %% Receive a FLOW_CONTROL_ERROR stream error.
{ok, << _:24, 3:8, _:8, 1:32, 3:32 >>} = gen_tcp:recv(Socket, 13, 6000), {ok, << _:24, 3:8, _:8, 1:32, 3:32 >>} = gen_tcp:recv(Socket, 13, 6000),
ok. ok.
settings_initial_window_size_changes(Config) ->
doc("When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, the server "
"must adjust the size of the flow control windows of the active "
"streams. (RFC7540 6.9.2)"),
{ok, Socket} = do_handshake(Config),
%% Set SETTINGS_INITIAL_WINDOW_SIZE to 0 to prevent sending of DATA.
ok = gen_tcp:send(Socket, cow_http2:settings(#{initial_window_size => 0})),
%% Receive the SETTINGS ack.
{ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
%% Send a HEADERS frame.
{HeadersBlock, _} = cow_hpack:encode([
{<<":method">>, <<"GET">>},
{<<":scheme">>, <<"http">>},
{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
{<<":path">>, <<"/">>}
]),
ok = gen_tcp:send(Socket, cow_http2:headers(1, fin, HeadersBlock)),
%% Receive a response but no DATA frames are coming.
{ok, << SkipLen:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000),
{ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
{error, timeout} = gen_tcp:recv(Socket, 9, 1000),
%% Set SETTINGS_INITIAL_WINDOW_SIZE to a larger value.
ok = gen_tcp:send(Socket, cow_http2:settings(#{initial_window_size => 5})),
%% Receive the SETTINGS ack.
{ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
%% Receive a DATA frame of that size and no other.
{ok, << 5:24, 0:8, 0:8, 1:32, "Hello" >>} = gen_tcp:recv(Socket, 14, 1000),
{error, timeout} = gen_tcp:recv(Socket, 9, 1000),
%% Set SETTINGS_INITIAL_WINDOW_SIZE to exactly the size in the body.
ok = gen_tcp:send(Socket, cow_http2:settings(#{initial_window_size => 12})),
%% Receive the SETTINGS ack.
{ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
%% Receive the rest of the response.
{ok, << 7:24, 0:8, 1:8, 1:32, " world!" >>} = gen_tcp:recv(Socket, 16, 1000),
ok.
settings_initial_window_size_changes_negative(Config) ->
doc("When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, the server "
"must adjust the size of the flow control windows of the active "
"streams even if their window end up negative. (RFC7540 6.9.2)"),
{ok, Socket} = do_handshake(Config),
%% Set SETTINGS_INITIAL_WINDOW_SIZE to 5.
ok = gen_tcp:send(Socket, cow_http2:settings(#{initial_window_size => 5})),
%% Receive the SETTINGS ack.
{ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
%% Send a HEADERS frame.
{HeadersBlock, _} = cow_hpack:encode([
{<<":method">>, <<"GET">>},
{<<":scheme">>, <<"http">>},
{<<":authority">>, <<"localhost">>}, %% @todo Correct port number.
{<<":path">>, <<"/">>}
]),
ok = gen_tcp:send(Socket, cow_http2:headers(1, fin, HeadersBlock)),
%% Receive a response with a single DATA frame of the initial size we set.
{ok, << SkipLen:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000),
{ok, _} = gen_tcp:recv(Socket, SkipLen, 1000),
{ok, << 5:24, 0:8, 0:8, 1:32, "Hello" >>} = gen_tcp:recv(Socket, 14, 1000),
{error, timeout} = gen_tcp:recv(Socket, 9, 1000),
%% Set SETTINGS_INITIAL_WINDOW_SIZE to 0 to make the stream's window negative.
ok = gen_tcp:send(Socket, cow_http2:settings(#{initial_window_size => 0})),
%% Receive the SETTINGS ack.
{ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
%% Set SETTINGS_INITIAL_WINDOW_SIZE to exactly the size in the body.
ok = gen_tcp:send(Socket, cow_http2:settings(#{initial_window_size => 12})),
%% Receive the SETTINGS ack.
{ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000),
%% Receive the rest of the response.
{ok, << 7:24, 0:8, 1:8, 1:32, " world!" >>} = gen_tcp:recv(Socket, 16, 1000),
ok.
settings_initial_window_size_reject_overflow(Config) ->
doc("A SETTINGS_INITIAL_WINDOW_SIZE that causes a flow control window "
"to exceed 2^31-1 must be rejected with a FLOW_CONTROL_ERROR "
"connection error. (RFC7540 6.9.2)"),
{ok, Socket} = do_handshake(Config),
%% Set SETTINGS_INITIAL_WINDOW_SIZE to 2^31.
ok = gen_tcp:send(Socket, cow_http2:settings(#{initial_window_size => 16#80000000})),
%% Receive a FLOW_CONTROL_ERROR connection error.
{ok, << _:24, 7:8, _:72, 3:32 >>} = gen_tcp:recv(Socket, 17, 6000),
ok.