mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Preliminary h2 flow control support
Existing tests pass. A number of things remain to be done. Has only been tested with Gun so far. Feedback welcome!
This commit is contained in:
parent
304e3efbf8
commit
f3e5f3e410
5 changed files with 232 additions and 64 deletions
|
@ -1,7 +1,7 @@
|
||||||
{application, 'cowboy', [
|
{application, 'cowboy', [
|
||||||
{description, "Small, fast, modern HTTP server."},
|
{description, "Small, fast, modern HTTP server."},
|
||||||
{vsn, "2.0.0-pre.8"},
|
{vsn, "2.0.0-pre.8"},
|
||||||
{modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']},
|
{modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']},
|
||||||
{registered, [cowboy_sup,cowboy_clock]},
|
{registered, [cowboy_sup,cowboy_clock]},
|
||||||
{applications, [kernel,stdlib,crypto,cowlib,ranch]},
|
{applications, [kernel,stdlib,crypto,cowlib,ranch]},
|
||||||
{mod, {cowboy_app, []}},
|
{mod, {cowboy_app, []}},
|
||||||
|
|
|
@ -36,11 +36,20 @@
|
||||||
-record(stream, {
|
-record(stream, {
|
||||||
id = undefined :: cowboy_stream:streamid(),
|
id = undefined :: cowboy_stream:streamid(),
|
||||||
%% Stream handlers and their state.
|
%% Stream handlers and their state.
|
||||||
state = undefined :: {module(), any()},
|
state = undefined :: {module(), any()} | flush,
|
||||||
%% Whether we finished sending data.
|
%% Whether we finished sending data.
|
||||||
local = idle :: idle | upgrade | cowboy_stream:fin(),
|
local = idle :: idle | upgrade | cowboy_stream:fin() | flush,
|
||||||
|
%% Local flow control window (how much we can send).
|
||||||
|
local_window :: integer(),
|
||||||
|
%% Buffered data waiting for the flow control window to increase.
|
||||||
|
local_buffer = queue:new() :: queue:queue(
|
||||||
|
{cowboy_stream:fin(), non_neg_integer(), iolist()
|
||||||
|
| {sendfile, non_neg_integer(), pos_integer(), file:name_all()}}),
|
||||||
|
local_buffer_size = 0 :: non_neg_integer(),
|
||||||
%% Whether we finished receiving data.
|
%% Whether we finished receiving data.
|
||||||
remote = nofin :: cowboy_stream:fin(),
|
remote = nofin :: cowboy_stream:fin(),
|
||||||
|
%% Remote flow control window (how much we accept to receive).
|
||||||
|
remote_window :: integer(),
|
||||||
%% Request body length.
|
%% Request body length.
|
||||||
body_length = 0 :: non_neg_integer()
|
body_length = 0 :: non_neg_integer()
|
||||||
}).
|
}).
|
||||||
|
@ -67,7 +76,7 @@
|
||||||
% header_table_size => 4096,
|
% header_table_size => 4096,
|
||||||
% enable_push => false, %% We are the server. Push is never enabled.
|
% enable_push => false, %% We are the server. Push is never enabled.
|
||||||
% max_concurrent_streams => infinity,
|
% max_concurrent_streams => infinity,
|
||||||
% initial_window_size => 65535,
|
initial_window_size => 65535,
|
||||||
max_frame_size => 16384
|
max_frame_size => 16384
|
||||||
% max_header_list_size => infinity
|
% max_header_list_size => infinity
|
||||||
} :: map(),
|
} :: map(),
|
||||||
|
@ -75,7 +84,13 @@
|
||||||
%% We need to be careful there. It's well possible that we send
|
%% We need to be careful there. It's well possible that we send
|
||||||
%% two SETTINGS frames before we receive a SETTINGS ack.
|
%% two SETTINGS frames before we receive a SETTINGS ack.
|
||||||
next_settings = #{} :: undefined | map(), %% @todo perhaps set to undefined by default
|
next_settings = #{} :: undefined | map(), %% @todo perhaps set to undefined by default
|
||||||
remote_settings = #{} :: map(),
|
remote_settings = #{
|
||||||
|
initial_window_size => 65535
|
||||||
|
} :: map(),
|
||||||
|
|
||||||
|
%% Connection-wide flow control window.
|
||||||
|
local_window = 65535 :: integer(), %% How much we can send.
|
||||||
|
remote_window = 65535 :: integer(), %% How much we accept to receive.
|
||||||
|
|
||||||
%% Stream identifiers.
|
%% Stream identifiers.
|
||||||
client_streamid = 0 :: non_neg_integer(),
|
client_streamid = 0 :: non_neg_integer(),
|
||||||
|
@ -269,17 +284,22 @@ parse_settings_preface(State, _, _, _) ->
|
||||||
%% and terminate the stream if this is the end of it.
|
%% and terminate the stream if this is the end of it.
|
||||||
|
|
||||||
%% DATA frame.
|
%% DATA frame.
|
||||||
frame(State=#state{streams=Streams}, {data, StreamID, IsFin0, Data}) ->
|
frame(State=#state{remote_window=ConnWindow, streams=Streams},
|
||||||
|
{data, StreamID, IsFin0, Data}) ->
|
||||||
case lists:keyfind(StreamID, #stream.id, Streams) of
|
case lists:keyfind(StreamID, #stream.id, Streams) of
|
||||||
Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} ->
|
Stream = #stream{state=StreamState0, remote=nofin,
|
||||||
Len = Len0 + byte_size(Data),
|
remote_window=StreamWindow, body_length=Len0} ->
|
||||||
|
DataLen = byte_size(Data),
|
||||||
|
Len = Len0 + DataLen,
|
||||||
IsFin = case IsFin0 of
|
IsFin = case IsFin0 of
|
||||||
fin -> {fin, Len};
|
fin -> {fin, Len};
|
||||||
nofin -> nofin
|
nofin -> nofin
|
||||||
end,
|
end,
|
||||||
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
|
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
|
||||||
{Commands, StreamState} ->
|
{Commands, StreamState} ->
|
||||||
commands(State, Stream#stream{state=StreamState, body_length=Len}, Commands)
|
commands(State#state{remote_window=ConnWindow - DataLen},
|
||||||
|
Stream#stream{state=StreamState, remote_window=StreamWindow - DataLen,
|
||||||
|
body_length=Len}, Commands)
|
||||||
catch Class:Reason ->
|
catch Class:Reason ->
|
||||||
error_logger:error_msg("Exception occurred in "
|
error_logger:error_msg("Exception occurred in "
|
||||||
"cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
|
"cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
|
||||||
|
@ -342,13 +362,22 @@ frame(State, {ping_ack, _Opaque}) ->
|
||||||
frame(State, Frame={goaway, _, _, _}) ->
|
frame(State, Frame={goaway, _, _, _}) ->
|
||||||
terminate(State, {stop, Frame, 'Client is going away.'});
|
terminate(State, {stop, Frame, 'Client is going away.'});
|
||||||
%% Connection-wide WINDOW_UPDATE frame.
|
%% Connection-wide WINDOW_UPDATE frame.
|
||||||
frame(State, {window_update, _Increment}) ->
|
frame(State=#state{local_window=ConnWindow}, {window_update, Increment}) ->
|
||||||
%% @todo control flow
|
send_data(State#state{local_window=ConnWindow + Increment});
|
||||||
State;
|
|
||||||
%% Stream-specific WINDOW_UPDATE frame.
|
%% Stream-specific WINDOW_UPDATE frame.
|
||||||
frame(State, {window_update, _StreamID, _Increment}) ->
|
frame(State0=#state{streams=Streams0}, {window_update, StreamID, Increment}) ->
|
||||||
%% @todo stream-specific control flow
|
case lists:keyfind(StreamID, #stream.id, Streams0) of
|
||||||
State;
|
Stream0 = #stream{local_window=StreamWindow} ->
|
||||||
|
{State, Stream} = send_data(State0,
|
||||||
|
Stream0#stream{local_window=StreamWindow + Increment}),
|
||||||
|
Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
|
||||||
|
State#state{streams=Streams};
|
||||||
|
false ->
|
||||||
|
%% @todo Receiving this frame on a stream in the idle state is an error.
|
||||||
|
%% WINDOW_UPDATE frames may be received for a short period of time
|
||||||
|
%% after a stream is closed. They must be ignored.
|
||||||
|
State0
|
||||||
|
end;
|
||||||
%% Unexpected CONTINUATION frame.
|
%% Unexpected CONTINUATION frame.
|
||||||
frame(State, {continuation, _, _, _}) ->
|
frame(State, {continuation, _, _, _}) ->
|
||||||
terminate(State, {connection_error, protocol_error,
|
terminate(State, {connection_error, protocol_error,
|
||||||
|
@ -419,11 +448,8 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta
|
||||||
[{sendfile, fin, O, B, P}|Tail]);
|
[{sendfile, fin, O, B, P}|Tail]);
|
||||||
_ ->
|
_ ->
|
||||||
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
|
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
|
||||||
%% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
|
{State1, Stream1} = send_data(State, Stream#stream{local=nofin}, fin, Body),
|
||||||
%% Use the length set by the server instead, if any.
|
commands(State1#state{encode_state=EncodeState}, Stream1, Tail)
|
||||||
%% @todo Would be better if we didn't have to convert to binary.
|
|
||||||
send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384),
|
|
||||||
commands(State#state{encode_state=EncodeState}, Stream#stream{local=fin}, Tail)
|
|
||||||
end;
|
end;
|
||||||
%% @todo response when local!=idle
|
%% @todo response when local!=idle
|
||||||
%% Send response headers and initiate chunked encoding.
|
%% Send response headers and initiate chunked encoding.
|
||||||
|
@ -445,10 +471,9 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta
|
||||||
%% split into multiple calls and flow control should be used to make
|
%% split into multiple calls and flow control should be used to make
|
||||||
%% sure we only send as fast as the client can receive and don't block
|
%% sure we only send as fast as the client can receive and don't block
|
||||||
%% anything.
|
%% anything.
|
||||||
commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin},
|
commands(State0, Stream0=#stream{local=nofin}, [{data, IsFin, Data}|Tail]) ->
|
||||||
[{data, IsFin, Data}|Tail]) ->
|
{State, Stream} = send_data(State0, Stream0, IsFin, Data),
|
||||||
Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
|
commands(State, Stream, Tail);
|
||||||
commands(State, Stream#stream{local=IsFin}, Tail);
|
|
||||||
|
|
||||||
%% @todo data when local!=nofin
|
%% @todo data when local!=nofin
|
||||||
|
|
||||||
|
@ -463,16 +488,10 @@ commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=Str
|
||||||
%% to ensure the file is sent in chunks (which would require a better
|
%% to ensure the file is sent in chunks (which would require a better
|
||||||
%% flow control at the stream handler level). One thing for sure, the
|
%% flow control at the stream handler level). One thing for sure, the
|
||||||
%% implementation necessarily varies between HTTP/1.1 and HTTP/2.
|
%% implementation necessarily varies between HTTP/1.1 and HTTP/2.
|
||||||
commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin},
|
commands(State0, Stream0=#stream{local=nofin},
|
||||||
[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
|
[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
|
||||||
%% @todo We currently have a naive implementation without a
|
{State, Stream} = send_data(State0, Stream0, IsFin, {sendfile, Offset, Bytes, Path}),
|
||||||
%% scheduler to prioritize frames that need to be sent.
|
commands(State, Stream, Tail);
|
||||||
%% A future update will need to queue such data frames
|
|
||||||
%% and only send them when there is nothing currently
|
|
||||||
%% being sent. We would probably also benefit from doing
|
|
||||||
%% asynchronous sends.
|
|
||||||
sendfile(Socket, Transport, StreamID, IsFin, Offset, Bytes, Path, 16384),
|
|
||||||
commands(State, Stream#stream{local=IsFin}, Tail);
|
|
||||||
%% @todo sendfile when local!=nofin
|
%% @todo sendfile when local!=nofin
|
||||||
%% Send a push promise.
|
%% Send a push promise.
|
||||||
%%
|
%%
|
||||||
|
@ -500,9 +519,15 @@ commands(State0=#state{socket=Socket, transport=Transport, server_streamid=Promi
|
||||||
State = stream_init(State0#state{server_streamid=PromisedStreamID + 2, encode_state=EncodeState},
|
State = stream_init(State0#state{server_streamid=PromisedStreamID + 2, encode_state=EncodeState},
|
||||||
PromisedStreamID, fin, iolist_to_binary(HeaderBlock)),
|
PromisedStreamID, fin, iolist_to_binary(HeaderBlock)),
|
||||||
commands(State, Stream, Tail);
|
commands(State, Stream, Tail);
|
||||||
%% @todo Update the flow control state.
|
commands(State=#state{socket=Socket, transport=Transport, remote_window=ConnWindow},
|
||||||
commands(State, Stream, [{flow, _Size}|Tail]) ->
|
Stream=#stream{id=StreamID, remote_window=StreamWindow},
|
||||||
commands(State, Stream, Tail);
|
[{flow, Size}|Tail]) ->
|
||||||
|
Transport:send(Socket, [
|
||||||
|
cow_http2:window_update(Size),
|
||||||
|
cow_http2:window_update(StreamID, Size)
|
||||||
|
]),
|
||||||
|
commands(State#state{remote_window=ConnWindow + Size},
|
||||||
|
Stream#stream{remote_window=StreamWindow + Size}, Tail);
|
||||||
%% Supervise a child process.
|
%% Supervise a child process.
|
||||||
commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
|
commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
|
||||||
[{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
|
[{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
|
||||||
|
@ -536,29 +561,88 @@ status(Status) when is_integer(Status) ->
|
||||||
status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 ->
|
status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 ->
|
||||||
<< H, T, U >>.
|
<< H, T, U >>.
|
||||||
|
|
||||||
%% This same function is found in gun_http2.
|
|
||||||
send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
|
|
||||||
if
|
|
||||||
Length < byte_size(Data) ->
|
%% @todo Should we ever want to implement the PRIORITY mechanism,
|
||||||
<< Payload:Length/binary, Rest/bits >> = Data,
|
%% this would be the place to do it. Right now, we just go over
|
||||||
Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)),
|
%% all streams and send what we can until either everything is
|
||||||
send_data(Socket, Transport, StreamID, IsFin, Rest, Length);
|
%% sent or we run out of space in the window.
|
||||||
true ->
|
send_data(State=#state{streams=Streams}) ->
|
||||||
Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data))
|
resume_streams(State, Streams, []).
|
||||||
|
|
||||||
|
%% @todo When streams terminate we need to remove the stream.
|
||||||
|
resume_streams(State, [], Acc) ->
|
||||||
|
State#state{streams=lists:reverse(Acc)};
|
||||||
|
%% While technically we should never get < 0 here, let's be on the safe side.
|
||||||
|
resume_streams(State=#state{local_window=ConnWindow}, Streams, Acc)
|
||||||
|
when ConnWindow =< 0 ->
|
||||||
|
State#state{streams=lists:reverse(Acc, Streams)};
|
||||||
|
%% We rely on send_data/2 to do all the necessary checks about the stream.
|
||||||
|
resume_streams(State0, [Stream0|Tail], Acc) ->
|
||||||
|
{State, Stream} = send_data(State0, Stream0),
|
||||||
|
resume_streams(State, Tail, [Stream|Acc]).
|
||||||
|
|
||||||
|
%% @todo We might want to print an error if local=fin.
|
||||||
|
%%
|
||||||
|
%% @todo It's possible that the stream terminates. We must remove it.
|
||||||
|
send_data(State=#state{local_window=ConnWindow},
|
||||||
|
Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize})
|
||||||
|
when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
|
||||||
|
{State, Stream};
|
||||||
|
send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) ->
|
||||||
|
%% We know there is an item in the queue.
|
||||||
|
{{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
|
||||||
|
{State, Stream} = send_data(State0,
|
||||||
|
Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
|
||||||
|
IsFin, Data),
|
||||||
|
send_data(State, Stream).
|
||||||
|
|
||||||
|
%% Send data immediately if we can, buffer otherwise.
|
||||||
|
%% @todo We might want to print an error if local=fin.
|
||||||
|
send_data(State=#state{local_window=ConnWindow},
|
||||||
|
Stream=#stream{local_window=StreamWindow}, IsFin, Data)
|
||||||
|
when ConnWindow =< 0; StreamWindow =< 0 ->
|
||||||
|
{State, queue_data(Stream, IsFin, Data)};
|
||||||
|
send_data(State=#state{socket=Socket, transport=Transport, local_window=ConnWindow},
|
||||||
|
Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data) ->
|
||||||
|
MaxFrameSize = 16384, %% @todo Use the real SETTINGS_MAX_FRAME_SIZE set by the client.
|
||||||
|
SendSize = min(min(ConnWindow, StreamWindow), MaxFrameSize),
|
||||||
|
case Data of
|
||||||
|
{sendfile, Offset, Bytes, Path} when Bytes =< SendSize ->
|
||||||
|
Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
|
||||||
|
Transport:sendfile(Socket, Path, Offset, Bytes),
|
||||||
|
{State#state{local_window=ConnWindow - SendSize},
|
||||||
|
Stream#stream{local=IsFin, local_window=StreamWindow - SendSize}};
|
||||||
|
{sendfile, Offset, Bytes, Path} ->
|
||||||
|
Transport:send(Socket, cow_http2:data_header(StreamID, nofin, SendSize)),
|
||||||
|
Transport:sendfile(Socket, Path, Offset, SendSize),
|
||||||
|
send_data(State#state{local_window=ConnWindow - SendSize},
|
||||||
|
Stream#stream{local_window=StreamWindow - SendSize},
|
||||||
|
IsFin, {sendfile, Offset + SendSize, Bytes - SendSize, Path});
|
||||||
|
Iolist0 ->
|
||||||
|
IolistSize = iolist_size(Iolist0),
|
||||||
|
if
|
||||||
|
IolistSize =< SendSize ->
|
||||||
|
Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)),
|
||||||
|
{State#state{local_window=ConnWindow - SendSize},
|
||||||
|
Stream#stream{local=IsFin, local_window=StreamWindow - SendSize}};
|
||||||
|
true ->
|
||||||
|
{Iolist, More} = cowboy_iolists:split(SendSize, Iolist0),
|
||||||
|
Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)),
|
||||||
|
send_data(State#state{local_window=ConnWindow - SendSize},
|
||||||
|
Stream#stream{local_window=StreamWindow - SendSize},
|
||||||
|
IsFin, More)
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @todo This is currently awfully slow. But at least it's correct.
|
queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data) ->
|
||||||
sendfile(Socket, Transport, StreamID, IsFin, Offset, Bytes, Path, Length) ->
|
DataSize = case Data of
|
||||||
if
|
{sendfile, _, Bytes, _} -> Bytes;
|
||||||
Length < Bytes ->
|
Iolist -> iolist_size(Iolist)
|
||||||
Transport:send(Socket, cow_http2:data_header(StreamID, nofin, Length)),
|
end,
|
||||||
Transport:sendfile(Socket, Path, Offset, Length),
|
Q = queue:in({IsFin, DataSize, Data}, Q0),
|
||||||
sendfile(Socket, Transport, StreamID, IsFin,
|
Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}.
|
||||||
Offset + Length, Bytes - Length, Path, Length);
|
|
||||||
true ->
|
|
||||||
Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
|
|
||||||
Transport:sendfile(Socket, Path, Offset, Bytes)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec terminate(#state{}, _) -> no_return().
|
-spec terminate(#state{}, _) -> no_return().
|
||||||
terminate(undefined, Reason) ->
|
terminate(undefined, Reason) ->
|
||||||
|
@ -641,12 +725,17 @@ stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, peer=Peer
|
||||||
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
|
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
stream_handler_init(State=#state{opts=Opts}, StreamID, RemoteIsFin, LocalIsFin, Req) ->
|
stream_handler_init(State=#state{opts=Opts,
|
||||||
|
local_settings=#{initial_window_size := RemoteWindow},
|
||||||
|
remote_settings=#{initial_window_size := LocalWindow}},
|
||||||
|
StreamID, RemoteIsFin, LocalIsFin, Req) ->
|
||||||
try cowboy_stream:init(StreamID, Req, Opts) of
|
try cowboy_stream:init(StreamID, Req, Opts) of
|
||||||
{Commands, StreamState} ->
|
{Commands, StreamState} ->
|
||||||
commands(State#state{client_streamid=StreamID},
|
commands(State#state{client_streamid=StreamID},
|
||||||
#stream{id=StreamID, state=StreamState,
|
#stream{id=StreamID, state=StreamState,
|
||||||
remote=RemoteIsFin, local=LocalIsFin}, Commands)
|
remote=RemoteIsFin, local=LocalIsFin,
|
||||||
|
local_window=LocalWindow, remote_window=RemoteWindow},
|
||||||
|
Commands)
|
||||||
catch Class:Reason ->
|
catch Class:Reason ->
|
||||||
error_logger:error_msg("Exception occurred in "
|
error_logger:error_msg("Exception occurred in "
|
||||||
"cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.",
|
"cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.",
|
||||||
|
@ -676,12 +765,20 @@ stream_terminate(State=#state{socket=Socket, transport=Transport,
|
||||||
Children = stream_terminate_children(Children0, StreamID, []),
|
Children = stream_terminate_children(Children0, StreamID, []),
|
||||||
State1#state{streams=Streams, children=Children};
|
State1#state{streams=Streams, children=Children};
|
||||||
%% When a response was sent but not terminated, we need to close the stream.
|
%% When a response was sent but not terminated, we need to close the stream.
|
||||||
{value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal ->
|
{value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams}
|
||||||
|
when Reason =:= normal ->
|
||||||
Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
|
Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
|
||||||
stream_call_terminate(StreamID, Reason, StreamState),
|
stream_call_terminate(StreamID, Reason, StreamState),
|
||||||
Children = stream_terminate_children(Children0, StreamID, []),
|
Children = stream_terminate_children(Children0, StreamID, []),
|
||||||
State#state{streams=Streams, children=Children};
|
State#state{streams=Streams, children=Children};
|
||||||
%% Otherwise we sent an RST_STREAM and the stream is already closed.
|
%% Unless there is still data in the buffer. We can however reset
|
||||||
|
%% a few fields and set a special local state to avoid confusion.
|
||||||
|
{value, Stream=#stream{state=StreamState, local=nofin}, Streams} ->
|
||||||
|
stream_call_terminate(StreamID, Reason, StreamState),
|
||||||
|
Children = stream_terminate_children(Children0, StreamID, []),
|
||||||
|
State#state{streams=[Stream#stream{state=flush, local=flush}|Streams],
|
||||||
|
children=Children};
|
||||||
|
%% Otherwise we sent an RST_STREAM and/or the stream is already closed.
|
||||||
{value, #stream{state=StreamState}, Streams} ->
|
{value, #stream{state=StreamState}, Streams} ->
|
||||||
stream_call_terminate(StreamID, Reason, StreamState),
|
stream_call_terminate(StreamID, Reason, StreamState),
|
||||||
Children = stream_terminate_children(Children0, StreamID, []),
|
Children = stream_terminate_children(Children0, StreamID, []),
|
||||||
|
|
71
src/cowboy_iolists.erl
Normal file
71
src/cowboy_iolists.erl
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
%% Copyright (c) 2017, Loïc Hoguin <essen@ninenines.eu>
|
||||||
|
%%
|
||||||
|
%% Permission to use, copy, modify, and/or distribute this software for any
|
||||||
|
%% purpose with or without fee is hereby granted, provided that the above
|
||||||
|
%% copyright notice and this permission notice appear in all copies.
|
||||||
|
%%
|
||||||
|
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
|
||||||
|
-module(cowboy_iolists).
|
||||||
|
|
||||||
|
-export([split/2]).
|
||||||
|
|
||||||
|
-spec split(non_neg_integer(), iodata()) -> {iodata(), iodata()}.
|
||||||
|
split(N, Iolist) ->
|
||||||
|
case split(N, Iolist, []) of
|
||||||
|
{ok, Before, After} ->
|
||||||
|
{Before, After};
|
||||||
|
{more, _, Before} ->
|
||||||
|
{lists:reverse(Before), <<>>}
|
||||||
|
end.
|
||||||
|
|
||||||
|
split(0, Rest, Acc) ->
|
||||||
|
{ok, lists:reverse(Acc), Rest};
|
||||||
|
split(N, [], Acc) ->
|
||||||
|
{more, N, Acc};
|
||||||
|
split(N, Binary, Acc) when byte_size(Binary) =< N ->
|
||||||
|
{ok, lists:reverse([Binary|Acc]), <<>>};
|
||||||
|
split(N, Binary, Acc) when is_binary(Binary) ->
|
||||||
|
<< Before:N/binary, After/bits >> = Binary,
|
||||||
|
{ok, lists:reverse([Before|Acc]), After};
|
||||||
|
split(N, [Binary|Tail], Acc) when byte_size(Binary) =< N ->
|
||||||
|
split(N - byte_size(Binary), Tail, [Binary|Acc]);
|
||||||
|
split(N, [Binary|Tail], Acc) when is_binary(Binary) ->
|
||||||
|
<< Before:N/binary, After/bits >> = Binary,
|
||||||
|
{ok, lists:reverse([Before|Acc]), [After|Tail]};
|
||||||
|
split(N, [Char|Tail], Acc) when is_integer(Char) ->
|
||||||
|
split(N - 1, Tail, [Char|Acc]);
|
||||||
|
split(N, [List|Tail], Acc0) ->
|
||||||
|
case split(N, List, Acc0) of
|
||||||
|
{ok, Before, After} ->
|
||||||
|
{ok, Before, [After|Tail]};
|
||||||
|
{more, More, Acc} ->
|
||||||
|
split(More, Tail, Acc)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
split_test_() ->
|
||||||
|
Tests = [
|
||||||
|
{10, "Hello world!", "Hello worl", "d!"},
|
||||||
|
{10, <<"Hello world!">>, "Hello worl", "d!"},
|
||||||
|
{10, ["He", [<<"llo">>], $\s, [["world"], <<"!">>]], "Hello worl", "d!"},
|
||||||
|
{10, ["Hello "|<<"world!">>], "Hello worl", "d!"},
|
||||||
|
{10, "Hello!", "Hello!", ""},
|
||||||
|
{10, <<"Hello!">>, "Hello!", ""},
|
||||||
|
{10, ["He", [<<"ll">>], $o, [["!"]]], "Hello!", ""},
|
||||||
|
{10, ["Hel"|<<"lo!">>], "Hello!", ""}
|
||||||
|
],
|
||||||
|
[{iolist_to_binary(V), fun() ->
|
||||||
|
{B, A} = split(N, V),
|
||||||
|
true = iolist_to_binary(RB) =:= iolist_to_binary(B),
|
||||||
|
true = iolist_to_binary(RA) =:= iolist_to_binary(A)
|
||||||
|
end} || {N, V, RB, RA} <- Tests].
|
||||||
|
|
||||||
|
-endif.
|
|
@ -88,7 +88,7 @@
|
||||||
-export_type([cookie_opts/0]).
|
-export_type([cookie_opts/0]).
|
||||||
|
|
||||||
-type read_body_opts() :: #{
|
-type read_body_opts() :: #{
|
||||||
length => non_neg_integer() | infinity,
|
length => non_neg_integer(),
|
||||||
period => non_neg_integer(),
|
period => non_neg_integer(),
|
||||||
timeout => timeout()
|
timeout => timeout()
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -49,10 +49,10 @@ init_dispatch(Config) ->
|
||||||
{"/resp/:key[/:arg]", resp_h, []},
|
{"/resp/:key[/:arg]", resp_h, []},
|
||||||
{"/multipart[/:key]", multipart_h, []},
|
{"/multipart[/:key]", multipart_h, []},
|
||||||
{"/args/:key/:arg[/:default]", echo_h, []},
|
{"/args/:key/:arg[/:default]", echo_h, []},
|
||||||
{"/crash/:key/period", echo_h, #{length => infinity, period => 1000, crash => true}},
|
{"/crash/:key/period", echo_h, #{length => 999999999, period => 1000, crash => true}},
|
||||||
{"/no-opts/:key", echo_h, #{crash => true}},
|
{"/no-opts/:key", echo_h, #{crash => true}},
|
||||||
{"/opts/:key/length", echo_h, #{length => 1000}},
|
{"/opts/:key/length", echo_h, #{length => 1000}},
|
||||||
{"/opts/:key/period", echo_h, #{length => infinity, period => 1000}},
|
{"/opts/:key/period", echo_h, #{length => 999999999, period => 1000}},
|
||||||
{"/opts/:key/timeout", echo_h, #{timeout => 1000, crash => true}},
|
{"/opts/:key/timeout", echo_h, #{timeout => 1000, crash => true}},
|
||||||
{"/full/:key", echo_h, []},
|
{"/full/:key", echo_h, []},
|
||||||
{"/no/:key", echo_h, []},
|
{"/no/:key", echo_h, []},
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue