From ee534f4e2b40d08f5c8c68dc3ebe01216b2f1f12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 12 May 2025 13:01:55 +0200 Subject: [PATCH 1/5] WIP webtransport --- ebin/cowboy.app | 2 +- src/cowboy_http3.erl | 155 ++++++++++++++++++++++++- src/cowboy_quicer.erl | 20 +++- src/cowboy_stream.erl | 1 + src/cowboy_websocket.erl | 1 + src/cowboy_webtransport.erl | 218 ++++++++++++++++++++++++++++++++++++ 6 files changed, 387 insertions(+), 10 deletions(-) create mode 100644 src/cowboy_webtransport.erl diff --git a/ebin/cowboy.app b/ebin/cowboy.app index 91569c7a..39be2000 100644 --- a/ebin/cowboy.app +++ b/ebin/cowboy.app @@ -1,7 +1,7 @@ {application, 'cowboy', [ {description, "Small, fast, modern HTTP server."}, {vsn, "2.13.0"}, - {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_decompress_h','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_quicer','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket']}, + {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_decompress_h','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_quicer','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket','cowboy_webtransport']}, {registered, [cowboy_sup,cowboy_clock]}, {applications, [kernel,stdlib,crypto,cowlib,ranch]}, {optional_applications, []}, diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index da1312e4..d76683b1 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -32,10 +32,10 @@ enable_connect_protocol => boolean(), env => cowboy_middleware:env(), logger => module(), - max_decode_blocked_streams => 0..16#3fffffffffffffff, - max_decode_table_size => 0..16#3fffffffffffffff, - max_encode_blocked_streams => 0..16#3fffffffffffffff, - max_encode_table_size => 0..16#3fffffffffffffff, + max_decode_blocked_streams => 0..16#3fffffffffffffff, + max_decode_table_size => 0..16#3fffffffffffffff, + max_encode_blocked_streams => 0..16#3fffffffffffffff, + max_encode_table_size => 0..16#3fffffffffffffff, max_ignored_frame_size_received => non_neg_integer() | infinity, metrics_callback => cowboy_metrics_h:metrics_callback(), metrics_req_filter => fun((cowboy_req:req()) -> map()), @@ -51,12 +51,43 @@ }. -export_type([opts/0]). +%% @todo We have the WT CONNECT stream, using the capsule protocol +%% and the WT children stream whose events are redirected directly to cowboy_webtransport. +%% We probably need two new statuses to accomodate for that. +%% We might need a concept of WT session somewhere, perhaps as part of the status. +%% The session can be identified by the StreamID of the CONNECT stream. +%% Maybe {webtransport, SessionID, unidi|bidi} +%% Basically when we receive data for a webtransport SessionID we need to send it +%% to that SessionID's request process. Hmm... but how? The process was started +%% by cowboy_stream_h not by the protocol, so we need a way to hand off that +%% process. We can get the child by asking cowboy_children but... +%% We can give the pid via the switch_protocol in the ModState I guess. +%% And then that protocol would just send to the pid? But where do we store the pid then? +%% We probably need a webtransport_sessions field in the state +%% #{SessionID => pid()} +%% Then when we get a new stream we can inform, +%% when a stream gets data/closes we can inform, +%% when a datagram we can inform (contains quarterstreamid). +%% @todo + +%% HTTP/3 or WebTransport stream. +%% +%% WebTransport sessions involve one bidirectional CONNECT stream +%% that must stay open (and can be used for signaling using the +%% Capsule Protocol) and an application-defined number of +%% unidirectional and bidirectional streams, as well as datagrams. +%% +%% WebTransport sessions run in the CONNECT request process and +%% all events related to the session is sent there as a message. +%% The pid of the process is kept in the state. -record(stream, { id :: cow_http3:stream_id(), %% Whether the stream is currently in a special state. status :: header | {unidi, control | encoder | decoder} - | normal | {data | ignore, non_neg_integer()} | stopping, + | normal | {data | ignore, non_neg_integer()} | stopping + %% @todo Is unidi | bidi useful to keep? + | webtransport_session | {webtransport_stream, cow_http3:stream_id(), unidi | bidi}, %% Stream buffer. buffer = <<>> :: binary(), @@ -152,6 +183,9 @@ loop(State0=#state{opts=Opts, children=Children}) -> %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> loop(info(State0, StreamID, Msg)); + %% WebTransport commands. + {'$webtransport_commands', SessionID, Commands} -> + loop(webtransport_commands(State0, SessionID, Commands)); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> loop(down(State0, Pid, Msg)); @@ -216,6 +250,14 @@ parse1(State=#state{http3_machine=HTTP3Machine0}, {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State#state{http3_machine=HTTP3Machine}, Error) end; +%% @todo WT status +parse1(State, Stream=#stream{status=webtransport_session}, Data, IsFin) -> + %% @todo HTTP Capsules. + error({todo, State, Stream, Data, IsFin}); +parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID, _}}, Data, IsFin) -> + webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}), + %% No need to store the stream again, WT streams don't get changed here. + loop(State); parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) -> DataLen = byte_size(Data), if @@ -245,7 +287,13 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) -> case cow_http3:parse(Data) of {ok, Frame, Rest} -> FrameIsFin = is_fin(IsFin, Rest), + %% @todo If we become a webtransport stream we don't want + %% to continue parsing as HTTP/3. That's OK we will + %% branch off based on stream status. Nothing to do here. parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin); + %% The WebTransport stream header is not a real frame. + {webtransport_stream_header, SessionID, Rest} -> + become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin); {more, Frame = {data, _}, Len} -> %% We're at the end of the data so FrameIsFin is equivalent to IsFin. case IsFin of @@ -317,13 +365,24 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0}, {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State0#state{http3_machine=HTTP3Machine}, Error) end; + %% @todo Perhaps do this in cow_http3_machine directly. {ok, push, _} -> terminate(State0, {connection_error, h3_stream_creation_error, 'Only servers can push. (RFC9114 6.2.2)'}); + {ok, {webtransport, SessionID}, Rest} -> + become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin); %% Unknown stream types must be ignored. We choose to abort the %% stream instead of reading and discarding the incoming data. {undefined, _} -> - loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)) + loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)); + %% Very unlikely to happen but WebTransport headers may be fragmented + %% as they are more than one byte. The fin flag in this case is an error, + %% but because it happens in WebTransport application data (the Session ID) + %% we only reset the impacted stream and not the entire connection. + more when IsFin =:= fin -> + loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)); + more -> + loop(stream_store(State0, Stream0#stream{buffer=Data})) end. frame(State=#state{http3_machine=HTTP3Machine0}, @@ -450,6 +509,10 @@ headers_to_map([{Name, Value}|Tail], Acc0) -> headers_to_map(Tail, Acc). headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) -> + +%% @todo For webtransport CONNECT requests we must have extra checks on settings. +%% @todo We may also need to defer them if we didn't get settings. + try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> commands(State, Stream#stream{state=StreamState}, Commands) @@ -653,10 +716,29 @@ commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) -> reset_stream(State, Stream, Error); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. +commands(State0=#state{http3_machine=HTTP3Machine0}, Stream0=#stream{id=StreamID}, + [{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) -> + State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), + Stream1 = #stream{state=StreamState} = stream_get(State, StreamID), + %% The stream becomes a WT session at that point. It is the + %% parent stream of all streams in this WT session. The + %% cowboy_stream state is kept because it will be needed + %% to terminate the stream properly. + HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0), + Stream = Stream1#stream{ + status=webtransport_session, + state={cowboy_webtransport, WTState#{stream_state => StreamState}} + }, + %% @todo We must propagate the buffer to capsule handling if any. + commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail); commands(State0, Stream0=#stream{id=StreamID}, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), Stream = stream_get(State, StreamID), + %% @todo For webtransport we want to stop handling this as a normal stream. + %% This becomes a stream that uses the capsule protocol + %% https://www.rfc-editor.org/rfc/rfc9297#name-the-capsule-protocol + %% and relates to a webtransport session (the request process). commands(State, Stream, Tail); %% Set options dynamically. commands(State, Stream, [{set_options, _Opts}|Tail]) -> @@ -758,6 +840,67 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID}, cowboy_quicer:send(Conn, EncoderID, EncData)), State. +%% We mark the stream as being a WebTransport stream +%% and then continue parsing the data as a WebTransport +%% stream. This function is common for incoming unidi +%% and bidi streams. +become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0}, + Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) -> + case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State = State0#state{http3_machine=HTTP3Machine}, + Stream = Stream0#stream{status={webtransport_stream, SessionID, StreamType}}, + webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}), + parse(stream_store(State, Stream), StreamID, Rest, IsFin) + %% @todo Error conditions. + end. + +webtransport_event(State, SessionID, Event) -> + #stream{ + status=webtransport_session, + state={cowboy_webtransport, #{session_pid := SessionPid}} + } = stream_get(State, SessionID), + SessionPid ! {'$webtransport_event', Event}, + ok. + +webtransport_commands(State, SessionID, Commands) -> + Session = #stream{status=webtransport_session} = stream_get(SessionID, State), + wt_commands(State, Session, Commands). + +wt_commands(State, _, []) -> + State; +wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, + [{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) -> + %% Because opening the stream involves sending a short header + %% we necessarily write data. The InitialData variable allows + %% providing additional data to be sent in the same packet. + StartF = case StreamType of + bidi -> start_bidi_stream; + unidi -> start_unidi_stream + end, + Header = cow_http3:webtransport_stream_header(SessionID, StreamType), + case cowboy_quicer:StartF(Conn, [Header, InitialData]) of + {ok, StreamID} -> + %% @todo Pass Session directly? + webtransport_event(State, SessionID, + {opened_stream_id, OpenStreamRef, StreamID}), + %% @todo Save the WT stream in cow_http3_machine AND here. + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]}); +wt_commands(State, Session, [{send, datagram, Data}|Tail]) -> + error({todo, State, Session, [{send, datagram, Data}|Tail]}); +wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + case cowboy_quicer:send(Conn, StreamID, Data, nofin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end. + reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, Stream=#stream{id=StreamID}, Error) -> Reason = case Error of diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl index d9f51f3e..fb9de125 100644 --- a/src/cowboy_quicer.erl +++ b/src/cowboy_quicer.erl @@ -23,6 +23,7 @@ -export([shutdown/2]). %% Streams. +-export([start_bidi_stream/2]). -export([start_unidi_stream/2]). -export([send/3]). -export([send/4]). @@ -45,6 +46,9 @@ peercert(_) -> no_quicer(). -spec shutdown(_, _) -> no_return(). shutdown(_, _) -> no_quicer(). +-spec start_bidi_stream(_, _) -> no_return(). +start_bidi_stream(_, _) -> no_quicer(). + -spec start_unidi_stream(_, _) -> no_return(). start_unidi_stream(_, _) -> no_quicer(). @@ -109,16 +113,26 @@ shutdown(Conn, ErrorCode) -> %% Streams. +-spec start_bidi_stream(quicer_connection_handle(), iodata()) + -> {ok, cow_http3:stream_id()} + | {error, any()}. + +start_bidi_stream(Conn, InitialData) -> + start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_NONE). + -spec start_unidi_stream(quicer_connection_handle(), iodata()) -> {ok, cow_http3:stream_id()} | {error, any()}. -start_unidi_stream(Conn, HeaderData) -> +start_unidi_stream(Conn, InitialData) -> + start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL). + +start_stream(Conn, InitialData, OpenFlag) -> case quicer:start_stream(Conn, #{ active => true, - open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}) of + open_flag => OpenFlag}) of {ok, StreamRef} -> - case quicer:send(StreamRef, HeaderData) of + case quicer:send(StreamRef, InitialData) of {ok, _} -> {ok, StreamID} = quicer:get_stream_id(StreamRef), put({quicer_stream, StreamID}, StreamRef), diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl index 79e4357b..6680bdc9 100644 --- a/src/cowboy_stream.erl +++ b/src/cowboy_stream.erl @@ -49,6 +49,7 @@ -type reason() :: normal | switch_protocol | {internal_error, timeout | {error | exit | throw, any()}, human_reason()} | {socket_error, closed | atom(), human_reason()} + %% @todo Or cow_http3:error(). | {stream_error, cow_http2:error(), human_reason()} | {connection_error, cow_http2:error(), human_reason()} | {stop, cow_http2:frame() | {exit, any()}, human_reason()}. diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index dd577307..cb30c3fc 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -402,6 +402,7 @@ before_loop(State, HandlerState, ParseState) -> -spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}. +%% @todo Do we really need this for HTTP/2? set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) -> %% Most of the time we don't need to cancel the timer since it %% will have triggered already. But this call is harmless so diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl new file mode 100644 index 00000000..3ef4d9a5 --- /dev/null +++ b/src/cowboy_webtransport.erl @@ -0,0 +1,218 @@ +%% Copyright (c) Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @todo To enable WebTransport the following options need to be set: +%% +%% QUIC: +%% - max_datagram_frame_size > 0 +%% +%% HTTP/3: +%% - SETTINGS_H3_DATAGRAM = 1 +%% - SETTINGS_ENABLE_CONNECT_PROTOCOL = 1 +%% - SETTINGS_WEBTRANSPORT_MAX_SESSIONS >= 1 + +%% Cowboy supports versions 7 through 12 of the WebTransport drafts. +-module(cowboy_webtransport). + +-export([upgrade/4]). +-export([upgrade/5]). + +-type opts() :: #{ + %% @todo +}. +-export_type([opts/0]). + +-record(state, { + parent :: pid(), + opts = #{} :: opts(), + handler :: module(), + hibernate = false :: boolean(), + req = #{} :: map() +}). + +%% This function mirrors a similar function for Websocket. + +-spec is_upgrade_request(cowboy_req:req()) -> boolean(). +is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol}) + when Version =:= 'HTTP/3' -> + %% @todo scheme MUST BE "https" + <<"webtransport">> =:= cowboy_bstr:to_lower(Protocol); + +is_upgrade_request(_) -> + false. + +%% Stream process. + +-spec upgrade(Req, Env, module(), any()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +upgrade(Req, Env, Handler, HandlerState) -> + upgrade(Req, Env, Handler, HandlerState, #{}). + +-spec upgrade(Req, Env, module(), any(), opts()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +%% @todo Immediately crash if a response has already been sent. +upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handler, HandlerState, Opts) -> + FilteredReq = case maps:get(req_filter, Opts, undefined) of + undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req); + FilterFun -> FilterFun(Req) + end, + %% @todo add parent, ref, streamid here directly + State = #state{parent=Pid, opts=Opts, handler=Handler, req=FilteredReq}, + + %% @todo Must check is_upgrade_request (rename, not an upgrade) + %% and also ensure that all the relevant settings are enabled (quic and h3) + + %% @todo A protocol may be negotiated via + %% - WT-Available-Protocols + %% - WT-Protocol + %% Negotiation is done by the handler in init like Websocket. + %% Parsing and building of the headers must be added to Cowlib though. + + %% Considering we must ensure the relevant settings are enabled, + %% either we check them BEFORE, or we check them when the handler + %% is OK to initiate a webtransport session. Probably need to + %% check them BEFORE as we need to become (takeover) the webtransport process + %% after we are done with the upgrade. -> we check them in cow_http3_machine OK + + %% After the upgrade we become the process that will receive all data + %% relevant to this webtransport session. However the data will not + %% go through stream handlers / middlewares anymore, it will be + %% a straight cowboy_http3 -> this pid. + + case is_upgrade_request(Req) of + true -> + Headers = cowboy_req:response_headers(#{}, Req), + Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, + #{session_pid => self()}}}, + webtransport_init(State, HandlerState); + %% Use 501 Not Implemented to mirror the recommendation in + %% by RFC9220 3 (WebSockets Upgrade over HTTP/3). + false -> + {ok, cowboy_req:reply(501, Req), Env} + end. + +webtransport_init(State=#state{handler=Handler}, HandlerState) -> + case erlang:function_exported(Handler, webtransport_init, 1) of + true -> handler_call(State, HandlerState, webtransport_init, undefined); + false -> before_loop(State, HandlerState) + end. + +before_loop(State=#state{hibernate=true}, HandlerState) -> + proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState]); +before_loop(State, HandlerState) -> + loop(State, HandlerState). + +-spec loop(#state{}, any()) -> no_return(). + +loop(State=#state{parent=Parent%, timeout_ref=TRef + }, HandlerState) -> + receive + %% @todo Parent to filter messages? Nothing? + %% @todo Can there be groups of events? + {'$webtransport_event', Event} -> + handler_call(State, HandlerState, webtransport_handle, Event); + %% Timeouts. +%% @todo idle_timeout +% {timeout, TRef, ?MODULE} -> +% tick_idle_timeout(State, HandlerState, ParseState); +% {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> +% before_loop(State, HandlerState, ParseState); + %% System messages. + {'EXIT', Parent, Reason} -> + %% @todo We should exit gracefully. + exit(Reason); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {State, HandlerState}); + %% Calls from supervisor module. + {'$gen_call', From, Call} -> + cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), + before_loop(State, HandlerState); + Message -> + handler_call(State, HandlerState, websocket_info, Message) + end. + +handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) -> + try case Callback of + websocket_init -> Handler:websocket_init(HandlerState); + _ -> Handler:Callback(Message, HandlerState) + end of + {Commands, HandlerState2} when is_list(Commands) -> + handler_call_result(State, HandlerState2, Commands); + {Commands, HandlerState2, hibernate} when is_list(Commands) -> + handler_call_result(State#state{hibernate=true}, HandlerState2, Commands) + catch Class:Reason:Stacktrace -> +%% @todo +% websocket_send_close(State, {crash, Class, Reason}), +% handler_terminate(State, HandlerState, {crash, Class, Reason}), + erlang:raise(Class, Reason, Stacktrace) + end. + +handler_call_result(State0, HandlerState, Commands) -> + case commands(Commands, State0, []) of + {ok, State} -> + before_loop(State, HandlerState); + {stop, State} -> + terminate(State, HandlerState, stop); + {Error = {error, _}, State} -> + terminate(State, HandlerState, Error) + end. + +%% We accumulate the commands that must be sent to the connection process +%% because we want to send everything into one message. Other commands are +%% processed immediately. + +commands([], State, []) -> + {ok, State}; +commands([], State=#state{parent=Pid}, Commands) -> + Pid ! {'$webtransport_commands', lists:reverse(Commands)}, + {ok, State}; +%% {open_stream, OpenStreamRef, StreamType, InitialData}. +commands([Command={open_stream, _, _, _}|Tail], State, Acc) -> + commands(Tail, State, [Command|Acc]); +%% {close_stream, StreamID, Code}. +commands([Command={close_stream, _, _}|Tail], State, Acc) -> + commands(Tail, State, [Command|Acc]); +%% {send, StreamID | datagram, Data}. +commands([Command={send, _, _}|Tail], State, Acc) -> + commands(Tail, State, [Command|Acc]). +%% @todo send with IsFin +%% @todo stop, {error, Reason} probably. What to do about sending when asked to stop? +%% @todo set_options (to increase number of streams? data amounts? or a flow command?) +%% @todo shutdown_reason if useful. + +terminate(State, HandlerState, Error) -> + error({todo, State, HandlerState, Error}). + + + + + + + +%% WebTransport functions: +%% +%% webtransport_init(HandlerState) +%% webtransport_handle({opened_stream_id, OpenStreamRef, StreamID}, HandlerState) +%% webtransport_handle({stream_open, StreamID, unidi | bidi}, HandlerState) +%% webtransport_handle({stream_data, StreamID, IsFin, Data}, HandlerState) +%% webtransport_handle({stream_reset_at, StreamID, Code, FinalSize}, HandlerState) +%% webtransport_handle({stream_stop_sending, StreamID, Code}, HandlerState) +%% webtransport_handle({datagram, Data}, HandlerState) +%% webtransport_handle(goaway, HandlerState) +%% webtransport_info(Message, HandlerState) From 46c53cd4fbdd51cd74fac9a611ea124fa946ee55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 13 May 2025 14:20:11 +0200 Subject: [PATCH 2/5] WIP fixes for Chromium --- src/cowboy.erl | 8 +++- src/cowboy_http3.erl | 5 ++- src/cowboy_webtransport.erl | 7 ++-- test/cowboy_test.erl | 1 + test/draft_h3_webtransport_SUITE.erl | 59 ++++++++++++++++++++++++++++ test/handlers/wt_echo_h.erl | 21 ++++++++++ 6 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 test/draft_h3_webtransport_SUITE.erl create mode 100644 test/handlers/wt_echo_h.erl diff --git a/src/cowboy.erl b/src/cowboy.erl index d46691f9..29b9788c 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -95,8 +95,12 @@ start_quic(Ref, TransOpts, ProtoOpts) -> end, SocketOpts = [ {alpn, ["h3"]}, %% @todo Why not binary? - {peer_unidi_stream_count, 3}, %% We only need control and QPACK enc/dec. - {peer_bidi_stream_count, 100} + {peer_unidi_stream_count, 100}, %% We only need control and QPACK enc/dec. + {peer_bidi_stream_count, 100}, + %% For WebTransport. @todo Also increase default unidi stream count. + %% @todo We probably don't want it enabled if WT isn't used. + {datagram_send_enabled, 1}, + {datagram_receive_enabled, 1} |SocketOpts2], _ListenerPid = spawn(fun() -> {ok, Listener} = quicer:listen(Port, SocketOpts), diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index d76683b1..7ad65c88 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -716,9 +716,10 @@ commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) -> reset_stream(State, Stream, Error); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. -commands(State0=#state{http3_machine=HTTP3Machine0}, Stream0=#stream{id=StreamID}, +commands(State0, Stream0=#stream{id=StreamID}, [{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) -> State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), + #state{http3_machine=HTTP3Machine0} = State, Stream1 = #stream{state=StreamState} = stream_get(State, StreamID), %% The stream becomes a WT session at that point. It is the %% parent stream of all streams in this WT session. The @@ -864,7 +865,7 @@ webtransport_event(State, SessionID, Event) -> ok. webtransport_commands(State, SessionID, Commands) -> - Session = #stream{status=webtransport_session} = stream_get(SessionID, State), + Session = #stream{status=webtransport_session} = stream_get(State, SessionID), wt_commands(State, Session, Commands). wt_commands(State, _, []) -> diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl index 3ef4d9a5..6beaba4a 100644 --- a/src/cowboy_webtransport.erl +++ b/src/cowboy_webtransport.erl @@ -34,6 +34,7 @@ -export_type([opts/0]). -record(state, { + id :: cow_http3:stream_id(), parent :: pid(), opts = #{} :: opts(), handler :: module(), @@ -72,7 +73,7 @@ upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handl FilterFun -> FilterFun(Req) end, %% @todo add parent, ref, streamid here directly - State = #state{parent=Pid, opts=Opts, handler=Handler, req=FilteredReq}, + State = #state{id=StreamID, parent=Pid, opts=Opts, handler=Handler, req=FilteredReq}, %% @todo Must check is_upgrade_request (rename, not an upgrade) %% and also ensure that all the relevant settings are enabled (quic and h3) @@ -179,8 +180,8 @@ handler_call_result(State0, HandlerState, Commands) -> commands([], State, []) -> {ok, State}; -commands([], State=#state{parent=Pid}, Commands) -> - Pid ! {'$webtransport_commands', lists:reverse(Commands)}, +commands([], State=#state{id=SessionID, parent=Pid}, Commands) -> + Pid ! {'$webtransport_commands', SessionID, lists:reverse(Commands)}, {ok, State}; %% {open_stream, OpenStreamRef, StreamType, InitialData}. commands([Command={open_stream, _, _, _}|Tail], State, Acc) -> diff --git a/test/cowboy_test.erl b/test/cowboy_test.erl index 541e8f90..a17eddb7 100644 --- a/test/cowboy_test.erl +++ b/test/cowboy_test.erl @@ -53,6 +53,7 @@ init_http3(Ref, ProtoOpts, Config) -> }, {ok, Listener} = cowboy:start_quic(Ref, TransOpts, ProtoOpts), {ok, {_, Port}} = quicer:sockname(Listener), + ct:pal("port ~p", [Port]), %% @todo Keep listener information around in a better place. persistent_term:put({cowboy_test_quic, Ref}, Listener), [{ref, Ref}, {type, quic}, {protocol, http3}, {port, Port}, {opts, TransOpts}|Config]. diff --git a/test/draft_h3_webtransport_SUITE.erl b/test/draft_h3_webtransport_SUITE.erl new file mode 100644 index 00000000..442ec15b --- /dev/null +++ b/test/draft_h3_webtransport_SUITE.erl @@ -0,0 +1,59 @@ +%% Copyright (c) Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(draft_h3_webtransport_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-import(ct_helper, [config/2]). +-import(ct_helper, [doc/1]). + +all() -> + [{group, enabled}]. + +groups() -> + Tests = ct_helper:all(?MODULE), + [{enabled, [], Tests}]. %% @todo Enable parallel when all is better. + +init_per_group(Name = enabled, Config) -> + cowboy_test:init_http3(Name, #{ + enable_connect_protocol => true, + h3_datagram => true, + enable_webtransport => true, %% For compatibility with draft-02. + webtransport_max_sessions => 10, + env => #{dispatch => cowboy_router:compile(init_routes(Config))} + }, Config). + +end_per_group(Name, _) -> + cowboy_test:stop_group(Name). + +init_routes(_) -> [ + {"localhost", [ + {"/wt", wt_echo_h, []} + ]} +]. + +%% Temporary. + +%% To start Chromium the command line is roughly: +%% chromium --ignore-certificate-errors-spki-list=LeLykt63i2FRAm+XO91yBoSjKfrXnAFygqe5xt0zgDA= --ignore-certificate-errors --user-data-dir=/tmp/chromium-wt --allow-insecure-localhost --webtransport-developer-mode --enable-quic https://googlechrome.github.io/samples/webtransport/client.html +%% +%% To find the SPKI the command is roughly: +%% openssl x509 -in ~/ninenines/cowboy/test/rfc9114_SUITE_data/server.pem -pubkey -noout | \ +%% openssl pkey -pubin -outform der | \ +%% openssl dgst -sha256 -binary | \ +%% openssl enc -base64 + +run(_Config) -> + timer:sleep(infinity). diff --git a/test/handlers/wt_echo_h.erl b/test/handlers/wt_echo_h.erl new file mode 100644 index 00000000..97056cf6 --- /dev/null +++ b/test/handlers/wt_echo_h.erl @@ -0,0 +1,21 @@ +%% This module echoes client events back, +%% including creating new streams. + +-module(wt_echo_h). +%% @todo -behavior(cowboy_webtransport). + +-export([init/2]). +-export([webtransport_handle/2]). + +init(Req, _) -> + {cowboy_webtransport, Req, undefined}. + +%% @todo WT handle {stream_open,4,bidi} +%% @todo WT handle {stream_data,4,nofin,<<>>} %% skip? + +webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, HandlerState) -> + ct:pal("WT handle ~p~n", [Event]), + {[{send, StreamID, Data}], HandlerState}; +webtransport_handle(Event, HandlerState) -> + ct:pal("WT handle ~p~n", [Event]), + {[], HandlerState}. From c43384c8de85a4f6b780ba46f26fa2c726346afb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 16 May 2025 17:47:27 +0200 Subject: [PATCH 3/5] WIP chromium + figuring out tests --- src/cowboy_http3.erl | 13 +- src/cowboy_webtransport.erl | 9 +- test/draft_h3_webtransport_SUITE.erl | 195 ++++++++++++++++++++++++++- test/handlers/wt_echo_h.erl | 42 +++++- 4 files changed, 248 insertions(+), 11 deletions(-) diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 7ad65c88..110a114b 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -852,7 +852,11 @@ become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0}, State = State0#state{http3_machine=HTTP3Machine}, Stream = Stream0#stream{status={webtransport_stream, SessionID, StreamType}}, webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}), - parse(stream_store(State, Stream), StreamID, Rest, IsFin) + %% We don't need to parse the remaining data if there isn't any. + case {Rest, IsFin} of + {<<>>, nofin} -> loop(stream_store(State, Stream)); + _ -> parse(stream_store(State, Stream), StreamID, Rest, IsFin) + end %% @todo Error conditions. end. @@ -900,6 +904,13 @@ wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) -> ok -> wt_commands(State, Session, Tail) %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session, [Cmd = {send, StreamID, IsFin, Data}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. end. reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl index 6beaba4a..0d7d452b 100644 --- a/src/cowboy_webtransport.erl +++ b/src/cowboy_webtransport.erl @@ -145,12 +145,12 @@ loop(State=#state{parent=Parent%, timeout_ref=TRef cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), before_loop(State, HandlerState); Message -> - handler_call(State, HandlerState, websocket_info, Message) + handler_call(State, HandlerState, webtransport_info, Message) end. handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) -> try case Callback of - websocket_init -> Handler:websocket_init(HandlerState); + webtransport_init -> Handler:webtransport_init(HandlerState); _ -> Handler:Callback(Message, HandlerState) end of {Commands, HandlerState2} when is_list(Commands) -> @@ -189,8 +189,12 @@ commands([Command={open_stream, _, _, _}|Tail], State, Acc) -> %% {close_stream, StreamID, Code}. commands([Command={close_stream, _, _}|Tail], State, Acc) -> commands(Tail, State, [Command|Acc]); +%% @todo We must reject send to a remote unidi stream. %% {send, StreamID | datagram, Data}. commands([Command={send, _, _}|Tail], State, Acc) -> + commands(Tail, State, [Command|Acc]); +%% {send, StreamID, IsFin, Data}. +commands([Command={send, _, _, _}|Tail], State, Acc) -> commands(Tail, State, [Command|Acc]). %% @todo send with IsFin %% @todo stop, {error, Reason} probably. What to do about sending when asked to stop? @@ -210,6 +214,7 @@ terminate(State, HandlerState, Error) -> %% %% webtransport_init(HandlerState) %% webtransport_handle({opened_stream_id, OpenStreamRef, StreamID}, HandlerState) +%% @todo opened_stream_error %% webtransport_handle({stream_open, StreamID, unidi | bidi}, HandlerState) %% webtransport_handle({stream_data, StreamID, IsFin, Data}, HandlerState) %% webtransport_handle({stream_reset_at, StreamID, Code, FinalSize}, HandlerState) diff --git a/test/draft_h3_webtransport_SUITE.erl b/test/draft_h3_webtransport_SUITE.erl index 442ec15b..50f61601 100644 --- a/test/draft_h3_webtransport_SUITE.erl +++ b/test/draft_h3_webtransport_SUITE.erl @@ -55,5 +55,196 @@ init_routes(_) -> [ %% openssl dgst -sha256 -binary | \ %% openssl enc -base64 -run(_Config) -> - timer:sleep(infinity). +%run(_Config) -> +% timer:sleep(infinity). + +%% @todo Write tests!! + +%% 3. Session Establishment + +%% 3.1. Establishing a WebTransport-Capable HTTP/3 Connection + +%% In order to indicate support for WebTransport, the server MUST send a SETTINGS_WEBTRANSPORT_MAX_SESSIONS value greater than "0" in its SETTINGS frame. (3.1) + +%% The client MUST NOT send a WebTransport request until it has received the setting indicating WebTransport support from the server. (3.1) + +%% For draft verisons of WebTransport only, the server MUST NOT process any incoming WebTransport requests until the client settings have been received, as the client may be using a version of the WebTransport extension that is different from the one used by the server. (3.1) + +%% Because WebTransport over HTTP/3 requires support for HTTP/3 datagrams and the Capsule Protocol, both the client and the server MUST indicate support for HTTP/3 datagrams by sending a SETTINGS_H3_DATAGRAM value set to 1 in their SETTINGS frame (see Section 2.1.1 of [HTTP-DATAGRAM]). (3.1) + +%% WebTransport over HTTP/3 also requires support for QUIC datagrams. To indicate support, both the client and the server MUST send a max_datagram_frame_size transport parameter with a value greater than 0 (see Section 3 of [QUIC-DATAGRAM]). (3.1) + +%% Any WebTransport requests sent by the client without enabling QUIC and HTTP datagrams MUST be treated as malformed by the server, as described in Section 4.1.2 of [HTTP3]. (3.1) + +%% WebTransport over HTTP/3 relies on the RESET_STREAM_AT frame defined in [RESET-STREAM-AT]. To indicate support, both the client and the server MUST enable the extension as described in Section 3 of [RESET-STREAM-AT]. (3.1) + +%% 3.2. Extended CONNECT in HTTP/3 + +%% [RFC8441] defines an extended CONNECT method in Section 4, enabled by the SETTINGS_ENABLE_CONNECT_PROTOCOL setting. That setting is defined for HTTP/3 by [RFC9220]. A server supporting WebTransport over HTTP/3 MUST send both the SETTINGS_WEBTRANSPORT_MAX_SESSIONS setting with a value greater than "0" and the SETTINGS_ENABLE_CONNECT_PROTOCOL setting with a value of "1". (3.2) + +%% 3.3. Creating a New Session + +%% As WebTransport sessions are established over HTTP/3, they are identified using the https URI scheme ([HTTP], Section 4.2.2). (3.3) + +%% In order to create a new WebTransport session, a client can send an HTTP CONNECT request. The :protocol pseudo-header field ([RFC8441]) MUST be set to webtransport. The :scheme field MUST be https. Both the :authority and the :path value MUST be set; those fields indicate the desired WebTransport server. If the WebTransport session is coming from a browser client, an Origin header [RFC6454] MUST be provided within the request; otherwise, the header is OPTIONAL. (3.3) + +%% If it does not (have a WT server), it SHOULD reply with status code 404 (Section 15.5.5 of [HTTP]). (3.3) + +%% When the request contains the Origin header, the WebTransport server MUST verify the Origin header to ensure that the specified origin is allowed to access the server in question. If the verification fails, the WebTransport server SHOULD reply with status code 403 (Section 15.5.4 of [HTTP]). (3.3) + +%% If all checks pass, the WebTransport server MAY accept the session by replying with a 2xx series status code, as defined in Section 15.3 of [HTTP]. (3.3) + +%% If the server accepts 0-RTT, the server MUST NOT reduce the limit of maximum open WebTransport sessions from the one negotiated during the previous session; such change would be deemed incompatible, and MUST result in a H3_SETTINGS_ERROR connection error. (3.3) + +%% The capsule-protocol header field Section 3.4 of [HTTP-DATAGRAM] is not required by WebTransport and can safely be ignored by WebTransport endpoints. (3.3) + +%% 3.4. Application Protocol Negotiation + +%% The user agent MAY include a WT-Available-Protocols header field in the CONNECT request. The WT-Available-Protocols enumerates the possible protocols in preference order. If the server receives such a header, it MAY include a WT-Protocol field in a successful (2xx) response. If it does, the server SHALL include a single choice from the client's list in that field. Servers MAY reject the request if the client did not include a suitable protocol. (3.4) + +%% Both WT-Available-Protocols and WT-Protocol are Structured Fields [RFC8941]. WT-Available-Protocols is a List of Tokens, and WT-Protocol is a Token. The token in the WT-Protocol response header field MUST be one of the tokens listed in WT-Available-Protocols of the request. (3.4) + +%% @todo 3.5 Prioritization + +%% 4. WebTransport Features + +%% The client MAY optimistically open unidirectional and bidirectional streams, as well as send datagrams, for a session that it has sent the CONNECT request for, even if it has not yet received the server's response to the request. (4) + +%% If at any point a session ID is received that cannot be a valid ID for a client-initiated bidirectional stream, the recipient MUST close the connection with an H3_ID_ERROR error code. (4) + +%% 4.1. Unidirectional streams + +%% WebTransport endpoints can initiate unidirectional streams. (4.1) + +%% 4.2. Bidirectional Streams + +%% WebTransport endpoints can initiate bidirectional streams. (4.2) + +%% Endpoints MUST NOT send WEBTRANSPORT_STREAM as a frame type on HTTP/3 streams other than the very first bytes of a request stream. Receiving this frame type in any other circumstances MUST be treated as a connection error of type H3_FRAME_ERROR. (4.2) + +%% 4.3. Resetting Data Streams + +%% A WebTransport endpoint may send a RESET_STREAM or a STOP_SENDING frame for a WebTransport data stream. Those signals are propagated by the WebTransport implementation to the application. (4.3) + +%% A WebTransport application SHALL provide an error code for those operations. (4.3) + +%% WebTransport implementations MUST use the RESET_STREAM_AT frame [RESET-STREAM-AT] with a Reliable Size set to at least the size of the WebTransport header when resetting a WebTransport data stream. This ensures that the ID field associating the data stream with a WebTransport session is always delivered. (4.3) + +%% WebTransport implementations SHALL forward the error code for a stream associated with a known session to the application that owns that session (4.3) + +%% 4.4. Datagrams + +%% Datagrams can be sent using HTTP Datagrams. The WebTransport datagram payload is sent unmodified in the "HTTP Datagram Payload" field of an HTTP Datagram (Section 2.1 of [HTTP-DATAGRAM]). Note that the payload field directly follows the Quarter Stream ID field, which is at the start of the QUIC DATAGRAM frame payload and refers to the CONNECT stream that established the WebTransport session. (4.4) + +%% 4.5. Buffering Incoming Streams and Datagrams + +%% To handle this case (out of order stream_open/CONNECT), WebTransport endpoints SHOULD buffer streams and datagrams until those can be associated with an established session. (4.5) + +%% To avoid resource exhaustion, the endpoints MUST limit the number of buffered streams and datagrams. When the number of buffered streams is exceeded, a stream SHALL be closed by sending a RESET_STREAM and/or STOP_SENDING with the WEBTRANSPORT_BUFFERED_STREAM_REJECTED error code. When the number of buffered datagrams is exceeded, a datagram SHALL be dropped. It is up to an implementation to choose what stream or datagram to discard. (4.5) + +%% 4.6. Interaction with HTTP/3 GOAWAY frame + +%% A client receiving GOAWAY cannot initiate CONNECT requests for new WebTransport sessions on that HTTP/3 connection; it must open a new HTTP/3 connection to initiate new WebTransport sessions with the same peer. (4.6) + +%% An HTTP/3 GOAWAY frame is also a signal to applications to initiate shutdown for all WebTransport sessions. (4.6) + +%% To shut down a single WebTransport session, either endpoint can send a DRAIN_WEBTRANSPORT_SESSION (0x78ae) capsule. (4.6) + +%% After sending or receiving either a DRAIN_WEBTRANSPORT_SESSION capsule or a HTTP/3 GOAWAY frame, an endpoint MAY continue using the session and MAY open new streams. The signal is intended for the application using WebTransport, which is expected to attempt to gracefully terminate the session as soon as possible. (4.6) + +%% @todo 4.7. Use of Keying Material Exporters + +%% 5. Flow Control + +%% 5.1. Limiting the Number of Simultaneous Sessions + +%% This document defines a SETTINGS_WEBTRANSPORT_MAX_SESSIONS parameter that allows the server to limit the maximum number of concurrent WebTransport sessions on a single HTTP/3 connection. The client MUST NOT open more simultaneous sessions than indicated in the server SETTINGS parameter. The server MUST NOT close the connection if the client opens sessions exceeding this limit, as the client and the server do not have a consistent view of how many sessions are open due to the asynchronous nature of the protocol; instead, it MUST reset all of the CONNECT streams it is not willing to process with the H3_REQUEST_REJECTED status defined in [HTTP3]. (5.1) + +%% 5.2. Limiting the Number of Streams Within a Session + +%% The WT_MAX_STREAMS capsule (Section 5.6.1) establishes a limit on the number of streams within a WebTransport session. (5.2) + +%% Note that the CONNECT stream for the session is not included in either the bidirectional or the unidirectional stream limits (5.2) + +%% The session-level stream limit applies in addition to the QUIC MAX_STREAMS frame, which provides a connection-level stream limit. New streams can only be created within the session if both the stream- and the connection-level limit permit (5.2) + +%% The WT_STREAMS_BLOCKED capsule (Section 5.7) can be sent to indicate that an endpoint was unable to create a stream due to the session-level stream limit. (5.2) + +%% Note that enforcing this limit requires reliable resets for stream headers so that both endpoints can agree on the number of streams that are open. (5.2) + +%% 5.3. Data Limits + +%% The WT_MAX_DATA capsule (Section 5.8) establishes a limit on the amount of data that can be sent within a WebTransport session. This limit counts all data that is sent on streams of the corresponding type, excluding the stream header (see Section 4.1 and Section 4.2). (5.3) + +%% Implementing WT_MAX_DATA requires that the QUIC stack provide the WebTransport implementation with information about the final size of streams; see { {Section 4.5 of !RFC9000}}. This allows both endpoints to agree on how much data was consumed by that stream, although the stream header exclusion above applies. (5.3) + +%% The WT_DATA_BLOCKED capsule (Section 5.9) can be sent to indicate that an endpoint was unable to send data due to a limit set by the WT_MAX_DATA capsule. (5.3) + +%% The WT_MAX_STREAM_DATA and WT_STREAM_DATA_BLOCKED capsules (Part XX of [I-D.ietf-webtrans-http2]) are not used and so are prohibited. Endpoints MUST treat receipt of a WT_MAX_STREAM_DATA or a WT_STREAM_DATA_BLOCKED capsule as a session error. (5.3) + +%% 5.4. Flow Control and Intermediaries + +%% In practice, an intermediary that translates flow control signals between similar WebTransport protocols, such as between two HTTP/3 connections, can often simply reexpress the same limits received on one connection directly on the other connection. (5.4) + +%% 5.5. Flow Control SETTINGS + +%% WT_MAX_STREAMS via SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAMS_UNI and SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAMS_BIDI (5.5) + +%% WT_MAX_DATA via SETTINGS_WEBTRANSPORT_INITIAL_MAX_DATA (5.5) + +%% 5.6. Flow Control Capsules + +%% 5.6.1. WT_MAX_STREAMS Capsule + +%% An HTTP capsule [HTTP-DATAGRAM] called WT_MAX_STREAMS is introduced to inform the peer of the cumulative number of streams of a given type it is permitted to open. A WT_MAX_STREAMS capsule with a type of 0x190B4D3F applies to bidirectional streams, and a WT_MAX_STREAMS capsule with a type of 0x190B4D40 applies to unidirectional streams. (5.6.1) + +%% Note that, because Maximum Streams is a cumulative value representing the total allowed number of streams, including previously closed streams, endpoints repeatedly send new WT_MAX_STREAMS capsules with increasing Maximum Streams values as streams are opened. (5.6.1) + +%% Maximum Streams: A count of the cumulative number of streams of the corresponding type that can be opened over the lifetime of the session. This value cannot exceed 260, as it is not possible to encode stream IDs larger than 262-1. (5.6.1) + +%% An endpoint MUST NOT open more streams than permitted by the current stream limit set by its peer. (5.6.1) + +%% Note that this limit includes streams that have been closed as well as those that are open. (5.6.1) + +%% Initial values for these limits MAY be communicated by sending non-zero values for SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAMS_UNI and SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAMS_BIDI. (5.6.1) + +%% 5.7. WT_STREAMS_BLOCKED Capsule + +%% A sender SHOULD send a WT_STREAMS_BLOCKED capsule (type=0x190B4D43 for bidi or 0x190B4D44 for unidi) when it wishes to open a stream but is unable to do so due to the maximum stream limit set by its peer. (5.7) + +%% 5.8. WT_MAX_DATA Capsule + +%% An HTTP capsule [HTTP-DATAGRAM] called WT_MAX_DATA (type=0x190B4D3D) is introduced to inform the peer of the maximum amount of data that can be sent on the WebTransport session as a whole. (5.8) + +%% This limit counts all data that is sent on streams of the corresponding type, excluding the stream header (see Section 4.1 and Section 4.2). Implementing WT_MAX_DATA requires that the QUIC stack provide the WebTransport implementation with information about the final size of streams; see Section 4.5 of [RFC9000]. (5.8) + +%% All data sent in WT_STREAM capsules counts toward this limit. The sum of the lengths of Stream Data fields in WT_STREAM capsules MUST NOT exceed the value advertised by a receiver. (5.8) + +%% The initial value for this limit MAY be communicated by sending a non-zero value for SETTINGS_WEBTRANSPORT_INITIAL_MAX_DATA. (5.8) + +%% 5.9. WT_DATA_BLOCKED Capsule + +%% A sender SHOULD send a WT_DATA_BLOCKED capsule (type=0x190B4D41) when it wishes to send data but is unable to do so due to WebTransport session-level flow control. (5.9) + +%% WT_DATA_BLOCKED capsules can be used as input to tuning of flow control algorithms. (5.9) + +%% 6. Session Termination + +%% A WebTransport session over HTTP/3 is considered terminated when either of the following conditions is met: +%% * the CONNECT stream is closed, either cleanly or abruptly, on either side; or +%% * a CLOSE_WEBTRANSPORT_SESSION capsule is either sent or received. +%% (6) + +%% Upon learning that the session has been terminated, the endpoint MUST reset the send side and abort reading on the receive side of all of the streams associated with the session (see Section 2.4 of [RFC9000]) using the WEBTRANSPORT_SESSION_GONE error code; it MUST NOT send any new datagrams or open any new streams. (6) + +%% To terminate a session with a detailed error message, an application MAY send an HTTP capsule [HTTP-DATAGRAM] of type CLOSE_WEBTRANSPORT_SESSION (0x2843). (6) + +%% Application Error Message: A UTF-8 encoded error message string provided by the application closing the session. The message takes up the remainder of the capsule, and its length MUST NOT exceed 1024 bytes. (6) + +%% An endpoint that sends a CLOSE_WEBTRANSPORT_SESSION capsule MUST immediately send a FIN. The endpoint MAY send a STOP_SENDING to indicate it is no longer reading from the CONNECT stream. The recipient MUST either close or reset the stream in response. (6) + +%% If any additional stream data is received on the CONNECT stream after receiving a CLOSE_WEBTRANSPORT_SESSION capsule, the stream MUST be reset with code H3_MESSAGE_ERROR. (6) + +%% Cleanly terminating a CONNECT stream without a CLOSE_WEBTRANSPORT_SESSION capsule SHALL be semantically equivalent to terminating it with a CLOSE_WEBTRANSPORT_SESSION capsule that has an error code of 0 and an empty error string. (6) + +%% the endpoint SHOULD wait until all CONNECT streams have been closed by the peer before sending the CONNECTION_CLOSE (6) diff --git a/test/handlers/wt_echo_h.erl b/test/handlers/wt_echo_h.erl index 97056cf6..50dadb49 100644 --- a/test/handlers/wt_echo_h.erl +++ b/test/handlers/wt_echo_h.erl @@ -6,16 +6,46 @@ -export([init/2]). -export([webtransport_handle/2]). +-export([webtransport_info/2]). init(Req, _) -> - {cowboy_webtransport, Req, undefined}. + {cowboy_webtransport, Req, #{}}. %% @todo WT handle {stream_open,4,bidi} -%% @todo WT handle {stream_data,4,nofin,<<>>} %% skip? -webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, HandlerState) -> +webtransport_handle(Event = {stream_open, StreamID, bidi}, Streams) -> ct:pal("WT handle ~p~n", [Event]), - {[{send, StreamID, Data}], HandlerState}; -webtransport_handle(Event, HandlerState) -> + {[], Streams#{StreamID => bidi}}; +webtransport_handle(Event = {stream_open, StreamID, unidi}, Streams) -> ct:pal("WT handle ~p~n", [Event]), - {[], HandlerState}. + OpenStreamRef = make_ref(), + {[{open_stream, OpenStreamRef, unidi, <<>>}], Streams#{ + StreamID => {unidi_remote, OpenStreamRef}, + OpenStreamRef => {unidi_local, StreamID}}}; +webtransport_handle(Event = {opened_stream_id, OpenStreamRef, OpenStreamID}, Streams) -> + ct:pal("WT handle ~p~n", [Event]), + #{OpenStreamRef := {unidi_local, RemoteStreamID}} = Streams, + #{RemoteStreamID := {unidi_remote, OpenStreamRef}} = Streams, + {[], maps:remove(OpenStreamRef, Streams#{ + RemoteStreamID => {unidi_remote, OpenStreamID}, + OpenStreamID => {unidi_local, RemoteStreamID} + })}; +webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) -> + ct:pal("WT handle ~p~n", [Event]), + case Streams of + #{StreamID := bidi} -> + {[{send, StreamID, IsFin, Data}], Streams}; + #{StreamID := {unidi_remote, Ref}} when is_reference(Ref) -> + %% The stream isn't ready. We try again later. + erlang:send_after(100, self(), {try_again, Event}), + {[], Streams}; + #{StreamID := {unidi_remote, LocalStreamID}} -> + {[{send, LocalStreamID, IsFin, Data}], Streams} + end; +webtransport_handle(Event, Streams) -> + ct:pal("WT handle ~p~n", [Event]), + {[], Streams}. + +webtransport_info({try_again, Event}, Streams) -> + ct:pal("try_again ~p", [Event]), + webtransport_handle(Event, Streams). From 8c2bdb1e2247962531c9754e86b0e5bc1b6063bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 23 May 2025 14:02:03 +0200 Subject: [PATCH 4/5] WIP --- src/cowboy_http3.erl | 70 +++++++-- src/cowboy_quicer.erl | 3 + src/cowboy_req.erl | 1 + src/cowboy_webtransport.erl | 16 +- test/draft_h3_webtransport_SUITE.erl | 214 ++++++++++++++++++++++++++- test/handlers/wt_echo_h.erl | 39 ++++- test/rfc9220_SUITE.erl | 2 +- 7 files changed, 318 insertions(+), 27 deletions(-) diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 110a114b..bc87ceb6 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -198,6 +198,8 @@ handle_quic_msg(State0=#state{opts=Opts}, Msg) -> case cowboy_quicer:handle(Msg) of {data, StreamID, IsFin, Data} -> parse(State0, StreamID, Data, IsFin); + {datagram, Data} -> + parse_datagram(State0, Data); {stream_started, StreamID, StreamType} -> State = stream_new_remote(State0, StreamID, StreamType), loop(State); @@ -551,6 +553,36 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, send_headers(State0, Stream, fin, StatusCode0, RespHeaders0) end. +%% Datagrams. + +parse_datagram(State, Data) -> + case parse_var_int(Data) of + {ok, QuarterID, Rest} -> + SessionID = QuarterID * 4, + case stream_get(State, SessionID) of + #stream{status=webtransport_session} -> + webtransport_event(State, SessionID, {datagram, Rest}), + loop(State); + _ -> + error(todo) %% @todo Might be a future WT session or an error. + end; + %% Ignore invalid datagrams. @todo Is that behavior correct? + more -> + loop(State) + end. + +%% @todo Move to Cowlib and use in cow_http3. +parse_var_int(<<0:2, Int:6, Rest/bits>>) -> + {ok, Int, Rest}; +parse_var_int(<<1:2, Int:14, Rest/bits>>) -> + {ok, Int, Rest}; +parse_var_int(<<2:2, Int:30, Rest/bits>>) -> + {ok, Int, Rest}; +parse_var_int(<<3:2, Int:62, Rest/bits>>) -> + {ok, Int, Rest}; +parse_var_int(_) -> + more. + %% Erlang messages. down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> @@ -874,7 +906,7 @@ webtransport_commands(State, SessionID, Commands) -> wt_commands(State, _, []) -> State; -wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, +wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) -> %% Because opening the stream involves sending a short header %% we necessarily write data. The InitialData variable allows @@ -887,17 +919,25 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, case cowboy_quicer:StartF(Conn, [Header, InitialData]) of {ok, StreamID} -> %% @todo Pass Session directly? - webtransport_event(State, SessionID, + webtransport_event(State0, SessionID, {opened_stream_id, OpenStreamRef, StreamID}), - %% @todo Save the WT stream in cow_http3_machine AND here. + State = stream_new_local(State0, StreamID, StreamType, + {webtransport_stream, SessionID, StreamType}), wt_commands(State, Session, Tail) %% @todo Handle errors. end; wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) -> %% @todo Check that StreamID belongs to Session. error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]}); -wt_commands(State, Session, [{send, datagram, Data}|Tail]) -> - error({todo, State, Session, [{send, datagram, Data}|Tail]}); +wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, + [{send, datagram, Data}|Tail]) -> + QuarterID = SessionID div 4, + %% @todo Add a function to cowboy_quicer. + case quicer:send_dgram(Conn, [cow_http3:encode_int(QuarterID), Data]) of + {ok, _} -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) -> %% @todo Check that StreamID belongs to Session. case cowboy_quicer:send(Conn, StreamID, Data, nofin) of @@ -1058,15 +1098,25 @@ terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reas stream_get(#state{streams=Streams}, StreamID) -> maps:get(StreamID, Streams, error). -stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, - StreamID, StreamType) -> +stream_new_local(State, StreamID, StreamType, Status) -> + stream_new(State, StreamID, StreamType, unidi_local, Status). + +stream_new_remote(State, StreamID, StreamType) -> + Status = case StreamType of + unidi -> header; + bidi -> normal + end, + stream_new(State, StreamID, StreamType, unidi_remote, Status). + +stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, + StreamID, StreamType, UnidiType, Status) -> {HTTP3Machine, Status} = case StreamType of unidi -> - {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0), - header}; + {cow_http3_machine:init_unidi_stream(StreamID, UnidiType, HTTP3Machine0), + Status}; bidi -> {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0), - normal} + Status} end, Stream = #stream{id=StreamID, status=Status}, State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}. diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl index fb9de125..915f31a3 100644 --- a/src/cowboy_quicer.erl +++ b/src/cowboy_quicer.erl @@ -201,6 +201,9 @@ handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) -> _ -> nofin end, {data, StreamID, IsFin, Data}; +%% @todo Match on Conn. +handle({quic, Data, Conn, Flags}) when is_integer(Flags) -> + {datagram, Data}; %% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED. handle({quic, new_stream, StreamRef, #{flags := Flags}}) -> case quicer:setopt(StreamRef, active, true) of diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 933d22e3..550054e8 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -445,6 +445,7 @@ parse_header_fun(<<"sec-websocket-protocol">>) -> fun cow_http_hd:parse_sec_webs parse_header_fun(<<"sec-websocket-version">>) -> fun cow_http_hd:parse_sec_websocket_version_req/1; parse_header_fun(<<"trailer">>) -> fun cow_http_hd:parse_trailer/1; parse_header_fun(<<"upgrade">>) -> fun cow_http_hd:parse_upgrade/1; +parse_header_fun(<<"wt-available-protocols">>) -> fun cow_http_hd:parse_wt_available_protocols/1; parse_header_fun(<<"x-forwarded-for">>) -> fun cow_http_hd:parse_x_forwarded_for/1. parse_header(Name, Req, Default, ParseFun) -> diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl index 0d7d452b..324ae5c5 100644 --- a/src/cowboy_webtransport.erl +++ b/src/cowboy_webtransport.erl @@ -27,6 +27,7 @@ -export([upgrade/4]). -export([upgrade/5]). +-export([terminate/3]). -type opts() :: #{ %% @todo @@ -201,8 +202,19 @@ commands([Command={send, _, _, _}|Tail], State, Acc) -> %% @todo set_options (to increase number of streams? data amounts? or a flow command?) %% @todo shutdown_reason if useful. -terminate(State, HandlerState, Error) -> - error({todo, State, HandlerState, Error}). +terminate(State, HandlerState, Reason) -> + %cowboy_stream:terminate(StreamID, Reason, StreamState) +%% @todo This terminate is at the connection level. +% handler_terminate(State, HandlerState, Reason), +% case Shutdown of +% normal -> exit(normal); +% _ -> exit({shutdown, Shutdown}) +% end. +% exit(normal). +%handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> +% cowboy_handler:terminate(Reason, Req, HandlerState, Handler). + ok. + diff --git a/test/draft_h3_webtransport_SUITE.erl b/test/draft_h3_webtransport_SUITE.erl index 50f61601..2832774a 100644 --- a/test/draft_h3_webtransport_SUITE.erl +++ b/test/draft_h3_webtransport_SUITE.erl @@ -19,6 +19,10 @@ -import(ct_helper, [config/2]). -import(ct_helper, [doc/1]). +%% @todo -ifdef(COWBOY_QUICER). + +-include_lib("quicer/include/quicer.hrl"). + all() -> [{group, enabled}]. @@ -58,29 +62,38 @@ init_routes(_) -> [ %run(_Config) -> % timer:sleep(infinity). -%% @todo Write tests!! - %% 3. Session Establishment %% 3.1. Establishing a WebTransport-Capable HTTP/3 Connection %% In order to indicate support for WebTransport, the server MUST send a SETTINGS_WEBTRANSPORT_MAX_SESSIONS value greater than "0" in its SETTINGS frame. (3.1) +%% @todo reject_session_disabled +%% @todo accept_session_below +%% @todo accept_session_equal +%% @todo reject_session_above %% The client MUST NOT send a WebTransport request until it has received the setting indicating WebTransport support from the server. (3.1) %% For draft verisons of WebTransport only, the server MUST NOT process any incoming WebTransport requests until the client settings have been received, as the client may be using a version of the WebTransport extension that is different from the one used by the server. (3.1) %% Because WebTransport over HTTP/3 requires support for HTTP/3 datagrams and the Capsule Protocol, both the client and the server MUST indicate support for HTTP/3 datagrams by sending a SETTINGS_H3_DATAGRAM value set to 1 in their SETTINGS frame (see Section 2.1.1 of [HTTP-DATAGRAM]). (3.1) +%% @todo settings_h3_datagram_enabled %% WebTransport over HTTP/3 also requires support for QUIC datagrams. To indicate support, both the client and the server MUST send a max_datagram_frame_size transport parameter with a value greater than 0 (see Section 3 of [QUIC-DATAGRAM]). (3.1) +%% @todo quic_datagram_enabled (if size is too low the CONNECT stream can be used for capsules) %% Any WebTransport requests sent by the client without enabling QUIC and HTTP datagrams MUST be treated as malformed by the server, as described in Section 4.1.2 of [HTTP3]. (3.1) +%% @todo reject_h3_datagram_disabled +%% @todo reject_quic_datagram_disabled %% WebTransport over HTTP/3 relies on the RESET_STREAM_AT frame defined in [RESET-STREAM-AT]. To indicate support, both the client and the server MUST enable the extension as described in Section 3 of [RESET-STREAM-AT]. (3.1) +%% @todo reset_stream_at_enabled %% 3.2. Extended CONNECT in HTTP/3 %% [RFC8441] defines an extended CONNECT method in Section 4, enabled by the SETTINGS_ENABLE_CONNECT_PROTOCOL setting. That setting is defined for HTTP/3 by [RFC9220]. A server supporting WebTransport over HTTP/3 MUST send both the SETTINGS_WEBTRANSPORT_MAX_SESSIONS setting with a value greater than "0" and the SETTINGS_ENABLE_CONNECT_PROTOCOL setting with a value of "1". (3.2) +%% @todo settings_enable_connect_protocol_enabled +%% @todo reject_settings_enable_connect_protocol_disabled %% 3.3. Creating a New Session @@ -92,7 +105,19 @@ init_routes(_) -> [ %% When the request contains the Origin header, the WebTransport server MUST verify the Origin header to ensure that the specified origin is allowed to access the server in question. If the verification fails, the WebTransport server SHOULD reply with status code 403 (Section 15.5.4 of [HTTP]). (3.3) -%% If all checks pass, the WebTransport server MAY accept the session by replying with a 2xx series status code, as defined in Section 15.3 of [HTTP]. (3.3) +accept_session_when_enabled(Config) -> + doc("Confirm that a WebTransport session can be established over HTTP/3. " + "(draft_webtrans_http3 3.3, RFC9220)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a bidi stream, send Hello, get Hello back. + {ok, BidiStreamRef} = quicer:start_stream(Conn, #{}), + {ok, _} = quicer:send(BidiStreamRef, <<16#41, 0:2, SessionID:6, "Hello">>), + {ok, <<"Hello">>} = rfc9114_SUITE:do_receive_data(BidiStreamRef), + ok. %% If the server accepts 0-RTT, the server MUST NOT reduce the limit of maximum open WebTransport sessions from the one negotiated during the previous session; such change would be deemed incompatible, and MUST result in a H3_SETTINGS_ERROR connection error. (3.3) @@ -102,6 +127,16 @@ init_routes(_) -> [ %% The user agent MAY include a WT-Available-Protocols header field in the CONNECT request. The WT-Available-Protocols enumerates the possible protocols in preference order. If the server receives such a header, it MAY include a WT-Protocol field in a successful (2xx) response. If it does, the server SHALL include a single choice from the client's list in that field. Servers MAY reject the request if the client did not include a suitable protocol. (3.4) +application_protocol_negotiation(Config) -> + doc("Applications can negotiate a protocol to use via WebTransport. " + "(draft_webtrans_http3 3.4)"), + %% Connect to the WebTransport server. + #{ + resp_headers := RespHeaders + } = do_webtransport_connect(Config, [{<<"wt-available-protocols">>, <<"foo, bar">>}]), + {<<"wt-protocol">>, <<"foo">>} = lists:keyfind(<<"wt-protocol">>, 1, RespHeaders), + ok. + %% Both WT-Available-Protocols and WT-Protocol are Structured Fields [RFC8941]. WT-Available-Protocols is a List of Tokens, and WT-Protocol is a Token. The token in the WT-Protocol response header field MUST be one of the tokens listed in WT-Available-Protocols of the request. (3.4) %% @todo 3.5 Prioritization @@ -111,14 +146,65 @@ init_routes(_) -> [ %% The client MAY optimistically open unidirectional and bidirectional streams, as well as send datagrams, for a session that it has sent the CONNECT request for, even if it has not yet received the server's response to the request. (4) %% If at any point a session ID is received that cannot be a valid ID for a client-initiated bidirectional stream, the recipient MUST close the connection with an H3_ID_ERROR error code. (4) +%% @todo Open bidi with Session ID 0, then do the CONNECT request. %% 4.1. Unidirectional streams -%% WebTransport endpoints can initiate unidirectional streams. (4.1) +unidirectional_streams(Config) -> + doc("Both endpoints can open and use unidirectional streams. " + "(draft_webtrans_http3 4.1)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a unidi stream, send Hello with a Fin flag. + {ok, LocalStreamRef} = quicer:start_stream(Conn, + #{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}), + {ok, _} = quicer:send(LocalStreamRef, + <<16#54, 0:2, SessionID:6, "Hello">>, + ?QUIC_SEND_FLAG_FIN), + %% Accept an identical unidi stream. + {unidi, RemoteStreamRef} = do_receive_new_stream(), + {nofin, <<16#54, 0:2, SessionID:6>>} = do_receive_data(RemoteStreamRef), + {fin, <<"Hello">>} = do_receive_data(RemoteStreamRef), + ok. %% 4.2. Bidirectional Streams -%% WebTransport endpoints can initiate bidirectional streams. (4.2) +bidirectional_streams_client(Config) -> + doc("The WT client can open and use bidirectional streams. " + "(draft_webtrans_http3 4.2)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a bidi stream, send Hello, get Hello back. + {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), + {ok, _} = quicer:send(LocalStreamRef, <<16#41, 0:2, SessionID:6, "Hello">>), + %% @todo Use the local do_receive_data instead to have the fin flag. + {ok, <<"Hello">>} = rfc9114_SUITE:do_receive_data(LocalStreamRef), + ok. + +bidirectional_streams_server(Config) -> + doc("The WT server can open and use bidirectional streams. " + "(draft_webtrans_http3 4.2)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a bidi stream, send a special instruction to make it create a bidi stream. + {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), + {ok, _} = quicer:send(LocalStreamRef, <<16#41, 0:2, SessionID:6, "TEST:open_bidi">>), + %% Accept the bidi stream and receive the data. + {bidi, RemoteStreamRef} = do_receive_new_stream(), + {nofin, <<16#41, 0:2, SessionID:6>>} = do_receive_data(RemoteStreamRef), + {ok, _} = quicer:send(RemoteStreamRef, <<"Hello">>, + ?QUIC_SEND_FLAG_FIN), + {fin, <<"Hello">>} = do_receive_data(RemoteStreamRef), + ok. %% Endpoints MUST NOT send WEBTRANSPORT_STREAM as a frame type on HTTP/3 streams other than the very first bytes of a request stream. Receiving this frame type in any other circumstances MUST be treated as a connection error of type H3_FRAME_ERROR. (4.2) @@ -134,7 +220,21 @@ init_routes(_) -> [ %% 4.4. Datagrams -%% Datagrams can be sent using HTTP Datagrams. The WebTransport datagram payload is sent unmodified in the "HTTP Datagram Payload" field of an HTTP Datagram (Section 2.1 of [HTTP-DATAGRAM]). Note that the payload field directly follows the Quarter Stream ID field, which is at the start of the QUIC DATAGRAM frame payload and refers to the CONNECT stream that established the WebTransport session. (4.4) +datagrams(Config) -> + doc("Both endpoints can send and receive datagrams. (draft_webtrans_http3 4.4)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + session_id := SessionID + } = do_webtransport_connect(Config), + QuarterID = SessionID div 4, + %% Send a Hello datagram. + {ok, _} = quicer:send_dgram(Conn, <<0:2, QuarterID:6, "Hello">>), + %% Receive a Hello datagram back. + {datagram, SessionID, <<"Hello">>} = do_receive_datagram(Conn), + ok. + +%% @todo datagrams_via_capsule? %% 4.5. Buffering Incoming Streams and Datagrams @@ -248,3 +348,105 @@ init_routes(_) -> [ %% Cleanly terminating a CONNECT stream without a CLOSE_WEBTRANSPORT_SESSION capsule SHALL be semantically equivalent to terminating it with a CLOSE_WEBTRANSPORT_SESSION capsule that has an error code of 0 and an empty error string. (6) %% the endpoint SHOULD wait until all CONNECT streams have been closed by the peer before sending the CONNECTION_CLOSE (6) + +%% Helpers. + +do_webtransport_connect(Config) -> + do_webtransport_connect(Config, []). + +do_webtransport_connect(Config, ExtraHeaders) -> + %% Connect to server. + #{conn := Conn, settings := Settings} = rfc9114_SUITE:do_connect(Config, #{ + peer_unidi_stream_count => 100, + datagram_send_enabled => 1, + datagram_receive_enabled => 1 + }), + %% Confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + #{enable_connect_protocol := true} = Settings, + %% Confirm that SETTINGS_WEBTRANSPORT_MAX_SESSIONS >= 1. + #{webtransport_max_sessions := WTMaxSessions} = Settings, + true = WTMaxSessions >= 1, + %% Confirm that SETTINGS_H3_DATAGRAM = 1. + #{h3_datagram := true} = Settings, + %% Confirm that QUIC's max_datagram_size > 0. + receive {quic, dgram_state_changed, Conn, DatagramState} -> + #{ + dgram_max_len := DatagramMaxLen, + dgram_send_enabled := DatagramSendEnabled + } = DatagramState, + true = DatagramMaxLen > 0, + true = DatagramSendEnabled, + ok + after 5000 -> + error({timeout, waiting_for_datagram_state_change}) + end, + %% Send a CONNECT :protocol request to upgrade the stream to Websocket. + {ok, ConnectStreamRef} = quicer:start_stream(Conn, #{}), + {ok, EncodedRequest, _EncData, _EncSt} = cow_qpack:encode_field_section([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"webtransport">>}, + {<<":scheme">>, <<"https">>}, + {<<":path">>, <<"/wt">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"origin">>, <<"https://localhost">>} + |ExtraHeaders], 0, cow_qpack:init(encoder)), + {ok, _} = quicer:send(ConnectStreamRef, [ + <<1>>, %% HEADERS frame. + cow_http3:encode_int(iolist_size(EncodedRequest)), + EncodedRequest + ]), + %% Receive a 200 response. + {ok, Data} = rfc9114_SUITE:do_receive_data(ConnectStreamRef), + {HLenEnc, HLenBits} = rfc9114_SUITE:do_guess_int_encoding(Data), + << + 1, %% HEADERS frame. + HLenEnc:2, HLen:HLenBits, + EncodedResponse:HLen/bytes + >> = Data, + {ok, DecodedResponse, _DecData, _DecSt} + = cow_qpack:decode_field_section(EncodedResponse, 0, cow_qpack:init(decoder)), + #{<<":status">> := <<"200">>} = maps:from_list(DecodedResponse), + %% Retrieve the Session ID. + {ok, SessionID} = quicer:get_stream_id(ConnectStreamRef), + %% Accept QPACK streams to avoid conflicts with unidi streams from tests. + Unidi1 = rfc9114_SUITE:do_accept_qpack_stream(Conn), + Unidi2 = rfc9114_SUITE:do_accept_qpack_stream(Conn), + %% Done. + #{ + conn => Conn, + session_id => SessionID, + resp_headers => DecodedResponse, + enc_or_dec1 => Unidi1, + enc_or_dec2 => Unidi2 + }. + +do_receive_new_stream() -> + receive + {quic, new_stream, StreamRef, #{flags := Flags}} -> + ok = quicer:setopt(StreamRef, active, true), + case quicer:is_unidirectional(Flags) of + true -> {unidi, StreamRef}; + false -> {bidi, StreamRef} + end + after 5000 -> + error({timeout, waiting_for_stream}) + end. + +do_receive_data(StreamRef) -> + receive {quic, Data, StreamRef, #{flags := Flags}} -> + IsFin = case Flags band ?QUIC_RECEIVE_FLAG_FIN of + ?QUIC_RECEIVE_FLAG_FIN -> fin; + _ -> nofin + end, + {IsFin, Data} + after 5000 -> + error({timeout, waiting_for_data}) + end. + +do_receive_datagram(Conn) -> + receive {quic, <<0:2, QuarterID:6, Data/bits>>, Conn, Flags} when is_integer(Flags) -> + {datagram, QuarterID * 4, Data} + after 5000 -> + ct:pal("~p", [process_info(self(), messages)]), + error({timeout, waiting_for_datagram}) + end. diff --git a/test/handlers/wt_echo_h.erl b/test/handlers/wt_echo_h.erl index 50dadb49..fabb612a 100644 --- a/test/handlers/wt_echo_h.erl +++ b/test/handlers/wt_echo_h.erl @@ -8,7 +8,13 @@ -export([webtransport_handle/2]). -export([webtransport_info/2]). -init(Req, _) -> +init(Req0, _) -> + Req = case cowboy_req:parse_header(<<"wt-available-protocols">>, Req0) of + undefined -> + Req0; + [Protocol|_] -> + cowboy_req:set_resp_header(<<"wt-protocol">>, Protocol, Req0) + end, {cowboy_webtransport, Req, #{}}. %% @todo WT handle {stream_open,4,bidi} @@ -24,12 +30,26 @@ webtransport_handle(Event = {stream_open, StreamID, unidi}, Streams) -> OpenStreamRef => {unidi_local, StreamID}}}; webtransport_handle(Event = {opened_stream_id, OpenStreamRef, OpenStreamID}, Streams) -> ct:pal("WT handle ~p~n", [Event]), - #{OpenStreamRef := {unidi_local, RemoteStreamID}} = Streams, - #{RemoteStreamID := {unidi_remote, OpenStreamRef}} = Streams, - {[], maps:remove(OpenStreamRef, Streams#{ - RemoteStreamID => {unidi_remote, OpenStreamID}, - OpenStreamID => {unidi_local, RemoteStreamID} - })}; + case Streams of + #{OpenStreamRef := bidi} -> + {[], maps:remove(OpenStreamRef, Streams#{ + OpenStreamID => bidi + })}; + #{OpenStreamRef := {unidi_local, RemoteStreamID}} -> + #{RemoteStreamID := {unidi_remote, OpenStreamRef}} = Streams, + {[], maps:remove(OpenStreamRef, Streams#{ + RemoteStreamID => {unidi_remote, OpenStreamID}, + OpenStreamID => {unidi_local, RemoteStreamID} + })} + end; +webtransport_handle(Event = {stream_data, StreamID, IsFin, <<"TEST:", Test/bits>>}, Streams) -> + ct:pal("WT handle ~p~n", [Event]), + case Test of + <<"open_bidi">> -> + OpenStreamRef = make_ref(), + {[{open_stream, OpenStreamRef, bidi, <<>>}], + Streams#{OpenStreamRef => bidi}} + end; webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) -> ct:pal("WT handle ~p~n", [Event]), case Streams of @@ -42,8 +62,11 @@ webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) -> #{StreamID := {unidi_remote, LocalStreamID}} -> {[{send, LocalStreamID, IsFin, Data}], Streams} end; -webtransport_handle(Event, Streams) -> +webtransport_handle(Event = {datagram, Data}, Streams) -> ct:pal("WT handle ~p~n", [Event]), + {[{send, datagram, Data}], Streams}; +webtransport_handle(Event, Streams) -> + ct:pal("WT handle ignore ~p~n", [Event]), {[], Streams}. webtransport_info({try_again, Event}, Streams) -> diff --git a/test/rfc9220_SUITE.erl b/test/rfc9220_SUITE.erl index fc7a48d1..38a59b2c 100644 --- a/test/rfc9220_SUITE.erl +++ b/test/rfc9220_SUITE.erl @@ -426,7 +426,7 @@ reject_upgrade_header(Config) -> % Examples. accept_handshake_when_enabled(Config) -> - doc("Confirm the example for Websocket over HTTP/2 works. (RFC9220, RFC8441 5.1)"), + doc("Confirm the example for Websocket over HTTP/3 works. (RFC9220, RFC8441 5.1)"), %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. #{conn := Conn, settings := Settings} = rfc9114_SUITE:do_connect(Config), #{enable_connect_protocol := true} = Settings, From 2fe2edf4ffd79fc722dc313c0ba13b078e2c75fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 26 May 2025 14:48:13 +0200 Subject: [PATCH 5/5] Fix signal int sizes --- src/cowboy_http3.erl | 16 ++-------------- test/draft_h3_webtransport_SUITE.erl | 14 ++++++-------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index bc87ceb6..8a01c795 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -556,8 +556,8 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, %% Datagrams. parse_datagram(State, Data) -> - case parse_var_int(Data) of - {ok, QuarterID, Rest} -> + case cow_http3:parse_int(Data) of + {QuarterID, Rest} -> SessionID = QuarterID * 4, case stream_get(State, SessionID) of #stream{status=webtransport_session} -> @@ -571,18 +571,6 @@ parse_datagram(State, Data) -> loop(State) end. -%% @todo Move to Cowlib and use in cow_http3. -parse_var_int(<<0:2, Int:6, Rest/bits>>) -> - {ok, Int, Rest}; -parse_var_int(<<1:2, Int:14, Rest/bits>>) -> - {ok, Int, Rest}; -parse_var_int(<<2:2, Int:30, Rest/bits>>) -> - {ok, Int, Rest}; -parse_var_int(<<3:2, Int:62, Rest/bits>>) -> - {ok, Int, Rest}; -parse_var_int(_) -> - more. - %% Erlang messages. down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> diff --git a/test/draft_h3_webtransport_SUITE.erl b/test/draft_h3_webtransport_SUITE.erl index 2832774a..4cde6170 100644 --- a/test/draft_h3_webtransport_SUITE.erl +++ b/test/draft_h3_webtransport_SUITE.erl @@ -115,7 +115,7 @@ accept_session_when_enabled(Config) -> } = do_webtransport_connect(Config), %% Create a bidi stream, send Hello, get Hello back. {ok, BidiStreamRef} = quicer:start_stream(Conn, #{}), - {ok, _} = quicer:send(BidiStreamRef, <<16#41, 0:2, SessionID:6, "Hello">>), + {ok, _} = quicer:send(BidiStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>), {ok, <<"Hello">>} = rfc9114_SUITE:do_receive_data(BidiStreamRef), ok. @@ -125,8 +125,6 @@ accept_session_when_enabled(Config) -> %% 3.4. Application Protocol Negotiation -%% The user agent MAY include a WT-Available-Protocols header field in the CONNECT request. The WT-Available-Protocols enumerates the possible protocols in preference order. If the server receives such a header, it MAY include a WT-Protocol field in a successful (2xx) response. If it does, the server SHALL include a single choice from the client's list in that field. Servers MAY reject the request if the client did not include a suitable protocol. (3.4) - application_protocol_negotiation(Config) -> doc("Applications can negotiate a protocol to use via WebTransport. " "(draft_webtrans_http3 3.4)"), @@ -162,11 +160,11 @@ unidirectional_streams(Config) -> {ok, LocalStreamRef} = quicer:start_stream(Conn, #{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}), {ok, _} = quicer:send(LocalStreamRef, - <<16#54, 0:2, SessionID:6, "Hello">>, + <<1:2, 16#54:14, 0:2, SessionID:6, "Hello">>, ?QUIC_SEND_FLAG_FIN), %% Accept an identical unidi stream. {unidi, RemoteStreamRef} = do_receive_new_stream(), - {nofin, <<16#54, 0:2, SessionID:6>>} = do_receive_data(RemoteStreamRef), + {nofin, <<1:2, 16#54:14, 0:2, SessionID:6>>} = do_receive_data(RemoteStreamRef), {fin, <<"Hello">>} = do_receive_data(RemoteStreamRef), ok. @@ -182,7 +180,7 @@ bidirectional_streams_client(Config) -> } = do_webtransport_connect(Config), %% Create a bidi stream, send Hello, get Hello back. {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), - {ok, _} = quicer:send(LocalStreamRef, <<16#41, 0:2, SessionID:6, "Hello">>), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>), %% @todo Use the local do_receive_data instead to have the fin flag. {ok, <<"Hello">>} = rfc9114_SUITE:do_receive_data(LocalStreamRef), ok. @@ -197,10 +195,10 @@ bidirectional_streams_server(Config) -> } = do_webtransport_connect(Config), %% Create a bidi stream, send a special instruction to make it create a bidi stream. {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), - {ok, _} = quicer:send(LocalStreamRef, <<16#41, 0:2, SessionID:6, "TEST:open_bidi">>), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:open_bidi">>), %% Accept the bidi stream and receive the data. {bidi, RemoteStreamRef} = do_receive_new_stream(), - {nofin, <<16#41, 0:2, SessionID:6>>} = do_receive_data(RemoteStreamRef), + {nofin, <<1:2, 16#41:14, 0:2, SessionID:6>>} = do_receive_data(RemoteStreamRef), {ok, _} = quicer:send(RemoteStreamRef, <<"Hello">>, ?QUIC_SEND_FLAG_FIN), {fin, <<"Hello">>} = do_receive_data(RemoteStreamRef),