diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 44ef66e0..43a1ec6a 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -206,6 +206,9 @@ handle_quic_msg(State0=#state{opts=Opts}, Msg) -> {stream_closed, StreamID, ErrorCode} -> State = stream_closed(State0, StreamID, ErrorCode), loop(State); + {peer_send_shutdown, StreamID} -> + State = stream_peer_send_shutdown(State0, StreamID), + loop(State); closed -> %% @todo Different error reason if graceful? Reason = {socket_error, closed, 'The socket has been closed.'}, @@ -974,7 +977,7 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_cl wt_commands(State, Session, Tail) %% @todo Handle errors. end; -wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail]) +wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail]) when Cmd =:= close; element(1, Cmd) =:= close -> %% We must send a CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream. {AppCode, AppMsg} = case Cmd of @@ -986,13 +989,12 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail]) case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of ok -> %% @todo The endpoint MAY send a STOP_SENDING to indicate it is no longer reading from the CONNECT stream. - %% @todo We must then terminate the WT session. - %% @todo Because the handler is in a separate process - %% we must wait for it to stop and eventually - %% kill the process if it takes too long. - %% @todo This is because we initiated, not the remote endpoint. - %% @todo We must also close the streams that are still open, - %% and finally close the CONNECT stream (cleanly). + State = webtransport_terminate_session(State0, Session), + %% @todo Because the handler is in a separate process + %% we must wait for it to stop and eventually + %% kill the process if it takes too long. + %% @todo This is because we initiated, not the remote endpoint. + %% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it). wt_commands(State, Session, Tail) %% @todo Handle errors. end. @@ -1021,6 +1023,17 @@ webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machin lingering_streams=Lingering }. +stream_peer_send_shutdown(State, StreamID) -> + case stream_get(State, StreamID) of + %% Cleanly terminating the CONNECT stream is equivalent + %% to an application error code of 0 and empty message. + Stream = #stream{status=webtransport_session} -> + webtransport_event(State, StreamID, {closed, 0, <<>>}), + webtransport_terminate_session(State, Stream); + _ -> + State + end. + reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, Stream=#stream{id=StreamID}, Error) -> Reason = case Error of @@ -1216,6 +1229,10 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) -> stream_closed(State=#state{opts=Opts, streams=Streams0, children=Children0}, StreamID, ErrorCode) -> case maps:take(StreamID, Streams0) of + {Stream=#stream{status=webtransport_session}, Streams} -> + %% @todo Ensure that we don't double call in some cases. + webtransport_event(State, StreamID, closed_abruptly), + webtransport_terminate_session(State#state{streams=Streams}, Stream); {#stream{state=undefined}, Streams} -> %% Unidi stream has no handler/children. stream_closed1(State#state{streams=Streams}, StreamID); diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl index 81e16584..58382175 100644 --- a/src/cowboy_quicer.erl +++ b/src/cowboy_quicer.erl @@ -237,8 +237,9 @@ handle({quic, dgram_state_changed, _Conn, _Props}) -> %% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT handle({quic, transport_shutdown, _Conn, _Flags}) -> ok; -handle({quic, peer_send_shutdown, _StreamRef, undefined}) -> - ok; +handle({quic, peer_send_shutdown, StreamRef, undefined}) -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + {peer_send_shutdown, StreamID}; handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) -> ok; handle({quic, shutdown, _Conn, success}) -> diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl index d194e7e3..cdd98a7d 100644 --- a/src/cowboy_webtransport.erl +++ b/src/cowboy_webtransport.erl @@ -128,6 +128,8 @@ loop(State=#state{parent=Parent%, timeout_ref=TRef %% @todo Can there be groups of events? {'$webtransport_event', Event={closed, _, _}} -> terminate(State, HandlerState, Event); + {'$webtransport_event', Event=closed_abruptly} -> + terminate(State, HandlerState, Event); {'$webtransport_event', Event} -> handler_call(State, HandlerState, webtransport_handle, Event); %% Timeouts. diff --git a/test/draft_h3_webtransport_SUITE.erl b/test/draft_h3_webtransport_SUITE.erl index 5e70f99a..d962a2f0 100644 --- a/test/draft_h3_webtransport_SUITE.erl +++ b/test/draft_h3_webtransport_SUITE.erl @@ -430,9 +430,6 @@ drain_wt_session_continue_server(Config) -> %% * a CLOSE_WEBTRANSPORT_SESSION capsule is either sent or received. %% (6) -%% @todo connect_stream_closed_cleanly_client -%% @todo connect_stream_closed_abruptly_client - close_wt_session_client(Config) -> doc("The WT client can close a single session. (draft_webtrans_http3 4.6)"), %% Connect to the WebTransport server. @@ -510,8 +507,46 @@ wt_session_gone_client(Config) -> #{reason := webtransport_session_gone} = do_wait_stream_aborted(RemoteBidiStreamRef), ok. -%% @todo wt_session_gone_server -%% Upon learning that the session has been terminated, the endpoint MUST reset the send side and abort reading on the receive side of all of the streams associated with the session (see Section 2.4 of [RFC9000]) using the WEBTRANSPORT_SESSION_GONE error code; it MUST NOT send any new datagrams or open any new streams. (6) +wt_session_gone_server(Config) -> + doc("After the session has been terminated by the WT server, " + "the WT server must reset associated streams with the " + "WEBTRANSPORT_SESSION_GONE error code. (draft_webtrans_http3 4.6)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + connect_stream_ref := ConnectStreamRef, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a unidi stream. + {ok, LocalUnidiStreamRef} = quicer:start_stream(Conn, + #{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}), + {ok, _} = quicer:send(LocalUnidiStreamRef, + <<1:2, 16#54:14, 0:2, SessionID:6, "Hello">>), + %% Accept an identical unidi stream. + {unidi, RemoteUnidiStreamRef} = do_receive_new_stream(), + {nofin, <<1:2, 16#54:14, 0:2, SessionID:6>>} = do_receive_data(RemoteUnidiStreamRef), + {nofin, <<"Hello">>} = do_receive_data(RemoteUnidiStreamRef), + %% Create a bidi stream, send a special instruction + %% to make the server create another bidi stream. + {ok, LocalBidiStreamRef} = quicer:start_stream(Conn, #{}), + {ok, _} = quicer:send(LocalBidiStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:open_bidi">>), + %% Accept the bidi stream and receive the data. + {bidi, RemoteBidiStreamRef} = do_receive_new_stream(), + {nofin, <<1:2, 16#41:14, 0:2, SessionID:6>>} = do_receive_data(RemoteBidiStreamRef), + {ok, _} = quicer:send(RemoteBidiStreamRef, <<"Hello">>), + {nofin, <<"Hello">>} = do_receive_data(RemoteBidiStreamRef), + + %% Send a special instruction to make the server initiate the close. + {ok, _} = quicer:send(LocalBidiStreamRef, <<"TEST:close">>), + %% Receive the CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream. + CloseWTSessionCapsule = cow_capsule:close_wt_session(0, <<>>), + {fin, CloseWTSessionCapsule} = do_receive_data(ConnectStreamRef), + %% All streams from that WT session have been aborted. + #{reason := webtransport_session_gone} = do_wait_stream_aborted(LocalUnidiStreamRef), + #{reason := webtransport_session_gone} = do_wait_stream_aborted(RemoteUnidiStreamRef), + #{reason := webtransport_session_gone} = do_wait_stream_aborted(LocalBidiStreamRef), + #{reason := webtransport_session_gone} = do_wait_stream_aborted(RemoteBidiStreamRef), + ok. %% Application Error Message: A UTF-8 encoded error message string provided by the application closing the session. The message takes up the remainder of the capsule, and its length MUST NOT exceed 1024 bytes. (6) %% @todo What if it's larger? @@ -587,8 +622,85 @@ close_wt_session_app_code_msg_server(Config) -> %% If any additional stream data is received on the CONNECT stream after receiving a CLOSE_WEBTRANSPORT_SESSION capsule, the stream MUST be reset with code H3_MESSAGE_ERROR. (6) %% @todo close_wt_session_followed_by_data -%% Cleanly terminating a CONNECT stream without a CLOSE_WEBTRANSPORT_SESSION capsule SHALL be semantically equivalent to terminating it with a CLOSE_WEBTRANSPORT_SESSION capsule that has an error code of 0 and an empty error string. (6) -%% @todo connect_stream_closed_app_code_0_empty_msg +connect_stream_closed_cleanly_fin(Config) -> + doc("The WT client closing the CONNECT stream cleanly " + "is equivalent to a capsule with an application error code of 0 " + "and an empty error string. (draft_webtrans_http3 4.6)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + connect_stream_ref := ConnectStreamRef, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a bidi stream, send a special instruction to make it propagate events. + {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), + EventPidBin = term_to_binary(self()), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, + "TEST:event_pid:", EventPidBin/binary>>), + {nofin, <<"event_pid_received">>} = do_receive_data(LocalStreamRef), + %% Cleanly terminate the CONNECT stream. + {ok, _} = quicer:send(ConnectStreamRef, <<>>, ?QUIC_SEND_FLAG_FIN), + %% Receive the terminate event from the WT handler. + receive + {'$wt_echo_h', terminate, {closed, 0, <<>>}, _, _} -> + ok + after 1000 -> + error({timeout, waiting_for_terminate_event}) + end. + +connect_stream_closed_cleanly_shutdown(Config) -> + doc("The WT client closing the CONNECT stream cleanly " + "is equivalent to a capsule with an application error code of 0 " + "and an empty error string. (draft_webtrans_http3 4.6)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + connect_stream_ref := ConnectStreamRef, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a bidi stream, send a special instruction to make it propagate events. + {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), + EventPidBin = term_to_binary(self()), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, + "TEST:event_pid:", EventPidBin/binary>>), + {nofin, <<"event_pid_received">>} = do_receive_data(LocalStreamRef), + %% Cleanly terminate the CONNECT stream. + _ = quicer:shutdown_stream(ConnectStreamRef), + %% Receive the terminate event from the WT handler. + receive + {'$wt_echo_h', terminate, {closed, 0, <<>>}, _, _} -> + ok + after 1000 -> + error({timeout, waiting_for_terminate_event}) + end. + +connect_stream_closed_abruptly(Config) -> + doc("The WT client may close the CONNECT stream abruptly. " + "(draft_webtrans_http3 4.6)"), + %% Connect to the WebTransport server. + #{ + conn := Conn, + connect_stream_ref := ConnectStreamRef, + session_id := SessionID + } = do_webtransport_connect(Config), + %% Create a bidi stream, send a special instruction to make it propagate events. + {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), + EventPidBin = term_to_binary(self()), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, + "TEST:event_pid:", EventPidBin/binary>>), + {nofin, <<"event_pid_received">>} = do_receive_data(LocalStreamRef), + %% Abruptly terminate the CONNECT stream. + _ = quicer:shutdown_stream(ConnectStreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, + 0, infinity), + %% Receive the terminate event from the WT handler. + receive + %% @todo It would be good to forward a stream error as well + %% so that a WT error can be sent, but I have been unsuccessful. + {'$wt_echo_h', terminate, closed_abruptly, _, _} -> + ok + after 1000 -> + error({timeout, waiting_for_terminate_event}) + end. %% @todo This one is about gracefully closing HTTP/3 connection with WT sessions. %% the endpoint SHOULD wait until all CONNECT streams have been closed by the peer before sending the CONNECTION_CLOSE (6) diff --git a/test/handlers/wt_echo_h.erl b/test/handlers/wt_echo_h.erl index bb68e392..12f9872e 100644 --- a/test/handlers/wt_echo_h.erl +++ b/test/handlers/wt_echo_h.erl @@ -9,7 +9,15 @@ -export([webtransport_info/2]). -export([terminate/3]). +%% -define(DEBUG, 1). +-ifdef(DEBUG). +-define(LOG(Fmt, Args), ct:pal(Fmt, Args)). +-else. +-define(LOG(Fmt, Args), _ = Fmt, _ = Args, ok). +-endif. + init(Req0, _) -> + ?LOG("WT init ~p~n", [Req0]), Req = case cowboy_req:parse_header(<<"wt-available-protocols">>, Req0) of undefined -> Req0; @@ -21,16 +29,16 @@ init(Req0, _) -> %% @todo WT handle {stream_open,4,bidi} webtransport_handle(Event = {stream_open, StreamID, bidi}, Streams) -> - ct:pal("WT handle ~p~n", [Event]), + ?LOG("WT handle ~p~n", [Event]), {[], Streams#{StreamID => bidi}}; webtransport_handle(Event = {stream_open, StreamID, unidi}, Streams) -> - ct:pal("WT handle ~p~n", [Event]), + ?LOG("WT handle ~p~n", [Event]), OpenStreamRef = make_ref(), {[{open_stream, OpenStreamRef, unidi, <<>>}], Streams#{ StreamID => {unidi_remote, OpenStreamRef}, OpenStreamRef => {unidi_local, StreamID}}}; webtransport_handle(Event = {opened_stream_id, OpenStreamRef, OpenStreamID}, Streams) -> - ct:pal("WT handle ~p~n", [Event]), + ?LOG("WT handle ~p~n", [Event]), case Streams of #{OpenStreamRef := bidi} -> {[], maps:remove(OpenStreamRef, Streams#{ @@ -43,8 +51,8 @@ webtransport_handle(Event = {opened_stream_id, OpenStreamRef, OpenStreamID}, Str OpenStreamID => {unidi_local, RemoteStreamID} })} end; -webtransport_handle(Event = {stream_data, _StreamID, _IsFin, <<"TEST:", Test/bits>>}, Streams) -> - ct:pal("WT handle ~p~n", [Event]), +webtransport_handle(Event = {stream_data, StreamID, _IsFin, <<"TEST:", Test/bits>>}, Streams) -> + ?LOG("WT handle ~p~n", [Event]), case Test of <<"open_bidi">> -> OpenStreamRef = make_ref(), @@ -59,10 +67,11 @@ webtransport_handle(Event = {stream_data, _StreamID, _IsFin, <<"TEST:", Test/bit <<"close_app_code_msg">> -> {[{close, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams}; <<"event_pid:", EventPidBin/bits>> -> - {[], Streams#{event_pid => binary_to_term(EventPidBin)}} + {[{send, StreamID, nofin, <<"event_pid_received">>}], + Streams#{event_pid => binary_to_term(EventPidBin)}} end; webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) -> - ct:pal("WT handle ~p~n", [Event]), + ?LOG("WT handle ~p~n", [Event]), case Streams of #{StreamID := bidi} -> {[{send, StreamID, IsFin, Data}], Streams}; @@ -74,23 +83,23 @@ webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) -> {[{send, LocalStreamID, IsFin, Data}], Streams} end; webtransport_handle(Event = {datagram, Data}, Streams) -> - ct:pal("WT handle ~p~n", [Event]), + ?LOG("WT handle ~p~n", [Event]), {[{send, datagram, Data}], Streams}; webtransport_handle(Event = close_initiated, Streams) -> - ct:pal("WT handle ~p~n", [Event]), + ?LOG("WT handle ~p~n", [Event]), {[{send, datagram, <<"TEST:close_initiated">>}], Streams}; webtransport_handle(Event, Streams) -> - ct:pal("WT handle ignore ~p~n", [Event]), + ?LOG("WT handle ignore ~p~n", [Event]), {[], Streams}. webtransport_info({try_again, Event}, Streams) -> - ct:pal("try_again ~p", [Event]), + ?LOG("WT try_again ~p", [Event]), webtransport_handle(Event, Streams). terminate(Reason, Req, State=#{event_pid := EventPid}) -> - ct:pal("terminate ~p ~p ~p", [Reason, Req, State]), + ?LOG("WT terminate ~p ~p ~p", [Reason, Req, State]), EventPid ! {'$wt_echo_h', terminate, Reason, Req, State}, ok; terminate(Reason, Req, State) -> - ct:pal("terminate ~p ~p ~p", [Reason, Req, State]), + ?LOG("WT terminate ~p ~p ~p", [Reason, Req, State]), ok.