0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

WIP drain

This commit is contained in:
Loïc Hoguin 2025-06-03 13:14:19 +02:00
parent 9a5caad2f1
commit af1692b4c6
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
2 changed files with 53 additions and 59 deletions

View file

@ -254,7 +254,7 @@ parse1(State=#state{http3_machine=HTTP3Machine0},
end; end;
%% @todo Handle when IsFin = fin which must terminate the WT session. %% @todo Handle when IsFin = fin which must terminate the WT session.
parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, IsFin) -> parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, IsFin) ->
case parse_capsule(Data) of case cow_capsule:parse(Data) of
{ok, drain_wt_session, Rest} -> {ok, drain_wt_session, Rest} ->
webtransport_event(State, SessionID, close_initiated), webtransport_event(State, SessionID, close_initiated),
parse1(State, Stream, Rest, IsFin); parse1(State, Stream, Rest, IsFin);
@ -358,36 +358,6 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
end end
end. end.
parse_capsule(<<2:2, 16#78ae:30, 0, Rest/bits>>) ->
{ok, drain_wt_session, Rest};
parse_capsule(<<1:2, 16#2843:14, Rest0/bits>>) when byte_size(Rest0) >= 5 ->
LenOrError = case Rest0 of
<<0:2, Len0:6, Rest1/bits>> ->
{Len0, Rest1};
<<1:2, Len0:14, Rest1/bits>> when Len0 =< 1028 ->
{Len0, Rest1};
%% AppCode is 4 bytes and AppMsg is up to 1024 bytes.
_ ->
error
end,
case LenOrError of
{Len1, Rest2} ->
AppMsgLen = Len1 - 4,
case Rest2 of
<<AppCode:32, AppMsg:AppMsgLen/unit:8, Rest/bits>> ->
{ok, {close_wt_session, AppCode, AppMsg}, Rest};
_ ->
more
end;
error ->
error
end;
parse_capsule(<<>>) ->
more;
parse_capsule(Data) ->
%% @todo Skip unknown capsules.
error({todo, unknown_capsule, Data}).
%% We may receive multiple frames in a single QUIC packet. %% We may receive multiple frames in a single QUIC packet.
%% The FIN flag applies to the QUIC packet, not to the frame. %% The FIN flag applies to the QUIC packet, not to the frame.
%% We must therefore only consider the frame to have a FIN %% We must therefore only consider the frame to have a FIN
@ -797,10 +767,6 @@ commands(State0, Stream0=#stream{id=StreamID},
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
Stream = stream_get(State, StreamID), Stream = stream_get(State, StreamID),
%% @todo For webtransport we want to stop handling this as a normal stream.
%% This becomes a stream that uses the capsule protocol
%% https://www.rfc-editor.org/rfc/rfc9297#name-the-capsule-protocol
%% and relates to a webtransport session (the request process).
commands(State, Stream, Tail); commands(State, Stream, Tail);
%% Set options dynamically. %% Set options dynamically.
commands(State, Stream, [{set_options, _Opts}|Tail]) -> commands(State, Stream, [{set_options, _Opts}|Tail]) ->
@ -984,7 +950,7 @@ wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tai
wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) -> wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) ->
%% We must send a DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream. %% We must send a DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
%% @todo I don't think capsules should go over stream handlers. %% @todo I don't think capsules should go over stream handlers.
Capsule = drain_wt_session_capsule(), Capsule = cow_capsule:drain_wt_session(),
case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of
ok -> ok ->
wt_commands(State, Session, Tail) wt_commands(State, Session, Tail)
@ -998,7 +964,7 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
{close, AppCode0} -> {AppCode0, <<>>}; {close, AppCode0} -> {AppCode0, <<>>};
{close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0} {close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0}
end, end,
Capsule = close_wt_session_capsule(AppCode, AppMsg), Capsule = cow_capsule:close_wt_session(AppCode, AppMsg),
case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of
ok -> ok ->
%% @todo The endpoint MAY send a STOP_SENDING to indicate it is no longer reading from the CONNECT stream. %% @todo The endpoint MAY send a STOP_SENDING to indicate it is no longer reading from the CONNECT stream.
@ -1013,16 +979,6 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
%% @todo Handle errors. %% @todo Handle errors.
end. end.
%% @todo Where should I put capsules?
drain_wt_session_capsule() ->
<<2:2, 16#78ae:30, 0>>.
close_wt_session_capsule(AppCode, <<>>) ->
<<1:2, 16#2843:14, 4, AppCode:32>>;
close_wt_session_capsule(AppCode, AppMsg) ->
Len = 4 + iolist_size(AppMsg),
[<<1:2, 16#2843:14>>, cow_http3:encode_int(Len), <<AppCode:32>>, AppMsg].
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID}, Error) -> Stream=#stream{id=StreamID}, Error) ->
Reason = case Error of Reason = case Error of

View file

