0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 04:10:24 +00:00
This commit is contained in:
Loïc Hoguin 2025-06-18 16:14:58 +02:00
parent 28b6ebe9e7
commit 3841edbd5e
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
4 changed files with 49 additions and 34 deletions

View file

@ -86,8 +86,9 @@
%% Whether the stream is currently in a special state.
status :: header | {unidi, control | encoder | decoder}
| normal | {data | ignore, non_neg_integer()} | stopping
| {webtransport_session, normal | {ignore, non_neg_integer()}}
%% @todo Is unidi | bidi useful to keep?
| webtransport_session | {webtransport_stream, cow_http3:stream_id(), unidi | bidi},
| {webtransport_stream, cow_http3:stream_id(), unidi | bidi},
%% Stream buffer.
buffer = <<>> :: binary(),
@ -256,7 +257,8 @@ parse1(State=#state{http3_machine=HTTP3Machine0},
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end;
%% @todo Handle when IsFin = fin which must terminate the WT session.
parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, IsFin) ->
parse1(State, Stream=#stream{id=SessionID, status=
{webtransport_session, normal}}, Data, IsFin) ->
case cow_capsule:parse(Data) of
{ok, wt_drain_session, Rest} ->
webtransport_event(State, SessionID, close_initiated),
@ -274,14 +276,32 @@ parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, I
%% @todo Don't crash, error out properly.
<<>> = Rest,
loop(webtransport_terminate_session(State, Stream));
%% Ignore unknown/unhandled capsules.
more ->
loop(stream_store(State, Stream#stream{buffer=Data}));
%% Ignore unhandled/unknown capsules.
{ok, _, Rest} ->
parse1(State, Stream, Rest, IsFin);
{ok, Rest} ->
parse1(State, Stream, Rest, IsFin);
%% @todo Make the max length configurable?
{skip, Len} when Len =< 8192 ->
loop(stream_store(State, Stream#stream{
status={webtransport_session, {ignore, Len}}}));
{skip, Len} ->
%% @todo What should be done on capsule error?
error({todo, capsule_too_long, Len});
error ->
%% @todo What should be done on capsule error?
error({todo, capsule_error, Data});
more ->
loop(stream_store(State, Stream#stream{buffer=Data}))
error({todo, capsule_error, Data})
end;
parse1(State, Stream=#stream{status=
{webtransport_session, {ignore, Len}}}, Data, IsFin) ->
case Data of
<<_:Len/unit:8, Rest/bits>> ->
parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin);
_ ->
loop(stream_store(State, Stream#stream{
status={webtransport_session, {ignore, Len - byte_size(Data)}}}))
end;
parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID, _}}, Data, IsFin) ->
webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}),
@ -582,20 +602,14 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
%% Datagrams.
parse_datagram(State, Data) ->
case cow_http3:parse_int(Data) of
{QuarterID, Rest} ->
SessionID = QuarterID * 4,
parse_datagram(State, Data0) ->
{SessionID, Data} = cow_http3:parse_datagram(Data0),
case stream_get(State, SessionID) of
#stream{status=webtransport_session} ->
webtransport_event(State, SessionID, {datagram, Rest}),
#stream{status={webtransport_session, _}} ->
webtransport_event(State, SessionID, {datagram, Data}),
loop(State);
_ ->
error(todo) %% @todo Might be a future WT session or an error.
end;
%% Ignore invalid datagrams. @todo Is that behavior correct?
more ->
loop(State)
end.
%% Erlang messages.
@ -774,7 +788,7 @@ commands(State0, Stream0=#stream{id=StreamID},
%% to terminate the stream properly.
HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0),
Stream = Stream1#stream{
status=webtransport_session,
status={webtransport_session, normal},
state={cowboy_webtransport, WTState#{stream_state => StreamState}}
},
%% @todo We must propagate the buffer to capsule handling if any.
@ -905,7 +919,7 @@ become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0},
webtransport_event(State, SessionID, Event) ->
#stream{
status=webtransport_session,
status={webtransport_session, _},
state={cowboy_webtransport, #{session_pid := SessionPid}}
} = stream_get(State, SessionID),
SessionPid ! {'$webtransport_event', Event},
@ -913,7 +927,7 @@ webtransport_event(State, SessionID, Event) ->
webtransport_commands(State, SessionID, Commands) ->
case stream_get(State, SessionID) of
Session = #stream{status=webtransport_session} ->
Session = #stream{status={webtransport_session, _}} ->
wt_commands(State, Session, Commands);
%% The stream has been terminated, ignore pending commands.
error ->
@ -947,9 +961,8 @@ wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) ->
error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]});
wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID},
[{send, datagram, Data}|Tail]) ->
QuarterID = SessionID div 4,
%% @todo Add a function to cowboy_quicer.
case quicer:send_dgram(Conn, [cow_http3:encode_int(QuarterID), Data]) of
case quicer:send_dgram(Conn, cow_http3:datagram(SessionID, Data)) of
{ok, _} ->
wt_commands(State, Session, Tail)
%% @todo Handle errors.
@ -1003,7 +1016,7 @@ webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machin
streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) ->
%% Reset/abort the WT streams.
Streams = maps:filtermap(fun
(_, #stream{id=StreamID, status=webtransport_session})
(_, #stream{id=StreamID, status={webtransport_session, _}})
when StreamID =:= SessionID ->
%% We remove the session stream but do the shutdown outside this function.
false;
@ -1031,7 +1044,7 @@ stream_peer_send_shutdown(State, StreamID) ->
case stream_get(State, StreamID) of
%% Cleanly terminating the CONNECT stream is equivalent
%% to an application error code of 0 and empty message.
Stream = #stream{status=webtransport_session} ->
Stream = #stream{status={webtransport_session, _}} ->
webtransport_event(State, StreamID, {closed, 0, <<>>}),
%% Shutdown the CONNECT stream fully.
%% @todo This needs a cowboy_quicer function.
@ -1235,7 +1248,7 @@ stream_closed(State=#state{opts=Opts,
case maps:take(StreamID, Streams0) of
%% In the WT session's case, streams will be
%% removed in webtransport_terminate_session.
{Stream=#stream{status=webtransport_session}, _} ->
{Stream=#stream{status={webtransport_session, _}}, _} ->
webtransport_event(State, StreamID, closed_abruptly),
webtransport_terminate_session(State, Stream);
{#stream{state=undefined}, Streams} ->

View file

@ -203,7 +203,7 @@ handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) ->
end,
{data, StreamID, IsFin, Data};
%% @todo Match on Conn.
handle({quic, Data, Conn, Flags}) when is_integer(Flags) ->
handle({quic, Data, Conn, Flags}) when is_binary(Data), is_integer(Flags) ->
{datagram, Data};
%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED.
handle({quic, new_stream, StreamRef, #{flags := Flags}}) ->

View file

@ -130,10 +130,12 @@ application_protocol_negotiation(Config) ->
doc("Applications can negotiate a protocol to use via WebTransport. "
"(draft_webtrans_http3 3.4)"),
%% Connect to the WebTransport server.
WTAvailableProtocols = cow_http_hd:wt_available_protocols([<<"foo">>, <<"bar">>]),
#{
resp_headers := RespHeaders
} = do_webtransport_connect(Config, [{<<"wt-available-protocols">>, <<"foo, bar">>}]),
{<<"wt-protocol">>, <<"foo">>} = lists:keyfind(<<"wt-protocol">>, 1, RespHeaders),
} = do_webtransport_connect(Config, [{<<"wt-available-protocols">>, WTAvailableProtocols}]),
{<<"wt-protocol">>, WTProtocol} = lists:keyfind(<<"wt-protocol">>, 1, RespHeaders),
<<"foo">> = iolist_to_binary(cow_http_hd:parse_wt_protocol(WTProtocol)),
ok.
%% Both WT-Available-Protocols and WT-Protocol are Structured Fields [RFC8941]. WT-Available-Protocols is a List of Tokens, and WT-Protocol is a Token. The token in the WT-Protocol response header field MUST be one of the tokens listed in WT-Available-Protocols of the request. (3.4)

View file

@ -22,7 +22,7 @@ init(Req0, _) ->
undefined ->
Req0;
[Protocol|_] ->
cowboy_req:set_resp_header(<<"wt-protocol">>, Protocol, Req0)
cowboy_req:set_resp_header(<<"wt-protocol">>, cow_http_hd:wt_protocol(Protocol), Req0)
end,
{cowboy_webtransport, Req, #{}}.
@ -97,9 +97,9 @@ webtransport_info({try_again, Event}, Streams) ->
webtransport_handle(Event, Streams).
terminate(Reason, Req, State=#{event_pid := EventPid}) ->
?LOG("WT terminate ~p ~p ~p", [Reason, Req, State]),
?LOG("WT terminate ~0p~n~0p~n~0p", [Reason, Req, State]),
EventPid ! {'$wt_echo_h', terminate, Reason, Req, State},
ok;
terminate(Reason, Req, State) ->
?LOG("WT terminate ~p ~p ~p", [Reason, Req, State]),
?LOG("WT terminate ~0p~n~0p~n~0p", [Reason, Req, State]),
ok.