mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 04:10:24 +00:00
Avoid resetting Websocket idle_timeout timer too often
`perf` has shown that Cowboy spends a lot of time cancelling and starting this timer. Instead of resetting for every data received, we now only reset a field in the state. Before it was working like this: - start idle timeout timer - on trigger, close the connection - on data, cancel and start again Now it's working like this: - start idle timeout timer for a tenth of its duration, with tick number = 0 - on trigger, if tick number != 10 - start the timer again, again for a tenth of its duration - increment tick number - on trigger, if tick number = 10 - close the connection - on data, set tick number to 0
This commit is contained in:
parent
643b335ba8
commit
086f60cca4
3 changed files with 45 additions and 15 deletions
1
Makefile
1
Makefile
|
@ -9,6 +9,7 @@ PROJECT_REGISTERED = cowboy_clock
|
||||||
|
|
||||||
PLT_APPS = public_key ssl # ct_helper gun common_test inets
|
PLT_APPS = public_key ssl # ct_helper gun common_test inets
|
||||||
CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl
|
CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl
|
||||||
|
#CT_OPTS += +JPperf true +S 1
|
||||||
|
|
||||||
# Dependencies.
|
# Dependencies.
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,14 @@
|
||||||
}.
|
}.
|
||||||
-export_type([opts/0]).
|
-export_type([opts/0]).
|
||||||
|
|
||||||
|
%% We don't want to reset the idle timeout too often,
|
||||||
|
%% so we don't reset it on data. Instead we reset the
|
||||||
|
%% number of ticks we have observed. We divide the
|
||||||
|
%% timeout value by a value and that value becomes
|
||||||
|
%% the number of ticks at which point we can drop
|
||||||
|
%% the connection. This value is the number of ticks.
|
||||||
|
-define(IDLE_TIMEOUT_TICKS, 10).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
parent :: undefined | pid(),
|
parent :: undefined | pid(),
|
||||||
ref :: ranch:ref(),
|
ref :: ranch:ref(),
|
||||||
|
@ -86,6 +94,7 @@
|
||||||
handler :: module(),
|
handler :: module(),
|
||||||
key = undefined :: undefined | binary(),
|
key = undefined :: undefined | binary(),
|
||||||
timeout_ref = undefined :: undefined | reference(),
|
timeout_ref = undefined :: undefined | reference(),
|
||||||
|
timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
|
||||||
messages = undefined :: undefined | {atom(), atom(), atom()}
|
messages = undefined :: undefined | {atom(), atom(), atom()}
|
||||||
| {atom(), atom(), atom(), atom()},
|
| {atom(), atom(), atom(), atom()},
|
||||||
hibernate = false :: boolean(),
|
hibernate = false :: boolean(),
|
||||||
|
@ -297,9 +306,9 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
|
||||||
undefined -> undefined;
|
undefined -> undefined;
|
||||||
_ -> Transport:messages()
|
_ -> Transport:messages()
|
||||||
end,
|
end,
|
||||||
State = loop_timeout(State0#state{parent=Parent,
|
State = set_idle_timeout(State0#state{parent=Parent,
|
||||||
ref=Ref, socket=Socket, transport=Transport,
|
ref=Ref, socket=Socket, transport=Transport,
|
||||||
key=undefined, messages=Messages}),
|
key=undefined, messages=Messages}, 0),
|
||||||
%% We call parse_header/3 immediately because there might be
|
%% We call parse_header/3 immediately because there might be
|
||||||
%% some data in the buffer that was sent along with the handshake.
|
%% some data in the buffer that was sent along with the handshake.
|
||||||
%% While it is not allowed by the protocol to send frames immediately,
|
%% While it is not allowed by the protocol to send frames immediately,
|
||||||
|
@ -373,28 +382,39 @@ before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
|
||||||
before_loop(State, HandlerState, ParseState) ->
|
before_loop(State, HandlerState, ParseState) ->
|
||||||
loop(State, HandlerState, ParseState).
|
loop(State, HandlerState, ParseState).
|
||||||
|
|
||||||
-spec loop_timeout(#state{}) -> #state{}.
|
-spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}.
|
||||||
loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) ->
|
|
||||||
|
set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) ->
|
||||||
|
%% Most of the time we don't need to cancel the timer since it
|
||||||
|
%% will have triggered already. But this call is harmless so
|
||||||
|
%% it is kept to simplify the code as we do need to cancel when
|
||||||
|
%% options are changed dynamically.
|
||||||
_ = case PrevRef of
|
_ = case PrevRef of
|
||||||
undefined -> ignore;
|
undefined -> ignore;
|
||||||
PrevRef -> erlang:cancel_timer(PrevRef)
|
PrevRef -> erlang:cancel_timer(PrevRef)
|
||||||
end,
|
end,
|
||||||
case maps:get(idle_timeout, Opts, 60000) of
|
case maps:get(idle_timeout, Opts, 60000) of
|
||||||
infinity ->
|
infinity ->
|
||||||
State#state{timeout_ref=undefined};
|
State#state{timeout_ref=undefined, timeout_num=TimeoutNum};
|
||||||
Timeout ->
|
Timeout ->
|
||||||
TRef = erlang:start_timer(Timeout, self(), ?MODULE),
|
TRef = erlang:start_timer(Timeout div ?IDLE_TIMEOUT_TICKS, self(), ?MODULE),
|
||||||
State#state{timeout_ref=TRef}
|
State#state{timeout_ref=TRef, timeout_num=TimeoutNum}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-define(reset_idle_timeout(State), State#state{timeout_num=0}).
|
||||||
|
|
||||||
|
tick_idle_timeout(State=#state{timeout_num=?IDLE_TIMEOUT_TICKS}, HandlerState, _) ->
|
||||||
|
websocket_close(State, HandlerState, timeout);
|
||||||
|
tick_idle_timeout(State=#state{timeout_num=TimeoutNum}, HandlerState, ParseState) ->
|
||||||
|
before_loop(set_idle_timeout(State, TimeoutNum + 1), HandlerState, ParseState).
|
||||||
|
|
||||||
-spec loop(#state{}, any(), parse_state()) -> no_return().
|
-spec loop(#state{}, any(), parse_state()) -> no_return().
|
||||||
loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
|
loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
|
||||||
timeout_ref=TRef}, HandlerState, ParseState) ->
|
timeout_ref=TRef}, HandlerState, ParseState) ->
|
||||||
receive
|
receive
|
||||||
%% Socket messages. (HTTP/1.1)
|
%% Socket messages. (HTTP/1.1)
|
||||||
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
||||||
State2 = loop_timeout(State),
|
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
|
||||||
parse(State2, HandlerState, ParseState, Data);
|
|
||||||
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
||||||
terminate(State, HandlerState, {error, closed});
|
terminate(State, HandlerState, {error, closed});
|
||||||
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
||||||
|
@ -407,18 +427,16 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
|
||||||
%% Body reading messages. (HTTP/2)
|
%% Body reading messages. (HTTP/2)
|
||||||
{request_body, _Ref, nofin, Data} ->
|
{request_body, _Ref, nofin, Data} ->
|
||||||
maybe_read_body(State),
|
maybe_read_body(State),
|
||||||
State2 = loop_timeout(State),
|
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
|
||||||
parse(State2, HandlerState, ParseState, Data);
|
|
||||||
%% @todo We need to handle this case as if it was an {error, closed}
|
%% @todo We need to handle this case as if it was an {error, closed}
|
||||||
%% but not before we finish processing frames. We probably should have
|
%% but not before we finish processing frames. We probably should have
|
||||||
%% a check in before_loop to let us stop looping if a flag is set.
|
%% a check in before_loop to let us stop looping if a flag is set.
|
||||||
{request_body, _Ref, fin, _, Data} ->
|
{request_body, _Ref, fin, _, Data} ->
|
||||||
maybe_read_body(State),
|
maybe_read_body(State),
|
||||||
State2 = loop_timeout(State),
|
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
|
||||||
parse(State2, HandlerState, ParseState, Data);
|
|
||||||
%% Timeouts.
|
%% Timeouts.
|
||||||
{timeout, TRef, ?MODULE} ->
|
{timeout, TRef, ?MODULE} ->
|
||||||
websocket_close(State, HandlerState, timeout);
|
tick_idle_timeout(State, HandlerState, ParseState);
|
||||||
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
|
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
|
||||||
before_loop(State, HandlerState, ParseState);
|
before_loop(State, HandlerState, ParseState);
|
||||||
%% System messages.
|
%% System messages.
|
||||||
|
@ -600,7 +618,8 @@ commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
|
||||||
commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) ->
|
commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) ->
|
||||||
State = case SetOpts of
|
State = case SetOpts of
|
||||||
#{idle_timeout := IdleTimeout} ->
|
#{idle_timeout := IdleTimeout} ->
|
||||||
loop_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}});
|
%% We reset the number of ticks when changing the idle_timeout option.
|
||||||
|
set_idle_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0);
|
||||||
_ ->
|
_ ->
|
||||||
State0
|
State0
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -37,6 +37,14 @@ groups() ->
|
||||||
{japanese, [], SubGroups}
|
{japanese, [], SubGroups}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
%% Optionally enable `perf` for the current node.
|
||||||
|
% spawn(fun() -> ct:pal(os:cmd("perf record -g -F 9999 -o /tmp/ws_perf.data -p " ++ os:getpid() ++ " -- sleep 11")) end),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
init_per_group(Name=http, Config) ->
|
init_per_group(Name=http, Config) ->
|
||||||
ct:pal("Websocket over cleartext HTTP/1.1 (~s)",
|
ct:pal("Websocket over cleartext HTTP/1.1 (~s)",
|
||||||
[init_data_info(Config)]),
|
[init_data_info(Config)]),
|
||||||
|
@ -185,6 +193,8 @@ do_full(Config, What, Num, FrameSize) ->
|
||||||
text -> do_text_data(Config, FrameSize);
|
text -> do_text_data(Config, FrameSize);
|
||||||
binary -> rand:bytes(FrameSize)
|
binary -> rand:bytes(FrameSize)
|
||||||
end,
|
end,
|
||||||
|
%% Heat up the processes before doing the real run.
|
||||||
|
% do_full1(ConnPid, StreamRef, Num, FrameType, FrameData),
|
||||||
{Time, _} = timer:tc(?MODULE, do_full1, [ConnPid, StreamRef, Num, FrameType, FrameData]),
|
{Time, _} = timer:tc(?MODULE, do_full1, [ConnPid, StreamRef, Num, FrameType, FrameData]),
|
||||||
do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
|
do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
|
||||||
gun:ws_send(ConnPid, StreamRef, close),
|
gun:ws_send(ConnPid, StreamRef, close),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue