0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 20:30:23 +00:00

Ensure stream terminate is called when switching protocols

This commit is contained in:
Loïc Hoguin 2017-10-22 14:53:04 +01:00
parent debaecd49a
commit 4bebe39975
No known key found for this signature in database
GPG key ID: 71366FF21851DF03
5 changed files with 42 additions and 14 deletions

View file

@ -355,7 +355,7 @@ detected.
[source,erlang] [source,erlang]
---- ----
reason() :: normal reason() :: normal | switch_protocol
| {internal_error, timeout | {error | exit | throw, any()}, HumanReadable} | {internal_error, timeout | {error | exit | throw, any()}, HumanReadable}
| {socket_error, closed | atom(), HumanReadable} | {socket_error, closed | atom(), HumanReadable}
| {stream_error, Error, HumanReadable} | {stream_error, Error, HumanReadable}

View file

@ -878,22 +878,17 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
%% @todo This should be the last stream running otherwise we need to wait before switching. %% @todo This should be the last stream running otherwise we need to wait before switching.
%% @todo If there's streams opened after this one, fail instead of 101. %% @todo If there's streams opened after this one, fail instead of 101.
State = cancel_timeout(State0), State = cancel_timeout(State0),
%% @todo When we actually do the upgrade, we only have the one stream left, plus %% Send a 101 response, then terminate the stream.
%% possibly some processes terminating. We need a smart strategy for handling the State = #state{streams=Streams} = commands(State, StreamID, [{inform, 101, Headers}]),
%% children shutdown. We can start with brutal_kill and discarding the EXIT messages #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
%% received before switching to Websocket. Something better would be to let the %% @todo We need to shutdown processes here first.
%% stream processes finish but that implies the Websocket module to know about stream_call_terminate(StreamID, switch_protocol, StreamState),
%% them and filter the messages. For now, kill them all and discard all messages %% Terminate children processes and flush any remaining messages from the mailbox.
%% in the mailbox.
cowboy_children:terminate(Children), cowboy_children:terminate(Children),
flush(), flush(),
%% Everything good, upgrade!
_ = commands(State, StreamID, [{inform, 101, Headers}]),
%% @todo This is no good because commands return a state normally and here it doesn't %% @todo This is no good because commands return a state normally and here it doesn't
%% we need to let this module go entirely. Perhaps it should be handled directly in %% we need to let this module go entirely. Perhaps it should be handled directly in
%% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer. %% cowboy_clear/cowboy_tls?
Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState); Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
%% Stream shutdown. %% Stream shutdown.
commands(State, StreamID, [stop|Tail]) -> commands(State, StreamID, [stop|Tail]) ->

View file

@ -42,7 +42,7 @@
| stop]. | stop].
-export_type([commands/0]). -export_type([commands/0]).
-type reason() :: normal -type reason() :: normal | switch_protocol
| {internal_error, timeout | {error | exit | throw, any()}, human_reason()} | {internal_error, timeout | {error | exit | throw, any()}, human_reason()}
| {socket_error, closed | atom(), human_reason()} | {socket_error, closed | atom(), human_reason()}
| {stream_error, cow_http2:error(), human_reason()} | {stream_error, cow_http2:error(), human_reason()}

View file

@ -9,6 +9,9 @@
-export([terminate/3]). -export([terminate/3]).
-export([early_error/5]). -export([early_error/5]).
%% For switch_protocol.
-export([takeover/7]).
-record(state, { -record(state, {
pid, pid,
test test
@ -43,6 +46,8 @@ init_commands(_, _, State=#state{test=shutdown_timeout_on_stream_stop}) ->
init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) -> init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) ->
Spawn = init_process(true, State), Spawn = init_process(true, State),
[{headers, 200, #{}}, {spawn, Spawn, 2000}]; [{headers, 200, #{}}, {spawn, Spawn, 2000}];
init_commands(_, _, State=#state{test=terminate_on_switch_protocol}) ->
[{switch_protocol, #{}, ?MODULE, State}];
init_commands(_, _, State=#state{test=terminate_on_stop}) -> init_commands(_, _, State=#state{test=terminate_on_stop}) ->
[{response, 204, #{}, <<>>}]; [{response, 204, #{}, <<>>}];
init_commands(_, _, _) -> init_commands(_, _, _) ->
@ -94,3 +99,8 @@ early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
<<"crash_in_early_error",_/bits>> -> error(crash); <<"crash_in_early_error",_/bits>> -> error(crash);
_ -> Resp _ -> Resp
end. end.
%% @todo It would be good if we could allow this function to return normally.
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, State=#state{pid=Pid}) ->
Pid ! {Pid, self(), takeover, Parent, Ref, Socket, Transport, Opts, Buffer, State},
exit(normal).

View file

@ -361,3 +361,26 @@ terminate_on_stop(Config) ->
%% Confirm terminate/3 is called. %% Confirm terminate/3 is called.
receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
ok. ok.
terminate_on_switch_protocol(Config) ->
case config(protocol, Config) of
http -> do_terminate_on_switch_protocol(Config);
http2 -> doc("The switch_protocol command is not currently supported for HTTP/2.")
end.
do_terminate_on_switch_protocol(Config) ->
doc("Confirm terminate/3 is called after switch_protocol is returned."),
Self = self(),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/long_polling", [
{<<"accept-encoding">>, <<"gzip">>},
{<<"x-test-case">>, <<"terminate_on_switch_protocol">>},
{<<"x-test-pid">>, pid_to_list(Self)}
]),
%% Confirm init/3 is called and receive the response.
Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end,
%% Confirm terminate/3 is called.
receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
%% Confirm takeover/7 is called.
receive {Self, Pid, takeover, _, _, _, _, _, _, _} -> ok after 1000 -> error(timeout) end,
ok.