diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index f08bb15e..44ef66e0 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -258,6 +258,19 @@ parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, I {ok, drain_wt_session, Rest} -> webtransport_event(State, SessionID, close_initiated), parse1(State, Stream, Rest, IsFin); + {ok, {close_wt_session, AppCode, AppMsg}, Rest} -> + %% This event will be handled specially and lead + %% to the termination of the session process. + webtransport_event(State, SessionID, {closed, AppCode, AppMsg}), + %% Shutdown the CONNECT stream immediately. + %% @todo Do this in webtransport_terminate_session? + ConnectStreamRef = get({quicer_stream, SessionID}), + _ = quicer:shutdown_stream(ConnectStreamRef), + %% @todo Will we receive a {stream_closed,...} after that? + %% If any data is received past that point this is an error. + %% @todo Don't crash, error out properly. + <<>> = Rest, + loop(webtransport_terminate_session(State, Stream)); %% Ignore unknown/unhandled capsules. {ok, _, Rest} -> parse1(State, Stream, Rest, IsFin); @@ -896,8 +909,13 @@ webtransport_event(State, SessionID, Event) -> ok. webtransport_commands(State, SessionID, Commands) -> - Session = #stream{status=webtransport_session} = stream_get(State, SessionID), - wt_commands(State, Session, Commands). + case stream_get(State, SessionID) of + Session = #stream{status=webtransport_session} -> + wt_commands(State, Session, Commands); + %% The stream has been terminated, ignore pending commands. + error -> + State + end. wt_commands(State, _, []) -> State; @@ -979,6 +997,30 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail]) %% @todo Handle errors. end. +webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0, + streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) -> + %% Reset/abort the WT streams. + Streams = maps:filtermap(fun + (StreamID, #stream{status={webtransport_stream, StreamSessionID, _}}) + when SessionID =:= StreamSessionID -> + cowboy_quicer:shutdown_stream(Conn, StreamID, + both, cow_http3:error_to_code(webtransport_session_gone)), + false; + (_, _) -> + true + end, Streams0), + %% Keep the streams in lingering state. + %% We only keep up to 100 streams in this state. @todo Make it configurable? + Terminated = maps:keys(Streams0) -- maps:keys(Streams), + Lingering = lists:sublist(Terminated ++ Lingering0, 100), + %% Update the HTTP3 state machine. + HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0), + State#state{ + http3_machine=HTTP3Machine, + streams=Streams, + lingering_streams=Lingering + }. + reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, Stream=#stream{id=StreamID}, Error) -> Reason = case Error of @@ -1048,6 +1090,7 @@ terminate_stream(State=#state{streams=Streams0, children=Children0}, stream_linger(State#state{streams=Streams, children=Children}, StreamID). %% We must dereference the stream state when WebTransport is in use. +%% @todo Do this here or in cowboy_webtransport? terminate_stream_handler(State, StreamID, Reason, {cowboy_webtransport, #{stream_state := StreamState}}) -> terminate_stream_handler(State, StreamID, Reason, StreamState); @@ -1151,6 +1194,18 @@ stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, Stream = #stream{id=StreamID, status=Status}, State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}. +%% @todo Special handling WT session? +%% +%% cowboy_stream:init as normal +%% upgrade/switch_protocol to cowboy_webtransport +%% - WT handler can stop via command +%% - WT process can crash/exit +%% - Client can send capsule on CONNECT stream to close +%% - Client can terminate CONNECT stream (gracefully/abruptly) +%% - Server can close the connection (gracefully/abruptly) +%% - Client can close the connection (gracefully/abruptly) +%% Note that GOAWAY must not stop the WT session + %% Stream closed message for a local (write-only) unidi stream. stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) -> stream_closed1(State, StreamID); diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl index 915f31a3..81e16584 100644 --- a/src/cowboy_quicer.erl +++ b/src/cowboy_quicer.erl @@ -179,6 +179,7 @@ shutdown_stream(_Conn, StreamID, Dir, ErrorCode) -> _ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity), ok. +%% @todo Are these flags correct for what we want? shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT; shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE. diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl index 437ab7d4..d194e7e3 100644 --- a/src/cowboy_webtransport.erl +++ b/src/cowboy_webtransport.erl @@ -27,7 +27,6 @@ -export([upgrade/4]). -export([upgrade/5]). --export([terminate/3]). -type opts() :: #{ %% @todo @@ -127,6 +126,8 @@ loop(State=#state{parent=Parent%, timeout_ref=TRef receive %% @todo Parent to filter messages? Nothing? %% @todo Can there be groups of events? + {'$webtransport_event', Event={closed, _, _}} -> + terminate(State, HandlerState, Event); {'$webtransport_event', Event} -> handler_call(State, HandlerState, webtransport_handle, Event); %% Timeouts. @@ -213,22 +214,16 @@ commands([Command={close, _, _}|Tail], State, _, Acc) -> %% @todo set_options (to increase number of streams? data amounts? or a flow command?) %% @todo shutdown_reason if useful. -terminate(State=#state{req=Req}, HandlerState, Reason) -> - %cowboy_stream:terminate(StreamID, Reason, StreamState) -%% @todo This terminate is at the connection level. -% handler_terminate(State, HandlerState, Reason), +terminate(State, HandlerState, Reason) -> + handler_terminate(State, HandlerState, Reason), % case Shutdown of % normal -> exit(normal); % _ -> exit({shutdown, Shutdown}) % end. -% exit(normal). -%handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> -% cowboy_handler:terminate(Reason, Req, HandlerState, Handler). - %% @todo I think we must call terminate ourselves. - {ok, Req, Reason}. - - + exit(normal). +handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> + cowboy_handler:terminate(Reason, Req, HandlerState, Handler). diff --git a/test/draft_h3_webtransport_SUITE.erl b/test/draft_h3_webtransport_SUITE.erl index 2d5d967e..b271f590 100644 --- a/test/draft_h3_webtransport_SUITE.erl +++ b/test/draft_h3_webtransport_SUITE.erl @@ -431,7 +431,30 @@ drain_wt_session_continue_server(Config) -> %% @todo connect_stream_closed_cleanly_client %% @todo connect_stream_closed_abruptly_client -%% @todo close_wt_session_client + +close_wt_session_client(Config) -> + doc("The WT client can close a single session. (draft_webtrans_http3 4.6)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + connect_stream_ref := ConnectStreamRef, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Send the CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream. + {ok, _} = quicer:send(ConnectStreamRef, + cow_capsule:close_wt_session(0, <<>>), + ?QUIC_SEND_FLAG_FIN), + %% Normally we should also stop reading but in order to detect + %% that the server stops the stream we must not otherwise the + %% stream will be de facto closed on our end. + %% + %% The recipient must close or reset the stream in response. + receive + {quic, stream_closed, ConnectStreamRef, _} -> + ok + after 1000 -> + error({timeout, waiting_for_stream_closed}) + end. close_wt_session_server(Config) -> doc("The WT server can close a single session. (draft_webtrans_http3 4.6)"), @@ -459,6 +482,32 @@ close_wt_session_server(Config) -> %% Application Error Message: A UTF-8 encoded error message string provided by the application closing the session. The message takes up the remainder of the capsule, and its length MUST NOT exceed 1024 bytes. (6) %% @todo close_wt_session_app_code_msg_client +close_wt_session_app_code_msg_client(Config) -> + doc("The WT client can close a single session with an application error code " + "and an application error message. (draft_webtrans_http3 4.6)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + connect_stream_ref := ConnectStreamRef, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a bidi stream, send a special instruction to make it propagate events. + {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), + EventPidBin = term_to_binary(self()), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, + "TEST:event_pid:", EventPidBin/binary>>), + %% Send the CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream. + {ok, _} = quicer:send(ConnectStreamRef, + cow_capsule:close_wt_session(17, <<"seventeen">>), + ?QUIC_SEND_FLAG_FIN), + %% @todo Stop reading from the CONNECt stream too. (STOP_SENDING) + %% Receive the terminate event from the WT handler. + receive + {'$wt_echo_h', terminate, {closed, 17, <<"seventeen">>}, _, _} -> + ok + after 1000 -> + error({timeout, waiting_for_terminate_event}) + end. close_wt_session_app_code_server(Config) -> doc("The WT server can close a single session with an application error code. " diff --git a/test/handlers/wt_echo_h.erl b/test/handlers/wt_echo_h.erl index c533b9ee..bb68e392 100644 --- a/test/handlers/wt_echo_h.erl +++ b/test/handlers/wt_echo_h.erl @@ -7,6 +7,7 @@ -export([init/2]). -export([webtransport_handle/2]). -export([webtransport_info/2]). +-export([terminate/3]). init(Req0, _) -> Req = case cowboy_req:parse_header(<<"wt-available-protocols">>, Req0) of @@ -56,7 +57,9 @@ webtransport_handle(Event = {stream_data, _StreamID, _IsFin, <<"TEST:", Test/bit <<"close_app_code">> -> {[{close, 1234567890}], Streams}; <<"close_app_code_msg">> -> - {[{close, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams} + {[{close, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams}; + <<"event_pid:", EventPidBin/bits>> -> + {[], Streams#{event_pid => binary_to_term(EventPidBin)}} end; webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) -> ct:pal("WT handle ~p~n", [Event]), @@ -83,3 +86,11 @@ webtransport_handle(Event, Streams) -> webtransport_info({try_again, Event}, Streams) -> ct:pal("try_again ~p", [Event]), webtransport_handle(Event, Streams). + +terminate(Reason, Req, State=#{event_pid := EventPid}) -> + ct:pal("terminate ~p ~p ~p", [Reason, Req, State]), + EventPid ! {'$wt_echo_h', terminate, Reason, Req, State}, + ok; +terminate(Reason, Req, State) -> + ct:pal("terminate ~p ~p ~p", [Reason, Req, State]), + ok.