0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Compare commits

...

2 commits

Author SHA1 Message Date
Loïc Hoguin
a8cc1d8144
Cleanup logs 2025-06-06 14:27:58 +02:00
Loïc Hoguin
ee6ef37643
More progress on close
Need to fix the crashes and we should be in a pretty good state
for PoC and whatnot.
2025-06-06 13:43:45 +02:00
5 changed files with 171 additions and 30 deletions

View file

@ -206,6 +206,9 @@ handle_quic_msg(State0=#state{opts=Opts}, Msg) ->
{stream_closed, StreamID, ErrorCode} -> {stream_closed, StreamID, ErrorCode} ->
State = stream_closed(State0, StreamID, ErrorCode), State = stream_closed(State0, StreamID, ErrorCode),
loop(State); loop(State);
{peer_send_shutdown, StreamID} ->
State = stream_peer_send_shutdown(State0, StreamID),
loop(State);
closed -> closed ->
%% @todo Different error reason if graceful? %% @todo Different error reason if graceful?
Reason = {socket_error, closed, 'The socket has been closed.'}, Reason = {socket_error, closed, 'The socket has been closed.'},
@ -974,7 +977,7 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_cl
wt_commands(State, Session, Tail) wt_commands(State, Session, Tail)
%% @todo Handle errors. %% @todo Handle errors.
end; end;
wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail]) wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
when Cmd =:= close; element(1, Cmd) =:= close -> when Cmd =:= close; element(1, Cmd) =:= close ->
%% We must send a CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream. %% We must send a CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
{AppCode, AppMsg} = case Cmd of {AppCode, AppMsg} = case Cmd of
@ -986,13 +989,12 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of
ok -> ok ->
%% @todo The endpoint MAY send a STOP_SENDING to indicate it is no longer reading from the CONNECT stream. %% @todo The endpoint MAY send a STOP_SENDING to indicate it is no longer reading from the CONNECT stream.
%% @todo We must then terminate the WT session. State = webtransport_terminate_session(State0, Session),
%% @todo Because the handler is in a separate process %% @todo Because the handler is in a separate process
%% we must wait for it to stop and eventually %% we must wait for it to stop and eventually
%% kill the process if it takes too long. %% kill the process if it takes too long.
%% @todo This is because we initiated, not the remote endpoint. %% @todo This is because we initiated, not the remote endpoint.
%% @todo We must also close the streams that are still open, %% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it).
%% and finally close the CONNECT stream (cleanly).
wt_commands(State, Session, Tail) wt_commands(State, Session, Tail)
%% @todo Handle errors. %% @todo Handle errors.
end. end.
@ -1021,6 +1023,17 @@ webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machin
lingering_streams=Lingering lingering_streams=Lingering
}. }.
stream_peer_send_shutdown(State, StreamID) ->
case stream_get(State, StreamID) of
%% Cleanly terminating the CONNECT stream is equivalent
%% to an application error code of 0 and empty message.
Stream = #stream{status=webtransport_session} ->
webtransport_event(State, StreamID, {closed, 0, <<>>}),
webtransport_terminate_session(State, Stream);
_ ->
State
end.
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID}, Error) -> Stream=#stream{id=StreamID}, Error) ->
Reason = case Error of Reason = case Error of
@ -1216,6 +1229,10 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
stream_closed(State=#state{opts=Opts, stream_closed(State=#state{opts=Opts,
streams=Streams0, children=Children0}, StreamID, ErrorCode) -> streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
case maps:take(StreamID, Streams0) of case maps:take(StreamID, Streams0) of
{Stream=#stream{status=webtransport_session}, Streams} ->
%% @todo Ensure that we don't double call in some cases.
webtransport_event(State, StreamID, closed_abruptly),
webtransport_terminate_session(State#state{streams=Streams}, Stream);
{#stream{state=undefined}, Streams} -> {#stream{state=undefined}, Streams} ->
%% Unidi stream has no handler/children. %% Unidi stream has no handler/children.
stream_closed1(State#state{streams=Streams}, StreamID); stream_closed1(State#state{streams=Streams}, StreamID);

View file

@ -237,8 +237,9 @@ handle({quic, dgram_state_changed, _Conn, _Props}) ->
%% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT %% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT
handle({quic, transport_shutdown, _Conn, _Flags}) -> handle({quic, transport_shutdown, _Conn, _Flags}) ->
ok; ok;
handle({quic, peer_send_shutdown, _StreamRef, undefined}) -> handle({quic, peer_send_shutdown, StreamRef, undefined}) ->
ok; {ok, StreamID} = quicer:get_stream_id(StreamRef),
{peer_send_shutdown, StreamID};
handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) -> handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) ->
ok; ok;
handle({quic, shutdown, _Conn, success}) -> handle({quic, shutdown, _Conn, success}) ->

View file

@ -128,6 +128,8 @@ loop(State=#state{parent=Parent%, timeout_ref=TRef
%% @todo Can there be groups of events? %% @todo Can there be groups of events?
{'$webtransport_event', Event={closed, _, _}} -> {'$webtransport_event', Event={closed, _, _}} ->
terminate(State, HandlerState, Event); terminate(State, HandlerState, Event);
{'$webtransport_event', Event=closed_abruptly} ->
terminate(State, HandlerState, Event);
{'$webtransport_event', Event} -> {'$webtransport_event', Event} ->
handler_call(State, HandlerState, webtransport_handle, Event); handler_call(State, HandlerState, webtransport_handle, Event);
%% Timeouts. %% Timeouts.

View file

@ -430,9 +430,6 @@ drain_wt_session_continue_server(Config) ->
%% * a CLOSE_WEBTRANSPORT_SESSION capsule is either sent or received. %% * a CLOSE_WEBTRANSPORT_SESSION capsule is either sent or received.
%% (6) %% (6)
%% @todo connect_stream_closed_cleanly_client
%% @todo connect_stream_closed_abruptly_client
close_wt_session_client(Config) -> close_wt_session_client(Config) ->
doc("The WT client can close a single session. (draft_webtrans_http3 4.6)"), doc("The WT client can close a single session. (draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server. %% Connect to the WebTransport server.
@ -510,8 +507,46 @@ wt_session_gone_client(Config) ->
#{reason := webtransport_session_gone} = do_wait_stream_aborted(RemoteBidiStreamRef), #{reason := webtransport_session_gone} = do_wait_stream_aborted(RemoteBidiStreamRef),
ok. ok.
%% @todo wt_session_gone_server wt_session_gone_server(Config) ->
%% Upon learning that the session has been terminated, the endpoint MUST reset the send side and abort reading on the receive side of all of the streams associated with the session (see Section 2.4 of [RFC9000]) using the WEBTRANSPORT_SESSION_GONE error code; it MUST NOT send any new datagrams or open any new streams. (6) doc("After the session has been terminated by the WT server, "
"the WT server must reset associated streams with the "
"WEBTRANSPORT_SESSION_GONE error code. (draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Create a unidi stream.
{ok, LocalUnidiStreamRef} = quicer:start_stream(Conn,
#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}),
{ok, _} = quicer:send(LocalUnidiStreamRef,
<<1:2, 16#54:14, 0:2, SessionID:6, "Hello">>),
%% Accept an identical unidi stream.
{unidi, RemoteUnidiStreamRef} = do_receive_new_stream(),
{nofin, <<1:2, 16#54:14, 0:2, SessionID:6>>} = do_receive_data(RemoteUnidiStreamRef),
{nofin, <<"Hello">>} = do_receive_data(RemoteUnidiStreamRef),
%% Create a bidi stream, send a special instruction
%% to make the server create another bidi stream.
{ok, LocalBidiStreamRef} = quicer:start_stream(Conn, #{}),
{ok, _} = quicer:send(LocalBidiStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:open_bidi">>),
%% Accept the bidi stream and receive the data.
{bidi, RemoteBidiStreamRef} = do_receive_new_stream(),
{nofin, <<1:2, 16#41:14, 0:2, SessionID:6>>} = do_receive_data(RemoteBidiStreamRef),
{ok, _} = quicer:send(RemoteBidiStreamRef, <<"Hello">>),
{nofin, <<"Hello">>} = do_receive_data(RemoteBidiStreamRef),
%% Send a special instruction to make the server initiate the close.
{ok, _} = quicer:send(LocalBidiStreamRef, <<"TEST:close">>),
%% Receive the CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
CloseWTSessionCapsule = cow_capsule:close_wt_session(0, <<>>),
{fin, CloseWTSessionCapsule} = do_receive_data(ConnectStreamRef),
%% All streams from that WT session have been aborted.
#{reason := webtransport_session_gone} = do_wait_stream_aborted(LocalUnidiStreamRef),
#{reason := webtransport_session_gone} = do_wait_stream_aborted(RemoteUnidiStreamRef),
#{reason := webtransport_session_gone} = do_wait_stream_aborted(LocalBidiStreamRef),
#{reason := webtransport_session_gone} = do_wait_stream_aborted(RemoteBidiStreamRef),
ok.
%% Application Error Message: A UTF-8 encoded error message string provided by the application closing the session. The message takes up the remainder of the capsule, and its length MUST NOT exceed 1024 bytes. (6) %% Application Error Message: A UTF-8 encoded error message string provided by the application closing the session. The message takes up the remainder of the capsule, and its length MUST NOT exceed 1024 bytes. (6)
%% @todo What if it's larger? %% @todo What if it's larger?
@ -587,8 +622,85 @@ close_wt_session_app_code_msg_server(Config) ->
%% If any additional stream data is received on the CONNECT stream after receiving a CLOSE_WEBTRANSPORT_SESSION capsule, the stream MUST be reset with code H3_MESSAGE_ERROR. (6) %% If any additional stream data is received on the CONNECT stream after receiving a CLOSE_WEBTRANSPORT_SESSION capsule, the stream MUST be reset with code H3_MESSAGE_ERROR. (6)
%% @todo close_wt_session_followed_by_data %% @todo close_wt_session_followed_by_data
%% Cleanly terminating a CONNECT stream without a CLOSE_WEBTRANSPORT_SESSION capsule SHALL be semantically equivalent to terminating it with a CLOSE_WEBTRANSPORT_SESSION capsule that has an error code of 0 and an empty error string. (6) connect_stream_closed_cleanly_fin(Config) ->
%% @todo connect_stream_closed_app_code_0_empty_msg doc("The WT client closing the CONNECT stream cleanly "
"is equivalent to a capsule with an application error code of 0 "
"and an empty error string. (draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Create a bidi stream, send a special instruction to make it propagate events.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
EventPidBin = term_to_binary(self()),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6,
"TEST:event_pid:", EventPidBin/binary>>),
{nofin, <<"event_pid_received">>} = do_receive_data(LocalStreamRef),
%% Cleanly terminate the CONNECT stream.
{ok, _} = quicer:send(ConnectStreamRef, <<>>, ?QUIC_SEND_FLAG_FIN),
%% Receive the terminate event from the WT handler.
receive
{'$wt_echo_h', terminate, {closed, 0, <<>>}, _, _} ->
ok
after 1000 ->
error({timeout, waiting_for_terminate_event})
end.
connect_stream_closed_cleanly_shutdown(Config) ->
doc("The WT client closing the CONNECT stream cleanly "
"is equivalent to a capsule with an application error code of 0 "
"and an empty error string. (draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Create a bidi stream, send a special instruction to make it propagate events.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
EventPidBin = term_to_binary(self()),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6,
"TEST:event_pid:", EventPidBin/binary>>),
{nofin, <<"event_pid_received">>} = do_receive_data(LocalStreamRef),
%% Cleanly terminate the CONNECT stream.
_ = quicer:shutdown_stream(ConnectStreamRef),
%% Receive the terminate event from the WT handler.
receive
{'$wt_echo_h', terminate, {closed, 0, <<>>}, _, _} ->
ok
after 1000 ->
error({timeout, waiting_for_terminate_event})
end.
connect_stream_closed_abruptly(Config) ->
doc("The WT client may close the CONNECT stream abruptly. "
"(draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Create a bidi stream, send a special instruction to make it propagate events.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
EventPidBin = term_to_binary(self()),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6,
"TEST:event_pid:", EventPidBin/binary>>),
{nofin, <<"event_pid_received">>} = do_receive_data(LocalStreamRef),
%% Abruptly terminate the CONNECT stream.
_ = quicer:shutdown_stream(ConnectStreamRef, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT,
0, infinity),
%% Receive the terminate event from the WT handler.
receive
%% @todo It would be good to forward a stream error as well
%% so that a WT error can be sent, but I have been unsuccessful.
{'$wt_echo_h', terminate, closed_abruptly, _, _} ->
ok
after 1000 ->
error({timeout, waiting_for_terminate_event})
end.
%% @todo This one is about gracefully closing HTTP/3 connection with WT sessions. %% @todo This one is about gracefully closing HTTP/3 connection with WT sessions.
%% the endpoint SHOULD wait until all CONNECT streams have been closed by the peer before sending the CONNECTION_CLOSE (6) %% the endpoint SHOULD wait until all CONNECT streams have been closed by the peer before sending the CONNECTION_CLOSE (6)

View file

@ -9,7 +9,15 @@
-export([webtransport_info/2]). -export([webtransport_info/2]).
-export([terminate/3]). -export([terminate/3]).
%% -define(DEBUG, 1).
-ifdef(DEBUG).
-define(LOG(Fmt, Args), ct:pal(Fmt, Args)).
-else.
-define(LOG(Fmt, Args), _ = Fmt, _ = Args, ok).
-endif.
init(Req0, _) -> init(Req0, _) ->
?LOG("WT init ~p~n", [Req0]),
Req = case cowboy_req:parse_header(<<"wt-available-protocols">>, Req0) of Req = case cowboy_req:parse_header(<<"wt-available-protocols">>, Req0) of
undefined -> undefined ->
Req0; Req0;
@ -21,16 +29,16 @@ init(Req0, _) ->
%% @todo WT handle {stream_open,4,bidi} %% @todo WT handle {stream_open,4,bidi}
webtransport_handle(Event = {stream_open, StreamID, bidi}, Streams) -> webtransport_handle(Event = {stream_open, StreamID, bidi}, Streams) ->
ct:pal("WT handle ~p~n", [Event]), ?LOG("WT handle ~p~n", [Event]),
{[], Streams#{StreamID => bidi}}; {[], Streams#{StreamID => bidi}};
webtransport_handle(Event = {stream_open, StreamID, unidi}, Streams) -> webtransport_handle(Event = {stream_open, StreamID, unidi}, Streams) ->
ct:pal("WT handle ~p~n", [Event]), ?LOG("WT handle ~p~n", [Event]),
OpenStreamRef = make_ref(), OpenStreamRef = make_ref(),
{[{open_stream, OpenStreamRef, unidi, <<>>}], Streams#{ {[{open_stream, OpenStreamRef, unidi, <<>>}], Streams#{
StreamID => {unidi_remote, OpenStreamRef}, StreamID => {unidi_remote, OpenStreamRef},
OpenStreamRef => {unidi_local, StreamID}}}; OpenStreamRef => {unidi_local, StreamID}}};
webtransport_handle(Event = {opened_stream_id, OpenStreamRef, OpenStreamID}, Streams) -> webtransport_handle(Event = {opened_stream_id, OpenStreamRef, OpenStreamID}, Streams) ->
ct:pal("WT handle ~p~n", [Event]), ?LOG("WT handle ~p~n", [Event]),
case Streams of case Streams of
#{OpenStreamRef := bidi} -> #{OpenStreamRef := bidi} ->
{[], maps:remove(OpenStreamRef, Streams#{ {[], maps:remove(OpenStreamRef, Streams#{
@ -43,8 +51,8 @@ webtransport_handle(Event = {opened_stream_id, OpenStreamRef, OpenStreamID}, Str
OpenStreamID => {unidi_local, RemoteStreamID} OpenStreamID => {unidi_local, RemoteStreamID}
})} })}
end; end;
webtransport_handle(Event = {stream_data, _StreamID, _IsFin, <<"TEST:", Test/bits>>}, Streams) -> webtransport_handle(Event = {stream_data, StreamID, _IsFin, <<"TEST:", Test/bits>>}, Streams) ->
ct:pal("WT handle ~p~n", [Event]), ?LOG("WT handle ~p~n", [Event]),
case Test of case Test of
<<"open_bidi">> -> <<"open_bidi">> ->
OpenStreamRef = make_ref(), OpenStreamRef = make_ref(),
@ -59,10 +67,11 @@ webtransport_handle(Event = {stream_data, _StreamID, _IsFin, <<"TEST:", Test/bit
<<"close_app_code_msg">> -> <<"close_app_code_msg">> ->
{[{close, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams}; {[{close, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams};
<<"event_pid:", EventPidBin/bits>> -> <<"event_pid:", EventPidBin/bits>> ->
{[], Streams#{event_pid => binary_to_term(EventPidBin)}} {[{send, StreamID, nofin, <<"event_pid_received">>}],
Streams#{event_pid => binary_to_term(EventPidBin)}}
end; end;
webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) -> webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) ->
ct:pal("WT handle ~p~n", [Event]), ?LOG("WT handle ~p~n", [Event]),
case Streams of case Streams of
#{StreamID := bidi} -> #{StreamID := bidi} ->
{[{send, StreamID, IsFin, Data}], Streams}; {[{send, StreamID, IsFin, Data}], Streams};
@ -74,23 +83,23 @@ webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) ->
{[{send, LocalStreamID, IsFin, Data}], Streams} {[{send, LocalStreamID, IsFin, Data}], Streams}
end; end;
webtransport_handle(Event = {datagram, Data}, Streams) -> webtransport_handle(Event = {datagram, Data}, Streams) ->
ct:pal("WT handle ~p~n", [Event]), ?LOG("WT handle ~p~n", [Event]),
{[{send, datagram, Data}], Streams}; {[{send, datagram, Data}], Streams};
webtransport_handle(Event = close_initiated, Streams) -> webtransport_handle(Event = close_initiated, Streams) ->
ct:pal("WT handle ~p~n", [Event]), ?LOG("WT handle ~p~n", [Event]),
{[{send, datagram, <<"TEST:close_initiated">>}], Streams}; {[{send, datagram, <<"TEST:close_initiated">>}], Streams};
webtransport_handle(Event, Streams) -> webtransport_handle(Event, Streams) ->
ct:pal("WT handle ignore ~p~n", [Event]), ?LOG("WT handle ignore ~p~n", [Event]),
{[], Streams}. {[], Streams}.
webtransport_info({try_again, Event}, Streams) -> webtransport_info({try_again, Event}, Streams) ->
ct:pal("try_again ~p", [Event]), ?LOG("WT try_again ~p", [Event]),
webtransport_handle(Event, Streams). webtransport_handle(Event, Streams).
terminate(Reason, Req, State=#{event_pid := EventPid}) -> terminate(Reason, Req, State=#{event_pid := EventPid}) ->
ct:pal("terminate ~p ~p ~p", [Reason, Req, State]), ?LOG("WT terminate ~p ~p ~p", [Reason, Req, State]),
EventPid ! {'$wt_echo_h', terminate, Reason, Req, State}, EventPid ! {'$wt_echo_h', terminate, Reason, Req, State},
ok; ok;
terminate(Reason, Req, State) -> terminate(Reason, Req, State) ->
ct:pal("terminate ~p ~p ~p", [Reason, Req, State]), ?LOG("WT terminate ~p ~p ~p", [Reason, Req, State]),
ok. ok.