0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Add support for multiple stream handlers

The stream handlers can be specified using the protocol
option 'stream_handlers'. It defaults to [cowboy_stream_h].

The cowboy_stream_h module currently does not forward the
calls to further stream handlers. It feels like an edge
case; usually we'd want to put our own handlers between
the protocol code and the request process. I am therefore
going to focus on other things for now.

The various types and specifications for stream handlers
have been updated and the cowboy_stream module can now
be safely used as a behavior. The interface might change
a little more, though.

This commit does not include tests or documentation.
They will follow separately.
This commit is contained in:
Loïc Hoguin 2017-01-16 14:22:43 +01:00
parent e5a8088e68
commit 0f8452cafa
No known key found for this signature in database
GPG key ID: 71366FF21851DF03
7 changed files with 209 additions and 140 deletions

View file

@ -41,26 +41,29 @@
%% doesn't let us do that yet. %% doesn't let us do that yet.
-spec start_clear(ranch:ref(), non_neg_integer(), ranch_tcp:opts(), opts()) -spec start_clear(ranch:ref(), non_neg_integer(), ranch_tcp:opts(), opts())
-> {ok, pid()} | {error, any()}. -> {ok, pid()} | {error, any()}.
start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts) start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts0)
when is_integer(NbAcceptors), NbAcceptors > 0 -> when is_integer(NbAcceptors), NbAcceptors > 0 ->
TransOpts = [connection_type(ProtoOpts)|TransOpts0], {TransOpts, ConnectionType} = ensure_connection_type(TransOpts0),
ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
ranch:start_listener(Ref, NbAcceptors, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts). ranch:start_listener(Ref, NbAcceptors, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
-spec start_tls(ranch:ref(), non_neg_integer(), ranch_ssl:opts(), opts()) -spec start_tls(ranch:ref(), non_neg_integer(), ranch_ssl:opts(), opts())
-> {ok, pid()} | {error, any()}. -> {ok, pid()} | {error, any()}.
start_tls(Ref, NbAcceptors, TransOpts0, ProtoOpts) start_tls(Ref, NbAcceptors, TransOpts0, ProtoOpts0)
when is_integer(NbAcceptors), NbAcceptors > 0 -> when is_integer(NbAcceptors), NbAcceptors > 0 ->
{TransOpts1, ConnectionType} = ensure_connection_type(TransOpts0),
TransOpts = [ TransOpts = [
connection_type(ProtoOpts),
{next_protocols_advertised, [<<"h2">>, <<"http/1.1">>]}, {next_protocols_advertised, [<<"h2">>, <<"http/1.1">>]},
{alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
|TransOpts0], |TransOpts1],
ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
ranch:start_listener(Ref, NbAcceptors, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts). ranch:start_listener(Ref, NbAcceptors, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
-spec connection_type(opts()) -> {connection_type, worker | supervisor}. ensure_connection_type(TransOpts) ->
connection_type(ProtoOpts) -> case proplists:get_value(connection_type, TransOpts) of
{_, Type} = maps:get(stream_handler, ProtoOpts, {cowboy_stream_h, supervisor}), undefined -> {[{connection_type, supervisor}|TransOpts], supervisor};
{connection_type, Type}. ConnectionType -> {TransOpts, ConnectionType}
end.
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}. -spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
stop_listener(Ref) -> stop_listener(Ref) ->

View file

@ -40,9 +40,8 @@ init(Parent, Ref, Socket, Transport, Opts) ->
init(Parent, Ref, Socket, Transport, Opts, cowboy_http). init(Parent, Ref, Socket, Transport, Opts, cowboy_http).
init(Parent, Ref, Socket, Transport, Opts, Protocol) -> init(Parent, Ref, Socket, Transport, Opts, Protocol) ->
{Handler, Type} = maps:get(stream_handler, Opts, {cowboy_stream_h, supervisor}), _ = case maps:get(connection_type, Opts, supervisor) of
_ = case Type of
worker -> ok; worker -> ok;
supervisor -> process_flag(trap_exit, true) supervisor -> process_flag(trap_exit, true)
end, end,
Protocol:init(Parent, Ref, Socket, Transport, Opts, Handler). Protocol:init(Parent, Ref, Socket, Transport, Opts).

View file

@ -14,7 +14,7 @@
-module(cowboy_http). -module(cowboy_http).
-export([init/6]). -export([init/5]).
-export([system_continue/3]). -export([system_continue/3]).
-export([system_terminate/4]). -export([system_terminate/4]).
@ -67,17 +67,13 @@
}). }).
-record(stream, { -record(stream, {
%% Stream identifier.
id = undefined :: cowboy_stream:streamid(), id = undefined :: cowboy_stream:streamid(),
%% Stream handlers and their state.
%% Stream handler state. state = undefined :: {module(), any()},
state = undefined :: any(),
%% Client HTTP version for this stream. %% Client HTTP version for this stream.
version = undefined :: cowboy:http_version(), version = undefined :: cowboy:http_version(),
%% Commands queued. %% Commands queued.
queue = [] :: [] %% @todo better type queue = [] :: cowboy_stream:commands()
}). }).
-type stream() :: #stream{}. -type stream() :: #stream{}.
@ -88,7 +84,6 @@
socket :: inet:socket(), socket :: inet:socket(),
transport :: module(), transport :: module(),
opts = #{} :: map(), opts = #{} :: map(),
handler :: module(),
%% Remote address and port for the connection. %% Remote address and port for the connection.
peer = undefined :: {inet:ip_address(), inet:port_number()}, peer = undefined :: {inet:ip_address(), inet:port_number()},
@ -124,14 +119,14 @@
-include_lib("cowlib/include/cow_inline.hrl"). -include_lib("cowlib/include/cow_inline.hrl").
-include_lib("cowlib/include/cow_parse.hrl"). -include_lib("cowlib/include/cow_parse.hrl").
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok. -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
init(Parent, Ref, Socket, Transport, Opts, Handler) -> init(Parent, Ref, Socket, Transport, Opts) ->
case Transport:peername(Socket) of case Transport:peername(Socket) of
{ok, Peer} -> {ok, Peer} ->
LastStreamID = maps:get(max_keepalive, Opts, 100), LastStreamID = maps:get(max_keepalive, Opts, 100),
before_loop(set_request_timeout(#state{ before_loop(set_request_timeout(#state{
parent=Parent, ref=Ref, socket=Socket, parent=Parent, ref=Ref, socket=Socket,
transport=Transport, opts=Opts, handler=Handler, transport=Transport, opts=Opts,
peer=Peer, last_streamid=LastStreamID}), <<>>); peer=Peer, last_streamid=LastStreamID}), <<>>);
{error, Reason} -> {error, Reason} ->
%% Couldn't read the peer address; connection is gone. %% Couldn't read the peer address; connection is gone.
@ -159,7 +154,7 @@ before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
loop(State, Buffer). loop(State, Buffer).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
handler=_Handler, timer=TimerRef, children=Children}, Buffer) -> timer=TimerRef, children=Children}, Buffer) ->
{OK, Closed, Error} = Transport:messages(), {OK, Closed, Error} = Transport:messages(),
receive receive
%% Socket messages. %% Socket messages.
@ -257,9 +252,8 @@ parse(Buffer, State=#state{in_state=#ps_body{}}) ->
%% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now. %% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version}, after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version},
State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) -> State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
%% @todo Opts at the end. Maybe pass the same Opts we got? try cowboy_stream:init(StreamID, Req, Opts) of
try Handler:init(StreamID, Req, Opts) of
{Commands, StreamState} -> {Commands, StreamState} ->
Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0], Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
State = case maybe_req_close(State0, Headers, Version) of State = case maybe_req_close(State0, Headers, Version) of
@ -268,27 +262,27 @@ after_parse({request, Req=#{streamid := StreamID, headers := Headers, version :=
end, end,
parse(Buffer, commands(State, StreamID, Commands)) parse(Buffer, commands(State, StreamID, Commands))
catch Class:Reason -> catch Class:Reason ->
error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) " error_logger:error_msg("Exception occurred in "
"with reason ~p:~p.", "cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.",
[Handler, StreamID, Req, Opts, Class, Reason]), [StreamID, Req, Opts, Class, Reason]),
%% @todo Bad value returned here. Crashes. ok %% @todo send a proper response, etc. note that terminate must NOT be called
ok
%% @todo Status code. %% @todo Status code.
% stream_reset(State, StreamID, {internal_error, {Class, Reason}, % stream_reset(State, StreamID, {internal_error, {Class, Reason},
% 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity. % 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity.
end; end;
%% Streams are sequential so the body is always about the last stream created %% Streams are sequential so the body is always about the last stream created
%% unless that stream has terminated. %% unless that stream has terminated.
after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler, after_parse({data, StreamID, IsFin, Data, State=#state{
streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) -> streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
try Handler:data(StreamID, IsFin, Data, StreamState0) of try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} -> {Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}), Stream#stream{state=StreamState}),
parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands)) parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
catch Class:Reason -> catch Class:Reason ->
error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.", error_logger:error_msg("Exception occurred in "
[Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]), "cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
[StreamID, IsFin, Data, StreamState0, Class, Reason]),
%% @todo Bad value returned here. Crashes. %% @todo Bad value returned here. Crashes.
ok ok
%% @todo %% @todo
@ -669,18 +663,18 @@ is_http2_upgrade(_, _) ->
%% Prior knowledge upgrade, without an HTTP/1.1 request. %% Prior knowledge upgrade, without an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
opts=Opts, handler=Handler, peer=Peer}, Buffer) -> opts=Opts, peer=Peer}, Buffer) ->
case Transport:secure() of case Transport:secure() of
false -> false ->
_ = cancel_request_timeout(State), _ = cancel_request_timeout(State),
cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer); cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer);
true -> true ->
error_terminate(400, State, {connection_error, protocol_error, error_terminate(400, State, {connection_error, protocol_error,
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
end. end.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
opts=Opts, handler=Handler, peer=Peer}, Buffer, HTTP2Settings, Req) -> opts=Opts, peer=Peer}, Buffer, HTTP2Settings, Req) ->
%% @todo %% @todo
%% However if the client sent a body, we need to read the body in full %% However if the client sent a body, we need to read the body in full
%% and if we can't do that, return a 413 response. Some options are in order. %% and if we can't do that, return a 413 response. Some options are in order.
@ -695,7 +689,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
%% @todo Possibly redirect the request if it was https. %% @todo Possibly redirect the request if it was https.
_ = cancel_request_timeout(State), _ = cancel_request_timeout(State),
cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer, Settings, Req) cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, Settings, Req)
catch _:_ -> catch _:_ ->
error_terminate(400, State, {connection_error, protocol_error, error_terminate(400, State, {connection_error, protocol_error,
'The HTTP2-Settings header contains a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) 'The HTTP2-Settings header contains a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
@ -748,17 +742,18 @@ down(State=#state{children=Children0}, Pid, Msg) ->
State State
end. end.
info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) -> info(State=#state{streams=Streams0}, StreamID, Msg) ->
case lists:keyfind(StreamID, #stream.id, Streams0) of case lists:keyfind(StreamID, #stream.id, Streams0) of
Stream = #stream{state=StreamState0} -> Stream = #stream{state=StreamState0} ->
try Handler:info(StreamID, Msg, StreamState0) of try cowboy_stream:info(StreamID, Msg, StreamState0) of
{Commands, StreamState} -> {Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}), Stream#stream{state=StreamState}),
commands(State#state{streams=Streams}, StreamID, Commands) commands(State#state{streams=Streams}, StreamID, Commands)
catch Class:Reason -> catch Class:Reason ->
error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.", error_logger:error_msg("Exception occurred in "
[Handler, StreamID, Msg, StreamState0, Class, Reason]), "cowboy_stream:info(~p, ~p, ~p) with reason ~p:~p.",
[StreamID, Msg, StreamState0, Class, Reason]),
ok ok
%% @todo %% @todo
% stream_reset(State, StreamID, {internal_error, {Class, Reason}, % stream_reset(State, StreamID, {internal_error, {Class, Reason},
@ -926,7 +921,7 @@ stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
% stream_terminate(State#state{out_state=done}, StreamID, StreamError). % stream_terminate(State#state{out_state=done}, StreamID, StreamError).
stream_terminate(State, StreamID, StreamError). stream_terminate(State, StreamID, StreamError).
stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler, stream_terminate(State=#state{socket=Socket, transport=Transport,
out_streamid=OutStreamID, out_state=OutState, out_streamid=OutStreamID, out_state=OutState,
streams=Streams0, children=Children0}, StreamID, Reason) -> streams=Streams0, children=Children0}, StreamID, Reason) ->
{value, #stream{state=StreamState, version=Version}, Streams} {value, #stream{state=StreamState, version=Version}, Streams}
@ -940,7 +935,7 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle
ok ok
end, end,
stream_call_terminate(StreamID, Reason, Handler, StreamState), stream_call_terminate(StreamID, Reason, StreamState),
%% @todo initiate children shutdown %% @todo initiate children shutdown
% Children = stream_terminate_children(Children0, StreamID, []), % Children = stream_terminate_children(Children0, StreamID, []),
Children = [case C of Children = [case C of
@ -964,13 +959,13 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle
end. end.
%% @todo Taken directly from _http2 %% @todo Taken directly from _http2
stream_call_terminate(StreamID, Reason, Handler, StreamState) -> stream_call_terminate(StreamID, Reason, StreamState) ->
try try
Handler:terminate(StreamID, Reason, StreamState), cowboy_stream:terminate(StreamID, Reason, StreamState)
ok
catch Class:Reason -> catch Class:Reason ->
error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.", error_logger:error_msg("Exception occurred in "
[Handler, StreamID, Reason, StreamState, Class, Reason]) "cowboy_stream:terminate(~p, ~p, ~p) with reason ~p:~p.",
[StreamID, Reason, StreamState, Class, Reason])
end. end.
%stream_terminate_children([], _, Acc) -> %stream_terminate_children([], _, Acc) ->

View file

@ -14,9 +14,9 @@
-module(cowboy_http2). -module(cowboy_http2).
-export([init/6]). -export([init/5]).
-export([init/8]). -export([init/7]).
-export([init/10]). -export([init/9]).
-export([system_continue/3]). -export([system_continue/3]).
-export([system_terminate/4]). -export([system_terminate/4]).
@ -24,7 +24,8 @@
-record(stream, { -record(stream, {
id = undefined :: cowboy_stream:streamid(), id = undefined :: cowboy_stream:streamid(),
state = undefined :: any(), %% Stream handlers and their state.
state = undefined :: {module(), any()},
%% Whether we finished sending data. %% Whether we finished sending data.
local = idle :: idle | cowboy_stream:fin(), local = idle :: idle | cowboy_stream:fin(),
%% Whether we finished receiving data. %% Whether we finished receiving data.
@ -44,7 +45,6 @@
socket = undefined :: inet:socket(), socket = undefined :: inet:socket(),
transport :: module(), transport :: module(),
opts = #{} :: map(), opts = #{} :: map(),
handler :: module(),
%% Remote address and port for the connection. %% Remote address and port for the connection.
peer = undefined :: {inet:ip_address(), inet:port_number()}, peer = undefined :: {inet:ip_address(), inet:port_number()},
@ -89,21 +89,21 @@
encode_state = cow_hpack:init() :: cow_hpack:state() encode_state = cow_hpack:init() :: cow_hpack:state()
}). }).
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok. -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
init(Parent, Ref, Socket, Transport, Opts, Handler) -> init(Parent, Ref, Socket, Transport, Opts) ->
case Transport:peername(Socket) of case Transport:peername(Socket) of
{ok, Peer} -> {ok, Peer} ->
init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, <<>>); init(Parent, Ref, Socket, Transport, Opts, Peer, <<>>);
{error, Reason} -> {error, Reason} ->
%% Couldn't read the peer address; connection is gone. %% Couldn't read the peer address; connection is gone.
terminate(undefined, {socket_error, Reason, 'An error has occurred on the socket.'}) terminate(undefined, {socket_error, Reason, 'An error has occurred on the socket.'})
end. end.
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module(), -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(),
{inet:ip_address(), inet:port_number()}, binary()) -> ok. {inet:ip_address(), inet:port_number()}, binary()) -> ok.
init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer) -> init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer) ->
State = #state{parent=Parent, ref=Ref, socket=Socket, State = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, opts=Opts, handler=Handler, peer=Peer, transport=Transport, opts=Opts, peer=Peer,
parse_state={preface, sequence, preface_timeout(Opts)}}, parse_state={preface, sequence, preface_timeout(Opts)}},
preface(State), preface(State),
case Buffer of case Buffer of
@ -112,11 +112,11 @@ init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer) ->
end. end.
%% @todo Add an argument for the request body. %% @todo Add an argument for the request body.
-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module(), -spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(),
{inet:ip_address(), inet:port_number()}, binary(), map() | undefined, cowboy_req:req()) -> ok. {inet:ip_address(), inet:port_number()}, binary(), map() | undefined, cowboy_req:req()) -> ok.
init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer, _Settings, Req) -> init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, _Settings, Req) ->
State0 = #state{parent=Parent, ref=Ref, socket=Socket, State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, opts=Opts, handler=Handler, peer=Peer, transport=Transport, opts=Opts, peer=Peer,
parse_state={preface, sequence, preface_timeout(Opts)}}, parse_state={preface, sequence, preface_timeout(Opts)}},
preface(State0), preface(State0),
%% @todo Apply settings. %% @todo Apply settings.
@ -245,7 +245,7 @@ parse_settings_preface(State, _, _, _) ->
%% and terminate the stream if this is the end of it. %% and terminate the stream if this is the end of it.
%% DATA frame. %% DATA frame.
frame(State=#state{handler=Handler, streams=Streams}, {data, StreamID, IsFin0, Data}) -> frame(State=#state{streams=Streams}, {data, StreamID, IsFin0, Data}) ->
case lists:keyfind(StreamID, #stream.id, Streams) of case lists:keyfind(StreamID, #stream.id, Streams) of
Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} -> Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} ->
Len = Len0 + byte_size(Data), Len = Len0 + byte_size(Data),
@ -253,14 +253,15 @@ frame(State=#state{handler=Handler, streams=Streams}, {data, StreamID, IsFin0, D
fin -> {fin, Len}; fin -> {fin, Len};
nofin -> nofin nofin -> nofin
end, end,
try Handler:data(StreamID, IsFin, Data, StreamState0) of try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} -> {Commands, StreamState} ->
commands(State, Stream#stream{state=StreamState, body_length=Len}, Commands) commands(State, Stream#stream{state=StreamState, body_length=Len}, Commands)
catch Class:Reason -> catch Class:Reason ->
error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.", error_logger:error_msg("Exception occurred in "
[Handler, StreamID, IsFin0, Data, StreamState0, Class, Reason]), "cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
[StreamID, IsFin0, Data, StreamState0, Class, Reason]),
stream_reset(State, StreamID, {internal_error, {Class, Reason}, stream_reset(State, StreamID, {internal_error, {Class, Reason},
'Exception occurred in StreamHandler:data/4 call.'}) 'Exception occurred in cowboy_stream:data/4.'})
end; end;
_ -> _ ->
stream_reset(State, StreamID, {stream_error, stream_closed, stream_reset(State, StreamID, {stream_error, stream_closed,
@ -350,17 +351,18 @@ down(State=#state{children=Children0}, Pid, Msg) ->
State State
end. end.
info(State=#state{handler=Handler, streams=Streams}, StreamID, Msg) -> info(State=#state{streams=Streams}, StreamID, Msg) ->
case lists:keyfind(StreamID, #stream.id, Streams) of case lists:keyfind(StreamID, #stream.id, Streams) of
Stream = #stream{state=StreamState0} -> Stream = #stream{state=StreamState0} ->
try Handler:info(StreamID, Msg, StreamState0) of try cowboy_stream:info(StreamID, Msg, StreamState0) of
{Commands, StreamState} -> {Commands, StreamState} ->
commands(State, Stream#stream{state=StreamState}, Commands) commands(State, Stream#stream{state=StreamState}, Commands)
catch Class:Reason -> catch Class:Reason ->
error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.", error_logger:error_msg("Exception occurred in "
[Handler, StreamID, Msg, StreamState0, Class, Reason]), "cowboy_stream:info(~p, ~p, ~p) with reason ~p:~p.",
[StreamID, Msg, StreamState0, Class, Reason]),
stream_reset(State, StreamID, {internal_error, {Class, Reason}, stream_reset(State, StreamID, {internal_error, {Class, Reason},
'Exception occurred in StreamHandler:info/3 call.'}) 'Exception occurred in cowboy_stream:info/3.'})
end; end;
false -> false ->
error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]), error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]),
@ -482,14 +484,8 @@ commands(State, Stream=#stream{id=StreamID}, [Error = {internal_error, _, _}|_Ta
%% @todo Do we even allow commands after? %% @todo Do we even allow commands after?
%% @todo Only reset when the stream still exists. %% @todo Only reset when the stream still exists.
stream_reset(after_commands(State, Stream), StreamID, Error); stream_reset(after_commands(State, Stream), StreamID, Error);
%% Upgrade to a new protocol. %% @todo HTTP/2 has no support for the Upgrade mechanism.
%% commands(State, Stream, [{switch_protocol, _Headers, _Mod, _ModState}|Tail]) ->
%% @todo Implementation.
%% @todo Can only upgrade if: there are no other streams and there are no children left alive.
%% @todo For HTTP/1.1 we should reject upgrading if pipelining is used.
commands(State, Stream, [{upgrade, _Mod, _ModState}]) ->
commands(State, Stream, []);
commands(State, Stream, [{upgrade, _Mod, _ModState}|Tail]) ->
%% @todo This is an error. Not sure what to do here yet. %% @todo This is an error. Not sure what to do here yet.
commands(State, Stream, Tail); commands(State, Stream, Tail);
commands(State, Stream=#stream{id=StreamID}, [stop|_Tail]) -> commands(State, Stream=#stream{id=StreamID}, [stop|_Tail]) ->
@ -518,19 +514,19 @@ send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
end. end.
-spec terminate(#state{}, _) -> no_return(). -spec terminate(#state{}, _) -> no_return().
terminate(#state{socket=Socket, transport=Transport, handler=Handler, terminate(#state{socket=Socket, transport=Transport,
streams=Streams, children=Children}, Reason) -> streams=Streams, children=Children}, Reason) ->
%% @todo Send GOAWAY frame; need to keep track of last good stream id; how? %% @todo Send GOAWAY frame; need to keep track of last good stream id; how?
terminate_all_streams(Streams, Reason, Handler, Children), terminate_all_streams(Streams, Reason, Children),
Transport:close(Socket), Transport:close(Socket),
exit({shutdown, Reason}). exit({shutdown, Reason}).
terminate_all_streams([], _, _, []) -> terminate_all_streams([], _, []) ->
ok; ok;
terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Handler, Children0) -> terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Children0) ->
stream_call_terminate(StreamID, Reason, Handler, StreamState), stream_call_terminate(StreamID, Reason, StreamState),
Children = stream_terminate_children(Children0, StreamID, []), Children = stream_terminate_children(Children0, StreamID, []),
terminate_all_streams(Tail, Reason, Handler, Children). terminate_all_streams(Tail, Reason, Children).
%% Stream functions. %% Stream functions.
@ -593,16 +589,16 @@ stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, peer=Peer
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
end. end.
stream_handler_init(State=#state{handler=Handler, opts=Opts}, StreamID, IsFin, Req) -> stream_handler_init(State=#state{opts=Opts}, StreamID, IsFin, Req) ->
try Handler:init(StreamID, Req, Opts) of try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} -> {Commands, StreamState} ->
commands(State, #stream{id=StreamID, state=StreamState, remote=IsFin}, Commands) commands(State, #stream{id=StreamID, state=StreamState, remote=IsFin}, Commands)
catch Class:Reason -> catch Class:Reason ->
error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) " error_logger:error_msg("Exception occurred in "
"with reason ~p:~p.", "cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.",
[Handler, StreamID, IsFin, Req, Class, Reason]), [StreamID, IsFin, Req, Class, Reason]),
stream_reset(State, StreamID, {internal_error, {Class, Reason}, stream_reset(State, StreamID, {internal_error, {Class, Reason},
'Exception occurred in StreamHandler:init/7 call.'}) %% @todo Check final arity. 'Exception occurred in cowboy_stream:init/3.'})
end. end.
%% @todo We might need to keep track of which stream has been reset so we don't send lots of them. %% @todo We might need to keep track of which stream has been reset so we don't send lots of them.
@ -615,23 +611,23 @@ stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
stream_terminate(State, StreamID, StreamError). stream_terminate(State, StreamID, StreamError).
stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler, stream_terminate(State=#state{socket=Socket, transport=Transport,
streams=Streams0, children=Children0, encode_state=EncodeState0}, StreamID, Reason) -> streams=Streams0, children=Children0, encode_state=EncodeState0}, StreamID, Reason) ->
case lists:keytake(StreamID, #stream.id, Streams0) of case lists:keytake(StreamID, #stream.id, Streams0) of
{value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal -> {value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal ->
Headers = #{<<":status">> => <<"204">>}, Headers = #{<<":status">> => <<"204">>},
{HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0), {HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0),
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
stream_call_terminate(StreamID, Reason, Handler, StreamState), stream_call_terminate(StreamID, Reason, StreamState),
Children = stream_terminate_children(Children0, StreamID, []), Children = stream_terminate_children(Children0, StreamID, []),
State#state{streams=Streams, children=Children, encode_state=EncodeState}; State#state{streams=Streams, children=Children, encode_state=EncodeState};
{value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal -> {value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal ->
Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)), Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
stream_call_terminate(StreamID, Reason, Handler, StreamState), stream_call_terminate(StreamID, Reason, StreamState),
Children = stream_terminate_children(Children0, StreamID, []), Children = stream_terminate_children(Children0, StreamID, []),
State#state{streams=Streams, children=Children}; State#state{streams=Streams, children=Children};
{value, #stream{state=StreamState}, Streams} -> {value, #stream{state=StreamState}, Streams} ->
stream_call_terminate(StreamID, Reason, Handler, StreamState), stream_call_terminate(StreamID, Reason, StreamState),
Children = stream_terminate_children(Children0, StreamID, []), Children = stream_terminate_children(Children0, StreamID, []),
State#state{streams=Streams, children=Children}; State#state{streams=Streams, children=Children};
false -> false ->
@ -640,13 +636,13 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle
State State
end. end.
stream_call_terminate(StreamID, Reason, Handler, StreamState) -> stream_call_terminate(StreamID, Reason, StreamState) ->
try try
Handler:terminate(StreamID, Reason, StreamState), cowboy_stream:terminate(StreamID, Reason, StreamState)
ok
catch Class:Reason -> catch Class:Reason ->
error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.", error_logger:error_msg("Exception occurred in "
[Handler, StreamID, Reason, StreamState, Class, Reason]) "cowboy_stream:terminate(~p, ~p, ~p) with reason ~p:~p.",
[StreamID, Reason, StreamState, Class, Reason])
end. end.
stream_terminate_children([], _, Acc) -> stream_terminate_children([], _, Acc) ->

View file

@ -14,31 +14,46 @@
-module(cowboy_stream). -module(cowboy_stream).
-type streamid() :: any().
-type fin() :: fin | nofin.
-type headers() :: map(). %% @todo cowboy:http_headers() when they're maps
-type status_code() :: 100..999. %% @todo cowboy:http_status() when not binary
-type state() :: any(). -type state() :: any().
-type commands() :: [{response, fin(), status_code(), headers()}
| {data, fin(), iodata()}
| {promise, binary(), binary(), binary(), binary(), headers()}
| {flow, auto | integer()}
| {spawn, pid()}
| {upgrade, module(), state()}].
-type human_reason() :: atom(). -type human_reason() :: atom().
-type reason() :: [{internal_error, timeout | {error | exit | throw, any()}, human_reason()}
| {socket_error, closed | atom(), human_reason()}
| {stream_error, cow_http2:error_reason(), human_reason()}
| {connection_error, cow_http2:error_reason(), human_reason()}
| {stop, cow_http2:frame(), human_reason()}].
-callback init(streamid(), fin(), binary(), binary(), binary(), binary(), -type streamid() :: any().
headers(), cowboy:opts()) -> {commands(), state()}. -export_type([streamid/0]).
-type fin() :: fin | nofin.
-export_type([fin/0]).
%% @todo Perhaps it makes more sense to have resp_body in this module?
-type commands() :: [{response, cowboy:http_status(), cowboy:http_headers(), cowboy_req:resp_body()}
| {headers, cowboy:http_status(), cowboy:http_headers()}
| {data, fin(), iodata()}
| {push, binary(), binary(), binary(), inet:port_number(),
binary(), binary(), cowboy:http_headers()}
| {flow, auto | integer()}
| {spawn, pid(), timeout()}
| {error_response, cowboy:http_status(), cowboy:http_headers(), iodata()}
| {internal_error, any(), human_reason()}
| {switch_protocol, cowboy:http_headers(), module(), state()}
%% @todo I'm not convinced we need this 'stop' command.
%% It's used on crashes, but error_response should
%% terminate the request instead. It's also used on
%% normal exits of children. I'm not sure what to do
%% there yet. Investigate.
| stop].
-export_type([commands/0]).
-type reason() :: normal
| {internal_error, timeout | {error | exit | throw, any()}, human_reason()}
| {socket_error, closed | atom(), human_reason()}
| {stream_error, cow_http2:error(), human_reason()}
| {connection_error, cow_http2:error(), human_reason()}
| {stop, cow_http2:frame(), human_reason()}.
-export_type([reason/0]).
-callback init(streamid(), cowboy_req:req(), cowboy:opts()) -> {commands(), state()}.
-callback data(streamid(), fin(), binary(), State) -> {commands(), State} when State::state(). -callback data(streamid(), fin(), binary(), State) -> {commands(), State} when State::state().
-callback info(streamid(), any(), state()) -> {commands(), State} when State::state(). -callback info(streamid(), any(), State) -> {commands(), State} when State::state().
-callback terminate(streamid(), reason(), state()) -> any(). -callback terminate(streamid(), reason(), state()) -> any().
%% @todo To optimize the number of active timers we could have a command %% @todo To optimize the number of active timers we could have a command
@ -51,3 +66,64 @@
%% %%
%% This same timer can be used to try and send PING frames to help detect %% This same timer can be used to try and send PING frames to help detect
%% that the connection is indeed unresponsive. %% that the connection is indeed unresponsive.
-export([init/3]).
-export([data/4]).
-export([info/3]).
-export([terminate/3]).
%% Note that this and other functions in this module do NOT catch
%% exceptions. We want the exception to go all the way down to the
%% protocol code.
%%
%% OK the failure scenario is not so clear. The problem is
%% that the failure at any point in init/3 will result in the
%% corresponding state being lost. I am unfortunately not
%% confident we can do anything about this. If the crashing
%% handler just created a process, we'll never know about it.
%% Therefore at this time I choose to leave all failure handling
%% to the protocol process.
%%
%% Note that a failure in init/3 will result in terminate/3
%% NOT being called. This is because the state is not available.
-spec init(streamid(), cowboy_req:req(), cowboy:opts())
-> {commands(), {module(), state()} | undefined}.
init(StreamID, Req, Opts) ->
case maps:get(stream_handlers, Opts, [cowboy_stream_h]) of
[] ->
{[], undefined};
[Handler|Tail] ->
%% We call the next handler and remove it from the list of
%% stream handlers. This means that handlers that run after
%% it have no knowledge it exists. Should user require this
%% knowledge they can just define a separate option that will
%% be left untouched.
{Commands, State} = Handler:init(StreamID, Req, Opts#{stream_handlers => Tail}),
{Commands, {Handler, State}}
end.
-spec data(streamid(), fin(), binary(), {Handler, State} | undefined)
-> {commands(), {Handler, State} | undefined}
when Handler::module(), State::state().
data(_, _, _, undefined) ->
{[], undefined};
data(StreamID, IsFin, Data, {Handler, State0}) ->
{Commands, State} = Handler:data(StreamID, IsFin, Data, State0),
{Commands, {Handler, State}}.
-spec info(streamid(), any(), {Handler, State} | undefined)
-> {commands(), {Handler, State} | undefined}
when Handler::module(), State::state().
info(_, _, undefined) ->
{[], undefined};
info(StreamID, Info, {Handler, State0}) ->
{Commands, State} = Handler:info(StreamID, Info, State0),
{Commands, {Handler, State}}.
-spec terminate(streamid(), reason(), {module(), state()} | undefined) -> ok.
terminate(_, _, undefined) ->
ok;
terminate(StreamID, Reason, {Handler, State}) ->
_ = Handler:terminate(StreamID, Reason, State),
ok.

View file

@ -13,7 +13,7 @@
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-module(cowboy_stream_h). -module(cowboy_stream_h).
%% @todo -behaviour(cowboy_stream). -behavior(cowboy_stream).
%% @todo Maybe have a callback for the type of process this is, worker or supervisor. %% @todo Maybe have a callback for the type of process this is, worker or supervisor.
-export([init/3]). -export([init/3]).
@ -25,6 +25,8 @@
-export([execute/3]). -export([execute/3]).
-export([resume/5]). -export([resume/5]).
%% @todo Need to call subsequent handlers.
-record(state, { -record(state, {
ref = undefined :: ranch:ref(), ref = undefined :: ranch:ref(),
pid = undefined :: pid(), pid = undefined :: pid(),
@ -39,8 +41,8 @@
%% the stream like supervisors do. So here just send a message to yourself first, %% the stream like supervisors do. So here just send a message to yourself first,
%% and then decide what to do when receiving this message. %% and then decide what to do when receiving this message.
%% @todo proper specs -spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
-spec init(_,_,_) -> _. -> {[{spawn, pid(), timeout()}], #state{}}.
init(_StreamID, Req=#{ref := Ref}, Opts) -> init(_StreamID, Req=#{ref := Ref}, Opts) ->
Env = maps:get(env, Opts, #{}), Env = maps:get(env, Opts, #{}),
Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]), Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
@ -52,9 +54,8 @@ init(_StreamID, Req=#{ref := Ref}, Opts) ->
%% If we accumulated enough data or IsFin=fin, send it. %% If we accumulated enough data or IsFin=fin, send it.
%% If not, buffer it. %% If not, buffer it.
%% If not, buffer it. %% If not, buffer it.
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
%% @todo proper specs -> {cowboy_stream:commands(), State} when State::#state{}.
-spec data(_,_,_,_) -> _.
data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) -> data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
{[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
@ -65,9 +66,10 @@ data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
{[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}. {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}.
%% @todo proper specs -spec info(cowboy_stream:streamid(), any(), State)
-spec info(_,_,_) -> _. -> {cowboy_stream:commands(), State} when State::#state{}.
info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) -> info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
%% @todo Do we even reach this clause?
{[stop], State}; {[stop], State};
info(_StreamID, {'EXIT', Pid, {_Reason, [_, {cow_http_hd, _, _, _}|_]}}, State=#state{pid=Pid}) -> info(_StreamID, {'EXIT', Pid, {_Reason, [_, {cow_http_hd, _, _, _}|_]}}, State=#state{pid=Pid}) ->
%% @todo Have an option to enable/disable this specific crash report? %% @todo Have an option to enable/disable this specific crash report?
@ -113,13 +115,12 @@ info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) ->
info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
{[SwitchProtocol], State}; {[SwitchProtocol], State};
%% Stray message. %% Stray message.
info(_StreamID, _Msg, State) -> info(_StreamID, _Info, State) ->
%% @todo Error report. %% @todo Error report.
%% @todo Cleanup if no reply was sent when stream ends. %% @todo Cleanup if no reply was sent when stream ends.
{[], State}. {[], State}.
%% @todo proper specs -spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok.
-spec terminate(_,_,_) -> _.
terminate(_StreamID, _Reason, _State) -> terminate(_StreamID, _Reason, _State) ->
ok. ok.

View file

@ -45,9 +45,8 @@ init(Parent, Ref, Socket, Transport, Opts) ->
end. end.
init(Parent, Ref, Socket, Transport, Opts, Protocol) -> init(Parent, Ref, Socket, Transport, Opts, Protocol) ->
{Handler, Type} = maps:get(stream_handler, Opts, {cowboy_stream_h, supervisor}), _ = case maps:get(connection_type, Opts, supervisor) of
_ = case Type of
worker -> ok; worker -> ok;
supervisor -> process_flag(trap_exit, true) supervisor -> process_flag(trap_exit, true)
end, end,
Protocol:init(Parent, Ref, Socket, Transport, Opts, Handler). Protocol:init(Parent, Ref, Socket, Transport, Opts).