2017-01-02 19:36:36 +01:00
|
|
|
%% Copyright (c) 2015-2017, Loïc Hoguin <essen@ninenines.eu>
|
2015-06-11 17:04:21 +02:00
|
|
|
%%
|
|
|
|
%% Permission to use, copy, modify, and/or distribute this software for any
|
|
|
|
%% purpose with or without fee is hereby granted, provided that the above
|
|
|
|
%% copyright notice and this permission notice appear in all copies.
|
|
|
|
%%
|
|
|
|
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
|
|
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
|
|
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
|
|
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
|
|
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
|
|
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
|
|
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
|
|
|
|
-module(cowboy_http2).
|
|
|
|
|
2017-01-16 14:22:43 +01:00
|
|
|
-export([init/5]).
|
|
|
|
-export([init/7]).
|
|
|
|
-export([init/9]).
|
2015-06-11 17:04:21 +02:00
|
|
|
|
|
|
|
-export([system_continue/3]).
|
|
|
|
-export([system_terminate/4]).
|
|
|
|
-export([system_code_change/4]).
|
|
|
|
|
|
|
|
-record(stream, {
|
|
|
|
id = undefined :: cowboy_stream:streamid(),
|
2017-01-16 14:22:43 +01:00
|
|
|
%% Stream handlers and their state.
|
|
|
|
state = undefined :: {module(), any()},
|
2015-06-11 17:04:21 +02:00
|
|
|
%% Whether we finished sending data.
|
2016-06-16 19:46:57 +02:00
|
|
|
local = idle :: idle | cowboy_stream:fin(),
|
2015-06-11 17:04:21 +02:00
|
|
|
%% Whether we finished receiving data.
|
2016-06-08 23:09:14 +02:00
|
|
|
remote = nofin :: cowboy_stream:fin(),
|
|
|
|
%% Request body length.
|
|
|
|
body_length = 0 :: non_neg_integer()
|
2015-06-11 17:04:21 +02:00
|
|
|
}).
|
|
|
|
|
|
|
|
-type stream() :: #stream{}.
|
|
|
|
|
|
|
|
-record(state, {
|
|
|
|
parent = undefined :: pid(),
|
|
|
|
ref :: ranch:ref(),
|
|
|
|
socket = undefined :: inet:socket(),
|
|
|
|
transport :: module(),
|
|
|
|
opts = #{} :: map(),
|
|
|
|
|
2016-06-20 17:28:59 +02:00
|
|
|
%% Remote address and port for the connection.
|
|
|
|
peer = undefined :: {inet:ip_address(), inet:port_number()},
|
|
|
|
|
2015-06-11 17:04:21 +02:00
|
|
|
%% Settings are separate for each endpoint. In addition, settings
|
|
|
|
%% must be acknowledged before they can be expected to be applied.
|
|
|
|
%%
|
|
|
|
%% @todo Since the ack is required, we must timeout if we don't receive it.
|
|
|
|
%% @todo I haven't put as much thought as I should have on this,
|
|
|
|
%% the final settings handling will be very different.
|
2017-02-25 20:05:31 +01:00
|
|
|
local_settings = #{
|
|
|
|
% header_table_size => 4096,
|
|
|
|
% enable_push => false, %% We are the server. Push is never enabled.
|
|
|
|
% max_concurrent_streams => infinity,
|
|
|
|
% initial_window_size => 65535,
|
|
|
|
max_frame_size => 16384
|
|
|
|
% max_header_list_size => infinity
|
|
|
|
} :: map(),
|
2016-03-13 23:14:57 +01:00
|
|
|
%% @todo We need a TimerRef to do SETTINGS_TIMEOUT errors.
|
|
|
|
%% We need to be careful there. It's well possible that we send
|
|
|
|
%% two SETTINGS frames before we receive a SETTINGS ack.
|
2015-06-11 17:04:21 +02:00
|
|
|
next_settings = #{} :: undefined | map(), %% @todo perhaps set to undefined by default
|
|
|
|
remote_settings = #{} :: map(),
|
|
|
|
|
|
|
|
%% Stream identifiers.
|
2017-02-25 20:05:31 +01:00
|
|
|
client_streamid = 0 :: non_neg_integer(),
|
2015-06-11 17:04:21 +02:00
|
|
|
server_streamid = 2 :: pos_integer(),
|
|
|
|
|
|
|
|
%% Currently active HTTP/2 streams. Streams may be initiated either
|
|
|
|
%% by the client or by the server through PUSH_PROMISE frames.
|
|
|
|
streams = [] :: [stream()],
|
|
|
|
|
|
|
|
%% Streams can spawn zero or more children which are then managed
|
|
|
|
%% by this module if operating as a supervisor.
|
|
|
|
children = [] :: [{pid(), cowboy_stream:streamid()}],
|
|
|
|
|
|
|
|
%% The client starts by sending a sequence of bytes as a preface,
|
|
|
|
%% followed by a potentially empty SETTINGS frame. Then the connection
|
|
|
|
%% is established and continues normally. An exception is when a HEADERS
|
|
|
|
%% frame is sent followed by CONTINUATION frames: no other frame can be
|
|
|
|
%% sent in between.
|
2016-03-13 23:14:57 +01:00
|
|
|
parse_state = undefined :: {preface, sequence, reference()}
|
|
|
|
| {preface, settings, reference()}
|
|
|
|
| normal
|
2015-06-11 17:04:21 +02:00
|
|
|
| {continuation, cowboy_stream:streamid(), cowboy_stream:fin(), binary()},
|
|
|
|
|
|
|
|
%% HPACK decoding and encoding state.
|
|
|
|
decode_state = cow_hpack:init() :: cow_hpack:state(),
|
|
|
|
encode_state = cow_hpack:init() :: cow_hpack:state()
|
|
|
|
}).
|
|
|
|
|
2017-01-16 14:22:43 +01:00
|
|
|
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
|
|
|
|
init(Parent, Ref, Socket, Transport, Opts) ->
|
2016-06-20 17:28:59 +02:00
|
|
|
case Transport:peername(Socket) of
|
|
|
|
{ok, Peer} ->
|
2017-01-16 14:22:43 +01:00
|
|
|
init(Parent, Ref, Socket, Transport, Opts, Peer, <<>>);
|
2016-06-20 17:28:59 +02:00
|
|
|
{error, Reason} ->
|
|
|
|
%% Couldn't read the peer address; connection is gone.
|
|
|
|
terminate(undefined, {socket_error, Reason, 'An error has occurred on the socket.'})
|
|
|
|
end.
|
2016-03-10 23:30:49 +01:00
|
|
|
|
2017-01-16 14:22:43 +01:00
|
|
|
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(),
|
2016-06-20 17:28:59 +02:00
|
|
|
{inet:ip_address(), inet:port_number()}, binary()) -> ok.
|
2017-01-16 14:22:43 +01:00
|
|
|
init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer) ->
|
2016-03-10 23:30:49 +01:00
|
|
|
State = #state{parent=Parent, ref=Ref, socket=Socket,
|
2017-01-16 14:22:43 +01:00
|
|
|
transport=Transport, opts=Opts, peer=Peer,
|
2016-03-13 23:14:57 +01:00
|
|
|
parse_state={preface, sequence, preface_timeout(Opts)}},
|
2016-03-10 23:30:49 +01:00
|
|
|
preface(State),
|
|
|
|
case Buffer of
|
|
|
|
<<>> -> before_loop(State, Buffer);
|
|
|
|
_ -> parse(State, Buffer)
|
|
|
|
end.
|
|
|
|
|
2016-03-12 18:25:35 +01:00
|
|
|
%% @todo Add an argument for the request body.
|
2017-01-16 14:22:43 +01:00
|
|
|
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(),
|
2017-01-02 18:27:03 +01:00
|
|
|
{inet:ip_address(), inet:port_number()}, binary(), map() | undefined, cowboy_req:req()) -> ok.
|
2017-01-16 14:22:43 +01:00
|
|
|
init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, _Settings, Req) ->
|
2016-03-12 18:25:35 +01:00
|
|
|
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
2017-01-16 14:22:43 +01:00
|
|
|
transport=Transport, opts=Opts, peer=Peer,
|
2016-03-13 23:14:57 +01:00
|
|
|
parse_state={preface, sequence, preface_timeout(Opts)}},
|
2016-03-12 18:25:35 +01:00
|
|
|
preface(State0),
|
2016-03-13 11:18:27 +01:00
|
|
|
%% @todo Apply settings.
|
2016-03-12 18:25:35 +01:00
|
|
|
%% StreamID from HTTP/1.1 Upgrade requests is always 1.
|
|
|
|
%% The stream is always in the half-closed (remote) state.
|
|
|
|
State = stream_handler_init(State0, 1, fin, Req),
|
|
|
|
case Buffer of
|
|
|
|
<<>> -> before_loop(State, Buffer);
|
|
|
|
_ -> parse(State, Buffer)
|
|
|
|
end.
|
|
|
|
|
2016-03-10 23:30:49 +01:00
|
|
|
preface(#state{socket=Socket, transport=Transport, next_settings=Settings}) ->
|
|
|
|
%% We send next_settings and use defaults until we get a ack.
|
|
|
|
ok = Transport:send(Socket, cow_http2:settings(Settings)).
|
2015-06-11 17:04:21 +02:00
|
|
|
|
2016-03-13 23:14:57 +01:00
|
|
|
preface_timeout(Opts) ->
|
|
|
|
PrefaceTimeout = maps:get(preface_timeout, Opts, 5000),
|
|
|
|
erlang:start_timer(PrefaceTimeout, self(), preface_timeout).
|
|
|
|
|
2015-06-11 17:04:21 +02:00
|
|
|
%% @todo Add the timeout for last time since we heard of connection.
|
|
|
|
before_loop(State, Buffer) ->
|
|
|
|
loop(State, Buffer).
|
|
|
|
|
2016-03-13 23:14:57 +01:00
|
|
|
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
|
|
|
children=Children, parse_state=PS}, Buffer) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
Transport:setopts(Socket, [{active, once}]),
|
|
|
|
{OK, Closed, Error} = Transport:messages(),
|
|
|
|
receive
|
|
|
|
%% Socket messages.
|
|
|
|
{OK, Socket, Data} ->
|
|
|
|
parse(State, << Buffer/binary, Data/binary >>);
|
|
|
|
{Closed, Socket} ->
|
|
|
|
terminate(State, {socket_error, closed, 'The socket has been closed.'});
|
|
|
|
{Error, Socket, Reason} ->
|
|
|
|
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
|
|
|
|
%% System messages.
|
|
|
|
{'EXIT', Parent, Reason} ->
|
|
|
|
exit(Reason);
|
|
|
|
{system, From, Request} ->
|
|
|
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
|
2016-03-13 23:14:57 +01:00
|
|
|
{timeout, TRef, preface_timeout} ->
|
|
|
|
case PS of
|
|
|
|
{preface, _, TRef} ->
|
|
|
|
terminate(State, {connection_error, protocol_error,
|
|
|
|
'The preface was not received in a reasonable amount of time.'});
|
|
|
|
_ ->
|
|
|
|
loop(State, Buffer)
|
|
|
|
end;
|
2015-06-11 17:04:21 +02:00
|
|
|
%% Messages pertaining to a stream.
|
Initial commit with connection/streams
Breaking changes with previous commit. This is a very large change,
and I am giving up on making a single commit that fixes everything.
More commits will follow slowly adding back features, introducing
new tests and fixing the documentation.
This change contains most of the work toward unifying the interface
for handling both HTTP/1.1 and HTTP/2. HTTP/1.1 connections are now
no longer 1 process per connection; instead by default 1 process per
request is also created. This has a number of pros and cons.
Because it has cons, we also allow users to use a lower-level API
that acts on "streams" (requests/responses) directly at the connection
process-level. If performance is a concern, one can always write a
stream handler. The performance in this case will be even greater
than with Cowboy 1, although all the special handlers are unavailable.
When switching to Websocket, after the handler returns from init/2,
Cowboy stops the stream and the Websocket protocol takes over the
connection process. Websocket then calls websocket_init/2 for any
additional initialization such as timers, because the process is
different in init/2 and websocket_*/* functions. This however would
allow us to use websocket_init/2 for sending messages on connect,
instead of sending ourselves a message and be subject to races.
Note that websocket_init/2 is optional.
This is all a big change and while most of the tests pass, some
functionality currently doesn't. SPDY is broken and will be removed
soon in favor of HTTP/2. Automatic compression is currently disabled.
The cowboy_req interface probably still have a few functions that
need to be updated. The docs and examples do not refer the current
functionality anymore.
Everything will be fixed over time. Feedback is more than welcome.
Open a ticket!
2016-02-10 17:28:32 +01:00
|
|
|
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
2015-06-11 17:04:21 +02:00
|
|
|
loop(info(State, StreamID, Msg), Buffer);
|
|
|
|
%% Exit signal from children.
|
|
|
|
Msg = {'EXIT', Pid, _} ->
|
|
|
|
loop(down(State, Pid, Msg), Buffer);
|
|
|
|
%% Calls from supervisor module.
|
|
|
|
{'$gen_call', {From, Tag}, which_children} ->
|
|
|
|
Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _} <- Children],
|
|
|
|
From ! {Tag, Workers},
|
|
|
|
loop(State, Buffer);
|
|
|
|
{'$gen_call', {From, Tag}, count_children} ->
|
|
|
|
NbChildren = length(Children),
|
|
|
|
Counts = [{specs, 1}, {active, NbChildren},
|
|
|
|
{supervisors, 0}, {workers, NbChildren}],
|
|
|
|
From ! {Tag, Counts},
|
|
|
|
loop(State, Buffer);
|
|
|
|
{'$gen_call', {From, Tag}, _} ->
|
|
|
|
From ! {Tag, {error, ?MODULE}},
|
|
|
|
loop(State, Buffer);
|
|
|
|
Msg ->
|
|
|
|
error_logger:error_msg("Received stray message ~p.", [Msg]),
|
|
|
|
loop(State, Buffer)
|
|
|
|
%% @todo Configurable timeout.
|
|
|
|
after 60000 ->
|
|
|
|
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
|
|
|
|
end.
|
|
|
|
|
2016-03-13 23:14:57 +01:00
|
|
|
parse(State=#state{socket=Socket, transport=Transport, parse_state={preface, sequence, TRef}}, Data) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
case Data of
|
|
|
|
<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", Rest/bits >> ->
|
2016-03-13 23:14:57 +01:00
|
|
|
parse(State#state{parse_state={preface, settings, TRef}}, Rest);
|
2015-06-11 17:04:21 +02:00
|
|
|
_ when byte_size(Data) >= 24 ->
|
|
|
|
Transport:close(Socket),
|
|
|
|
exit({shutdown, {connection_error, protocol_error,
|
|
|
|
'The connection preface was invalid. (RFC7540 3.5)'}});
|
|
|
|
_ ->
|
2016-03-10 23:30:49 +01:00
|
|
|
Len = byte_size(Data),
|
|
|
|
<< Preface:Len/binary, _/bits >> = <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
|
|
|
|
case Data of
|
|
|
|
Preface ->
|
|
|
|
%% @todo OK we should have a timeout when waiting for the preface.
|
|
|
|
before_loop(State, Data);
|
|
|
|
_ ->
|
|
|
|
Transport:close(Socket),
|
|
|
|
exit({shutdown, {connection_error, protocol_error,
|
|
|
|
'The connection preface was invalid. (RFC7540 3.5)'}})
|
|
|
|
end
|
2015-06-11 17:04:21 +02:00
|
|
|
end;
|
|
|
|
%% @todo Perhaps instead of just more we can have {more, Len} to avoid all the checks.
|
2017-02-25 20:05:31 +01:00
|
|
|
parse(State=#state{local_settings=#{max_frame_size := MaxFrameSize},
|
|
|
|
parse_state=ParseState}, Data) ->
|
|
|
|
case cow_http2:parse(Data, MaxFrameSize) of
|
2015-06-11 17:04:21 +02:00
|
|
|
{ok, Frame, Rest} ->
|
|
|
|
case ParseState of
|
2016-03-13 23:14:57 +01:00
|
|
|
normal ->
|
|
|
|
parse(frame(State, Frame), Rest);
|
|
|
|
{preface, settings, TRef} ->
|
|
|
|
parse_settings_preface(State, Frame, Rest, TRef);
|
|
|
|
{continuation, _, _, _} ->
|
|
|
|
parse(continuation_frame(State, Frame), Rest)
|
2015-06-11 17:04:21 +02:00
|
|
|
end;
|
|
|
|
{stream_error, StreamID, Reason, Human, Rest} ->
|
|
|
|
parse(stream_reset(State, StreamID, {stream_error, Reason, Human}), Rest);
|
|
|
|
Error = {connection_error, _, _} ->
|
|
|
|
terminate(State, Error);
|
|
|
|
more ->
|
|
|
|
before_loop(State, Data)
|
|
|
|
end.
|
|
|
|
|
2016-03-13 23:14:57 +01:00
|
|
|
parse_settings_preface(State, Frame={settings, _}, Rest, TRef) ->
|
2017-01-02 18:27:03 +01:00
|
|
|
_ = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
|
2016-03-13 23:14:57 +01:00
|
|
|
parse(frame(State#state{parse_state=normal}, Frame), Rest);
|
|
|
|
parse_settings_preface(State, _, _, _) ->
|
|
|
|
terminate(State, {connection_error, protocol_error,
|
|
|
|
'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'}).
|
|
|
|
|
2015-06-11 17:04:21 +02:00
|
|
|
%% @todo When we get a 'fin' we need to check if the stream had a 'fin' sent back
|
|
|
|
%% and terminate the stream if this is the end of it.
|
|
|
|
|
|
|
|
%% DATA frame.
|
2017-01-16 14:22:43 +01:00
|
|
|
frame(State=#state{streams=Streams}, {data, StreamID, IsFin0, Data}) ->
|
2016-06-16 19:46:57 +02:00
|
|
|
case lists:keyfind(StreamID, #stream.id, Streams) of
|
2016-06-08 23:09:14 +02:00
|
|
|
Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} ->
|
|
|
|
Len = Len0 + byte_size(Data),
|
|
|
|
IsFin = case IsFin0 of
|
|
|
|
fin -> {fin, Len};
|
|
|
|
nofin -> nofin
|
|
|
|
end,
|
2017-01-16 14:22:43 +01:00
|
|
|
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
|
2015-06-11 17:04:21 +02:00
|
|
|
{Commands, StreamState} ->
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream#stream{state=StreamState, body_length=Len}, Commands)
|
2015-06-11 17:04:21 +02:00
|
|
|
catch Class:Reason ->
|
2017-01-16 14:22:43 +01:00
|
|
|
error_logger:error_msg("Exception occurred in "
|
|
|
|
"cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
|
|
|
|
[StreamID, IsFin0, Data, StreamState0, Class, Reason]),
|
2015-06-11 17:04:21 +02:00
|
|
|
stream_reset(State, StreamID, {internal_error, {Class, Reason},
|
2017-01-16 14:22:43 +01:00
|
|
|
'Exception occurred in cowboy_stream:data/4.'})
|
2015-06-11 17:04:21 +02:00
|
|
|
end;
|
2016-03-13 23:14:57 +01:00
|
|
|
_ ->
|
2015-06-11 17:04:21 +02:00
|
|
|
stream_reset(State, StreamID, {stream_error, stream_closed,
|
|
|
|
'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
|
|
|
|
end;
|
|
|
|
%% Single HEADERS frame headers block.
|
|
|
|
frame(State, {headers, StreamID, IsFin, head_fin, HeaderBlock}) ->
|
|
|
|
%% @todo We probably need to validate StreamID here and in 4 next clauses.
|
|
|
|
stream_init(State, StreamID, IsFin, HeaderBlock);
|
|
|
|
%% HEADERS frame starting a headers block. Enter continuation mode.
|
|
|
|
frame(State, {headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}) ->
|
|
|
|
State#state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
|
|
|
|
%% Single HEADERS frame headers block with priority.
|
|
|
|
frame(State, {headers, StreamID, IsFin, head_fin,
|
|
|
|
_IsExclusive, _DepStreamID, _Weight, HeaderBlock}) ->
|
|
|
|
%% @todo Handle priority.
|
|
|
|
stream_init(State, StreamID, IsFin, HeaderBlock);
|
|
|
|
%% HEADERS frame starting a headers block. Enter continuation mode.
|
|
|
|
frame(State, {headers, StreamID, IsFin, head_nofin,
|
|
|
|
_IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) ->
|
|
|
|
%% @todo Handle priority.
|
|
|
|
State#state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
|
|
|
|
%% PRIORITY frame.
|
|
|
|
frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) ->
|
|
|
|
%% @todo Validate StreamID?
|
|
|
|
%% @todo Handle priority.
|
|
|
|
State;
|
|
|
|
%% RST_STREAM frame.
|
|
|
|
frame(State, {rst_stream, StreamID, Reason}) ->
|
|
|
|
stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset requested by client.'});
|
|
|
|
%% SETTINGS frame.
|
2016-03-13 11:18:27 +01:00
|
|
|
frame(State=#state{socket=Socket, transport=Transport}, {settings, _Settings}) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
%% @todo Apply SETTINGS.
|
2016-03-10 23:30:49 +01:00
|
|
|
Transport:send(Socket, cow_http2:settings_ack()),
|
2015-06-11 17:04:21 +02:00
|
|
|
State;
|
|
|
|
%% Ack for a previously sent SETTINGS frame.
|
|
|
|
frame(State=#state{next_settings=_NextSettings}, settings_ack) ->
|
|
|
|
%% @todo Apply SETTINGS that require synchronization.
|
|
|
|
State;
|
|
|
|
%% Unexpected PUSH_PROMISE frame.
|
|
|
|
frame(State, {push_promise, _, _, _, _}) ->
|
|
|
|
terminate(State, {connection_error, protocol_error,
|
|
|
|
'PUSH_PROMISE frames MUST only be sent on a peer-initiated stream. (RFC7540 6.6)'});
|
|
|
|
%% PING frame.
|
|
|
|
frame(State=#state{socket=Socket, transport=Transport}, {ping, Opaque}) ->
|
|
|
|
Transport:send(Socket, cow_http2:ping_ack(Opaque)),
|
|
|
|
State;
|
|
|
|
%% Ack for a previously sent PING frame.
|
|
|
|
%%
|
|
|
|
%% @todo Might want to check contents but probably a waste of time.
|
|
|
|
frame(State, {ping_ack, _Opaque}) ->
|
|
|
|
State;
|
|
|
|
%% GOAWAY frame.
|
|
|
|
frame(State, Frame={goaway, _, _, _}) ->
|
|
|
|
terminate(State, {stop, Frame, 'Client is going away.'});
|
|
|
|
%% Connection-wide WINDOW_UPDATE frame.
|
|
|
|
frame(State, {window_update, _Increment}) ->
|
|
|
|
%% @todo control flow
|
|
|
|
State;
|
|
|
|
%% Stream-specific WINDOW_UPDATE frame.
|
|
|
|
frame(State, {window_update, _StreamID, _Increment}) ->
|
|
|
|
%% @todo stream-specific control flow
|
|
|
|
State;
|
|
|
|
%% Unexpected CONTINUATION frame.
|
|
|
|
frame(State, {continuation, _, _, _}) ->
|
|
|
|
terminate(State, {connection_error, protocol_error,
|
|
|
|
'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}).
|
|
|
|
|
|
|
|
continuation_frame(State=#state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment0}},
|
2017-01-02 18:27:03 +01:00
|
|
|
{continuation, StreamID, head_fin, HeaderBlockFragment1}) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
stream_init(State#state{parse_state=normal}, StreamID, IsFin,
|
|
|
|
<< HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>);
|
|
|
|
continuation_frame(State=#state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment0}},
|
2017-01-02 18:27:03 +01:00
|
|
|
{continuation, StreamID, head_nofin, HeaderBlockFragment1}) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
State#state{parse_state={continuation, StreamID, IsFin,
|
|
|
|
<< HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>}};
|
|
|
|
continuation_frame(State, _) ->
|
|
|
|
terminate(State, {connection_error, protocol_error,
|
|
|
|
'An invalid frame was received while expecting a CONTINUATION frame. (RFC7540 6.2)'}).
|
|
|
|
|
|
|
|
down(State=#state{children=Children0}, Pid, Msg) ->
|
|
|
|
case lists:keytake(Pid, 1, Children0) of
|
|
|
|
{value, {_, StreamID}, Children} ->
|
|
|
|
info(State#state{children=Children}, StreamID, Msg);
|
|
|
|
false ->
|
|
|
|
error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]),
|
|
|
|
State
|
|
|
|
end.
|
|
|
|
|
2017-01-16 14:22:43 +01:00
|
|
|
info(State=#state{streams=Streams}, StreamID, Msg) ->
|
2016-06-16 19:46:57 +02:00
|
|
|
case lists:keyfind(StreamID, #stream.id, Streams) of
|
2015-06-11 17:04:21 +02:00
|
|
|
Stream = #stream{state=StreamState0} ->
|
2017-01-16 14:22:43 +01:00
|
|
|
try cowboy_stream:info(StreamID, Msg, StreamState0) of
|
2015-06-11 17:04:21 +02:00
|
|
|
{Commands, StreamState} ->
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream#stream{state=StreamState}, Commands)
|
2015-06-11 17:04:21 +02:00
|
|
|
catch Class:Reason ->
|
2017-01-16 14:22:43 +01:00
|
|
|
error_logger:error_msg("Exception occurred in "
|
|
|
|
"cowboy_stream:info(~p, ~p, ~p) with reason ~p:~p.",
|
|
|
|
[StreamID, Msg, StreamState0, Class, Reason]),
|
2015-06-11 17:04:21 +02:00
|
|
|
stream_reset(State, StreamID, {internal_error, {Class, Reason},
|
2017-01-16 14:22:43 +01:00
|
|
|
'Exception occurred in cowboy_stream:info/3.'})
|
2015-06-11 17:04:21 +02:00
|
|
|
end;
|
|
|
|
false ->
|
|
|
|
error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]),
|
|
|
|
State
|
|
|
|
end.
|
|
|
|
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream, []) ->
|
|
|
|
after_commands(State, Stream);
|
2016-08-10 17:15:02 +02:00
|
|
|
%% Error responses are sent only if a response wasn't sent already.
|
|
|
|
commands(State, Stream=#stream{local=idle}, [{error_response, StatusCode, Headers, Body}|Tail]) ->
|
|
|
|
commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]);
|
|
|
|
commands(State, Stream, [{error_response, _, _, _}|Tail]) ->
|
|
|
|
commands(State, Stream, Tail);
|
2015-06-11 17:04:21 +02:00
|
|
|
%% Send response headers.
|
|
|
|
%%
|
|
|
|
%% @todo Kill the stream if it sent a response when one has already been sent.
|
|
|
|
%% @todo Keep IsFin in the state.
|
|
|
|
%% @todo Same two things above apply to DATA, possibly promise too.
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0},
|
|
|
|
Stream=#stream{id=StreamID, local=idle}, [{response, StatusCode, Headers0, Body}|Tail]) ->
|
2016-08-10 11:49:31 +02:00
|
|
|
Headers = Headers0#{<<":status">> => status(StatusCode)},
|
2015-06-11 17:04:21 +02:00
|
|
|
{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
|
2016-06-06 17:27:48 +02:00
|
|
|
case Body of
|
2016-06-16 19:46:57 +02:00
|
|
|
<<>> ->
|
|
|
|
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
|
|
|
|
commands(State#state{encode_state=EncodeState}, Stream#stream{local=fin}, Tail);
|
2016-06-06 17:27:48 +02:00
|
|
|
{sendfile, O, B, P} ->
|
2016-06-16 19:46:57 +02:00
|
|
|
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
|
|
|
|
commands(State#state{encode_state=EncodeState}, Stream#stream{local=nofin},
|
2016-06-06 17:27:48 +02:00
|
|
|
[{sendfile, fin, O, B, P}|Tail]);
|
|
|
|
_ ->
|
2016-08-10 11:49:31 +02:00
|
|
|
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
|
|
|
|
%% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
|
|
|
|
%% Use the length set by the server instead, if any.
|
|
|
|
%% @todo Would be better if we didn't have to convert to binary.
|
|
|
|
send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384),
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State#state{encode_state=EncodeState}, Stream#stream{local=fin}, Tail)
|
2016-06-06 17:27:48 +02:00
|
|
|
end;
|
2016-06-16 19:46:57 +02:00
|
|
|
%% @todo response when local!=idle
|
2016-06-13 16:00:17 +02:00
|
|
|
%% Send response headers and initiate chunked encoding.
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0},
|
|
|
|
Stream=#stream{id=StreamID, local=idle}, [{headers, StatusCode, Headers0}|Tail]) ->
|
2016-08-10 11:49:31 +02:00
|
|
|
Headers = Headers0#{<<":status">> => status(StatusCode)},
|
2016-06-13 16:00:17 +02:00
|
|
|
{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
|
|
|
|
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State#state{encode_state=EncodeState}, Stream#stream{local=nofin}, Tail);
|
|
|
|
%% @todo headers when local!=idle
|
2015-06-11 17:04:21 +02:00
|
|
|
%% Send a response body chunk.
|
|
|
|
%%
|
|
|
|
%% @todo WINDOW_UPDATE stuff require us to buffer some data.
|
2016-06-06 17:27:48 +02:00
|
|
|
%%
|
|
|
|
%% When the body is sent using sendfile, the current solution is not
|
|
|
|
%% very good. The body could be too large, blocking the connection.
|
|
|
|
%% Also sendfile technically only works over TCP, so it's not that
|
|
|
|
%% useful for HTTP/2. At the very least the sendfile call should be
|
|
|
|
%% split into multiple calls and flow control should be used to make
|
|
|
|
%% sure we only send as fast as the client can receive and don't block
|
|
|
|
%% anything.
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin},
|
2015-06-11 17:04:21 +02:00
|
|
|
[{data, IsFin, Data}|Tail]) ->
|
|
|
|
Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream#stream{local=IsFin}, Tail);
|
2016-08-10 11:52:41 +02:00
|
|
|
|
2016-06-16 19:46:57 +02:00
|
|
|
%% @todo data when local!=nofin
|
2016-08-10 11:52:41 +02:00
|
|
|
|
2016-06-06 17:27:48 +02:00
|
|
|
%% Send a file.
|
|
|
|
%%
|
|
|
|
%% @todo This implementation is terrible. A good implementation would
|
|
|
|
%% need to check that Bytes is exact (or we need to document that we
|
|
|
|
%% trust it to be exact), and would need to send the file asynchronously
|
|
|
|
%% in many data frames. Perhaps a sendfile call should result in a
|
|
|
|
%% process being created specifically for this purpose. Or perhaps
|
|
|
|
%% the protocol should be "dumb" and the stream handler be the one
|
|
|
|
%% to ensure the file is sent in chunks (which would require a better
|
|
|
|
%% flow control at the stream handler level). One thing for sure, the
|
|
|
|
%% implementation necessarily varies between HTTP/1.1 and HTTP/2.
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin},
|
2016-06-06 17:27:48 +02:00
|
|
|
[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
|
2017-02-05 13:45:35 +01:00
|
|
|
%% @todo We currently have a naive implementation without a
|
|
|
|
%% scheduler to prioritize frames that need to be sent.
|
|
|
|
%% A future update will need to queue such data frames
|
|
|
|
%% and only send them when there is nothing currently
|
|
|
|
%% being sent. We would probably also benefit from doing
|
|
|
|
%% asynchronous sends.
|
|
|
|
sendfile(Socket, Transport, StreamID, IsFin, Offset, Bytes, Path, 16384),
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream#stream{local=IsFin}, Tail);
|
|
|
|
%% @todo sendfile when local!=nofin
|
2015-06-11 17:04:21 +02:00
|
|
|
%% Send a push promise.
|
|
|
|
%%
|
|
|
|
%% @todo We need to keep track of what promises we made so that we don't
|
|
|
|
%% end up with an infinite loop of promises.
|
|
|
|
commands(State0=#state{socket=Socket, transport=Transport, server_streamid=PromisedStreamID,
|
2016-06-16 19:46:57 +02:00
|
|
|
encode_state=EncodeState0}, Stream=#stream{id=StreamID},
|
2016-08-10 11:49:31 +02:00
|
|
|
[{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
|
|
|
|
Authority = case {Scheme, Port} of
|
|
|
|
{<<"http">>, 80} -> Host;
|
|
|
|
{<<"https">>, 443} -> Host;
|
|
|
|
_ -> [Host, $:, integer_to_binary(Port)]
|
|
|
|
end,
|
|
|
|
PathWithQs = case Qs of
|
|
|
|
<<>> -> Path;
|
|
|
|
_ -> [Path, $?, Qs]
|
|
|
|
end,
|
2015-06-11 17:04:21 +02:00
|
|
|
Headers = Headers0#{<<":method">> => Method,
|
|
|
|
<<":scheme">> => Scheme,
|
|
|
|
<<":authority">> => Authority,
|
2016-08-10 11:49:31 +02:00
|
|
|
<<":path">> => PathWithQs},
|
2015-06-11 17:04:21 +02:00
|
|
|
{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
|
|
|
|
Transport:send(Socket, cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock)),
|
|
|
|
%% @todo iolist_to_binary(HeaderBlock) isn't optimal. Need a shortcut.
|
|
|
|
State = stream_init(State0#state{server_streamid=PromisedStreamID + 2, encode_state=EncodeState},
|
|
|
|
PromisedStreamID, fin, iolist_to_binary(HeaderBlock)),
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream, Tail);
|
2015-06-11 17:04:21 +02:00
|
|
|
%% @todo Update the flow control state.
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream, [{flow, _Size}|Tail]) ->
|
|
|
|
commands(State, Stream, Tail);
|
2015-06-11 17:04:21 +02:00
|
|
|
%% Supervise a child process.
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
|
|
|
|
[{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
|
|
|
|
commands(State#state{children=[{Pid, StreamID}|Children]}, Stream, Tail);
|
2016-06-06 17:28:35 +02:00
|
|
|
%% Error handling.
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream=#stream{id=StreamID}, [Error = {internal_error, _, _}|_Tail]) ->
|
|
|
|
%% @todo Do we want to run the commands after an internal_error?
|
|
|
|
%% @todo Do we even allow commands after?
|
2016-06-06 17:28:35 +02:00
|
|
|
%% @todo Only reset when the stream still exists.
|
2016-06-16 19:46:57 +02:00
|
|
|
stream_reset(after_commands(State, Stream), StreamID, Error);
|
2017-01-16 14:22:43 +01:00
|
|
|
%% @todo HTTP/2 has no support for the Upgrade mechanism.
|
|
|
|
commands(State, Stream, [{switch_protocol, _Headers, _Mod, _ModState}|Tail]) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
%% @todo This is an error. Not sure what to do here yet.
|
2016-06-16 19:46:57 +02:00
|
|
|
commands(State, Stream, Tail);
|
|
|
|
commands(State, Stream=#stream{id=StreamID}, [stop|_Tail]) ->
|
2016-03-12 18:25:35 +01:00
|
|
|
%% @todo Do we want to run the commands after a stop?
|
2016-06-16 19:46:57 +02:00
|
|
|
%% @todo Do we even allow commands after?
|
|
|
|
stream_terminate(after_commands(State, Stream), StreamID, normal).
|
|
|
|
|
|
|
|
after_commands(State=#state{streams=Streams0}, Stream=#stream{id=StreamID}) ->
|
|
|
|
Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
|
|
|
|
State#state{streams=Streams}.
|
2015-06-11 17:04:21 +02:00
|
|
|
|
2016-08-10 11:49:31 +02:00
|
|
|
status(Status) when is_integer(Status) ->
|
|
|
|
integer_to_binary(Status);
|
|
|
|
status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 ->
|
|
|
|
<< H, T, U >>.
|
|
|
|
|
|
|
|
%% This same function is found in gun_http2.
|
|
|
|
send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
|
|
|
|
if
|
|
|
|
Length < byte_size(Data) ->
|
|
|
|
<< Payload:Length/binary, Rest/bits >> = Data,
|
|
|
|
Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)),
|
|
|
|
send_data(Socket, Transport, StreamID, IsFin, Rest, Length);
|
|
|
|
true ->
|
|
|
|
Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data))
|
|
|
|
end.
|
|
|
|
|
2017-02-05 13:45:35 +01:00
|
|
|
%% @todo This is currently awfully slow. But at least it's correct.
|
|
|
|
sendfile(Socket, Transport, StreamID, IsFin, Offset, Bytes, Path, Length) ->
|
|
|
|
if
|
|
|
|
Length < Bytes ->
|
|
|
|
Transport:send(Socket, cow_http2:data_header(StreamID, nofin, Length)),
|
|
|
|
Transport:sendfile(Socket, Path, Offset, Length),
|
|
|
|
sendfile(Socket, Transport, StreamID, IsFin,
|
|
|
|
Offset + Length, Bytes - Length, Path, Length);
|
|
|
|
true ->
|
|
|
|
Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
|
|
|
|
Transport:sendfile(Socket, Path, Offset, Bytes)
|
|
|
|
end.
|
|
|
|
|
2017-01-02 18:27:03 +01:00
|
|
|
-spec terminate(#state{}, _) -> no_return().
|
2017-02-25 20:05:31 +01:00
|
|
|
terminate(undefined, Reason) ->
|
|
|
|
exit({shutdown, Reason});
|
|
|
|
terminate(#state{socket=Socket, transport=Transport, client_streamid=LastStreamID,
|
2015-06-11 17:04:21 +02:00
|
|
|
streams=Streams, children=Children}, Reason) ->
|
2017-02-25 20:05:31 +01:00
|
|
|
%% @todo We might want to optionally send the Reason value
|
|
|
|
%% as debug data in the GOAWAY frame here. Perhaps more.
|
|
|
|
Transport:send(Socket, cow_http2:goaway(LastStreamID, terminate_reason(Reason), <<>>)),
|
2017-01-16 14:22:43 +01:00
|
|
|
terminate_all_streams(Streams, Reason, Children),
|
2015-06-11 17:04:21 +02:00
|
|
|
Transport:close(Socket),
|
|
|
|
exit({shutdown, Reason}).
|
|
|
|
|
2017-02-25 20:05:31 +01:00
|
|
|
terminate_reason({connection_error, Reason, _}) -> Reason;
|
|
|
|
terminate_reason({stop, _, _}) -> no_error;
|
|
|
|
terminate_reason({socket_error, _, _}) -> internal_error;
|
|
|
|
terminate_reason({internal_error, _, _}) -> internal_error.
|
|
|
|
|
2017-01-16 14:22:43 +01:00
|
|
|
terminate_all_streams([], _, []) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
ok;
|
2017-01-16 14:22:43 +01:00
|
|
|
terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Children0) ->
|
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
2015-06-11 17:04:21 +02:00
|
|
|
Children = stream_terminate_children(Children0, StreamID, []),
|
2017-01-16 14:22:43 +01:00
|
|
|
terminate_all_streams(Tail, Reason, Children).
|
2015-06-11 17:04:21 +02:00
|
|
|
|
|
|
|
%% Stream functions.
|
|
|
|
|
2016-06-20 17:28:59 +02:00
|
|
|
stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, peer=Peer, decode_state=DecodeState0},
|
2016-03-12 18:25:35 +01:00
|
|
|
StreamID, IsFin, HeaderBlock) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
%% @todo Add clause for CONNECT requests (no scheme/path).
|
|
|
|
try headers_decode(HeaderBlock, DecodeState0) of
|
|
|
|
{Headers0=#{
|
|
|
|
<<":method">> := Method,
|
|
|
|
<<":scheme">> := Scheme,
|
|
|
|
<<":authority">> := Authority,
|
Initial commit with connection/streams
Breaking changes with previous commit. This is a very large change,
and I am giving up on making a single commit that fixes everything.
More commits will follow slowly adding back features, introducing
new tests and fixing the documentation.
This change contains most of the work toward unifying the interface
for handling both HTTP/1.1 and HTTP/2. HTTP/1.1 connections are now
no longer 1 process per connection; instead by default 1 process per
request is also created. This has a number of pros and cons.
Because it has cons, we also allow users to use a lower-level API
that acts on "streams" (requests/responses) directly at the connection
process-level. If performance is a concern, one can always write a
stream handler. The performance in this case will be even greater
than with Cowboy 1, although all the special handlers are unavailable.
When switching to Websocket, after the handler returns from init/2,
Cowboy stops the stream and the Websocket protocol takes over the
connection process. Websocket then calls websocket_init/2 for any
additional initialization such as timers, because the process is
different in init/2 and websocket_*/* functions. This however would
allow us to use websocket_init/2 for sending messages on connect,
instead of sending ourselves a message and be subject to races.
Note that websocket_init/2 is optional.
This is all a big change and while most of the tests pass, some
functionality currently doesn't. SPDY is broken and will be removed
soon in favor of HTTP/2. Automatic compression is currently disabled.
The cowboy_req interface probably still have a few functions that
need to be updated. The docs and examples do not refer the current
functionality anymore.
Everything will be fixed over time. Feedback is more than welcome.
Open a ticket!
2016-02-10 17:28:32 +01:00
|
|
|
<<":path">> := PathWithQs}, DecodeState} ->
|
2015-06-11 17:04:21 +02:00
|
|
|
State = State0#state{decode_state=DecodeState},
|
|
|
|
Headers = maps:without([<<":method">>, <<":scheme">>, <<":authority">>, <<":path">>], Headers0),
|
2016-08-10 11:49:31 +02:00
|
|
|
BodyLength = case Headers of
|
|
|
|
_ when IsFin =:= fin ->
|
|
|
|
0;
|
|
|
|
#{<<"content-length">> := <<"0">>} ->
|
|
|
|
0;
|
|
|
|
#{<<"content-length">> := BinLength} ->
|
|
|
|
Length = try
|
|
|
|
cow_http_hd:parse_content_length(BinLength)
|
|
|
|
catch _:_ ->
|
|
|
|
terminate(State0, {stream_error, StreamID, protocol_error,
|
2017-02-05 20:45:36 +01:00
|
|
|
'The content-length header is invalid. (RFC7230 3.3.2)'})
|
2016-08-10 11:49:31 +02:00
|
|
|
%% @todo Err should terminate here...
|
|
|
|
end,
|
|
|
|
Length;
|
|
|
|
_ ->
|
|
|
|
undefined
|
|
|
|
end,
|
2016-06-20 17:29:25 +02:00
|
|
|
{Host, Port} = cow_http_hd:parse_host(Authority),
|
2016-06-06 17:28:56 +02:00
|
|
|
{Path, Qs} = cow_http:parse_fullpath(PathWithQs),
|
Initial commit with connection/streams
Breaking changes with previous commit. This is a very large change,
and I am giving up on making a single commit that fixes everything.
More commits will follow slowly adding back features, introducing
new tests and fixing the documentation.
This change contains most of the work toward unifying the interface
for handling both HTTP/1.1 and HTTP/2. HTTP/1.1 connections are now
no longer 1 process per connection; instead by default 1 process per
request is also created. This has a number of pros and cons.
Because it has cons, we also allow users to use a lower-level API
that acts on "streams" (requests/responses) directly at the connection
process-level. If performance is a concern, one can always write a
stream handler. The performance in this case will be even greater
than with Cowboy 1, although all the special handlers are unavailable.
When switching to Websocket, after the handler returns from init/2,
Cowboy stops the stream and the Websocket protocol takes over the
connection process. Websocket then calls websocket_init/2 for any
additional initialization such as timers, because the process is
different in init/2 and websocket_*/* functions. This however would
allow us to use websocket_init/2 for sending messages on connect,
instead of sending ourselves a message and be subject to races.
Note that websocket_init/2 is optional.
This is all a big change and while most of the tests pass, some
functionality currently doesn't. SPDY is broken and will be removed
soon in favor of HTTP/2. Automatic compression is currently disabled.
The cowboy_req interface probably still have a few functions that
need to be updated. The docs and examples do not refer the current
functionality anymore.
Everything will be fixed over time. Feedback is more than welcome.
Open a ticket!
2016-02-10 17:28:32 +01:00
|
|
|
Req = #{
|
|
|
|
ref => Ref,
|
|
|
|
pid => self(),
|
|
|
|
streamid => StreamID,
|
2016-06-20 17:28:59 +02:00
|
|
|
peer => Peer,
|
Initial commit with connection/streams
Breaking changes with previous commit. This is a very large change,
and I am giving up on making a single commit that fixes everything.
More commits will follow slowly adding back features, introducing
new tests and fixing the documentation.
This change contains most of the work toward unifying the interface
for handling both HTTP/1.1 and HTTP/2. HTTP/1.1 connections are now
no longer 1 process per connection; instead by default 1 process per
request is also created. This has a number of pros and cons.
Because it has cons, we also allow users to use a lower-level API
that acts on "streams" (requests/responses) directly at the connection
process-level. If performance is a concern, one can always write a
stream handler. The performance in this case will be even greater
than with Cowboy 1, although all the special handlers are unavailable.
When switching to Websocket, after the handler returns from init/2,
Cowboy stops the stream and the Websocket protocol takes over the
connection process. Websocket then calls websocket_init/2 for any
additional initialization such as timers, because the process is
different in init/2 and websocket_*/* functions. This however would
allow us to use websocket_init/2 for sending messages on connect,
instead of sending ourselves a message and be subject to races.
Note that websocket_init/2 is optional.
This is all a big change and while most of the tests pass, some
functionality currently doesn't. SPDY is broken and will be removed
soon in favor of HTTP/2. Automatic compression is currently disabled.
The cowboy_req interface probably still have a few functions that
need to be updated. The docs and examples do not refer the current
functionality anymore.
Everything will be fixed over time. Feedback is more than welcome.
Open a ticket!
2016-02-10 17:28:32 +01:00
|
|
|
method => Method,
|
|
|
|
scheme => Scheme,
|
|
|
|
host => Host,
|
|
|
|
port => Port,
|
|
|
|
path => Path,
|
|
|
|
qs => Qs,
|
|
|
|
version => 'HTTP/2',
|
|
|
|
headers => Headers,
|
2016-08-10 11:49:31 +02:00
|
|
|
has_body => IsFin =:= nofin,
|
|
|
|
body_length => BodyLength
|
Initial commit with connection/streams
Breaking changes with previous commit. This is a very large change,
and I am giving up on making a single commit that fixes everything.
More commits will follow slowly adding back features, introducing
new tests and fixing the documentation.
This change contains most of the work toward unifying the interface
for handling both HTTP/1.1 and HTTP/2. HTTP/1.1 connections are now
no longer 1 process per connection; instead by default 1 process per
request is also created. This has a number of pros and cons.
Because it has cons, we also allow users to use a lower-level API
that acts on "streams" (requests/responses) directly at the connection
process-level. If performance is a concern, one can always write a
stream handler. The performance in this case will be even greater
than with Cowboy 1, although all the special handlers are unavailable.
When switching to Websocket, after the handler returns from init/2,
Cowboy stops the stream and the Websocket protocol takes over the
connection process. Websocket then calls websocket_init/2 for any
additional initialization such as timers, because the process is
different in init/2 and websocket_*/* functions. This however would
allow us to use websocket_init/2 for sending messages on connect,
instead of sending ourselves a message and be subject to races.
Note that websocket_init/2 is optional.
This is all a big change and while most of the tests pass, some
functionality currently doesn't. SPDY is broken and will be removed
soon in favor of HTTP/2. Automatic compression is currently disabled.
The cowboy_req interface probably still have a few functions that
need to be updated. The docs and examples do not refer the current
functionality anymore.
Everything will be fixed over time. Feedback is more than welcome.
Open a ticket!
2016-02-10 17:28:32 +01:00
|
|
|
},
|
2016-03-12 18:25:35 +01:00
|
|
|
stream_handler_init(State, StreamID, IsFin, Req);
|
2015-06-11 17:04:21 +02:00
|
|
|
{_, DecodeState} ->
|
|
|
|
Transport:send(Socket, cow_http2:rst_stream(StreamID, protocol_error)),
|
|
|
|
State0#state{decode_state=DecodeState}
|
|
|
|
catch _:_ ->
|
|
|
|
terminate(State0, {connection_error, compression_error,
|
|
|
|
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
|
|
|
|
end.
|
|
|
|
|
2017-01-16 14:22:43 +01:00
|
|
|
stream_handler_init(State=#state{opts=Opts}, StreamID, IsFin, Req) ->
|
|
|
|
try cowboy_stream:init(StreamID, Req, Opts) of
|
2016-03-12 18:25:35 +01:00
|
|
|
{Commands, StreamState} ->
|
2017-02-25 20:05:31 +01:00
|
|
|
commands(State#state{client_streamid=StreamID},
|
|
|
|
#stream{id=StreamID, state=StreamState, remote=IsFin}, Commands)
|
2016-03-12 18:25:35 +01:00
|
|
|
catch Class:Reason ->
|
2017-01-16 14:22:43 +01:00
|
|
|
error_logger:error_msg("Exception occurred in "
|
|
|
|
"cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.",
|
|
|
|
[StreamID, IsFin, Req, Class, Reason]),
|
2016-03-12 18:25:35 +01:00
|
|
|
stream_reset(State, StreamID, {internal_error, {Class, Reason},
|
2017-01-16 14:22:43 +01:00
|
|
|
'Exception occurred in cowboy_stream:init/3.'})
|
2016-03-12 18:25:35 +01:00
|
|
|
end.
|
|
|
|
|
2015-06-11 17:04:21 +02:00
|
|
|
%% @todo We might need to keep track of which stream has been reset so we don't send lots of them.
|
|
|
|
stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
|
|
|
|
StreamError={internal_error, _, _}) ->
|
|
|
|
Transport:send(Socket, cow_http2:rst_stream(StreamID, internal_error)),
|
|
|
|
stream_terminate(State, StreamID, StreamError);
|
|
|
|
stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
|
|
|
|
StreamError={stream_error, Reason, _}) ->
|
|
|
|
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
|
|
|
|
stream_terminate(State, StreamID, StreamError).
|
|
|
|
|
2017-01-16 14:22:43 +01:00
|
|
|
stream_terminate(State=#state{socket=Socket, transport=Transport,
|
2016-06-16 19:46:57 +02:00
|
|
|
streams=Streams0, children=Children0, encode_state=EncodeState0}, StreamID, Reason) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
case lists:keytake(StreamID, #stream.id, Streams0) of
|
2016-06-16 19:46:57 +02:00
|
|
|
{value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal ->
|
|
|
|
Headers = #{<<":status">> => <<"204">>},
|
|
|
|
{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
|
|
|
|
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
|
2017-01-16 14:22:43 +01:00
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
2016-06-16 19:46:57 +02:00
|
|
|
Children = stream_terminate_children(Children0, StreamID, []),
|
|
|
|
State#state{streams=Streams, children=Children, encode_state=EncodeState};
|
2016-06-13 16:00:17 +02:00
|
|
|
{value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal ->
|
|
|
|
Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
|
2017-01-16 14:22:43 +01:00
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
2016-06-13 16:00:17 +02:00
|
|
|
Children = stream_terminate_children(Children0, StreamID, []),
|
|
|
|
State#state{streams=Streams, children=Children};
|
2015-06-11 17:04:21 +02:00
|
|
|
{value, #stream{state=StreamState}, Streams} ->
|
2017-01-16 14:22:43 +01:00
|
|
|
stream_call_terminate(StreamID, Reason, StreamState),
|
2015-06-11 17:04:21 +02:00
|
|
|
Children = stream_terminate_children(Children0, StreamID, []),
|
|
|
|
State#state{streams=Streams, children=Children};
|
|
|
|
false ->
|
|
|
|
%% @todo Unknown stream. Not sure what to do here. Check again once all
|
|
|
|
%% terminate calls have been written.
|
|
|
|
State
|
|
|
|
end.
|
|
|
|
|
2017-01-16 14:22:43 +01:00
|
|
|
stream_call_terminate(StreamID, Reason, StreamState) ->
|
2015-06-11 17:04:21 +02:00
|
|
|
try
|
2017-01-16 14:22:43 +01:00
|
|
|
cowboy_stream:terminate(StreamID, Reason, StreamState)
|
2015-06-11 17:04:21 +02:00
|
|
|
catch Class:Reason ->
|
2017-01-16 14:22:43 +01:00
|
|
|
error_logger:error_msg("Exception occurred in "
|
|
|
|
"cowboy_stream:terminate(~p, ~p, ~p) with reason ~p:~p.",
|
|
|
|
[StreamID, Reason, StreamState, Class, Reason])
|
2015-06-11 17:04:21 +02:00
|
|
|
end.
|
|
|
|
|
|
|
|
stream_terminate_children([], _, Acc) ->
|
|
|
|
Acc;
|
|
|
|
stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
|
|
|
|
exit(Pid, kill),
|
|
|
|
stream_terminate_children(Tail, StreamID, Acc);
|
|
|
|
stream_terminate_children([Child|Tail], StreamID, Acc) ->
|
|
|
|
stream_terminate_children(Tail, StreamID, [Child|Acc]).
|
|
|
|
|
|
|
|
%% Headers encode/decode.
|
|
|
|
|
|
|
|
headers_decode(HeaderBlock, DecodeState0) ->
|
|
|
|
{Headers, DecodeState} = cow_hpack:decode(HeaderBlock, DecodeState0),
|
2016-08-10 11:49:31 +02:00
|
|
|
{headers_to_map(Headers, #{}), DecodeState}.
|
2015-06-11 17:04:21 +02:00
|
|
|
|
2016-08-10 11:49:31 +02:00
|
|
|
%% This function is necessary to properly handle duplicate headers
|
|
|
|
%% and the special-case cookie header.
|
|
|
|
headers_to_map([], Acc) ->
|
|
|
|
Acc;
|
|
|
|
headers_to_map([{Name, Value}|Tail], Acc0) ->
|
|
|
|
Acc = case Acc0 of
|
|
|
|
%% The cookie header does not use proper HTTP header lists.
|
|
|
|
#{Name := Value0} when Name =:= <<"cookie">> -> Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
|
|
|
|
#{Name := Value0} -> Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
|
|
|
|
_ -> Acc0#{Name => Value}
|
|
|
|
end,
|
|
|
|
headers_to_map(Tail, Acc).
|
|
|
|
|
|
|
|
%% The set-cookie header is special; we can only send one cookie per header.
|
|
|
|
headers_encode(Headers0=#{<<"set-cookie">> := SetCookies}, EncodeState) ->
|
|
|
|
Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
|
|
|
|
Headers = Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies],
|
|
|
|
cow_hpack:encode(Headers, EncodeState);
|
2015-06-11 17:04:21 +02:00
|
|
|
headers_encode(Headers0, EncodeState) ->
|
|
|
|
Headers = maps:to_list(Headers0),
|
|
|
|
cow_hpack:encode(Headers, EncodeState).
|
|
|
|
|
|
|
|
%% System callbacks.
|
|
|
|
|
2017-01-02 18:27:03 +01:00
|
|
|
-spec system_continue(_, _, {#state{}, binary()}) -> ok.
|
2015-06-11 17:04:21 +02:00
|
|
|
system_continue(_, _, {State, Buffer}) ->
|
|
|
|
loop(State, Buffer).
|
|
|
|
|
|
|
|
-spec system_terminate(any(), _, _, _) -> no_return().
|
|
|
|
system_terminate(Reason, _, _, _) ->
|
|
|
|
exit(Reason).
|
|
|
|
|
|
|
|
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
|
|
|
|
system_code_change(Misc, _, _, _) ->
|
|
|
|
{ok, Misc}.
|