From 3841edbd5ef632f74aa20cb254256ed19bd9c87d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 18 Jun 2025 16:14:58 +0200 Subject: [PATCH] WIP --- src/cowboy_http3.erl | 69 +++++++++++++++++----------- src/cowboy_quicer.erl | 2 +- test/draft_h3_webtransport_SUITE.erl | 6 ++- test/handlers/wt_echo_h.erl | 6 +-- 4 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 2fbe9316..f3dcb23c 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -86,8 +86,9 @@ %% Whether the stream is currently in a special state. status :: header | {unidi, control | encoder | decoder} | normal | {data | ignore, non_neg_integer()} | stopping + | {webtransport_session, normal | {ignore, non_neg_integer()}} %% @todo Is unidi | bidi useful to keep? - | webtransport_session | {webtransport_stream, cow_http3:stream_id(), unidi | bidi}, + | {webtransport_stream, cow_http3:stream_id(), unidi | bidi}, %% Stream buffer. buffer = <<>> :: binary(), @@ -256,7 +257,8 @@ parse1(State=#state{http3_machine=HTTP3Machine0}, terminate(State#state{http3_machine=HTTP3Machine}, Error) end; %% @todo Handle when IsFin = fin which must terminate the WT session. -parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, IsFin) -> +parse1(State, Stream=#stream{id=SessionID, status= + {webtransport_session, normal}}, Data, IsFin) -> case cow_capsule:parse(Data) of {ok, wt_drain_session, Rest} -> webtransport_event(State, SessionID, close_initiated), @@ -274,14 +276,32 @@ parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, I %% @todo Don't crash, error out properly. <<>> = Rest, loop(webtransport_terminate_session(State, Stream)); - %% Ignore unknown/unhandled capsules. + more -> + loop(stream_store(State, Stream#stream{buffer=Data})); + %% Ignore unhandled/unknown capsules. {ok, _, Rest} -> parse1(State, Stream, Rest, IsFin); + {ok, Rest} -> + parse1(State, Stream, Rest, IsFin); + %% @todo Make the max length configurable? + {skip, Len} when Len =< 8192 -> + loop(stream_store(State, Stream#stream{ + status={webtransport_session, {ignore, Len}}})); + {skip, Len} -> + %% @todo What should be done on capsule error? + error({todo, capsule_too_long, Len}); error -> %% @todo What should be done on capsule error? - error({todo, capsule_error, Data}); - more -> - loop(stream_store(State, Stream#stream{buffer=Data})) + error({todo, capsule_error, Data}) + end; +parse1(State, Stream=#stream{status= + {webtransport_session, {ignore, Len}}}, Data, IsFin) -> + case Data of + <<_:Len/unit:8, Rest/bits>> -> + parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin); + _ -> + loop(stream_store(State, Stream#stream{ + status={webtransport_session, {ignore, Len - byte_size(Data)}}})) end; parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID, _}}, Data, IsFin) -> webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}), @@ -582,20 +602,14 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, %% Datagrams. -parse_datagram(State, Data) -> - case cow_http3:parse_int(Data) of - {QuarterID, Rest} -> - SessionID = QuarterID * 4, - case stream_get(State, SessionID) of - #stream{status=webtransport_session} -> - webtransport_event(State, SessionID, {datagram, Rest}), - loop(State); - _ -> - error(todo) %% @todo Might be a future WT session or an error. - end; - %% Ignore invalid datagrams. @todo Is that behavior correct? - more -> - loop(State) +parse_datagram(State, Data0) -> + {SessionID, Data} = cow_http3:parse_datagram(Data0), + case stream_get(State, SessionID) of + #stream{status={webtransport_session, _}} -> + webtransport_event(State, SessionID, {datagram, Data}), + loop(State); + _ -> + error(todo) %% @todo Might be a future WT session or an error. end. %% Erlang messages. @@ -774,7 +788,7 @@ commands(State0, Stream0=#stream{id=StreamID}, %% to terminate the stream properly. HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0), Stream = Stream1#stream{ - status=webtransport_session, + status={webtransport_session, normal}, state={cowboy_webtransport, WTState#{stream_state => StreamState}} }, %% @todo We must propagate the buffer to capsule handling if any. @@ -905,7 +919,7 @@ become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0}, webtransport_event(State, SessionID, Event) -> #stream{ - status=webtransport_session, + status={webtransport_session, _}, state={cowboy_webtransport, #{session_pid := SessionPid}} } = stream_get(State, SessionID), SessionPid ! {'$webtransport_event', Event}, @@ -913,7 +927,7 @@ webtransport_event(State, SessionID, Event) -> webtransport_commands(State, SessionID, Commands) -> case stream_get(State, SessionID) of - Session = #stream{status=webtransport_session} -> + Session = #stream{status={webtransport_session, _}} -> wt_commands(State, Session, Commands); %% The stream has been terminated, ignore pending commands. error -> @@ -947,9 +961,8 @@ wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) -> error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]}); wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [{send, datagram, Data}|Tail]) -> - QuarterID = SessionID div 4, %% @todo Add a function to cowboy_quicer. - case quicer:send_dgram(Conn, [cow_http3:encode_int(QuarterID), Data]) of + case quicer:send_dgram(Conn, cow_http3:datagram(SessionID, Data)) of {ok, _} -> wt_commands(State, Session, Tail) %% @todo Handle errors. @@ -1003,7 +1016,7 @@ webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machin streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) -> %% Reset/abort the WT streams. Streams = maps:filtermap(fun - (_, #stream{id=StreamID, status=webtransport_session}) + (_, #stream{id=StreamID, status={webtransport_session, _}}) when StreamID =:= SessionID -> %% We remove the session stream but do the shutdown outside this function. false; @@ -1031,7 +1044,7 @@ stream_peer_send_shutdown(State, StreamID) -> case stream_get(State, StreamID) of %% Cleanly terminating the CONNECT stream is equivalent %% to an application error code of 0 and empty message. - Stream = #stream{status=webtransport_session} -> + Stream = #stream{status={webtransport_session, _}} -> webtransport_event(State, StreamID, {closed, 0, <<>>}), %% Shutdown the CONNECT stream fully. %% @todo This needs a cowboy_quicer function. @@ -1235,7 +1248,7 @@ stream_closed(State=#state{opts=Opts, case maps:take(StreamID, Streams0) of %% In the WT session's case, streams will be %% removed in webtransport_terminate_session. - {Stream=#stream{status=webtransport_session}, _} -> + {Stream=#stream{status={webtransport_session, _}}, _} -> webtransport_event(State, StreamID, closed_abruptly), webtransport_terminate_session(State, Stream); {#stream{state=undefined}, Streams} -> diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl index 58382175..d4fffd78 100644 --- a/src/cowboy_quicer.erl +++ b/src/cowboy_quicer.erl @@ -203,7 +203,7 @@ handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) -> end, {data, StreamID, IsFin, Data}; %% @todo Match on Conn. -handle({quic, Data, Conn, Flags}) when is_integer(Flags) -> +handle({quic, Data, Conn, Flags}) when is_binary(Data), is_integer(Flags) -> {datagram, Data}; %% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED. handle({quic, new_stream, StreamRef, #{flags := Flags}}) -> diff --git a/test/draft_h3_webtransport_SUITE.erl b/test/draft_h3_webtransport_SUITE.erl index 756ec1b1..057a0f9c 100644 --- a/test/draft_h3_webtransport_SUITE.erl +++ b/test/draft_h3_webtransport_SUITE.erl @@ -130,10 +130,12 @@ application_protocol_negotiation(Config) -> doc("Applications can negotiate a protocol to use via WebTransport. " "(draft_webtrans_http3 3.4)"), %% Connect to the WebTransport server. + WTAvailableProtocols = cow_http_hd:wt_available_protocols([<<"foo">>, <<"bar">>]), #{ resp_headers := RespHeaders - } = do_webtransport_connect(Config, [{<<"wt-available-protocols">>, <<"foo, bar">>}]), - {<<"wt-protocol">>, <<"foo">>} = lists:keyfind(<<"wt-protocol">>, 1, RespHeaders), + } = do_webtransport_connect(Config, [{<<"wt-available-protocols">>, WTAvailableProtocols}]), + {<<"wt-protocol">>, WTProtocol} = lists:keyfind(<<"wt-protocol">>, 1, RespHeaders), + <<"foo">> = iolist_to_binary(cow_http_hd:parse_wt_protocol(WTProtocol)), ok. %% Both WT-Available-Protocols and WT-Protocol are Structured Fields [RFC8941]. WT-Available-Protocols is a List of Tokens, and WT-Protocol is a Token. The token in the WT-Protocol response header field MUST be one of the tokens listed in WT-Available-Protocols of the request. (3.4) diff --git a/test/handlers/wt_echo_h.erl b/test/handlers/wt_echo_h.erl index 12f9872e..b5ac3800 100644 --- a/test/handlers/wt_echo_h.erl +++ b/test/handlers/wt_echo_h.erl @@ -22,7 +22,7 @@ init(Req0, _) -> undefined -> Req0; [Protocol|_] -> - cowboy_req:set_resp_header(<<"wt-protocol">>, Protocol, Req0) + cowboy_req:set_resp_header(<<"wt-protocol">>, cow_http_hd:wt_protocol(Protocol), Req0) end, {cowboy_webtransport, Req, #{}}. @@ -97,9 +97,9 @@ webtransport_info({try_again, Event}, Streams) -> webtransport_handle(Event, Streams). terminate(Reason, Req, State=#{event_pid := EventPid}) -> - ?LOG("WT terminate ~p ~p ~p", [Reason, Req, State]), + ?LOG("WT terminate ~0p~n~0p~n~0p", [Reason, Req, State]), EventPid ! {'$wt_echo_h', terminate, Reason, Req, State}, ok; terminate(Reason, Req, State) -> - ?LOG("WT terminate ~p ~p ~p", [Reason, Req, State]), + ?LOG("WT terminate ~0p~n~0p~n~0p", [Reason, Req, State]), ok.