From 804dbb77a6d3085039abac69a347348780c631bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 10 Jun 2025 13:31:51 +0200 Subject: [PATCH] WIP close --- src/cowboy_http3.erl | 23 ++++++++++++++--------- src/cowboy_webtransport.erl | 31 ++++++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 43a1ec6a..21729073 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -1003,8 +1003,12 @@ webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machin streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) -> %% Reset/abort the WT streams. Streams = maps:filtermap(fun + (_, #stream{id=StreamID, status=webtransport_session}) + when StreamID =:= SessionID -> + %% We remove the session stream but do the shutdown outside this function. + false; (StreamID, #stream{status={webtransport_stream, StreamSessionID, _}}) - when SessionID =:= StreamSessionID -> + when StreamSessionID =:= SessionID -> cowboy_quicer:shutdown_stream(Conn, StreamID, both, cow_http3:error_to_code(webtransport_session_gone)), false; @@ -1029,6 +1033,11 @@ stream_peer_send_shutdown(State, StreamID) -> %% to an application error code of 0 and empty message. Stream = #stream{status=webtransport_session} -> webtransport_event(State, StreamID, {closed, 0, <<>>}), + %% Shutdown the CONNECT stream fully. + %% @todo This needs a cowboy_quicer function. + %% @todo What error code should be used here? If any. + ConnectStreamRef = get({quicer_stream, StreamID}), + _ = quicer:shutdown_stream(ConnectStreamRef), webtransport_terminate_session(State, Stream); _ -> State @@ -1102,11 +1111,6 @@ terminate_stream(State=#state{streams=Streams0, children=Children0}, Children = cowboy_children:shutdown(Children0, StreamID), stream_linger(State#state{streams=Streams, children=Children}, StreamID). -%% We must dereference the stream state when WebTransport is in use. -%% @todo Do this here or in cowboy_webtransport? -terminate_stream_handler(State, StreamID, Reason, - {cowboy_webtransport, #{stream_state := StreamState}}) -> - terminate_stream_handler(State, StreamID, Reason, StreamState); terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> try cowboy_stream:terminate(StreamID, Reason, StreamState) @@ -1229,10 +1233,11 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) -> stream_closed(State=#state{opts=Opts, streams=Streams0, children=Children0}, StreamID, ErrorCode) -> case maps:take(StreamID, Streams0) of - {Stream=#stream{status=webtransport_session}, Streams} -> - %% @todo Ensure that we don't double call in some cases. + %% In the WT session's case, streams will be + %% removed in webtransport_terminate_session. + {Stream=#stream{status=webtransport_session}, _} -> webtransport_event(State, StreamID, closed_abruptly), - webtransport_terminate_session(State#state{streams=Streams}, Stream); + webtransport_terminate_session(State, Stream); {#stream{state=undefined}, Streams} -> %% Unidi stream has no handler/children. stream_closed1(State#state{streams=Streams}, StreamID); diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl index cdd98a7d..0695e6ae 100644 --- a/src/cowboy_webtransport.erl +++ b/src/cowboy_webtransport.erl @@ -28,6 +28,10 @@ -export([upgrade/4]). -export([upgrade/5]). +%% cowboy_stream. +-export([info/3]). +-export([terminate/3]). + -type opts() :: #{ %% @todo }. @@ -127,9 +131,9 @@ loop(State=#state{parent=Parent%, timeout_ref=TRef %% @todo Parent to filter messages? Nothing? %% @todo Can there be groups of events? {'$webtransport_event', Event={closed, _, _}} -> - terminate(State, HandlerState, Event); + terminate_proc(State, HandlerState, Event); {'$webtransport_event', Event=closed_abruptly} -> - terminate(State, HandlerState, Event); + terminate_proc(State, HandlerState, Event); {'$webtransport_event', Event} -> handler_call(State, HandlerState, webtransport_handle, Event); %% Timeouts. @@ -174,10 +178,10 @@ handler_call_result(State0, HandlerState, Commands) -> {ok, State} -> before_loop(State, HandlerState); {stop, State} -> - terminate(State, HandlerState, stop); + terminate_proc(State, HandlerState, stop); %% @todo Do we need this here? {Error = {error, _}, State} -> - terminate(State, HandlerState, Error) + terminate_proc(State, HandlerState, Error) end. %% We accumulate the commands that must be sent to the connection process @@ -216,7 +220,7 @@ commands([Command={close, _, _}|Tail], State, _, Acc) -> %% @todo set_options (to increase number of streams? data amounts? or a flow command?) %% @todo shutdown_reason if useful. -terminate(State, HandlerState, Reason) -> +terminate_proc(State, HandlerState, Reason) -> handler_terminate(State, HandlerState, Reason), % case Shutdown of % normal -> exit(normal); @@ -243,3 +247,20 @@ handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> %% webtransport_handle({datagram, Data}, HandlerState) %% webtransport_handle(goaway, HandlerState) %% webtransport_info(Message, HandlerState) + + + +%% cowboy_stream callbacks. +%% +%% We shortcut stream handlers but still need to process some events +%% such as process exiting or termination. We implement the relevant +%% callbacks here. Note that as far as WebTransport is concerned, +%% receiving stream data here would be an error therefore the data +%% callback is not implemented. + +info(StreamID, Msg, WTState=#{stream_state := StreamState0}) -> + {Commands, StreamState} = cowboy_stream:info(StreamID, Msg, StreamState0), + {Commands, WTState#{stream_state => StreamState}}. + +terminate(StreamID, Reason, #{stream_state := StreamState}) -> + cowboy_stream:terminate(StreamID, Reason, StreamState).