0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Cancel timer only on websocket_data receives or sends

This commit prevents erlang messages from keeping a websocket connection
alive. Previously, the timer was canceled upon any activity. Now, the
timeout is only canceled when actual data is sent from the client. The
handler_loop_timeout/1 function is called from websocket_data/4 instead
of handler_before_loop/4. It is also called after every successful reply
in handler_call/4.
This commit is contained in:
Jeremy Ong 2012-12-19 11:34:44 -08:00 committed by Loïc Hoguin
parent a07d063fd8
commit 5f122d9fa6
3 changed files with 126 additions and 10 deletions

View file

@ -188,7 +188,8 @@ websocket_handshake(State=#state{transport=Transport, challenge=Challenge},
Req), Req),
%% Flush the resp_sent message before moving on. %% Flush the resp_sent message before moving on.
receive {cowboy_req, resp_sent} -> ok after 0 -> ok end, receive {cowboy_req, resp_sent} -> ok after 0 -> ok end,
handler_before_loop(State#state{messages=Transport:messages()}, State2 = handler_loop_timeout(State),
handler_before_loop(State2#state{messages=Transport:messages()},
Req2, HandlerState, <<>>). Req2, HandlerState, <<>>).
-spec handler_before_loop(#state{}, cowboy_req:req(), any(), binary()) -> closed. -spec handler_before_loop(#state{}, cowboy_req:req(), any(), binary()) -> closed.
@ -196,15 +197,13 @@ handler_before_loop(State=#state{
socket=Socket, transport=Transport, hibernate=true}, socket=Socket, transport=Transport, hibernate=true},
Req, HandlerState, SoFar) -> Req, HandlerState, SoFar) ->
Transport:setopts(Socket, [{active, once}]), Transport:setopts(Socket, [{active, once}]),
State2 = handler_loop_timeout(State),
catch erlang:hibernate(?MODULE, handler_loop, catch erlang:hibernate(?MODULE, handler_loop,
[State2#state{hibernate=false}, Req, HandlerState, SoFar]), [State#state{hibernate=false}, Req, HandlerState, SoFar]),
closed; closed;
handler_before_loop(State=#state{socket=Socket, transport=Transport}, handler_before_loop(State=#state{socket=Socket, transport=Transport},
Req, HandlerState, SoFar) -> Req, HandlerState, SoFar) ->
Transport:setopts(Socket, [{active, once}]), Transport:setopts(Socket, [{active, once}]),
State2 = handler_loop_timeout(State), handler_loop(State, Req, HandlerState, SoFar).
handler_loop(State2, Req, HandlerState, SoFar).
-spec handler_loop_timeout(#state{}) -> #state{}. -spec handler_loop_timeout(#state{}) -> #state{}.
handler_loop_timeout(State=#state{timeout=infinity}) -> handler_loop_timeout(State=#state{timeout=infinity}) ->
@ -222,7 +221,8 @@ handler_loop(State=#state{
Req, HandlerState, SoFar) -> Req, HandlerState, SoFar) ->
receive receive
{OK, Socket, Data} -> {OK, Socket, Data} ->
websocket_data(State, Req, HandlerState, State2 = handler_loop_timeout(State),
websocket_data(State2, Req, HandlerState,
<< SoFar/binary, Data/binary >>); << SoFar/binary, Data/binary >>);
{Closed, Socket} -> {Closed, Socket} ->
handler_terminate(State, Req, HandlerState, {error, closed}); handler_terminate(State, Req, HandlerState, {error, closed});
@ -460,7 +460,8 @@ handler_call(State=#state{handler=Handler, opts=Opts}, Req, HandlerState,
when is_tuple(Payload) -> when is_tuple(Payload) ->
case websocket_send(Payload, State) of case websocket_send(Payload, State) of
ok -> ok ->
NextState(State, Req2, HandlerState2, RemainingData); State2 = handler_loop_timeout(State),
NextState(State2, Req2, HandlerState2, RemainingData);
shutdown -> shutdown ->
handler_terminate(State, Req2, HandlerState, handler_terminate(State, Req2, HandlerState,
{normal, shutdown}); {normal, shutdown});
@ -471,7 +472,8 @@ handler_call(State=#state{handler=Handler, opts=Opts}, Req, HandlerState,
when is_tuple(Payload) -> when is_tuple(Payload) ->
case websocket_send(Payload, State) of case websocket_send(Payload, State) of
ok -> ok ->
NextState(State#state{hibernate=true}, State2 = handler_loop_timeout(State),
NextState(State2#state{hibernate=true},
Req2, HandlerState2, RemainingData); Req2, HandlerState2, RemainingData);
shutdown -> shutdown ->
handler_terminate(State, Req2, HandlerState, handler_terminate(State, Req2, HandlerState,
@ -483,7 +485,8 @@ handler_call(State=#state{handler=Handler, opts=Opts}, Req, HandlerState,
when is_list(Payload) -> when is_list(Payload) ->
case websocket_send_many(Payload, State) of case websocket_send_many(Payload, State) of
ok -> ok ->
NextState(State, Req2, HandlerState2, RemainingData); State2 = handler_loop_timeout(State),
NextState(State2, Req2, HandlerState2, RemainingData);
shutdown -> shutdown ->
handler_terminate(State, Req2, HandlerState, handler_terminate(State, Req2, HandlerState,
{normal, shutdown}); {normal, shutdown});
@ -494,7 +497,8 @@ handler_call(State=#state{handler=Handler, opts=Opts}, Req, HandlerState,
when is_list(Payload) -> when is_list(Payload) ->
case websocket_send_many(Payload, State) of case websocket_send_many(Payload, State) of
ok -> ok ->
NextState(State#state{hibernate=true}, State2 = handler_loop_timeout(State),
NextState(State2#state{hibernate=true},
Req2, HandlerState2, RemainingData); Req2, HandlerState2, RemainingData);
shutdown -> shutdown ->
handler_terminate(State, Req2, HandlerState, handler_terminate(State, Req2, HandlerState,

View file

@ -35,6 +35,8 @@
-export([ws_send_many/1]). -export([ws_send_many/1]).
-export([ws_text_fragments/1]). -export([ws_text_fragments/1]).
-export([ws_timeout_hibernate/1]). -export([ws_timeout_hibernate/1]).
-export([ws_timeout_cancel/1]).
-export([ws_timeout_reset/1]).
-export([ws_upgrade_with_opts/1]). -export([ws_upgrade_with_opts/1]).
%% ct. %% ct.
@ -54,6 +56,8 @@ groups() ->
ws_send_many, ws_send_many,
ws_text_fragments, ws_text_fragments,
ws_timeout_hibernate, ws_timeout_hibernate,
ws_timeout_cancel,
ws_timeout_reset,
ws_upgrade_with_opts ws_upgrade_with_opts
], ],
[{ws, [], BaseTests}]. [{ws, [], BaseTests}].
@ -110,6 +114,7 @@ init_dispatch() ->
{text, <<"won't be received">>}]} {text, <<"won't be received">>}]}
]}, ]},
{[<<"ws_timeout_hibernate">>], ws_timeout_hibernate_handler, []}, {[<<"ws_timeout_hibernate">>], ws_timeout_hibernate_handler, []},
{[<<"ws_timeout_cancel">>], ws_timeout_cancel_handler, []},
{[<<"ws_upgrade_with_opts">>], ws_upgrade_with_opts_handler, {[<<"ws_upgrade_with_opts">>], ws_upgrade_with_opts_handler,
<<"failure">>} <<"failure">>}
]} ]}
@ -506,6 +511,80 @@ ws_timeout_hibernate(Config) ->
{error, closed} = gen_tcp:recv(Socket, 0, 6000), {error, closed} = gen_tcp:recv(Socket, 0, 6000),
ok. ok.
ws_timeout_cancel(Config) ->
%% Erlang messages to a socket should not cancel the timeout
{port, Port} = lists:keyfind(port, 1, Config),
{ok, Socket} = gen_tcp:connect("localhost", Port,
[binary, {active, false}, {packet, raw}]),
ok = gen_tcp:send(Socket, [
"GET /ws_timeout_cancel HTTP/1.1\r\n"
"Host: localhost\r\n"
"Connection: Upgrade\r\n"
"Upgrade: websocket\r\n"
"Sec-WebSocket-Origin: http://localhost\r\n"
"Sec-WebSocket-Version: 8\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"\r\n"]),
{ok, Handshake} = gen_tcp:recv(Socket, 0, 6000),
{ok, {http_response, {1, 1}, 101, "Switching Protocols"}, Rest}
= erlang:decode_packet(http, Handshake, []),
[Headers, <<>>] = websocket_headers(
erlang:decode_packet(httph, Rest, []), []),
{'Connection', "Upgrade"} = lists:keyfind('Connection', 1, Headers),
{'Upgrade', "websocket"} = lists:keyfind('Upgrade', 1, Headers),
{"sec-websocket-accept", "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="}
= lists:keyfind("sec-websocket-accept", 1, Headers),
{ok, << 1:1, 0:3, 8:4, 0:8 >>} = gen_tcp:recv(Socket, 0, 6000),
{error, closed} = gen_tcp:recv(Socket, 0, 6000),
ok.
ws_timeout_reset(Config) ->
%% Erlang messages across a socket should reset the timeout
{port, Port} = lists:keyfind(port, 1, Config),
{ok, Socket} = gen_tcp:connect("localhost", Port,
[binary, {active, false}, {packet, raw}]),
ok = gen_tcp:send(Socket, [
"GET /ws_timeout_cancel HTTP/1.1\r\n"
"Host: localhost\r\n"
"Connection: Upgrade\r\n"
"Upgrade: WebSocket\r\n"
"Origin: http://localhost\r\n"
"Sec-Websocket-Key1: Y\" 4 1Lj!957b8@0H756!i\r\n"
"Sec-Websocket-Key2: 1711 M;4\\74 80<6\r\n"
"\r\n"]),
{ok, Handshake} = gen_tcp:recv(Socket, 0, 6000),
{ok, {http_response, {1, 1}, 101, "WebSocket Protocol Handshake"}, Rest}
= erlang:decode_packet(http, Handshake, []),
[Headers, <<>>] = websocket_headers(
erlang:decode_packet(httph, Rest, []), []),
{'Connection', "Upgrade"} = lists:keyfind('Connection', 1, Headers),
{'Upgrade', "WebSocket"} = lists:keyfind('Upgrade', 1, Headers),
{"sec-websocket-location", "ws://localhost/ws_timeout_cancel"}
= lists:keyfind("sec-websocket-location", 1, Headers),
{"sec-websocket-origin", "http://localhost"}
= lists:keyfind("sec-websocket-origin", 1, Headers),
ok = gen_tcp:send(Socket, <<15,245,8,18,2,204,133,33>>),
{ok, Body} = gen_tcp:recv(Socket, 0, 6000),
<<169,244,191,103,146,33,149,59,74,104,67,5,99,118,171,236>> = Body,
ok = gen_tcp:send(Socket, << 0, "msg sent", 255 >>),
{ok, << 0, "msg sent", 255 >>}
= gen_tcp:recv(Socket, 0, 6000),
ok = timer:sleep(500),
ok = gen_tcp:send(Socket, << 0, "msg sent", 255 >>),
{ok, << 0, "msg sent", 255 >>}
= gen_tcp:recv(Socket, 0, 6000),
ok = timer:sleep(500),
ok = gen_tcp:send(Socket, << 0, "msg sent", 255 >>),
{ok, << 0, "msg sent", 255 >>}
= gen_tcp:recv(Socket, 0, 6000),
ok = timer:sleep(500),
ok = gen_tcp:send(Socket, << 0, "msg sent", 255 >>),
{ok, << 0, "msg sent", 255 >>}
= gen_tcp:recv(Socket, 0, 6000),
{ok, << 255, 0 >>} = gen_tcp:recv(Socket, 0, 6000),
{error, closed} = gen_tcp:recv(Socket, 0, 6000),
ok.
ws_upgrade_with_opts(Config) -> ws_upgrade_with_opts(Config) ->
{port, Port} = lists:keyfind(port, 1, Config), {port, Port} = lists:keyfind(port, 1, Config),
{ok, Socket} = gen_tcp:connect("localhost", Port, {ok, Socket} = gen_tcp:connect("localhost", Port,

View file

@ -0,0 +1,33 @@
%% Feel free to use, reuse and abuse the code in this file.
-module(ws_timeout_cancel_handler).
-behaviour(cowboy_http_handler).
-behaviour(cowboy_websocket_handler).
-export([init/3, handle/2, terminate/2]).
-export([websocket_init/3, websocket_handle/3,
websocket_info/3, websocket_terminate/3]).
init(_Any, _Req, _Opts) ->
{upgrade, protocol, cowboy_websocket}.
handle(_Req, _State) ->
exit(badarg).
terminate(_Req, _State) ->
exit(badarg).
websocket_init(_TransportName, Req, _Opts) ->
erlang:start_timer(500, self(), should_not_cancel_timer),
{ok, Req, undefined, 1000}.
websocket_handle({text, Data}, Req, State) ->
{reply, {text, Data}, Req, State};
websocket_handle({binary, Data}, Req, State) ->
{reply, {binary, Data}, Req, State}.
websocket_info(_Info, Req, State) ->
erlang:start_timer(500, self(), should_not_cancel_timer),
{ok, Req, State}.
websocket_terminate(_Reason, _Req, _State) ->
ok.