0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Initial WebTransport implementation

This implements the upcoming draft-13
but has bits to make it work with draft-02
that most (all?) Chromium versions use.

Data and events are not going through
cowboy_stream beyond init. Since this
approach appears to work well it may
be a good idea to do the same for
Websocket over HTTP/2+ and improve
its performance.
This commit is contained in:
Loïc Hoguin 2025-05-12 13:01:55 +02:00
parent 24d32de931
commit 58909b0144
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
13 changed files with 1574 additions and 22 deletions

View file

@ -32,10 +32,10 @@
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
logger => module(),
max_decode_blocked_streams => 0..16#3fffffffffffffff,
max_decode_table_size => 0..16#3fffffffffffffff,
max_encode_blocked_streams => 0..16#3fffffffffffffff,
max_encode_table_size => 0..16#3fffffffffffffff,
max_decode_blocked_streams => 0..16#3fffffffffffffff,
max_decode_table_size => 0..16#3fffffffffffffff,
max_encode_blocked_streams => 0..16#3fffffffffffffff,
max_encode_table_size => 0..16#3fffffffffffffff,
max_ignored_frame_size_received => non_neg_integer() | infinity,
metrics_callback => cowboy_metrics_h:metrics_callback(),
metrics_req_filter => fun((cowboy_req:req()) -> map()),
@ -51,18 +51,30 @@
}.
-export_type([opts/0]).
%% HTTP/3 or WebTransport stream.
%%
%% WebTransport sessions involve one bidirectional CONNECT stream
%% that must stay open (and can be used for signaling using the
%% Capsule Protocol) and an application-defined number of
%% unidirectional and bidirectional streams, as well as datagrams.
%%
%% WebTransport sessions run in the CONNECT request process and
%% all events related to the session is sent there as a message.
%% The pid of the process is kept in the state.
-record(stream, {
id :: cow_http3:stream_id(),
%% Whether the stream is currently in a special state.
status :: header | {unidi, control | encoder | decoder}
| normal | {data | ignore, non_neg_integer()} | stopping,
| normal | {data | ignore, non_neg_integer()} | stopping
| {webtransport_session, normal | {ignore, non_neg_integer()}}
| {webtransport_stream, cow_http3:stream_id()},
%% Stream buffer.
buffer = <<>> :: binary(),
%% Stream state.
state = undefined :: undefined | {module, any()}
state = undefined :: undefined | {module(), any()}
}).
-record(state, {
@ -152,6 +164,9 @@ loop(State0=#state{opts=Opts, children=Children}) ->
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State0, StreamID, Msg));
%% WebTransport commands.
{'$webtransport_commands', SessionID, Commands} ->
loop(webtransport_commands(State0, SessionID, Commands));
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
loop(down(State0, Pid, Msg));
@ -164,12 +179,17 @@ handle_quic_msg(State0=#state{opts=Opts}, Msg) ->
case cowboy_quicer:handle(Msg) of
{data, StreamID, IsFin, Data} ->
parse(State0, StreamID, Data, IsFin);
{datagram, Data} ->
parse_datagram(State0, Data);
{stream_started, StreamID, StreamType} ->
State = stream_new_remote(State0, StreamID, StreamType),
loop(State);
{stream_closed, StreamID, ErrorCode} ->
State = stream_closed(State0, StreamID, ErrorCode),
loop(State);
{peer_send_shutdown, StreamID} ->
State = stream_peer_send_shutdown(State0, StreamID),
loop(State);
closed ->
%% @todo Different error reason if graceful?
Reason = {socket_error, closed, 'The socket has been closed.'},
@ -216,6 +236,56 @@ parse1(State=#state{http3_machine=HTTP3Machine0},
{error, Error={connection_error, _, _}, HTTP3Machine} ->
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end;
%% @todo Handle when IsFin = fin which must terminate the WT session.
parse1(State=#state{conn=Conn}, Stream=#stream{id=SessionID, status=
{webtransport_session, normal}}, Data, IsFin) ->
case cow_capsule:parse(Data) of
{ok, wt_drain_session, Rest} ->
webtransport_event(State, SessionID, close_initiated),
parse1(State, Stream, Rest, IsFin);
{ok, {wt_close_session, AppCode, AppMsg}, Rest} ->
%% This event will be handled specially and lead
%% to the termination of the session process.
webtransport_event(State, SessionID, {closed, AppCode, AppMsg}),
%% Shutdown the CONNECT stream immediately.
cowboy_quicer:shutdown_stream(Conn, SessionID),
%% @todo Will we receive a {stream_closed,...} after that?
%% If any data is received past that point this is an error.
%% @todo Don't crash, error out properly.
<<>> = Rest,
loop(webtransport_terminate_session(State, Stream));
more ->
loop(stream_store(State, Stream#stream{buffer=Data}));
%% Ignore unhandled/unknown capsules.
%% @todo Do this when cow_capsule includes some.
% {ok, _, Rest} ->
% parse1(State, Stream, Rest, IsFin);
% {ok, Rest} ->
% parse1(State, Stream, Rest, IsFin);
%% @todo Make the max length configurable?
{skip, Len} when Len =< 8192 ->
loop(stream_store(State, Stream#stream{
status={webtransport_session, {ignore, Len}}}));
{skip, Len} ->
%% @todo What should be done on capsule error?
error({todo, capsule_too_long, Len});
error ->
%% @todo What should be done on capsule error?
error({todo, capsule_error, Data})
end;
parse1(State, Stream=#stream{status=
{webtransport_session, {ignore, Len}}}, Data, IsFin) ->
case Data of
<<_:Len/unit:8, Rest/bits>> ->
parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin);
_ ->
loop(stream_store(State, Stream#stream{
status={webtransport_session, {ignore, Len - byte_size(Data)}}}))
end;
parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID}}, Data, IsFin) ->
webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}),
%% No need to store the stream again, WT streams don't get changed here.
loop(State);
parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
DataLen = byte_size(Data),
if
@ -246,6 +316,9 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
{ok, Frame, Rest} ->
FrameIsFin = is_fin(IsFin, Rest),
parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin);
%% The WebTransport stream header is not a real frame.
{webtransport_stream_header, SessionID, Rest} ->
become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin);
{more, Frame = {data, _}, Len} ->
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
case IsFin of
@ -317,13 +390,24 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
{error, Error={connection_error, _, _}, HTTP3Machine} ->
terminate(State0#state{http3_machine=HTTP3Machine}, Error)
end;
%% @todo Perhaps do this in cow_http3_machine directly.
{ok, push, _} ->
terminate(State0, {connection_error, h3_stream_creation_error,
'Only servers can push. (RFC9114 6.2.2)'});
{ok, {webtransport, SessionID}, Rest} ->
become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin);
%% Unknown stream types must be ignored. We choose to abort the
%% stream instead of reading and discarding the incoming data.
{undefined, _} ->
loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
%% Very unlikely to happen but WebTransport headers may be fragmented
%% as they are more than one byte. The fin flag in this case is an error,
%% but because it happens in WebTransport application data (the Session ID)
%% we only reset the impacted stream and not the entire connection.
more when IsFin =:= fin ->
loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
more ->
loop(stream_store(State0, Stream0#stream{buffer=Data}))
end.
frame(State=#state{http3_machine=HTTP3Machine0},
@ -449,6 +533,8 @@ headers_to_map([{Name, Value}|Tail], Acc0) ->
end,
headers_to_map(Tail, Acc).
%% @todo WebTransport CONNECT requests must have extra checks on settings.
%% @todo We may also need to defer them if we didn't get settings.
headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
@ -488,6 +574,18 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
send_headers(State0, Stream, fin, StatusCode0, RespHeaders0)
end.
%% Datagrams.
parse_datagram(State, Data0) ->
{SessionID, Data} = cow_http3:parse_datagram(Data0),
case stream_get(State, SessionID) of
#stream{status={webtransport_session, _}} ->
webtransport_event(State, SessionID, {datagram, Data}),
loop(State);
_ ->
error(todo) %% @todo Might be a future WT session or an error.
end.
%% Erlang messages.
down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
@ -653,6 +751,22 @@ commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
reset_stream(State, Stream, Error);
%% Use a different protocol within the stream (CONNECT :protocol).
%% @todo Make sure we error out when the feature is disabled.
commands(State0, Stream0=#stream{id=StreamID},
[{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) ->
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
#state{http3_machine=HTTP3Machine0} = State,
Stream1 = #stream{state=StreamState} = stream_get(State, StreamID),
%% The stream becomes a WT session at that point. It is the
%% parent stream of all streams in this WT session. The
%% cowboy_stream state is kept because it will be needed
%% to terminate the stream properly.
HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0),
Stream = Stream1#stream{
status={webtransport_session, normal},
state={cowboy_webtransport, WTState#{stream_state => StreamState}}
},
%% @todo We must propagate the buffer to capsule handling if any.
commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
commands(State0, Stream0=#stream{id=StreamID},
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
@ -758,6 +872,158 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
cowboy_quicer:send(Conn, EncoderID, EncData)),
State.
%% We mark the stream as being a WebTransport stream
%% and then continue parsing the data as a WebTransport
%% stream. This function is common for incoming unidi
%% and bidi streams.
become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0},
Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) ->
case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of
{ok, HTTP3Machine} ->
State = State0#state{http3_machine=HTTP3Machine},
Stream = Stream0#stream{status={webtransport_stream, SessionID}},
webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}),
%% We don't need to parse the remaining data if there isn't any.
case {Rest, IsFin} of
{<<>>, nofin} -> loop(stream_store(State, Stream));
_ -> parse(stream_store(State, Stream), StreamID, Rest, IsFin)
end
%% @todo Error conditions.
end.
webtransport_event(State, SessionID, Event) ->
#stream{
status={webtransport_session, _},
state={cowboy_webtransport, #{session_pid := SessionPid}}
} = stream_get(State, SessionID),
SessionPid ! {'$webtransport_event', SessionID, Event},
ok.
webtransport_commands(State, SessionID, Commands) ->
case stream_get(State, SessionID) of
Session = #stream{status={webtransport_session, _}} ->
wt_commands(State, Session, Commands);
%% The stream has been terminated, ignore pending commands.
error ->
State
end.
wt_commands(State, _, []) ->
State;
wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID},
[{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) ->
%% Because opening the stream involves sending a short header
%% we necessarily write data. The InitialData variable allows
%% providing additional data to be sent in the same packet.
StartF = case StreamType of
bidi -> start_bidi_stream;
unidi -> start_unidi_stream
end,
Header = cow_http3:webtransport_stream_header(SessionID, StreamType),
case cowboy_quicer:StartF(Conn, [Header, InitialData]) of
{ok, StreamID} ->
%% @todo Pass Session directly?
webtransport_event(State0, SessionID,
{opened_stream_id, OpenStreamRef, StreamID}),
State = stream_new_local(State0, StreamID, StreamType,
{webtransport_stream, SessionID}),
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end;
wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) ->
%% @todo Check that StreamID belongs to Session.
error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]});
wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID},
[{send, datagram, Data}|Tail]) ->
case cowboy_quicer:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of
ok ->
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end;
wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) ->
%% @todo Check that StreamID belongs to Session.
case cowboy_quicer:send(Conn, StreamID, Data, nofin) of
ok ->
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end;
wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tail]) ->
%% @todo Check that StreamID belongs to Session.
case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of
ok ->
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end;
wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) ->
%% We must send a WT_DRAIN_SESSION capsule on the CONNECT stream.
Capsule = cow_capsule:wt_drain_session(),
case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of
ok ->
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end;
wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
when Cmd =:= close; element(1, Cmd) =:= close ->
%% We must send a WT_CLOSE_SESSION capsule on the CONNECT stream.
{AppCode, AppMsg} = case Cmd of
close -> {0, <<>>};
{close, AppCode0} -> {AppCode0, <<>>};
{close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0}
end,
Capsule = cow_capsule:wt_close_session(AppCode, AppMsg),
case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of
ok ->
%% @todo The endpoint MAY send a STOP_SENDING to indicate it is no longer reading from the CONNECT stream.
State = webtransport_terminate_session(State0, Session),
%% @todo Because the handler is in a separate process
%% we must wait for it to stop and eventually
%% kill the process if it takes too long.
%% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it).
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end.
webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0,
streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) ->
%% Reset/abort the WT streams.
Streams = maps:filtermap(fun
(_, #stream{id=StreamID, status={webtransport_session, _}})
when StreamID =:= SessionID ->
%% We remove the session stream but do the shutdown outside this function.
false;
(StreamID, #stream{status={webtransport_stream, StreamSessionID}})
when StreamSessionID =:= SessionID ->
cowboy_quicer:shutdown_stream(Conn, StreamID,
both, cow_http3:error_to_code(webtransport_session_gone)),
false;
(_, _) ->
true
end, Streams0),
%% Keep the streams in lingering state.
%% We only keep up to 100 streams in this state. @todo Make it configurable?
Terminated = maps:keys(Streams0) -- maps:keys(Streams),
Lingering = lists:sublist(Terminated ++ Lingering0, 100),
%% Update the HTTP3 state machine.
HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0),
State#state{
http3_machine=HTTP3Machine,
streams=Streams,
lingering_streams=Lingering
}.
stream_peer_send_shutdown(State=#state{conn=Conn}, StreamID) ->
case stream_get(State, StreamID) of
%% Cleanly terminating the CONNECT stream is equivalent
%% to an application error code of 0 and empty message.
Stream = #stream{status={webtransport_session, _}} ->
webtransport_event(State, StreamID, {closed, 0, <<>>}),
%% Shutdown the CONNECT stream fully.
cowboy_quicer:shutdown_stream(Conn, StreamID),
webtransport_terminate_session(State, Stream);
_ ->
State
end.
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID}, Error) ->
Reason = case Error of
@ -903,15 +1169,25 @@ terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reas
stream_get(#state{streams=Streams}, StreamID) ->
maps:get(StreamID, Streams, error).
stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
StreamID, StreamType) ->
stream_new_local(State, StreamID, StreamType, Status) ->
stream_new(State, StreamID, StreamType, unidi_local, Status).
stream_new_remote(State, StreamID, StreamType) ->
Status = case StreamType of
unidi -> header;
bidi -> normal
end,
stream_new(State, StreamID, StreamType, unidi_remote, Status).
stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
StreamID, StreamType, UnidiType, Status) ->
{HTTP3Machine, Status} = case StreamType of
unidi ->
{cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0),
header};
{cow_http3_machine:init_unidi_stream(StreamID, UnidiType, HTTP3Machine0),
Status};
bidi ->
{cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
normal}
Status}
end,
Stream = #stream{id=StreamID, status=Status},
State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
@ -926,6 +1202,11 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
stream_closed(State=#state{opts=Opts,
streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
case maps:take(StreamID, Streams0) of
%% In the WT session's case, streams will be
%% removed in webtransport_terminate_session.
{Stream=#stream{status={webtransport_session, _}}, _} ->
webtransport_event(State, StreamID, closed_abruptly),
webtransport_terminate_session(State, Stream);
{#stream{state=undefined}, Streams} ->
%% Unidi stream has no handler/children.
stream_closed1(State#state{streams=Streams}, StreamID);