mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 04:10:24 +00:00
WIP client close WT session
This commit is contained in:
parent
b63e18d2cc
commit
cd327437ba
5 changed files with 94 additions and 15 deletions
|
@ -258,6 +258,19 @@ parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, I
|
||||||
{ok, drain_wt_session, Rest} ->
|
{ok, drain_wt_session, Rest} ->
|
||||||
webtransport_event(State, SessionID, close_initiated),
|
webtransport_event(State, SessionID, close_initiated),
|
||||||
parse1(State, Stream, Rest, IsFin);
|
parse1(State, Stream, Rest, IsFin);
|
||||||
|
{ok, {close_wt_session, AppCode, AppMsg}, Rest} ->
|
||||||
|
%% This event will be handled specially and lead
|
||||||
|
%% to the termination of the session process.
|
||||||
|
webtransport_event(State, SessionID, {closed, AppCode, AppMsg}),
|
||||||
|
%% Shutdown the CONNECT stream immediately.
|
||||||
|
%% @todo Do this in webtransport_terminate_session?
|
||||||
|
ConnectStreamRef = get({quicer_stream, SessionID}),
|
||||||
|
_ = quicer:shutdown_stream(ConnectStreamRef),
|
||||||
|
%% @todo Will we receive a {stream_closed,...} after that?
|
||||||
|
%% If any data is received past that point this is an error.
|
||||||
|
%% @todo Don't crash, error out properly.
|
||||||
|
<<>> = Rest,
|
||||||
|
loop(webtransport_terminate_session(State, Stream));
|
||||||
%% Ignore unknown/unhandled capsules.
|
%% Ignore unknown/unhandled capsules.
|
||||||
{ok, _, Rest} ->
|
{ok, _, Rest} ->
|
||||||
parse1(State, Stream, Rest, IsFin);
|
parse1(State, Stream, Rest, IsFin);
|
||||||
|
@ -896,8 +909,13 @@ webtransport_event(State, SessionID, Event) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
webtransport_commands(State, SessionID, Commands) ->
|
webtransport_commands(State, SessionID, Commands) ->
|
||||||
Session = #stream{status=webtransport_session} = stream_get(State, SessionID),
|
case stream_get(State, SessionID) of
|
||||||
wt_commands(State, Session, Commands).
|
Session = #stream{status=webtransport_session} ->
|
||||||
|
wt_commands(State, Session, Commands);
|
||||||
|
%% The stream has been terminated, ignore pending commands.
|
||||||
|
error ->
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
wt_commands(State, _, []) ->
|
wt_commands(State, _, []) ->
|
||||||
State;
|
State;
|
||||||
|
@ -979,6 +997,30 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
|
||||||
%% @todo Handle errors.
|
%% @todo Handle errors.
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0,
|
||||||
|
streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) ->
|
||||||
|
%% Reset/abort the WT streams.
|
||||||
|
Streams = maps:filtermap(fun
|
||||||
|
(StreamID, #stream{status={webtransport_stream, StreamSessionID, _}})
|
||||||
|
when SessionID =:= StreamSessionID ->
|
||||||
|
cowboy_quicer:shutdown_stream(Conn, StreamID,
|
||||||
|
both, cow_http3:error_to_code(webtransport_session_gone)),
|
||||||
|
false;
|
||||||
|
(_, _) ->
|
||||||
|
true
|
||||||
|
end, Streams0),
|
||||||
|
%% Keep the streams in lingering state.
|
||||||
|
%% We only keep up to 100 streams in this state. @todo Make it configurable?
|
||||||
|
Terminated = maps:keys(Streams0) -- maps:keys(Streams),
|
||||||
|
Lingering = lists:sublist(Terminated ++ Lingering0, 100),
|
||||||
|
%% Update the HTTP3 state machine.
|
||||||
|
HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0),
|
||||||
|
State#state{
|
||||||
|
http3_machine=HTTP3Machine,
|
||||||
|
streams=Streams,
|
||||||
|
lingering_streams=Lingering
|
||||||
|
}.
|
||||||
|
|
||||||
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
|
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
|
||||||
Stream=#stream{id=StreamID}, Error) ->
|
Stream=#stream{id=StreamID}, Error) ->
|
||||||
Reason = case Error of
|
Reason = case Error of
|
||||||
|
@ -1048,6 +1090,7 @@ terminate_stream(State=#state{streams=Streams0, children=Children0},
|
||||||
stream_linger(State#state{streams=Streams, children=Children}, StreamID).
|
stream_linger(State#state{streams=Streams, children=Children}, StreamID).
|
||||||
|
|
||||||
%% We must dereference the stream state when WebTransport is in use.
|
%% We must dereference the stream state when WebTransport is in use.
|
||||||
|
%% @todo Do this here or in cowboy_webtransport?
|
||||||
terminate_stream_handler(State, StreamID, Reason,
|
terminate_stream_handler(State, StreamID, Reason,
|
||||||
{cowboy_webtransport, #{stream_state := StreamState}}) ->
|
{cowboy_webtransport, #{stream_state := StreamState}}) ->
|
||||||
terminate_stream_handler(State, StreamID, Reason, StreamState);
|
terminate_stream_handler(State, StreamID, Reason, StreamState);
|
||||||
|
@ -1151,6 +1194,18 @@ stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
|
||||||
Stream = #stream{id=StreamID, status=Status},
|
Stream = #stream{id=StreamID, status=Status},
|
||||||
State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
|
State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
|
||||||
|
|
||||||
|
%% @todo Special handling WT session?
|
||||||
|
%%
|
||||||
|
%% cowboy_stream:init as normal
|
||||||
|
%% upgrade/switch_protocol to cowboy_webtransport
|
||||||
|
%% - WT handler can stop via command
|
||||||
|
%% - WT process can crash/exit
|
||||||
|
%% - Client can send capsule on CONNECT stream to close
|
||||||
|
%% - Client can terminate CONNECT stream (gracefully/abruptly)
|
||||||
|
%% - Server can close the connection (gracefully/abruptly)
|
||||||
|
%% - Client can close the connection (gracefully/abruptly)
|
||||||
|
%% Note that GOAWAY must not stop the WT session
|
||||||
|
|
||||||
%% Stream closed message for a local (write-only) unidi stream.
|
%% Stream closed message for a local (write-only) unidi stream.
|
||||||
stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) ->
|
stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) ->
|
||||||
stream_closed1(State, StreamID);
|
stream_closed1(State, StreamID);
|
||||||
|
|
|
@ -179,6 +179,7 @@ shutdown_stream(_Conn, StreamID, Dir, ErrorCode) ->
|
||||||
_ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity),
|
_ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% @todo Are these flags correct for what we want?
|
||||||
shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT;
|
shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT;
|
||||||
shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE.
|
shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE.
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,6 @@
|
||||||
|
|
||||||
-export([upgrade/4]).
|
-export([upgrade/4]).
|
||||||
-export([upgrade/5]).
|
-export([upgrade/5]).
|
||||||
-export([terminate/3]).
|
|
||||||
|
|
||||||
-type opts() :: #{
|
-type opts() :: #{
|
||||||
%% @todo
|
%% @todo
|
||||||
|
@ -127,6 +126,8 @@ loop(State=#state{parent=Parent%, timeout_ref=TRef
|
||||||
receive
|
receive
|
||||||
%% @todo Parent to filter messages? Nothing?
|
%% @todo Parent to filter messages? Nothing?
|
||||||
%% @todo Can there be groups of events?
|
%% @todo Can there be groups of events?
|
||||||
|
{'$webtransport_event', Event={closed, _, _}} ->
|
||||||
|
terminate(State, HandlerState, Event);
|
||||||
{'$webtransport_event', Event} ->
|
{'$webtransport_event', Event} ->
|
||||||
handler_call(State, HandlerState, webtransport_handle, Event);
|
handler_call(State, HandlerState, webtransport_handle, Event);
|
||||||
%% Timeouts.
|
%% Timeouts.
|
||||||
|
@ -213,22 +214,16 @@ commands([Command={close, _, _}|Tail], State, _, Acc) ->
|
||||||
%% @todo set_options (to increase number of streams? data amounts? or a flow command?)
|
%% @todo set_options (to increase number of streams? data amounts? or a flow command?)
|
||||||
%% @todo shutdown_reason if useful.
|
%% @todo shutdown_reason if useful.
|
||||||
|
|
||||||
terminate(State=#state{req=Req}, HandlerState, Reason) ->
|
terminate(State, HandlerState, Reason) ->
|
||||||
%cowboy_stream:terminate(StreamID, Reason, StreamState)
|
handler_terminate(State, HandlerState, Reason),
|
||||||
%% @todo This terminate is at the connection level.
|
|
||||||
% handler_terminate(State, HandlerState, Reason),
|
|
||||||
% case Shutdown of
|
% case Shutdown of
|
||||||
% normal -> exit(normal);
|
% normal -> exit(normal);
|
||||||
% _ -> exit({shutdown, Shutdown})
|
% _ -> exit({shutdown, Shutdown})
|
||||||
% end.
|
% end.
|
||||||
% exit(normal).
|
exit(normal).
|
||||||
%handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
|
|
||||||
% cowboy_handler:terminate(Reason, Req, HandlerState, Handler).
|
|
||||||
%% @todo I think we must call terminate ourselves.
|
|
||||||
{ok, Req, Reason}.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
|
||||||
|
cowboy_handler:terminate(Reason, Req, HandlerState, Handler).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -431,7 +431,30 @@ drain_wt_session_continue_server(Config) ->
|
||||||
|
|
||||||
%% @todo connect_stream_closed_cleanly_client
|
%% @todo connect_stream_closed_cleanly_client
|
||||||
%% @todo connect_stream_closed_abruptly_client
|
%% @todo connect_stream_closed_abruptly_client
|
||||||
%% @todo close_wt_session_client
|
|
||||||
|
close_wt_session_client(Config) ->
|
||||||
|
doc("The WT client can close a single session. (draft_webtrans_http3 4.6)"),
|
||||||
|
%% Connect to the WebTransport server.
|
||||||
|
#{
|
||||||
|
conn := Conn,
|
||||||
|
connect_stream_ref := ConnectStreamRef,
|
||||||
|
session_id := SessionID
|
||||||
|
} = do_webtransport_connect(Config),
|
||||||
|
%% Send the CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
|
||||||
|
{ok, _} = quicer:send(ConnectStreamRef,
|
||||||
|
cow_capsule:close_wt_session(0, <<>>),
|
||||||
|
?QUIC_SEND_FLAG_FIN),
|
||||||
|
%% Normally we should also stop reading but in order to detect
|
||||||
|
%% that the server stops the stream we must not otherwise the
|
||||||
|
%% stream will be de facto closed on our end.
|
||||||
|
%%
|
||||||
|
%% The recipient must close or reset the stream in response.
|
||||||
|
receive
|
||||||
|
{quic, stream_closed, ConnectStreamRef, _} ->
|
||||||
|
ok
|
||||||
|
after 1000 ->
|
||||||
|
error({timeout, waiting_for_stream_closed})
|
||||||
|
end.
|
||||||
|
|
||||||
close_wt_session_server(Config) ->
|
close_wt_session_server(Config) ->
|
||||||
doc("The WT server can close a single session. (draft_webtrans_http3 4.6)"),
|
doc("The WT server can close a single session. (draft_webtrans_http3 4.6)"),
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
-export([init/2]).
|
-export([init/2]).
|
||||||
-export([webtransport_handle/2]).
|
-export([webtransport_handle/2]).
|
||||||
-export([webtransport_info/2]).
|
-export([webtransport_info/2]).
|
||||||
|
-export([terminate/3]).
|
||||||
|
|
||||||
init(Req0, _) ->
|
init(Req0, _) ->
|
||||||
Req = case cowboy_req:parse_header(<<"wt-available-protocols">>, Req0) of
|
Req = case cowboy_req:parse_header(<<"wt-available-protocols">>, Req0) of
|
||||||
|
@ -83,3 +84,7 @@ webtransport_handle(Event, Streams) ->
|
||||||
webtransport_info({try_again, Event}, Streams) ->
|
webtransport_info({try_again, Event}, Streams) ->
|
||||||
ct:pal("try_again ~p", [Event]),
|
ct:pal("try_again ~p", [Event]),
|
||||||
webtransport_handle(Event, Streams).
|
webtransport_handle(Event, Streams).
|
||||||
|
|
||||||
|
terminate(Reason, Req, State) ->
|
||||||
|
ct:pal("terminate ~p ~p ~p", [Reason, Req, State]),
|
||||||
|
ok.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue