0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

WIP close

This commit is contained in:
Loïc Hoguin 2025-06-10 13:31:51 +02:00
parent a8cc1d8144
commit 804dbb77a6
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
2 changed files with 40 additions and 14 deletions

View file

@ -1003,8 +1003,12 @@ webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machin
streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) -> streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) ->
%% Reset/abort the WT streams. %% Reset/abort the WT streams.
Streams = maps:filtermap(fun Streams = maps:filtermap(fun
(_, #stream{id=StreamID, status=webtransport_session})
when StreamID =:= SessionID ->
%% We remove the session stream but do the shutdown outside this function.
false;
(StreamID, #stream{status={webtransport_stream, StreamSessionID, _}}) (StreamID, #stream{status={webtransport_stream, StreamSessionID, _}})
when SessionID =:= StreamSessionID -> when StreamSessionID =:= SessionID ->
cowboy_quicer:shutdown_stream(Conn, StreamID, cowboy_quicer:shutdown_stream(Conn, StreamID,
both, cow_http3:error_to_code(webtransport_session_gone)), both, cow_http3:error_to_code(webtransport_session_gone)),
false; false;
@ -1029,6 +1033,11 @@ stream_peer_send_shutdown(State, StreamID) ->
%% to an application error code of 0 and empty message. %% to an application error code of 0 and empty message.
Stream = #stream{status=webtransport_session} -> Stream = #stream{status=webtransport_session} ->
webtransport_event(State, StreamID, {closed, 0, <<>>}), webtransport_event(State, StreamID, {closed, 0, <<>>}),
%% Shutdown the CONNECT stream fully.
%% @todo This needs a cowboy_quicer function.
%% @todo What error code should be used here? If any.
ConnectStreamRef = get({quicer_stream, StreamID}),
_ = quicer:shutdown_stream(ConnectStreamRef),
webtransport_terminate_session(State, Stream); webtransport_terminate_session(State, Stream);
_ -> _ ->
State State
@ -1102,11 +1111,6 @@ terminate_stream(State=#state{streams=Streams0, children=Children0},
Children = cowboy_children:shutdown(Children0, StreamID), Children = cowboy_children:shutdown(Children0, StreamID),
stream_linger(State#state{streams=Streams, children=Children}, StreamID). stream_linger(State#state{streams=Streams, children=Children}, StreamID).
%% We must dereference the stream state when WebTransport is in use.
%% @todo Do this here or in cowboy_webtransport?
terminate_stream_handler(State, StreamID, Reason,
{cowboy_webtransport, #{stream_state := StreamState}}) ->
terminate_stream_handler(State, StreamID, Reason, StreamState);
terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
try try
cowboy_stream:terminate(StreamID, Reason, StreamState) cowboy_stream:terminate(StreamID, Reason, StreamState)
@ -1229,10 +1233,11 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
stream_closed(State=#state{opts=Opts, stream_closed(State=#state{opts=Opts,
streams=Streams0, children=Children0}, StreamID, ErrorCode) -> streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
case maps:take(StreamID, Streams0) of case maps:take(StreamID, Streams0) of
{Stream=#stream{status=webtransport_session}, Streams} -> %% In the WT session's case, streams will be
%% @todo Ensure that we don't double call in some cases. %% removed in webtransport_terminate_session.
{Stream=#stream{status=webtransport_session}, _} ->
webtransport_event(State, StreamID, closed_abruptly), webtransport_event(State, StreamID, closed_abruptly),
webtransport_terminate_session(State#state{streams=Streams}, Stream); webtransport_terminate_session(State, Stream);
{#stream{state=undefined}, Streams} -> {#stream{state=undefined}, Streams} ->
%% Unidi stream has no handler/children. %% Unidi stream has no handler/children.
stream_closed1(State#state{streams=Streams}, StreamID); stream_closed1(State#state{streams=Streams}, StreamID);

View file

@ -28,6 +28,10 @@
-export([upgrade/4]). -export([upgrade/4]).
-export([upgrade/5]). -export([upgrade/5]).
%% cowboy_stream.
-export([info/3]).
-export([terminate/3]).
-type opts() :: #{ -type opts() :: #{
%% @todo %% @todo
}. }.
@ -127,9 +131,9 @@ loop(State=#state{parent=Parent%, timeout_ref=TRef
%% @todo Parent to filter messages? Nothing? %% @todo Parent to filter messages? Nothing?
%% @todo Can there be groups of events? %% @todo Can there be groups of events?
{'$webtransport_event', Event={closed, _, _}} -> {'$webtransport_event', Event={closed, _, _}} ->
terminate(State, HandlerState, Event); terminate_proc(State, HandlerState, Event);
{'$webtransport_event', Event=closed_abruptly} -> {'$webtransport_event', Event=closed_abruptly} ->
terminate(State, HandlerState, Event); terminate_proc(State, HandlerState, Event);
{'$webtransport_event', Event} -> {'$webtransport_event', Event} ->
handler_call(State, HandlerState, webtransport_handle, Event); handler_call(State, HandlerState, webtransport_handle, Event);
%% Timeouts. %% Timeouts.
@ -174,10 +178,10 @@ handler_call_result(State0, HandlerState, Commands) ->
{ok, State} -> {ok, State} ->
before_loop(State, HandlerState); before_loop(State, HandlerState);
{stop, State} -> {stop, State} ->
terminate(State, HandlerState, stop); terminate_proc(State, HandlerState, stop);
%% @todo Do we need this here? %% @todo Do we need this here?
{Error = {error, _}, State} -> {Error = {error, _}, State} ->
terminate(State, HandlerState, Error) terminate_proc(State, HandlerState, Error)
end. end.
%% We accumulate the commands that must be sent to the connection process %% We accumulate the commands that must be sent to the connection process
@ -216,7 +220,7 @@ commands([Command={close, _, _}|Tail], State, _, Acc) ->
%% @todo set_options (to increase number of streams? data amounts? or a flow command?) %% @todo set_options (to increase number of streams? data amounts? or a flow command?)
%% @todo shutdown_reason if useful. %% @todo shutdown_reason if useful.
terminate(State, HandlerState, Reason) -> terminate_proc(State, HandlerState, Reason) ->
handler_terminate(State, HandlerState, Reason), handler_terminate(State, HandlerState, Reason),
% case Shutdown of % case Shutdown of
% normal -> exit(normal); % normal -> exit(normal);
@ -243,3 +247,20 @@ handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
%% webtransport_handle({datagram, Data}, HandlerState) %% webtransport_handle({datagram, Data}, HandlerState)
%% webtransport_handle(goaway, HandlerState) %% webtransport_handle(goaway, HandlerState)
%% webtransport_info(Message, HandlerState) %% webtransport_info(Message, HandlerState)
%% cowboy_stream callbacks.
%%
%% We shortcut stream handlers but still need to process some events
%% such as process exiting or termination. We implement the relevant
%% callbacks here. Note that as far as WebTransport is concerned,
%% receiving stream data here would be an error therefore the data
%% callback is not implemented.
info(StreamID, Msg, WTState=#{stream_state := StreamState0}) ->
{Commands, StreamState} = cowboy_stream:info(StreamID, Msg, StreamState0),
{Commands, WTState#{stream_state => StreamState}}.
terminate(StreamID, Reason, #{stream_state := StreamState}) ->
cowboy_stream:terminate(StreamID, Reason, StreamState).