mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Give the ListenerPid to the protocol on startup
Also sends a message 'shoot' that can be received by the protocol to make sure Cowboy has had enough time to fully initialize the socket. This message should be received before any socket-related operations are performed. WebSocket request connections are now moved from the pool 'default' to the pool 'websocket', meaning we can have a lot of running WebSockets despite having a low 'max_connections' setting.
This commit is contained in:
parent
56369d5c1a
commit
43d14b52cd
6 changed files with 50 additions and 24 deletions
14
README.md
14
README.md
|
@ -219,11 +219,15 @@ One of the strengths of Cowboy is of course that you can use it with any
|
|||
protocol you want. The only downside is that if it's not HTTP, you'll
|
||||
probably have to write the protocol handler yourself.
|
||||
|
||||
The only exported function a protocol handler needs is the start_link/3
|
||||
function, with arguments Socket, Transport and Opts. Socket is of course
|
||||
the client socket; Transport is the module name of the chosen transport
|
||||
The only exported function a protocol handler needs is the start_link/4
|
||||
function, with arguments ListenerPid, Socket, Transport and Opts. ListenerPid
|
||||
is the pid to the listener's gen_server, managing the connections. Socket is of
|
||||
course the client socket; Transport is the module name of the chosen transport
|
||||
handler and Opts is protocol options defined when starting the listener.
|
||||
Anything you do past this point is up to you!
|
||||
|
||||
After initializing your protocol, it is recommended to wait to receive a message
|
||||
containing the atom 'shoot', as it will ensure Cowboy has been able to fully
|
||||
initialize the socket. Anything you do past this point is up to you!
|
||||
|
||||
You should definitely look at the cowboy_http_protocol module for a great
|
||||
example of fast request handling if you need to. Otherwise it's probably
|
||||
|
@ -231,7 +235,7 @@ safe to use `{active, once}` mode and handle everything as it comes.
|
|||
|
||||
Note that while you technically can run a protocol handler directly as a
|
||||
gen_server or a gen_fsm, it's probably not a good idea, as the only call
|
||||
you'll ever receive from Cowboy is the start_link/3 call. On the other
|
||||
you'll ever receive from Cowboy is the start_link/4 call. On the other
|
||||
hand, feel free to write a very basic protocol handler which then forwards
|
||||
requests to a gen_server or gen_fsm. By doing so however you must take
|
||||
care to supervise their processes as Cowboy only knows about the protocol
|
||||
|
|
|
@ -36,10 +36,11 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) ->
|
|||
case Transport:accept(LSocket, 2000) of
|
||||
{ok, CSocket} ->
|
||||
{ok, Pid} = supervisor:start_child(ReqsSup,
|
||||
[CSocket, Transport, Protocol, Opts]),
|
||||
[ListenerPid, CSocket, Transport, Protocol, Opts]),
|
||||
Transport:controlling_process(CSocket, Pid),
|
||||
{ok, NbConns} = cowboy_listener:add_connection(ListenerPid,
|
||||
default, Pid),
|
||||
Pid ! shoot,
|
||||
limit_reqs(ListenerPid, NbConns, MaxConns);
|
||||
{error, timeout} ->
|
||||
ignore;
|
||||
|
|
|
@ -31,12 +31,13 @@
|
|||
%% @see cowboy_http_handler
|
||||
-module(cowboy_http_protocol).
|
||||
|
||||
-export([start_link/3]). %% API.
|
||||
-export([init/3, parse_request/1]). %% FSM.
|
||||
-export([start_link/4]). %% API.
|
||||
-export([init/4, parse_request/1]). %% FSM.
|
||||
|
||||
-include("include/http.hrl").
|
||||
|
||||
-record(state, {
|
||||
listener :: pid(),
|
||||
socket :: inet:socket(),
|
||||
transport :: module(),
|
||||
dispatch :: cowboy_dispatcher:dispatch_rules(),
|
||||
|
@ -51,20 +52,21 @@
|
|||
%% API.
|
||||
|
||||
%% @doc Start an HTTP protocol process.
|
||||
-spec start_link(inet:socket(), module(), any()) -> {ok, pid()}.
|
||||
start_link(Socket, Transport, Opts) ->
|
||||
Pid = spawn_link(?MODULE, init, [Socket, Transport, Opts]),
|
||||
-spec start_link(pid(), inet:socket(), module(), any()) -> {ok, pid()}.
|
||||
start_link(ListenerPid, Socket, Transport, Opts) ->
|
||||
Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, Opts]),
|
||||
{ok, Pid}.
|
||||
|
||||
%% FSM.
|
||||
|
||||
%% @private
|
||||
-spec init(inet:socket(), module(), any()) -> ok.
|
||||
init(Socket, Transport, Opts) ->
|
||||
-spec init(pid(), inet:socket(), module(), any()) -> ok.
|
||||
init(ListenerPid, Socket, Transport, Opts) ->
|
||||
Dispatch = proplists:get_value(dispatch, Opts, []),
|
||||
MaxEmptyLines = proplists:get_value(max_empty_lines, Opts, 5),
|
||||
Timeout = proplists:get_value(timeout, Opts, 5000),
|
||||
wait_request(#state{socket=Socket, transport=Transport,
|
||||
receive shoot -> ok end,
|
||||
wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport,
|
||||
dispatch=Dispatch, max_empty_lines=MaxEmptyLines, timeout=Timeout}).
|
||||
|
||||
%% @private
|
||||
|
@ -189,14 +191,14 @@ dispatch(Req=#http_req{host=Host, path=Path},
|
|||
end.
|
||||
|
||||
-spec handler_init(#http_req{}, #state{}) -> ok.
|
||||
handler_init(Req, State=#state{
|
||||
handler_init(Req, State=#state{listener=ListenerPid,
|
||||
transport=Transport, handler={Handler, Opts}}) ->
|
||||
try Handler:init({Transport:name(), http}, Req, Opts) of
|
||||
{ok, Req2, HandlerState} ->
|
||||
handler_loop(HandlerState, Req2, State);
|
||||
%% @todo {upgrade, transport, Module}
|
||||
{upgrade, protocol, Module} ->
|
||||
Module:upgrade(Handler, Opts, Req)
|
||||
Module:upgrade(ListenerPid, Handler, Opts, Req)
|
||||
catch Class:Reason ->
|
||||
error_terminate(500, State),
|
||||
error_logger:error_msg(
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
%% </ul>
|
||||
-module(cowboy_http_websocket).
|
||||
|
||||
-export([upgrade/3]). %% API.
|
||||
-export([upgrade/4]). %% API.
|
||||
-export([handler_loop/4]). %% Internal.
|
||||
|
||||
-include("include/http.hrl").
|
||||
|
@ -45,8 +45,9 @@
|
|||
%% You do not need to call this function manually. To upgrade to the WebSocket
|
||||
%% protocol, you simply need to return <em>{upgrade, protocol, {@module}}</em>
|
||||
%% in your <em>cowboy_http_handler:init/3</em> handler function.
|
||||
-spec upgrade(module(), any(), #http_req{}) -> ok.
|
||||
upgrade(Handler, Opts, Req) ->
|
||||
-spec upgrade(pid(), module(), any(), #http_req{}) -> ok.
|
||||
upgrade(ListenerPid, Handler, Opts, Req) ->
|
||||
cowboy_listener:move_connection(ListenerPid, websocket, self()),
|
||||
EOP = binary:compile_pattern(<< 255 >>),
|
||||
case catch websocket_upgrade(#state{handler=Handler, opts=Opts, eop=EOP}, Req) of
|
||||
{ok, State, Req2} -> handler_init(State, Req2);
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/0, stop/1,
|
||||
add_connection/3, remove_connection/2, wait/3]). %% API.
|
||||
add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API.
|
||||
-export([init/1, handle_call/3, handle_cast/2,
|
||||
handle_info/2, terminate/2, code_change/3]). %% gen_server.
|
||||
|
||||
|
@ -54,6 +54,11 @@ stop(ServerPid) ->
|
|||
add_connection(ServerPid, Pool, ConnPid) ->
|
||||
gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).
|
||||
|
||||
%% @doc Move a connection from one pool to another.
|
||||
-spec move_connection(pid(), atom(), pid()) -> ok.
|
||||
move_connection(ServerPid, DestPool, ConnPid) ->
|
||||
gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
|
||||
|
||||
%% @doc Remove the given connection from its pool.
|
||||
-spec remove_connection(pid(), pid()) -> ok.
|
||||
remove_connection(ServerPid, ConnPid) ->
|
||||
|
@ -107,6 +112,18 @@ handle_call(_Request, _From, State) ->
|
|||
|
||||
%% @private
|
||||
-spec handle_cast(_, State) -> {noreply, State}.
|
||||
handle_cast({move_connection, DestPool, ConnPid}, State=#state{
|
||||
req_pools=Pools, reqs_table=ReqsTable}) ->
|
||||
{MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2),
|
||||
ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}),
|
||||
{SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
|
||||
DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
|
||||
false -> 1;
|
||||
{DestPool, NbConns} -> NbConns + 1
|
||||
end,
|
||||
Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
|
||||
Pools3 = [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2],
|
||||
{noreply, State#state{req_pools=Pools3}};
|
||||
handle_cast({remove_connection, ConnPid}, State) ->
|
||||
State2 = remove_pid(ConnPid, State),
|
||||
{noreply, State2};
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
-module(cowboy_requests_sup).
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/0, start_request/4]). %% API.
|
||||
-export([start_link/0, start_request/5]). %% API.
|
||||
-export([init/1]). %% supervisor.
|
||||
|
||||
%% API.
|
||||
|
@ -25,9 +25,10 @@
|
|||
start_link() ->
|
||||
supervisor:start_link(?MODULE, []).
|
||||
|
||||
-spec start_request(inet:socket(), module(), module(), any()) -> {ok, pid()}.
|
||||
start_request(Socket, Transport, Protocol, Opts) ->
|
||||
Protocol:start_link(Socket, Transport, Opts).
|
||||
-spec start_request(pid(), inet:socket(), module(), module(), any())
|
||||
-> {ok, pid()}.
|
||||
start_request(ListenerPid, Socket, Transport, Protocol, Opts) ->
|
||||
Protocol:start_link(ListenerPid, Socket, Transport, Opts).
|
||||
|
||||
%% supervisor.
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue