mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Add timeout to cowboy_loop
This commit is contained in:
parent
879a6b8bc5
commit
ef4b515d2f
7 changed files with 118 additions and 25 deletions
|
@ -31,7 +31,9 @@ for plain HTTP handlers.
|
||||||
The `init/2` function must return a `cowboy_loop` tuple to enable
|
The `init/2` function must return a `cowboy_loop` tuple to enable
|
||||||
loop handler behavior. This tuple may optionally contain
|
loop handler behavior. This tuple may optionally contain
|
||||||
the atom `hibernate` to make the process enter hibernation
|
the atom `hibernate` to make the process enter hibernation
|
||||||
until a message is received.
|
until a message is received. Alternatively, the tuple may
|
||||||
|
optionally contain a positive integer to make the process
|
||||||
|
handle timeouts.
|
||||||
|
|
||||||
This snippet enables the loop handler:
|
This snippet enables the loop handler:
|
||||||
|
|
||||||
|
@ -49,6 +51,14 @@ init(Req, State) ->
|
||||||
{cowboy_loop, Req, State, hibernate}.
|
{cowboy_loop, Req, State, hibernate}.
|
||||||
----
|
----
|
||||||
|
|
||||||
|
This makes the process timeout after 1000ms of idle time:
|
||||||
|
|
||||||
|
[source,erlang]
|
||||||
|
----
|
||||||
|
init(Req, State) ->
|
||||||
|
{cowboy_loop, Req, State, 1000}.
|
||||||
|
----
|
||||||
|
|
||||||
=== Receive loop
|
=== Receive loop
|
||||||
|
|
||||||
Once initialized, Cowboy will wait for messages to arrive
|
Once initialized, Cowboy will wait for messages to arrive
|
||||||
|
@ -123,3 +133,10 @@ messages received. This is done by returning the atom
|
||||||
`hibernate` as part of the `loop` tuple callbacks normally
|
`hibernate` as part of the `loop` tuple callbacks normally
|
||||||
return. Just add the atom at the end and Cowboy will hibernate
|
return. Just add the atom at the end and Cowboy will hibernate
|
||||||
accordingly.
|
accordingly.
|
||||||
|
|
||||||
|
=== Timeout
|
||||||
|
|
||||||
|
You may activate timeout events by returning a positive integer
|
||||||
|
`N` as part of the `loop` tuple callbacks return. The default
|
||||||
|
value is `infinity`. The `info` callback will be called with the
|
||||||
|
atom `timeout` unless a message is received within `N` milliseconds.
|
||||||
|
|
|
@ -28,11 +28,11 @@ Loop handlers implement the following interface:
|
||||||
----
|
----
|
||||||
init(Req, State)
|
init(Req, State)
|
||||||
-> {cowboy_loop, Req, State}
|
-> {cowboy_loop, Req, State}
|
||||||
| {cowboy_loop, Req, State, hibernate}
|
| {cowboy_loop, Req, State, hibernate | timeout()}
|
||||||
|
|
||||||
info(Info, Req, State)
|
info(Info, Req, State)
|
||||||
-> {ok, Req, State}
|
-> {ok, Req, State}
|
||||||
| {ok, Req, State, hibernate}
|
| {ok, Req, State, hibernate | timeout()}
|
||||||
| {stop, Req, State}
|
| {stop, Req, State}
|
||||||
|
|
||||||
terminate(Reason, Req, State) -> ok %% optional
|
terminate(Reason, Req, State) -> ok %% optional
|
||||||
|
|
|
@ -17,12 +17,15 @@
|
||||||
|
|
||||||
-export([upgrade/4]).
|
-export([upgrade/4]).
|
||||||
-export([upgrade/5]).
|
-export([upgrade/5]).
|
||||||
-export([loop/4]).
|
-export([loop/5]).
|
||||||
|
|
||||||
-export([system_continue/3]).
|
-export([system_continue/3]).
|
||||||
-export([system_terminate/4]).
|
-export([system_terminate/4]).
|
||||||
-export([system_code_change/4]).
|
-export([system_code_change/4]).
|
||||||
|
|
||||||
|
%% From gen_server.
|
||||||
|
-define(is_timeout(X), ((X) =:= infinity orelse (is_integer(X) andalso (X) >= 0))).
|
||||||
|
|
||||||
-callback init(Req, any())
|
-callback init(Req, any())
|
||||||
-> {ok | module(), Req, any()}
|
-> {ok | module(), Req, any()}
|
||||||
| {module(), Req, any(), any()}
|
| {module(), Req, any(), any()}
|
||||||
|
@ -41,40 +44,46 @@
|
||||||
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
|
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
|
||||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||||
upgrade(Req, Env, Handler, HandlerState) ->
|
upgrade(Req, Env, Handler, HandlerState) ->
|
||||||
loop(Req, Env, Handler, HandlerState).
|
loop(Req, Env, Handler, HandlerState, infinity).
|
||||||
|
|
||||||
-spec upgrade(Req, Env, module(), any(), hibernate)
|
-spec upgrade(Req, Env, module(), any(), hibernate | timeout())
|
||||||
-> {suspend, ?MODULE, loop, [any()]}
|
-> {suspend, ?MODULE, loop, [any()]}
|
||||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||||
upgrade(Req, Env, Handler, HandlerState, hibernate) ->
|
upgrade(Req, Env, Handler, HandlerState, hibernate) ->
|
||||||
suspend(Req, Env, Handler, HandlerState).
|
suspend(Req, Env, Handler, HandlerState, infinity);
|
||||||
|
upgrade(Req, Env, Handler, HandlerState, Timeout) when ?is_timeout(Timeout) ->
|
||||||
|
loop(Req, Env, Handler, HandlerState, Timeout).
|
||||||
|
|
||||||
-spec loop(Req, Env, module(), any())
|
-spec loop(Req, Env, module(), any(), timeout())
|
||||||
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
|
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
|
||||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||||
%% @todo Handle system messages.
|
%% @todo Handle system messages.
|
||||||
loop(Req=#{pid := Parent}, Env, Handler, HandlerState) ->
|
loop(Req=#{pid := Parent}, Env, Handler, HandlerState, Timeout) ->
|
||||||
receive
|
receive
|
||||||
%% System messages.
|
%% System messages.
|
||||||
{'EXIT', Parent, Reason} ->
|
{'EXIT', Parent, Reason} ->
|
||||||
terminate(Req, Env, Handler, HandlerState, Reason);
|
terminate(Req, Env, Handler, HandlerState, Reason);
|
||||||
{system, From, Request} ->
|
{system, From, Request} ->
|
||||||
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
|
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
|
||||||
{Req, Env, Handler, HandlerState});
|
{Req, Env, Handler, HandlerState, Timeout});
|
||||||
%% Calls from supervisor module.
|
%% Calls from supervisor module.
|
||||||
{'$gen_call', From, Call} ->
|
{'$gen_call', From, Call} ->
|
||||||
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
|
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
|
||||||
loop(Req, Env, Handler, HandlerState);
|
loop(Req, Env, Handler, HandlerState, Timeout);
|
||||||
Message ->
|
Message ->
|
||||||
call(Req, Env, Handler, HandlerState, Message)
|
call(Req, Env, Handler, HandlerState, Timeout, Message)
|
||||||
|
after Timeout ->
|
||||||
|
call(Req, Env, Handler, HandlerState, Timeout, timeout)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
call(Req0, Env, Handler, HandlerState0, Message) ->
|
call(Req0, Env, Handler, HandlerState0, Timeout, Message) ->
|
||||||
try Handler:info(Message, Req0, HandlerState0) of
|
try Handler:info(Message, Req0, HandlerState0) of
|
||||||
{ok, Req, HandlerState} ->
|
{ok, Req, HandlerState} ->
|
||||||
loop(Req, Env, Handler, HandlerState);
|
loop(Req, Env, Handler, HandlerState, Timeout);
|
||||||
{ok, Req, HandlerState, hibernate} ->
|
{ok, Req, HandlerState, hibernate} ->
|
||||||
suspend(Req, Env, Handler, HandlerState);
|
suspend(Req, Env, Handler, HandlerState, Timeout);
|
||||||
|
{ok, Req, HandlerState, NewTimeout} when ?is_timeout(NewTimeout) ->
|
||||||
|
loop(Req, Env, Handler, HandlerState, NewTimeout);
|
||||||
{stop, Req, HandlerState} ->
|
{stop, Req, HandlerState} ->
|
||||||
terminate(Req, Env, Handler, HandlerState, stop)
|
terminate(Req, Env, Handler, HandlerState, stop)
|
||||||
catch Class:Reason:Stacktrace ->
|
catch Class:Reason:Stacktrace ->
|
||||||
|
@ -82,8 +91,8 @@ call(Req0, Env, Handler, HandlerState0, Message) ->
|
||||||
erlang:raise(Class, Reason, Stacktrace)
|
erlang:raise(Class, Reason, Stacktrace)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
suspend(Req, Env, Handler, HandlerState) ->
|
suspend(Req, Env, Handler, HandlerState, Timeout) ->
|
||||||
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState]}.
|
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState, Timeout]}.
|
||||||
|
|
||||||
terminate(Req, Env, Handler, HandlerState, Reason) ->
|
terminate(Req, Env, Handler, HandlerState, Reason) ->
|
||||||
Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
|
Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
|
||||||
|
@ -91,15 +100,15 @@ terminate(Req, Env, Handler, HandlerState, Reason) ->
|
||||||
|
|
||||||
%% System callbacks.
|
%% System callbacks.
|
||||||
|
|
||||||
-spec system_continue(_, _, {Req, Env, module(), any()})
|
-spec system_continue(_, _, {Req, Env, module(), any(), timeout()})
|
||||||
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
|
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
|
||||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||||
system_continue(_, _, {Req, Env, Handler, HandlerState}) ->
|
system_continue(_, _, {Req, Env, Handler, HandlerState, Timeout}) ->
|
||||||
loop(Req, Env, Handler, HandlerState).
|
loop(Req, Env, Handler, HandlerState, Timeout).
|
||||||
|
|
||||||
-spec system_terminate(any(), _, _, {Req, Env, module(), any()})
|
-spec system_terminate(any(), _, _, {Req, Env, module(), any(), timeout()})
|
||||||
-> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
-> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||||
system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState}) ->
|
system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState, _}) ->
|
||||||
terminate(Req, Env, Handler, HandlerState, Reason).
|
terminate(Req, Env, Handler, HandlerState, Reason).
|
||||||
|
|
||||||
-spec system_code_change(Misc, _, _, _) -> {ok, Misc}
|
-spec system_code_change(Misc, _, _, _) -> {ok, Misc}
|
||||||
|
|
23
test/handlers/loop_idle_timeout_h.erl
Normal file
23
test/handlers/loop_idle_timeout_h.erl
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
%% This module implements a loop handler that reads
|
||||||
|
%% the request query for a timeout value, then sends
|
||||||
|
%% itself a message after 1000ms. It replies a 200 when
|
||||||
|
%% the message does not timeout and a 299 otherwise.
|
||||||
|
|
||||||
|
-module(loop_idle_timeout_h).
|
||||||
|
|
||||||
|
-export([init/2]).
|
||||||
|
-export([info/3]).
|
||||||
|
-export([terminate/3]).
|
||||||
|
|
||||||
|
init(Req, _) ->
|
||||||
|
#{timeout := Timeout} = cowboy_req:match_qs([{timeout, int}], Req),
|
||||||
|
erlang:send_after(1000, self(), message),
|
||||||
|
{cowboy_loop, Req, 2, Timeout}.
|
||||||
|
|
||||||
|
info(message, Req, State) ->
|
||||||
|
{stop, cowboy_req:reply(200, Req), State};
|
||||||
|
info(timeout, Req, State) ->
|
||||||
|
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.
|
||||||
|
|
||||||
|
terminate(stop, _, _) ->
|
||||||
|
ok.
|
23
test/handlers/loop_new_timeout_h.erl
Normal file
23
test/handlers/loop_new_timeout_h.erl
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
%% This module implements a loop handler that changes
|
||||||
|
%% the timeout value to 500ms after the first message
|
||||||
|
%% then sends itself another message after 1000ms.
|
||||||
|
%% It is expected to timeout, that is, reply a 299.
|
||||||
|
|
||||||
|
-module(loop_new_timeout_h).
|
||||||
|
|
||||||
|
-export([init/2]).
|
||||||
|
-export([info/3]).
|
||||||
|
-export([terminate/3]).
|
||||||
|
|
||||||
|
init(Req, _) ->
|
||||||
|
self() ! message,
|
||||||
|
{cowboy_loop, Req, 2}.
|
||||||
|
|
||||||
|
info(message, Req, State) ->
|
||||||
|
erlang:send_after(1000, self(), message),
|
||||||
|
{ok, Req, State, 500};
|
||||||
|
info(timeout, Req, State) ->
|
||||||
|
{stop, cowboy_req:reply(<<"299 OK!">>, Req), State}.
|
||||||
|
|
||||||
|
terminate(stop, _, _) ->
|
||||||
|
ok.
|
|
@ -40,7 +40,9 @@ init_dispatch(_) ->
|
||||||
cowboy_router:compile([{'_', [
|
cowboy_router:compile([{'_', [
|
||||||
{"/long_polling", long_polling_h, []},
|
{"/long_polling", long_polling_h, []},
|
||||||
{"/loop_body", loop_handler_body_h, []},
|
{"/loop_body", loop_handler_body_h, []},
|
||||||
{"/loop_timeout", loop_handler_timeout_h, []}
|
{"/loop_timeout", loop_handler_timeout_h, []},
|
||||||
|
{"/loop_idle", loop_idle_timeout_h, []},
|
||||||
|
{"/loop_idle_new", loop_new_timeout_h, []}
|
||||||
]}]).
|
]}]).
|
||||||
|
|
||||||
%% Tests.
|
%% Tests.
|
||||||
|
@ -82,3 +84,22 @@ request_timeout(Config) ->
|
||||||
Ref = gun:get(ConnPid, "/loop_timeout", [{<<"accept-encoding">>, <<"gzip">>}]),
|
Ref = gun:get(ConnPid, "/loop_timeout", [{<<"accept-encoding">>, <<"gzip">>}]),
|
||||||
{response, nofin, 200, _} = gun:await(ConnPid, Ref, 10000),
|
{response, nofin, 200, _} = gun:await(ConnPid, Ref, 10000),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
idle_timeout(Config) ->
|
||||||
|
doc("Check idle timeout."),
|
||||||
|
ConnPid = gun_open(Config),
|
||||||
|
|
||||||
|
Ref = gun:get(ConnPid, "/loop_idle?timeout=2000", [{<<"accept-encoding">>, <<"gzip">>}]),
|
||||||
|
{response, fin, 200, _} = gun:await(ConnPid, Ref),
|
||||||
|
|
||||||
|
Ref2 = gun:get(ConnPid, "/loop_idle?timeout=500", [{<<"accept-encoding">>, <<"gzip">>}]),
|
||||||
|
{response, fin, 299, _} = gun:await(ConnPid, Ref2),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
new_timeout(Config) ->
|
||||||
|
doc("Check that the new timeout gets set."),
|
||||||
|
ConnPid = gun_open(Config),
|
||||||
|
Ref = gun:get(ConnPid, "/loop_idle_new", [{<<"accept-encoding">>, <<"gzip">>}]),
|
||||||
|
{response, fin, 299, _} = gun:await(ConnPid, Ref),
|
||||||
|
ok.
|
||||||
|
|
|
@ -659,7 +659,7 @@ sys_get_state_loop(Config) ->
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
SupPid = get_remote_pid_tcp(Socket),
|
SupPid = get_remote_pid_tcp(Socket),
|
||||||
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
|
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
|
||||||
{Req, Env, long_polling_sys_h, undefined} = sys:get_state(Pid),
|
{Req, Env, long_polling_sys_h, undefined, infinity} = sys:get_state(Pid),
|
||||||
#{pid := _, streamid := _} = Req,
|
#{pid := _, streamid := _} = Req,
|
||||||
#{dispatch := _} = Env,
|
#{dispatch := _} = Env,
|
||||||
ok.
|
ok.
|
||||||
|
@ -784,7 +784,7 @@ sys_replace_state_loop(Config) ->
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
SupPid = get_remote_pid_tcp(Socket),
|
SupPid = get_remote_pid_tcp(Socket),
|
||||||
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
|
[{_, Pid, _, _}] = supervisor:which_children(SupPid),
|
||||||
{Req, Env, long_polling_sys_h, undefined} = sys:replace_state(Pid, fun(S) -> S end),
|
{Req, Env, long_polling_sys_h, undefined, infinity} = sys:replace_state(Pid, fun(S) -> S end),
|
||||||
#{pid := _, streamid := _} = Req,
|
#{pid := _, streamid := _} = Req,
|
||||||
#{dispatch := _} = Env,
|
#{dispatch := _} = Env,
|
||||||
ok.
|
ok.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue