0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 04:10:24 +00:00

Start of drain/close WT session

This commit is contained in:
Loïc Hoguin 2025-06-02 15:55:45 +02:00
parent 2fe2edf4ff
commit 9a5caad2f1
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
4 changed files with 154 additions and 24 deletions

View file

@ -252,10 +252,21 @@ parse1(State=#state{http3_machine=HTTP3Machine0},
{error, Error={connection_error, _, _}, HTTP3Machine} ->
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end;
%% @todo WT status
parse1(State, Stream=#stream{status=webtransport_session}, Data, IsFin) ->
%% @todo HTTP Capsules.
error({todo, State, Stream, Data, IsFin});
%% @todo Handle when IsFin = fin which must terminate the WT session.
parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, IsFin) ->
case parse_capsule(Data) of
{ok, drain_wt_session, Rest} ->
webtransport_event(State, SessionID, close_initiated),
parse1(State, Stream, Rest, IsFin);
%% Ignore unknown/unhandled capsules.
{ok, _, Rest} ->
parse1(State, Stream, Rest, IsFin);
error ->
%% @todo What should be done on capsule error?
error({todo, capsule_error, Data});
more ->
loop(stream_store(State, Stream#stream{buffer=Data}))
end;
parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID, _}}, Data, IsFin) ->
webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}),
%% No need to store the stream again, WT streams don't get changed here.
@ -347,6 +358,36 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
end
end.
parse_capsule(<<2:2, 16#78ae:30, 0, Rest/bits>>) ->
{ok, drain_wt_session, Rest};
parse_capsule(<<1:2, 16#2843:14, Rest0/bits>>) when byte_size(Rest0) >= 5 ->
LenOrError = case Rest0 of
<<0:2, Len0:6, Rest1/bits>> ->
{Len0, Rest1};
<<1:2, Len0:14, Rest1/bits>> when Len0 =< 1028 ->
{Len0, Rest1};
%% AppCode is 4 bytes and AppMsg is up to 1024 bytes.
_ ->
error
end,
case LenOrError of
{Len1, Rest2} ->
AppMsgLen = Len1 - 4,
case Rest2 of
<<AppCode:32, AppMsg:AppMsgLen/unit:8, Rest/bits>> ->
{ok, {close_wt_session, AppCode, AppMsg}, Rest};
_ ->
more
end;
error ->
error
end;
parse_capsule(<<>>) ->
more;
parse_capsule(Data) ->
%% @todo Skip unknown capsules.
error({todo, unknown_capsule, Data}).
%% We may receive multiple frames in a single QUIC packet.
%% The FIN flag applies to the QUIC packet, not to the frame.
%% We must therefore only consider the frame to have a FIN
@ -933,14 +974,55 @@ wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) ->
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end;
wt_commands(State=#state{conn=Conn}, Session, [Cmd = {send, StreamID, IsFin, Data}|Tail]) ->
wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tail]) ->
%% @todo Check that StreamID belongs to Session.
case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of
ok ->
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end;
wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) ->
%% We must send a DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
%% @todo I don't think capsules should go over stream handlers.
Capsule = drain_wt_session_capsule(),
case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of
ok ->
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end;
wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
when Cmd =:= close; element(1, Cmd) =:= close ->
%% We must send a CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
{AppCode, AppMsg} = case Cmd of
close -> {0, <<>>};
{close, AppCode0} -> {AppCode0, <<>>};
{close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0}
end,
Capsule = close_wt_session_capsule(AppCode, AppMsg),
case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of
ok ->
%% @todo The endpoint MAY send a STOP_SENDING to indicate it is no longer reading from the CONNECT stream.
%% @todo We must then terminate the WT session.
%% @todo Because the handler is in a separate process
%% we must wait for it to stop and eventually
%% kill the process if it takes too long.
%% @todo This is because we initiated, not the remote endpoint.
%% @todo We must also close the streams that are still open,
%% and finally close the CONNECT stream (cleanly).
wt_commands(State, Session, Tail)
%% @todo Handle errors.
end.
%% @todo Where should I put capsules?
drain_wt_session_capsule() ->
<<2:2, 16#78ae:30, 0>>.
close_wt_session_capsule(AppCode, <<>>) ->
<<1:2, 16#2843:14, 4, AppCode:32>>;
close_wt_session_capsule(AppCode, AppMsg) ->
Len = 4 + iolist_size(AppMsg),
[<<1:2, 16#2843:14>>, cow_http3:encode_int(Len), <<AppCode:32>>, AppMsg].
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID}, Error) ->
Reason = case Error of

View file

