mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
WIP webtransport
This commit is contained in:
parent
24d32de931
commit
ee534f4e2b
6 changed files with 387 additions and 10 deletions
|
@ -1,7 +1,7 @@
|
||||||
{application, 'cowboy', [
|
{application, 'cowboy', [
|
||||||
{description, "Small, fast, modern HTTP server."},
|
{description, "Small, fast, modern HTTP server."},
|
||||||
{vsn, "2.13.0"},
|
{vsn, "2.13.0"},
|
||||||
{modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_decompress_h','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_quicer','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket']},
|
{modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_decompress_h','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_quicer','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket','cowboy_webtransport']},
|
||||||
{registered, [cowboy_sup,cowboy_clock]},
|
{registered, [cowboy_sup,cowboy_clock]},
|
||||||
{applications, [kernel,stdlib,crypto,cowlib,ranch]},
|
{applications, [kernel,stdlib,crypto,cowlib,ranch]},
|
||||||
{optional_applications, []},
|
{optional_applications, []},
|
||||||
|
|
|
@ -32,10 +32,10 @@
|
||||||
enable_connect_protocol => boolean(),
|
enable_connect_protocol => boolean(),
|
||||||
env => cowboy_middleware:env(),
|
env => cowboy_middleware:env(),
|
||||||
logger => module(),
|
logger => module(),
|
||||||
max_decode_blocked_streams => 0..16#3fffffffffffffff,
|
max_decode_blocked_streams => 0..16#3fffffffffffffff,
|
||||||
max_decode_table_size => 0..16#3fffffffffffffff,
|
max_decode_table_size => 0..16#3fffffffffffffff,
|
||||||
max_encode_blocked_streams => 0..16#3fffffffffffffff,
|
max_encode_blocked_streams => 0..16#3fffffffffffffff,
|
||||||
max_encode_table_size => 0..16#3fffffffffffffff,
|
max_encode_table_size => 0..16#3fffffffffffffff,
|
||||||
max_ignored_frame_size_received => non_neg_integer() | infinity,
|
max_ignored_frame_size_received => non_neg_integer() | infinity,
|
||||||
metrics_callback => cowboy_metrics_h:metrics_callback(),
|
metrics_callback => cowboy_metrics_h:metrics_callback(),
|
||||||
metrics_req_filter => fun((cowboy_req:req()) -> map()),
|
metrics_req_filter => fun((cowboy_req:req()) -> map()),
|
||||||
|
@ -51,12 +51,43 @@
|
||||||
}.
|
}.
|
||||||
-export_type([opts/0]).
|
-export_type([opts/0]).
|
||||||
|
|
||||||
|
%% @todo We have the WT CONNECT stream, using the capsule protocol
|
||||||
|
%% and the WT children stream whose events are redirected directly to cowboy_webtransport.
|
||||||
|
%% We probably need two new statuses to accomodate for that.
|
||||||
|
%% We might need a concept of WT session somewhere, perhaps as part of the status.
|
||||||
|
%% The session can be identified by the StreamID of the CONNECT stream.
|
||||||
|
%% Maybe {webtransport, SessionID, unidi|bidi}
|
||||||
|
%% Basically when we receive data for a webtransport SessionID we need to send it
|
||||||
|
%% to that SessionID's request process. Hmm... but how? The process was started
|
||||||
|
%% by cowboy_stream_h not by the protocol, so we need a way to hand off that
|
||||||
|
%% process. We can get the child by asking cowboy_children but...
|
||||||
|
%% We can give the pid via the switch_protocol in the ModState I guess.
|
||||||
|
%% And then that protocol would just send to the pid? But where do we store the pid then?
|
||||||
|
%% We probably need a webtransport_sessions field in the state
|
||||||
|
%% #{SessionID => pid()}
|
||||||
|
%% Then when we get a new stream we can inform,
|
||||||
|
%% when a stream gets data/closes we can inform,
|
||||||
|
%% when a datagram we can inform (contains quarterstreamid).
|
||||||
|
%% @todo
|
||||||
|
|
||||||
|
%% HTTP/3 or WebTransport stream.
|
||||||
|
%%
|
||||||
|
%% WebTransport sessions involve one bidirectional CONNECT stream
|
||||||
|
%% that must stay open (and can be used for signaling using the
|
||||||
|
%% Capsule Protocol) and an application-defined number of
|
||||||
|
%% unidirectional and bidirectional streams, as well as datagrams.
|
||||||
|
%%
|
||||||
|
%% WebTransport sessions run in the CONNECT request process and
|
||||||
|
%% all events related to the session is sent there as a message.
|
||||||
|
%% The pid of the process is kept in the state.
|
||||||
-record(stream, {
|
-record(stream, {
|
||||||
id :: cow_http3:stream_id(),
|
id :: cow_http3:stream_id(),
|
||||||
|
|
||||||
%% Whether the stream is currently in a special state.
|
%% Whether the stream is currently in a special state.
|
||||||
status :: header | {unidi, control | encoder | decoder}
|
status :: header | {unidi, control | encoder | decoder}
|
||||||
| normal | {data | ignore, non_neg_integer()} | stopping,
|
| normal | {data | ignore, non_neg_integer()} | stopping
|
||||||
|
%% @todo Is unidi | bidi useful to keep?
|
||||||
|
| webtransport_session | {webtransport_stream, cow_http3:stream_id(), unidi | bidi},
|
||||||
|
|
||||||
%% Stream buffer.
|
%% Stream buffer.
|
||||||
buffer = <<>> :: binary(),
|
buffer = <<>> :: binary(),
|
||||||
|
@ -152,6 +183,9 @@ loop(State0=#state{opts=Opts, children=Children}) ->
|
||||||
%% Messages pertaining to a stream.
|
%% Messages pertaining to a stream.
|
||||||
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
||||||
loop(info(State0, StreamID, Msg));
|
loop(info(State0, StreamID, Msg));
|
||||||
|
%% WebTransport commands.
|
||||||
|
{'$webtransport_commands', SessionID, Commands} ->
|
||||||
|
loop(webtransport_commands(State0, SessionID, Commands));
|
||||||
%% Exit signal from children.
|
%% Exit signal from children.
|
||||||
Msg = {'EXIT', Pid, _} ->
|
Msg = {'EXIT', Pid, _} ->
|
||||||
loop(down(State0, Pid, Msg));
|
loop(down(State0, Pid, Msg));
|
||||||
|
@ -216,6 +250,14 @@ parse1(State=#state{http3_machine=HTTP3Machine0},
|
||||||
{error, Error={connection_error, _, _}, HTTP3Machine} ->
|
{error, Error={connection_error, _, _}, HTTP3Machine} ->
|
||||||
terminate(State#state{http3_machine=HTTP3Machine}, Error)
|
terminate(State#state{http3_machine=HTTP3Machine}, Error)
|
||||||
end;
|
end;
|
||||||
|
%% @todo WT status
|
||||||
|
parse1(State, Stream=#stream{status=webtransport_session}, Data, IsFin) ->
|
||||||
|
%% @todo HTTP Capsules.
|
||||||
|
error({todo, State, Stream, Data, IsFin});
|
||||||
|
parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID, _}}, Data, IsFin) ->
|
||||||
|
webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}),
|
||||||
|
%% No need to store the stream again, WT streams don't get changed here.
|
||||||
|
loop(State);
|
||||||
parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
|
parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
|
||||||
DataLen = byte_size(Data),
|
DataLen = byte_size(Data),
|
||||||
if
|
if
|
||||||
|
@ -245,7 +287,13 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
|
||||||
case cow_http3:parse(Data) of
|
case cow_http3:parse(Data) of
|
||||||
{ok, Frame, Rest} ->
|
{ok, Frame, Rest} ->
|
||||||
FrameIsFin = is_fin(IsFin, Rest),
|
FrameIsFin = is_fin(IsFin, Rest),
|
||||||
|
%% @todo If we become a webtransport stream we don't want
|
||||||
|
%% to continue parsing as HTTP/3. That's OK we will
|
||||||
|
%% branch off based on stream status. Nothing to do here.
|
||||||
parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin);
|
parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin);
|
||||||
|
%% The WebTransport stream header is not a real frame.
|
||||||
|
{webtransport_stream_header, SessionID, Rest} ->
|
||||||
|
become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin);
|
||||||
{more, Frame = {data, _}, Len} ->
|
{more, Frame = {data, _}, Len} ->
|
||||||
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
|
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
|
||||||
case IsFin of
|
case IsFin of
|
||||||
|
@ -317,13 +365,24 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
|
||||||
{error, Error={connection_error, _, _}, HTTP3Machine} ->
|
{error, Error={connection_error, _, _}, HTTP3Machine} ->
|
||||||
terminate(State0#state{http3_machine=HTTP3Machine}, Error)
|
terminate(State0#state{http3_machine=HTTP3Machine}, Error)
|
||||||
end;
|
end;
|
||||||
|
%% @todo Perhaps do this in cow_http3_machine directly.
|
||||||
{ok, push, _} ->
|
{ok, push, _} ->
|
||||||
terminate(State0, {connection_error, h3_stream_creation_error,
|
terminate(State0, {connection_error, h3_stream_creation_error,
|
||||||
'Only servers can push. (RFC9114 6.2.2)'});
|
'Only servers can push. (RFC9114 6.2.2)'});
|
||||||
|
{ok, {webtransport, SessionID}, Rest} ->
|
||||||
|
become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin);
|
||||||
%% Unknown stream types must be ignored. We choose to abort the
|
%% Unknown stream types must be ignored. We choose to abort the
|
||||||
%% stream instead of reading and discarding the incoming data.
|
%% stream instead of reading and discarding the incoming data.
|
||||||
{undefined, _} ->
|
{undefined, _} ->
|
||||||
loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
|
loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
|
||||||
|
%% Very unlikely to happen but WebTransport headers may be fragmented
|
||||||
|
%% as they are more than one byte. The fin flag in this case is an error,
|
||||||
|
%% but because it happens in WebTransport application data (the Session ID)
|
||||||
|
%% we only reset the impacted stream and not the entire connection.
|
||||||
|
more when IsFin =:= fin ->
|
||||||
|
loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
|
||||||
|
more ->
|
||||||
|
loop(stream_store(State0, Stream0#stream{buffer=Data}))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
frame(State=#state{http3_machine=HTTP3Machine0},
|
frame(State=#state{http3_machine=HTTP3Machine0},
|
||||||
|
@ -450,6 +509,10 @@ headers_to_map([{Name, Value}|Tail], Acc0) ->
|
||||||
headers_to_map(Tail, Acc).
|
headers_to_map(Tail, Acc).
|
||||||
|
|
||||||
headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
|
headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
|
||||||
|
|
||||||
|
%% @todo For webtransport CONNECT requests we must have extra checks on settings.
|
||||||
|
%% @todo We may also need to defer them if we didn't get settings.
|
||||||
|
|
||||||
try cowboy_stream:init(StreamID, Req, Opts) of
|
try cowboy_stream:init(StreamID, Req, Opts) of
|
||||||
{Commands, StreamState} ->
|
{Commands, StreamState} ->
|
||||||
commands(State, Stream#stream{state=StreamState}, Commands)
|
commands(State, Stream#stream{state=StreamState}, Commands)
|
||||||
|
@ -653,10 +716,29 @@ commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
|
||||||
reset_stream(State, Stream, Error);
|
reset_stream(State, Stream, Error);
|
||||||
%% Use a different protocol within the stream (CONNECT :protocol).
|
%% Use a different protocol within the stream (CONNECT :protocol).
|
||||||
%% @todo Make sure we error out when the feature is disabled.
|
%% @todo Make sure we error out when the feature is disabled.
|
||||||
|
commands(State0=#state{http3_machine=HTTP3Machine0}, Stream0=#stream{id=StreamID},
|
||||||
|
[{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) ->
|
||||||
|
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
|
||||||
|
Stream1 = #stream{state=StreamState} = stream_get(State, StreamID),
|
||||||
|
%% The stream becomes a WT session at that point. It is the
|
||||||
|
%% parent stream of all streams in this WT session. The
|
||||||
|
%% cowboy_stream state is kept because it will be needed
|
||||||
|
%% to terminate the stream properly.
|
||||||
|
HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0),
|
||||||
|
Stream = Stream1#stream{
|
||||||
|
status=webtransport_session,
|
||||||
|
state={cowboy_webtransport, WTState#{stream_state => StreamState}}
|
||||||
|
},
|
||||||
|
%% @todo We must propagate the buffer to capsule handling if any.
|
||||||
|
commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
|
||||||
commands(State0, Stream0=#stream{id=StreamID},
|
commands(State0, Stream0=#stream{id=StreamID},
|
||||||
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
|
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
|
||||||
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
|
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
|
||||||
Stream = stream_get(State, StreamID),
|
Stream = stream_get(State, StreamID),
|
||||||
|
%% @todo For webtransport we want to stop handling this as a normal stream.
|
||||||
|
%% This becomes a stream that uses the capsule protocol
|
||||||
|
%% https://www.rfc-editor.org/rfc/rfc9297#name-the-capsule-protocol
|
||||||
|
%% and relates to a webtransport session (the request process).
|
||||||
commands(State, Stream, Tail);
|
commands(State, Stream, Tail);
|
||||||
%% Set options dynamically.
|
%% Set options dynamically.
|
||||||
commands(State, Stream, [{set_options, _Opts}|Tail]) ->
|
commands(State, Stream, [{set_options, _Opts}|Tail]) ->
|
||||||
|
@ -758,6 +840,67 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
|
||||||
cowboy_quicer:send(Conn, EncoderID, EncData)),
|
cowboy_quicer:send(Conn, EncoderID, EncData)),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
|
%% We mark the stream as being a WebTransport stream
|
||||||
|
%% and then continue parsing the data as a WebTransport
|
||||||
|
%% stream. This function is common for incoming unidi
|
||||||
|
%% and bidi streams.
|
||||||
|
become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0},
|
||||||
|
Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) ->
|
||||||
|
case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of
|
||||||
|
{ok, HTTP3Machine} ->
|
||||||
|
State = State0#state{http3_machine=HTTP3Machine},
|
||||||
|
Stream = Stream0#stream{status={webtransport_stream, SessionID, StreamType}},
|
||||||
|
webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}),
|
||||||
|
parse(stream_store(State, Stream), StreamID, Rest, IsFin)
|
||||||
|
%% @todo Error conditions.
|
||||||
|
end.
|
||||||
|
|
||||||
|
webtransport_event(State, SessionID, Event) ->
|
||||||
|
#stream{
|
||||||
|
status=webtransport_session,
|
||||||
|
state={cowboy_webtransport, #{session_pid := SessionPid}}
|
||||||
|
} = stream_get(State, SessionID),
|
||||||
|
SessionPid ! {'$webtransport_event', Event},
|
||||||
|
ok.
|
||||||
|
|
||||||
|
webtransport_commands(State, SessionID, Commands) ->
|
||||||
|
Session = #stream{status=webtransport_session} = stream_get(SessionID, State),
|
||||||
|
wt_commands(State, Session, Commands).
|
||||||
|
|
||||||
|
wt_commands(State, _, []) ->
|
||||||
|
State;
|
||||||
|
wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID},
|
||||||
|
[{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) ->
|
||||||
|
%% Because opening the stream involves sending a short header
|
||||||
|
%% we necessarily write data. The InitialData variable allows
|
||||||
|
%% providing additional data to be sent in the same packet.
|
||||||
|
StartF = case StreamType of
|
||||||
|
bidi -> start_bidi_stream;
|
||||||
|
unidi -> start_unidi_stream
|
||||||
|
end,
|
||||||
|
Header = cow_http3:webtransport_stream_header(SessionID, StreamType),
|
||||||
|
case cowboy_quicer:StartF(Conn, [Header, InitialData]) of
|
||||||
|
{ok, StreamID} ->
|
||||||
|
%% @todo Pass Session directly?
|
||||||
|
webtransport_event(State, SessionID,
|
||||||
|
{opened_stream_id, OpenStreamRef, StreamID}),
|
||||||
|
%% @todo Save the WT stream in cow_http3_machine AND here.
|
||||||
|
wt_commands(State, Session, Tail)
|
||||||
|
%% @todo Handle errors.
|
||||||
|
end;
|
||||||
|
wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) ->
|
||||||
|
%% @todo Check that StreamID belongs to Session.
|
||||||
|
error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]});
|
||||||
|
wt_commands(State, Session, [{send, datagram, Data}|Tail]) ->
|
||||||
|
error({todo, State, Session, [{send, datagram, Data}|Tail]});
|
||||||
|
wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) ->
|
||||||
|
%% @todo Check that StreamID belongs to Session.
|
||||||
|
case cowboy_quicer:send(Conn, StreamID, Data, nofin) of
|
||||||
|
ok ->
|
||||||
|
wt_commands(State, Session, Tail)
|
||||||
|
%% @todo Handle errors.
|
||||||
|
end.
|
||||||
|
|
||||||
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
|
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
|
||||||
Stream=#stream{id=StreamID}, Error) ->
|
Stream=#stream{id=StreamID}, Error) ->
|
||||||
Reason = case Error of
|
Reason = case Error of
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
-export([shutdown/2]).
|
-export([shutdown/2]).
|
||||||
|
|
||||||
%% Streams.
|
%% Streams.
|
||||||
|
-export([start_bidi_stream/2]).
|
||||||
-export([start_unidi_stream/2]).
|
-export([start_unidi_stream/2]).
|
||||||
-export([send/3]).
|
-export([send/3]).
|
||||||
-export([send/4]).
|
-export([send/4]).
|
||||||
|
@ -45,6 +46,9 @@ peercert(_) -> no_quicer().
|
||||||
-spec shutdown(_, _) -> no_return().
|
-spec shutdown(_, _) -> no_return().
|
||||||
shutdown(_, _) -> no_quicer().
|
shutdown(_, _) -> no_quicer().
|
||||||
|
|
||||||
|
-spec start_bidi_stream(_, _) -> no_return().
|
||||||
|
start_bidi_stream(_, _) -> no_quicer().
|
||||||
|
|
||||||
-spec start_unidi_stream(_, _) -> no_return().
|
-spec start_unidi_stream(_, _) -> no_return().
|
||||||
start_unidi_stream(_, _) -> no_quicer().
|
start_unidi_stream(_, _) -> no_quicer().
|
||||||
|
|
||||||
|
@ -109,16 +113,26 @@ shutdown(Conn, ErrorCode) ->
|
||||||
|
|
||||||
%% Streams.
|
%% Streams.
|
||||||
|
|
||||||
|
-spec start_bidi_stream(quicer_connection_handle(), iodata())
|
||||||
|
-> {ok, cow_http3:stream_id()}
|
||||||
|
| {error, any()}.
|
||||||
|
|
||||||
|
start_bidi_stream(Conn, InitialData) ->
|
||||||
|
start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_NONE).
|
||||||
|
|
||||||
-spec start_unidi_stream(quicer_connection_handle(), iodata())
|
-spec start_unidi_stream(quicer_connection_handle(), iodata())
|
||||||
-> {ok, cow_http3:stream_id()}
|
-> {ok, cow_http3:stream_id()}
|
||||||
| {error, any()}.
|
| {error, any()}.
|
||||||
|
|
||||||
start_unidi_stream(Conn, HeaderData) ->
|
start_unidi_stream(Conn, InitialData) ->
|
||||||
|
start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL).
|
||||||
|
|
||||||
|
start_stream(Conn, InitialData, OpenFlag) ->
|
||||||
case quicer:start_stream(Conn, #{
|
case quicer:start_stream(Conn, #{
|
||||||
active => true,
|
active => true,
|
||||||
open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}) of
|
open_flag => OpenFlag}) of
|
||||||
{ok, StreamRef} ->
|
{ok, StreamRef} ->
|
||||||
case quicer:send(StreamRef, HeaderData) of
|
case quicer:send(StreamRef, InitialData) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{ok, StreamID} = quicer:get_stream_id(StreamRef),
|
{ok, StreamID} = quicer:get_stream_id(StreamRef),
|
||||||
put({quicer_stream, StreamID}, StreamRef),
|
put({quicer_stream, StreamID}, StreamRef),
|
||||||
|
|
|
@ -49,6 +49,7 @@
|
||||||
-type reason() :: normal | switch_protocol
|
-type reason() :: normal | switch_protocol
|
||||||
| {internal_error, timeout | {error | exit | throw, any()}, human_reason()}
|
| {internal_error, timeout | {error | exit | throw, any()}, human_reason()}
|
||||||
| {socket_error, closed | atom(), human_reason()}
|
| {socket_error, closed | atom(), human_reason()}
|
||||||
|
%% @todo Or cow_http3:error().
|
||||||
| {stream_error, cow_http2:error(), human_reason()}
|
| {stream_error, cow_http2:error(), human_reason()}
|
||||||
| {connection_error, cow_http2:error(), human_reason()}
|
| {connection_error, cow_http2:error(), human_reason()}
|
||||||
| {stop, cow_http2:frame() | {exit, any()}, human_reason()}.
|
| {stop, cow_http2:frame() | {exit, any()}, human_reason()}.
|
||||||
|
|
|
@ -402,6 +402,7 @@ before_loop(State, HandlerState, ParseState) ->
|
||||||
|
|
||||||
-spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}.
|
-spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}.
|
||||||
|
|
||||||
|
%% @todo Do we really need this for HTTP/2?
|
||||||
set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) ->
|
set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) ->
|
||||||
%% Most of the time we don't need to cancel the timer since it
|
%% Most of the time we don't need to cancel the timer since it
|
||||||
%% will have triggered already. But this call is harmless so
|
%% will have triggered already. But this call is harmless so
|
||||||
|
|
218
src/cowboy_webtransport.erl
Normal file
218
src/cowboy_webtransport.erl
Normal file
|
@ -0,0 +1,218 @@
|
||||||
|
%% Copyright (c) Loïc Hoguin <essen@ninenines.eu>
|
||||||
|
%%
|
||||||
|
%% Permission to use, copy, modify, and/or distribute this software for any
|
||||||
|
%% purpose with or without fee is hereby granted, provided that the above
|
||||||
|
%% copyright notice and this permission notice appear in all copies.
|
||||||
|
%%
|
||||||
|
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
|
||||||
|
%% @todo To enable WebTransport the following options need to be set:
|
||||||
|
%%
|
||||||
|
%% QUIC:
|
||||||
|
%% - max_datagram_frame_size > 0
|
||||||
|
%%
|
||||||
|
%% HTTP/3:
|
||||||
|
%% - SETTINGS_H3_DATAGRAM = 1
|
||||||
|
%% - SETTINGS_ENABLE_CONNECT_PROTOCOL = 1
|
||||||
|
%% - SETTINGS_WEBTRANSPORT_MAX_SESSIONS >= 1
|
||||||
|
|
||||||
|
%% Cowboy supports versions 7 through 12 of the WebTransport drafts.
|
||||||
|
-module(cowboy_webtransport).
|
||||||
|
|
||||||
|
-export([upgrade/4]).
|
||||||
|
-export([upgrade/5]).
|
||||||
|
|
||||||
|
-type opts() :: #{
|
||||||
|
%% @todo
|
||||||
|
}.
|
||||||
|
-export_type([opts/0]).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
parent :: pid(),
|
||||||
|
opts = #{} :: opts(),
|
||||||
|
handler :: module(),
|
||||||
|
hibernate = false :: boolean(),
|
||||||
|
req = #{} :: map()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%% This function mirrors a similar function for Websocket.
|
||||||
|
|
||||||
|
-spec is_upgrade_request(cowboy_req:req()) -> boolean().
|
||||||
|
is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol})
|
||||||
|
when Version =:= 'HTTP/3' ->
|
||||||
|
%% @todo scheme MUST BE "https"
|
||||||
|
<<"webtransport">> =:= cowboy_bstr:to_lower(Protocol);
|
||||||
|
|
||||||
|
is_upgrade_request(_) ->
|
||||||
|
false.
|
||||||
|
|
||||||
|
%% Stream process.
|
||||||
|
|
||||||
|
-spec upgrade(Req, Env, module(), any())
|
||||||
|
-> {ok, Req, Env}
|
||||||
|
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||||
|
|
||||||
|
upgrade(Req, Env, Handler, HandlerState) ->
|
||||||
|
upgrade(Req, Env, Handler, HandlerState, #{}).
|
||||||
|
|
||||||
|
-spec upgrade(Req, Env, module(), any(), opts())
|
||||||
|
-> {ok, Req, Env}
|
||||||
|
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||||
|
|
||||||
|
%% @todo Immediately crash if a response has already been sent.
|
||||||
|
upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handler, HandlerState, Opts) ->
|
||||||
|
FilteredReq = case maps:get(req_filter, Opts, undefined) of
|
||||||
|
undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req);
|
||||||
|
FilterFun -> FilterFun(Req)
|
||||||
|
end,
|
||||||
|
%% @todo add parent, ref, streamid here directly
|
||||||
|
State = #state{parent=Pid, opts=Opts, handler=Handler, req=FilteredReq},
|
||||||
|
|
||||||
|
%% @todo Must check is_upgrade_request (rename, not an upgrade)
|
||||||
|
%% and also ensure that all the relevant settings are enabled (quic and h3)
|
||||||
|
|
||||||
|
%% @todo A protocol may be negotiated via
|
||||||
|
%% - WT-Available-Protocols
|
||||||
|
%% - WT-Protocol
|
||||||
|
%% Negotiation is done by the handler in init like Websocket.
|
||||||
|
%% Parsing and building of the headers must be added to Cowlib though.
|
||||||
|
|
||||||
|
%% Considering we must ensure the relevant settings are enabled,
|
||||||
|
%% either we check them BEFORE, or we check them when the handler
|
||||||
|
%% is OK to initiate a webtransport session. Probably need to
|
||||||
|
%% check them BEFORE as we need to become (takeover) the webtransport process
|
||||||
|
%% after we are done with the upgrade. -> we check them in cow_http3_machine OK
|
||||||
|
|
||||||
|
%% After the upgrade we become the process that will receive all data
|
||||||
|
%% relevant to this webtransport session. However the data will not
|
||||||
|
%% go through stream handlers / middlewares anymore, it will be
|
||||||
|
%% a straight cowboy_http3 -> this pid.
|
||||||
|
|
||||||
|
case is_upgrade_request(Req) of
|
||||||
|
true ->
|
||||||
|
Headers = cowboy_req:response_headers(#{}, Req),
|
||||||
|
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE,
|
||||||
|
#{session_pid => self()}}},
|
||||||
|
webtransport_init(State, HandlerState);
|
||||||
|
%% Use 501 Not Implemented to mirror the recommendation in
|
||||||
|
%% by RFC9220 3 (WebSockets Upgrade over HTTP/3).
|
||||||
|
false ->
|
||||||
|
{ok, cowboy_req:reply(501, Req), Env}
|
||||||
|
end.
|
||||||
|
|
||||||
|
webtransport_init(State=#state{handler=Handler}, HandlerState) ->
|
||||||
|
case erlang:function_exported(Handler, webtransport_init, 1) of
|
||||||
|
true -> handler_call(State, HandlerState, webtransport_init, undefined);
|
||||||
|
false -> before_loop(State, HandlerState)
|
||||||
|
end.
|
||||||
|
|
||||||
|
before_loop(State=#state{hibernate=true}, HandlerState) ->
|
||||||
|
proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState]);
|
||||||
|
before_loop(State, HandlerState) ->
|
||||||
|
loop(State, HandlerState).
|
||||||
|
|
||||||
|
-spec loop(#state{}, any()) -> no_return().
|
||||||
|
|
||||||
|
loop(State=#state{parent=Parent%, timeout_ref=TRef
|
||||||
|
}, HandlerState) ->
|
||||||
|
receive
|
||||||
|
%% @todo Parent to filter messages? Nothing?
|
||||||
|
%% @todo Can there be groups of events?
|
||||||
|
{'$webtransport_event', Event} ->
|
||||||
|
handler_call(State, HandlerState, webtransport_handle, Event);
|
||||||
|
%% Timeouts.
|
||||||
|
%% @todo idle_timeout
|
||||||
|
% {timeout, TRef, ?MODULE} ->
|
||||||
|
% tick_idle_timeout(State, HandlerState, ParseState);
|
||||||
|
% {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
|
||||||
|
% before_loop(State, HandlerState, ParseState);
|
||||||
|
%% System messages.
|
||||||
|
{'EXIT', Parent, Reason} ->
|
||||||
|
%% @todo We should exit gracefully.
|
||||||
|
exit(Reason);
|
||||||
|
{system, From, Request} ->
|
||||||
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
|
||||||
|
{State, HandlerState});
|
||||||
|
%% Calls from supervisor module.
|
||||||
|
{'$gen_call', From, Call} ->
|
||||||
|
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
|
||||||
|
before_loop(State, HandlerState);
|
||||||
|
Message ->
|
||||||
|
handler_call(State, HandlerState, websocket_info, Message)
|
||||||
|
end.
|
||||||
|
|
||||||
|
handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) ->
|
||||||
|
try case Callback of
|
||||||
|
websocket_init -> Handler:websocket_init(HandlerState);
|
||||||
|
_ -> Handler:Callback(Message, HandlerState)
|
||||||
|
end of
|
||||||
|
{Commands, HandlerState2} when is_list(Commands) ->
|
||||||
|
handler_call_result(State, HandlerState2, Commands);
|
||||||
|
{Commands, HandlerState2, hibernate} when is_list(Commands) ->
|
||||||
|
handler_call_result(State#state{hibernate=true}, HandlerState2, Commands)
|
||||||
|
catch Class:Reason:Stacktrace ->
|
||||||
|
%% @todo
|
||||||
|
% websocket_send_close(State, {crash, Class, Reason}),
|
||||||
|
% handler_terminate(State, HandlerState, {crash, Class, Reason}),
|
||||||
|
erlang:raise(Class, Reason, Stacktrace)
|
||||||
|
end.
|
||||||
|
|
||||||
|
handler_call_result(State0, HandlerState, Commands) ->
|
||||||
|
case commands(Commands, State0, []) of
|
||||||
|
{ok, State} ->
|
||||||
|
before_loop(State, HandlerState);
|
||||||
|
{stop, State} ->
|
||||||
|
terminate(State, HandlerState, stop);
|
||||||
|
{Error = {error, _}, State} ->
|
||||||
|
terminate(State, HandlerState, Error)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% We accumulate the commands that must be sent to the connection process
|
||||||
|
%% because we want to send everything into one message. Other commands are
|
||||||
|
%% processed immediately.
|
||||||
|
|
||||||
|
commands([], State, []) ->
|
||||||
|
{ok, State};
|
||||||
|
commands([], State=#state{parent=Pid}, Commands) ->
|
||||||
|
Pid ! {'$webtransport_commands', lists:reverse(Commands)},
|
||||||
|
{ok, State};
|
||||||
|
%% {open_stream, OpenStreamRef, StreamType, InitialData}.
|
||||||
|
commands([Command={open_stream, _, _, _}|Tail], State, Acc) ->
|
||||||
|
commands(Tail, State, [Command|Acc]);
|
||||||
|
%% {close_stream, StreamID, Code}.
|
||||||
|
commands([Command={close_stream, _, _}|Tail], State, Acc) ->
|
||||||
|
commands(Tail, State, [Command|Acc]);
|
||||||
|
%% {send, StreamID | datagram, Data}.
|
||||||
|
commands([Command={send, _, _}|Tail], State, Acc) ->
|
||||||
|
commands(Tail, State, [Command|Acc]).
|
||||||
|
%% @todo send with IsFin
|
||||||
|
%% @todo stop, {error, Reason} probably. What to do about sending when asked to stop?
|
||||||
|
%% @todo set_options (to increase number of streams? data amounts? or a flow command?)
|
||||||
|
%% @todo shutdown_reason if useful.
|
||||||
|
|
||||||
|
terminate(State, HandlerState, Error) ->
|
||||||
|
error({todo, State, HandlerState, Error}).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%% WebTransport functions:
|
||||||
|
%%
|
||||||
|
%% webtransport_init(HandlerState)
|
||||||
|
%% webtransport_handle({opened_stream_id, OpenStreamRef, StreamID}, HandlerState)
|
||||||
|
%% webtransport_handle({stream_open, StreamID, unidi | bidi}, HandlerState)
|
||||||
|
%% webtransport_handle({stream_data, StreamID, IsFin, Data}, HandlerState)
|
||||||
|
%% webtransport_handle({stream_reset_at, StreamID, Code, FinalSize}, HandlerState)
|
||||||
|
%% webtransport_handle({stream_stop_sending, StreamID, Code}, HandlerState)
|
||||||
|
%% webtransport_handle({datagram, Data}, HandlerState)
|
||||||
|
%% webtransport_handle(goaway, HandlerState)
|
||||||
|
%% webtransport_info(Message, HandlerState)
|
Loading…
Add table
Add a link
Reference in a new issue