0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Compare commits

..

No commits in common. "839397545966a287592b1700aab379b46cf3f47a" and "b63e18d2cc775f1b030b212a4e918be13302e959" have entirely different histories.

5 changed files with 16 additions and 127 deletions

View file

@ -258,19 +258,6 @@ parse1(State, Stream=#stream{id=SessionID, status=webtransport_session}, Data, I
{ok, drain_wt_session, Rest} ->
webtransport_event(State, SessionID, close_initiated),
parse1(State, Stream, Rest, IsFin);
{ok, {close_wt_session, AppCode, AppMsg}, Rest} ->
%% This event will be handled specially and lead
%% to the termination of the session process.
webtransport_event(State, SessionID, {closed, AppCode, AppMsg}),
%% Shutdown the CONNECT stream immediately.
%% @todo Do this in webtransport_terminate_session?
ConnectStreamRef = get({quicer_stream, SessionID}),
_ = quicer:shutdown_stream(ConnectStreamRef),
%% @todo Will we receive a {stream_closed,...} after that?
%% If any data is received past that point this is an error.
%% @todo Don't crash, error out properly.
<<>> = Rest,
loop(webtransport_terminate_session(State, Stream));
%% Ignore unknown/unhandled capsules.
{ok, _, Rest} ->
parse1(State, Stream, Rest, IsFin);
@ -909,13 +896,8 @@ webtransport_event(State, SessionID, Event) ->
ok.
webtransport_commands(State, SessionID, Commands) ->
case stream_get(State, SessionID) of
Session = #stream{status=webtransport_session} ->
wt_commands(State, Session, Commands);
%% The stream has been terminated, ignore pending commands.
error ->
State
end.
Session = #stream{status=webtransport_session} = stream_get(State, SessionID),
wt_commands(State, Session, Commands).
wt_commands(State, _, []) ->
State;
@ -997,30 +979,6 @@ wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
%% @todo Handle errors.
end.
webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0,
streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) ->
%% Reset/abort the WT streams.
Streams = maps:filtermap(fun
(StreamID, #stream{status={webtransport_stream, StreamSessionID, _}})
when SessionID =:= StreamSessionID ->
cowboy_quicer:shutdown_stream(Conn, StreamID,
both, cow_http3:error_to_code(webtransport_session_gone)),
false;
(_, _) ->
true
end, Streams0),
%% Keep the streams in lingering state.
%% We only keep up to 100 streams in this state. @todo Make it configurable?
Terminated = maps:keys(Streams0) -- maps:keys(Streams),
Lingering = lists:sublist(Terminated ++ Lingering0, 100),
%% Update the HTTP3 state machine.
HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0),
State#state{
http3_machine=HTTP3Machine,
streams=Streams,
lingering_streams=Lingering
}.
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID}, Error) ->
Reason = case Error of
@ -1090,7 +1048,6 @@ terminate_stream(State=#state{streams=Streams0, children=Children0},
stream_linger(State#state{streams=Streams, children=Children}, StreamID).
%% We must dereference the stream state when WebTransport is in use.
%% @todo Do this here or in cowboy_webtransport?
terminate_stream_handler(State, StreamID, Reason,
{cowboy_webtransport, #{stream_state := StreamState}}) ->
terminate_stream_handler(State, StreamID, Reason, StreamState);
@ -1194,18 +1151,6 @@ stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
Stream = #stream{id=StreamID, status=Status},
State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
%% @todo Special handling WT session?
%%
%% cowboy_stream:init as normal
%% upgrade/switch_protocol to cowboy_webtransport
%% - WT handler can stop via command
%% - WT process can crash/exit
%% - Client can send capsule on CONNECT stream to close
%% - Client can terminate CONNECT stream (gracefully/abruptly)
%% - Server can close the connection (gracefully/abruptly)
%% - Client can close the connection (gracefully/abruptly)
%% Note that GOAWAY must not stop the WT session
%% Stream closed message for a local (write-only) unidi stream.
stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) ->
stream_closed1(State, StreamID);

View file

@ -179,7 +179,6 @@ shutdown_stream(_Conn, StreamID, Dir, ErrorCode) ->
_ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity),
ok.
%% @todo Are these flags correct for what we want?
shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT;
shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE.

View file

@ -27,6 +27,7 @@
-export([upgrade/4]).
-export([upgrade/5]).
-export([terminate/3]).
-type opts() :: #{
%% @todo
@ -126,8 +127,6 @@ loop(State=#state{parent=Parent%, timeout_ref=TRef
receive
%% @todo Parent to filter messages? Nothing?
%% @todo Can there be groups of events?
{'$webtransport_event', Event={closed, _, _}} ->
terminate(State, HandlerState, Event);
{'$webtransport_event', Event} ->
handler_call(State, HandlerState, webtransport_handle, Event);
%% Timeouts.
@ -214,16 +213,22 @@ commands([Command={close, _, _}|Tail], State, _, Acc) ->
%% @todo set_options (to increase number of streams? data amounts? or a flow command?)
%% @todo shutdown_reason if useful.
terminate(State, HandlerState, Reason) ->
handler_terminate(State, HandlerState, Reason),
terminate(State=#state{req=Req}, HandlerState, Reason) ->
%cowboy_stream:terminate(StreamID, Reason, StreamState)
%% @todo This terminate is at the connection level.
% handler_terminate(State, HandlerState, Reason),
% case Shutdown of
% normal -> exit(normal);
% _ -> exit({shutdown, Shutdown})
% end.
exit(normal).
% exit(normal).
%handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
% cowboy_handler:terminate(Reason, Req, HandlerState, Handler).
%% @todo I think we must call terminate ourselves.
{ok, Req, Reason}.
handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
cowboy_handler:terminate(Reason, Req, HandlerState, Handler).

View file

@ -431,30 +431,7 @@ drain_wt_session_continue_server(Config) ->
%% @todo connect_stream_closed_cleanly_client
%% @todo connect_stream_closed_abruptly_client
close_wt_session_client(Config) ->
doc("The WT client can close a single session. (draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Send the CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
{ok, _} = quicer:send(ConnectStreamRef,
cow_capsule:close_wt_session(0, <<>>),
?QUIC_SEND_FLAG_FIN),
%% Normally we should also stop reading but in order to detect
%% that the server stops the stream we must not otherwise the
%% stream will be de facto closed on our end.
%%
%% The recipient must close or reset the stream in response.
receive
{quic, stream_closed, ConnectStreamRef, _} ->
ok
after 1000 ->
error({timeout, waiting_for_stream_closed})
end.
%% @todo close_wt_session_client
close_wt_session_server(Config) ->
doc("The WT server can close a single session. (draft_webtrans_http3 4.6)"),
@ -482,32 +459,6 @@ close_wt_session_server(Config) ->
%% Application Error Message: A UTF-8 encoded error message string provided by the application closing the session. The message takes up the remainder of the capsule, and its length MUST NOT exceed 1024 bytes. (6)
%% @todo close_wt_session_app_code_msg_client
close_wt_session_app_code_msg_client(Config) ->
doc("The WT client can close a single session with an application error code "
"and an application error message. (draft_webtrans_http3 4.6)"),
%% Connect to the WebTransport server.
#{
conn := Conn,
connect_stream_ref := ConnectStreamRef,
session_id := SessionID
} = do_webtransport_connect(Config),
%% Create a bidi stream, send a special instruction to make it propagate events.
{ok, LocalStreamRef} = quicer:start_stream(Conn, #{}),
EventPidBin = term_to_binary(self()),
{ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6,
"TEST:event_pid:", EventPidBin/binary>>),
%% Send the CLOSE_WEBTRANSPORT_SESSION capsule on the CONNECT stream.
{ok, _} = quicer:send(ConnectStreamRef,
cow_capsule:close_wt_session(17, <<"seventeen">>),
?QUIC_SEND_FLAG_FIN),
%% @todo Stop reading from the CONNECt stream too. (STOP_SENDING)
%% Receive the terminate event from the WT handler.
receive
{'$wt_echo_h', terminate, {closed, 17, <<"seventeen">>}, _, _} ->
ok
after 1000 ->
error({timeout, waiting_for_terminate_event})
end.
close_wt_session_app_code_server(Config) ->
doc("The WT server can close a single session with an application error code. "

View file

@ -7,7 +7,6 @@
-export([init/2]).
-export([webtransport_handle/2]).
-export([webtransport_info/2]).
-export([terminate/3]).
init(Req0, _) ->
Req = case cowboy_req:parse_header(<<"wt-available-protocols">>, Req0) of
@ -57,9 +56,7 @@ webtransport_handle(Event = {stream_data, _StreamID, _IsFin, <<"TEST:", Test/bit
<<"close_app_code">> ->
{[{close, 1234567890}], Streams};
<<"close_app_code_msg">> ->
{[{close, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams};
<<"event_pid:", EventPidBin/bits>> ->
{[], Streams#{event_pid => binary_to_term(EventPidBin)}}
{[{close, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams}
end;
webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) ->
ct:pal("WT handle ~p~n", [Event]),
@ -86,11 +83,3 @@ webtransport_handle(Event, Streams) ->
webtransport_info({try_again, Event}, Streams) ->
ct:pal("try_again ~p", [Event]),
webtransport_handle(Event, Streams).
terminate(Reason, Req, State=#{event_pid := EventPid}) ->
ct:pal("terminate ~p ~p ~p", [Reason, Req, State]),
EventPid ! {'$wt_echo_h', terminate, Reason, Req, State},
ok;
terminate(Reason, Req, State) ->
ct:pal("terminate ~p ~p ~p", [Reason, Req, State]),
ok.