@ -166,11 +166,12 @@ handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) ->
end.
handler_call_result(State0, HandlerState, Commands) ->
case commands(Commands, State0, []) of
case commands(Commands, State0, ok, []) of
{ok, State} ->
before_loop(State, HandlerState);
{stop, State} ->
terminate(State, HandlerState, stop);
%% @todo Do we need this here?
{Error = {error, _}, State} ->
terminate(State, HandlerState, Error)
end.
@ -179,26 +180,35 @@ handler_call_result(State0, HandlerState, Commands) ->
%% because we want to send everything into one message. Other commands are
%% processed immediately.
commands([], State, []) ->
{ok, State};
commands([], State=#state{id=SessionID, parent=Pid}, Commands) ->
commands([], State, Res, []) ->
{Res, State};
commands([], State=#state{id=SessionID, parent=Pid}, Res, Commands) ->
Pid ! {'$webtransport_commands', SessionID, lists:reverse(Commands)},
{ok, State};
{Res, State};
%% {open_stream, OpenStreamRef, StreamType, InitialData}.
commands([Command={open_stream, _, _, _}|Tail], State, Acc) ->
commands(Tail, State, [Command|Acc]);
commands([Command={open_stream, _, _, _}|Tail], State, Res, Acc) ->
commands(Tail, State, Res, [Command|Acc]);
%% {close_stream, StreamID, Code}.
commands([Command={close_stream, _, _}|Tail], State, Acc) ->
commands(Tail, State, [Command|Acc]);
commands([Command={close_stream, _, _}|Tail], State, Res, Acc) ->
commands(Tail, State, Res, [Command|Acc]);
%% @todo We must reject send to a remote unidi stream.
%% {send, StreamID | datagram, Data}.
commands([Command={send, _, _}|Tail], State, Acc) ->
commands(Tail, State, [Command|Acc]);
commands([Command={send, _, _}|Tail], State, Res, Acc) ->
commands(Tail, State, Res, [Command|Acc]);
%% {send, StreamID, IsFin, Data}.
commands([Command={send, _, _, _}|Tail], State, Acc) ->
commands(Tail, State, [Command|Acc]).
%% @todo send with IsFin
%% @todo stop, {error, Reason} probably. What to do about sending when asked to stop?
commands([Command={send, _, _, _}|Tail], State, Res, Acc) ->
commands(Tail, State, Res, [Command|Acc]);
%% initiate_close - DRAIN_WEBTRANSPORT_SESSION
commands([Command=initiate_close|Tail], State, Res, Acc) ->
commands(Tail, State, Res, [Command|Acc]);
%% close | {close, Code} | {close, Code, Msg} - CLOSE_WEBTRANSPORT_SESSION
%% @todo At this point the handler must not issue stream or send commands.
commands([Command=close|Tail], State, _, Acc) ->
commands(Tail, State, stop, [Command|Acc]);
commands([Command={close, _}|Tail], State, _, Acc) ->
commands(Tail, State, stop, [Command|Acc]);
commands([Command={close, _, _}|Tail], State, _, Acc) ->
commands(Tail, State, stop, [Command|Acc]).
%% @todo set_options (to increase number of streams? data amounts? or a flow command?)
%% @todo shutdown_reason if useful.

View file

@ -193,7 +193,8 @@ bidirectional_streams_server(Config) ->
conn := Conn,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Create a bidi stream, send a special instruction to make it create a bidi stream.
%% Create a bidi stream, send a special instruction
%% to make the server create another bidi stream.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:open_bidi">>),
%% Accept the bidi stream and receive the data.
@ -246,7 +247,38 @@ datagrams(Config) ->
%% An HTTP/3 GOAWAY frame is also a signal to applications to initiate shutdown for all WebTransport sessions. (4.6)
%% To shut down a single WebTransport session, either endpoint can send a DRAIN_WEBTRANSPORT_SESSION (0x78ae) capsule. (4.6)
drain_webtransport_session_client(Config) ->
doc("The WT client can initiate the close of a single session. "
"(draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Send the DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
%% @todo Capsule should be in its own module.
{ok, _} = quicer:send(ConnectStreamRef, <<2:2, 16#78ae:30, 0>>),
%% Receive a datagram indicating processing by the WT handler.
{datagram, SessionID, <<"TEST:close_initiated">>} = do_receive_datagram(Conn),
ok.
drain_webtransport_session_server(Config) ->
doc("The WT server can initiate the close of a single session. "
"(draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Create a bidi stream, send a special instruction to make it create a bidi stream.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:initiate_close">>),
%% Receive the DRAIN_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
%% @todo Capsule should be in its own module.
{nofin, <<2:2, 16#78ae:30, 0>>} = do_receive_data(ConnectStreamRef),
ok.
%% After sending or receiving either a DRAIN_WEBTRANSPORT_SESSION capsule or a HTTP/3 GOAWAY frame, an endpoint MAY continue using the session and MAY open new streams. The signal is intended for the application using WebTransport, which is expected to attempt to gracefully terminate the session as soon as possible. (4.6)
@ -412,6 +444,7 @@ do_webtransport_connect(Config, ExtraHeaders) ->
%% Done.
#{
conn => Conn,
connect_stream_ref => ConnectStreamRef,
session_id => SessionID,
resp_headers => DecodedResponse,
enc_or_dec1 => Unidi1,

View file

@ -42,13 +42,15 @@ webtransport_handle(Event = {opened_stream_id, OpenStreamRef, OpenStreamID}, Str
OpenStreamID => {unidi_local, RemoteStreamID}
})}
end;
webtransport_handle(Event = {stream_data, StreamID, IsFin, <<"TEST:", Test/bits>>}, Streams) ->
webtransport_handle(Event = {stream_data, _StreamID, _IsFin, <<"TEST:", Test/bits>>}, Streams) ->
ct:pal("WT handle ~p~n", [Event]),
case Test of
<<"open_bidi">> ->
OpenStreamRef = make_ref(),
{[{open_stream, OpenStreamRef, bidi, <<>>}],
Streams#{OpenStreamRef => bidi}}
Streams#{OpenStreamRef => bidi}};
<<"initiate_close">> ->
{[initiate_close], Streams}
end;
webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) ->
ct:pal("WT handle ~p~n", [Event]),
@ -65,6 +67,9 @@ webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) ->
webtransport_handle(Event = {datagram, Data}, Streams) ->
ct:pal("WT handle ~p~n", [Event]),
{[{send, datagram, Data}], Streams};
webtransport_handle(Event = close_initiated, Streams) ->
ct:pal("WT handle ~p~n", [Event]),
{[{send, datagram, <<"TEST:close_initiated">>}], Streams};
webtransport_handle(Event, Streams) ->
ct:pal("WT handle ignore ~p~n", [Event]),
{[], Streams}.