@ -116,7 +116,7 @@ accept_session_when_enabled(Config) ->
%% Create a bidi stream, send Hello, get Hello back. %% Create a bidi stream, send Hello, get Hello back.
{ok, BidiStreamRef} = quicer:start_stream(Conn, #{}), {ok, BidiStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(BidiStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>), {ok, _} = quicer:send(BidiStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>),
{ok, <<"Hello">>} = rfc9114_SUITE:do_receive_data(BidiStreamRef), {nofin, <<"Hello">>} = do_receive_data(BidiStreamRef),
ok. ok.
%% If the server accepts 0-RTT, the server MUST NOT reduce the limit of maximum open WebTransport sessions from the one negotiated during the previous session; such change would be deemed incompatible, and MUST result in a H3_SETTINGS_ERROR connection error. (3.3) %% If the server accepts 0-RTT, the server MUST NOT reduce the limit of maximum open WebTransport sessions from the one negotiated during the previous session; such change would be deemed incompatible, and MUST result in a H3_SETTINGS_ERROR connection error. (3.3)
@ -181,8 +181,7 @@ bidirectional_streams_client(Config) ->
%% Create a bidi stream, send Hello, get Hello back. %% Create a bidi stream, send Hello, get Hello back.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>), {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>),
%% @todo Use the local do_receive_data instead to have the fin flag. {nofin, <<"Hello">>} = do_receive_data(LocalStreamRef),
{ok, <<"Hello">>} = rfc9114_SUITE:do_receive_data(LocalStreamRef),
ok. ok.
bidirectional_streams_server(Config) -> bidirectional_streams_server(Config) ->
@ -247,7 +246,7 @@ datagrams(Config) ->
%% An HTTP/3 GOAWAY frame is also a signal to applications to initiate shutdown for all WebTransport sessions. (4.6) %% An HTTP/3 GOAWAY frame is also a signal to applications to initiate shutdown for all WebTransport sessions. (4.6)
drain_webtransport_session_client(Config) -> drain_wt_session_client(Config) ->
doc("The WT client can initiate the close of a single session. " doc("The WT client can initiate the close of a single session. "
"(draft_webtrans_http3 4.6)"), "(draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server. %% Connect to the WebTransport server.
@ -257,13 +256,12 @@ drain_webtransport_session_client(Config) ->
session_id := SessionID session_id := SessionID
} = do_webtransport_connect(Config), } = do_webtransport_connect(Config),
%% Send the DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream. %% Send the DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
%% @todo Capsule should be in its own module. {ok, _} = quicer:send(ConnectStreamRef, cow_capsule:drain_wt_session()),
{ok, _} = quicer:send(ConnectStreamRef, <<2:2, 16#78ae:30, 0>>),
%% Receive a datagram indicating processing by the WT handler. %% Receive a datagram indicating processing by the WT handler.
{datagram, SessionID, <<"TEST:close_initiated">>} = do_receive_datagram(Conn), {datagram, SessionID, <<"TEST:close_initiated">>} = do_receive_datagram(Conn),
ok. ok.
drain_webtransport_session_server(Config) -> drain_wt_session_server(Config) ->
doc("The WT server can initiate the close of a single session. " doc("The WT server can initiate the close of a single session. "
"(draft_webtrans_http3 4.6)"), "(draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server. %% Connect to the WebTransport server.
@ -272,15 +270,55 @@ drain_webtransport_session_server(Config) ->
connect_stream_ref := ConnectStreamRef, connect_stream_ref := ConnectStreamRef,
session_id := SessionID session_id := SessionID
} = do_webtransport_connect(Config), } = do_webtransport_connect(Config),
%% Create a bidi stream, send a special instruction to make it create a bidi stream. %% Create a bidi stream, send a special instruction to make it initiate the close.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:initiate_close">>), {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:initiate_close">>),
%% Receive the DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream. %% Receive the DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
%% @todo Capsule should be in its own module. DrainWTSessionCapsule = cow_capsule:drain_wt_session(),
{nofin, <<2:2, 16#78ae:30, 0>>} = do_receive_data(ConnectStreamRef), {nofin, DrainWTSessionCapsule} = do_receive_data(ConnectStreamRef),
ok. ok.
%% After sending or receiving either a DRAIN_WEBTRANSPORT_SESSION capsule or a HTTP/3 GOAWAY frame, an endpoint MAY continue using the session and MAY open new streams. The signal is intended for the application using WebTransport, which is expected to attempt to gracefully terminate the session as soon as possible. (4.6) drain_wt_session_continue_client(Config) ->
doc("After the WT client has initiated the close of the session, "
"both client and server can continue using the session and "
"open new streams. (draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Send the DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
{ok, _} = quicer:send(ConnectStreamRef, cow_capsule:drain_wt_session()),
%% Receive a datagram indicating processing by the WT handler.
{datagram, SessionID, <<"TEST:close_initiated">>} = do_receive_datagram(Conn),
%% Create a new bidi stream, send Hello, get Hello back.
{ok, ContinueStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(ContinueStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>),
{nofin, <<"Hello">>} = do_receive_data(ContinueStreamRef),
ok.
drain_wt_session_continue_server(Config) ->
doc("After the WT server has initiated the close of the session, "
"both client and server can continue using the session and "
"open new streams. (draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Create a bidi stream, send a special instruction to make it initiate the close.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:initiate_close">>),
%% Receive the DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
DrainWTSessionCapsule = cow_capsule:drain_wt_session(),
{nofin, DrainWTSessionCapsule} = do_receive_data(ConnectStreamRef),
%% Create a new bidi stream, send Hello, get Hello back.
{ok, ContinueStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(ContinueStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>),
{nofin, <<"Hello">>} = do_receive_data(ContinueStreamRef),
ok.
%% @todo 4.7. Use of Keying Material Exporters %% @todo 4.7. Use of Keying Material Exporters
@ -426,7 +464,7 @@ do_webtransport_connect(Config, ExtraHeaders) ->
EncodedRequest EncodedRequest
]), ]),
%% Receive a 200 response. %% Receive a 200 response.
{ok, Data} = rfc9114_SUITE:do_receive_data(ConnectStreamRef), {nofin, Data} = do_receive_data(ConnectStreamRef),
{HLenEnc, HLenBits} = rfc9114_SUITE:do_guess_int_encoding(Data), {HLenEnc, HLenBits} = rfc9114_SUITE:do_guess_int_encoding(Data),
<< <<
1, %% HEADERS frame. 1, %% HEADERS frame.