mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-15 04:30:25 +00:00
Allow passing options to sub protocols
Before this commit we had an issue where configuring a Websocket connection was simply not possible without doing magic, adding callbacks or extra return values. The init/2 function only allowed setting hibernate and timeout options. After this commit, when switching to a different type of handler you can either return {module, Req, State} or {module, Req, State, Opts} where Opts is any value (as far as the sub protocol interface is concerned) and is ultimately checked by the custom handlers. A large protocol like Websocket would accept only a map there, with many different options, while a small interface like loop handlers would allow passing hibernate and nothing else. For Websocket, hibernate must be set from the websocket_init/1 callback, because init/2 executes in a separate process. Sub protocols now have two callbacks: one with the Opts value, one without. The loop handler code was largely reworked and simplified. It does not need to manage a timeout or read from the socket anymore, it's the job of the protocol code. A lot of unnecessary stuff was therefore removed. Websocket compression must now be enabled from the handler options instead of per listener. This means that a project can have two separate Websocket handlers with different options. Compression is still disabled by default, and the idle_timeout value was changed from inifnity to 60000 (60 seconds), as that's safer and is also a good value for mobile devices.
This commit is contained in:
parent
80f8cda7ff
commit
a45813c60f
25 changed files with 171 additions and 244 deletions
|
@ -25,9 +25,7 @@
|
|||
|
||||
-callback init(Req, any())
|
||||
-> {ok | module(), Req, any()}
|
||||
| {module(), Req, any(), hibernate}
|
||||
| {module(), Req, any(), timeout()}
|
||||
| {module(), Req, any(), timeout(), hibernate}
|
||||
| {module(), Req, any(), any()}
|
||||
when Req::cowboy_req:req().
|
||||
|
||||
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
|
||||
|
@ -41,13 +39,9 @@ execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) ->
|
|||
Result = terminate(normal, Req2, State, Handler),
|
||||
{ok, Req2, Env#{result => Result}};
|
||||
{Mod, Req2, State} ->
|
||||
Mod:upgrade(Req2, Env, Handler, State, infinity, run);
|
||||
{Mod, Req2, State, hibernate} ->
|
||||
Mod:upgrade(Req2, Env, Handler, State, infinity, hibernate);
|
||||
{Mod, Req2, State, Timeout} ->
|
||||
Mod:upgrade(Req2, Env, Handler, State, Timeout, run);
|
||||
{Mod, Req2, State, Timeout, hibernate} ->
|
||||
Mod:upgrade(Req2, Env, Handler, State, Timeout, hibernate)
|
||||
Mod:upgrade(Req2, Env, Handler, State);
|
||||
{Mod, Req2, State, Opts} ->
|
||||
Mod:upgrade(Req2, Env, Handler, State, Opts)
|
||||
catch Class:Reason ->
|
||||
terminate({crash, Class, Reason}, Req, HandlerOpts, Handler),
|
||||
erlang:raise(Class, Reason, erlang:get_stacktrace())
|
||||
|
|
|
@ -136,11 +136,10 @@ init(Parent, Ref, Socket, Transport, Opts) ->
|
|||
%% Timeouts:
|
||||
%% - waiting for new request (if no stream is currently running)
|
||||
%% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state
|
||||
%% - waiting for body (if a stream requested the body to be read)
|
||||
%% -> read_body_timeout: amount of time we wait without receiving any data when reading the body
|
||||
%% - waiting for new request, or body (when a stream is currently running)
|
||||
%% -> idle_timeout: amount of time we wait without receiving any data
|
||||
%% - if we skip the body, skip only for a specific duration
|
||||
%% -> skip_body_timeout: also have a skip_body_length
|
||||
%% - none if we have a stream running and it didn't request the body to be read
|
||||
%% - global
|
||||
%% -> inactivity_timeout: max time to wait without anything happening before giving up
|
||||
|
||||
|
|
|
@ -12,27 +12,18 @@
|
|||
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
%% When using loop handlers, we are receiving data from the socket because we
|
||||
%% want to know when the socket gets closed. This is generally not an issue
|
||||
%% because these kinds of requests are generally not pipelined, and don't have
|
||||
%% a body. If they do have a body, this body is often read in the
|
||||
%% <em>init/2</em> callback and this is no problem. Otherwise, this data
|
||||
%% accumulates in a buffer until we reach a certain threshold of 5000 bytes
|
||||
%% by default. This can be configured through the <em>loop_max_buffer</em>
|
||||
%% environment value. The request will be terminated with an
|
||||
%% <em>{error, overflow}</em> reason if this threshold is reached.
|
||||
-module(cowboy_loop).
|
||||
-behaviour(cowboy_sub_protocol).
|
||||
|
||||
-export([upgrade/6]).
|
||||
-export([upgrade/4]).
|
||||
-export([upgrade/5]).
|
||||
-export([loop/4]).
|
||||
|
||||
-callback init(Req, any())
|
||||
-> {ok | module(), Req, any()}
|
||||
| {module(), Req, any(), hibernate}
|
||||
| {module(), Req, any(), timeout()}
|
||||
| {module(), Req, any(), timeout(), hibernate}
|
||||
| {module(), Req, any(), any()}
|
||||
when Req::cowboy_req:req().
|
||||
|
||||
-callback info(any(), Req, State)
|
||||
-> {ok, Req, State}
|
||||
| {ok, Req, State, hibernate}
|
||||
|
@ -42,97 +33,44 @@
|
|||
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
|
||||
-optional_callbacks([terminate/3]).
|
||||
|
||||
-record(state, {
|
||||
env :: cowboy_middleware:env(),
|
||||
hibernate = false :: boolean(),
|
||||
buffer_size = 0 :: non_neg_integer(),
|
||||
max_buffer = 5000 :: non_neg_integer() | infinity,
|
||||
timeout = infinity :: timeout(),
|
||||
timeout_ref = undefined :: undefined | reference()
|
||||
}).
|
||||
|
||||
-spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
|
||||
-> {ok, Req, Env} | {suspend, module(), atom(), [any()]}
|
||||
-spec upgrade(Req, Env, module(), any())
|
||||
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
upgrade(Req, Env, Handler, HandlerState, Timeout, Hibernate) ->
|
||||
State = #state{env=Env, max_buffer=get_max_buffer(Env), timeout=Timeout,
|
||||
hibernate=Hibernate =:= hibernate},
|
||||
State2 = timeout(State),
|
||||
before_loop(Req, State2, Handler, HandlerState).
|
||||
upgrade(Req, Env, Handler, HandlerState) ->
|
||||
loop(Req, Env, Handler, HandlerState).
|
||||
|
||||
get_max_buffer(#{loop_max_buffer := MaxBuffer}) -> MaxBuffer;
|
||||
get_max_buffer(_) -> 5000.
|
||||
-spec upgrade(Req, Env, module(), any(), hibernate)
|
||||
-> {suspend, ?MODULE, loop, [any()]}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
upgrade(Req, Env, Handler, HandlerState, hibernate) ->
|
||||
suspend(Req, Env, Handler, HandlerState).
|
||||
|
||||
before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) ->
|
||||
|
||||
%% @todo Yeah we can't get the socket anymore.
|
||||
%% Everything changes since we are a separate process now.
|
||||
%% Proper flow control at the connection level should be implemented
|
||||
%% instead of what we have here.
|
||||
|
||||
% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
|
||||
% Transport:setopts(Socket, [{active, once}]),
|
||||
{suspend, ?MODULE, loop, [Req, State#state{hibernate=false}, Handler, HandlerState]};
|
||||
before_loop(Req, State, Handler, HandlerState) ->
|
||||
|
||||
%% Same here.
|
||||
|
||||
% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
|
||||
% Transport:setopts(Socket, [{active, once}]),
|
||||
loop(Req, State, Handler, HandlerState).
|
||||
|
||||
%% Almost the same code can be found in cowboy_websocket.
|
||||
timeout(State=#state{timeout=infinity}) ->
|
||||
State#state{timeout_ref=undefined};
|
||||
timeout(State=#state{timeout=Timeout,
|
||||
timeout_ref=PrevRef}) ->
|
||||
_ = case PrevRef of
|
||||
undefined -> ignore%;
|
||||
% @todo PrevRef -> erlang:cancel_timer(PrevRef)
|
||||
end,
|
||||
TRef = erlang:start_timer(Timeout, self(), ?MODULE),
|
||||
State#state{timeout_ref=TRef}.
|
||||
|
||||
-spec loop(Req, #state{}, module(), any())
|
||||
-> {ok, Req, cowboy_middleware:env()} | {suspend, module(), atom(), [any()]}
|
||||
when Req::cowboy_req:req().
|
||||
loop(Req, State=#state{timeout_ref=TRef}, Handler, HandlerState) ->
|
||||
-spec loop(Req, Env, module(), any())
|
||||
-> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
%% @todo Handle system messages.
|
||||
loop(Req, Env, Handler, HandlerState) ->
|
||||
receive
|
||||
{timeout, TRef, ?MODULE} ->
|
||||
terminate(Req, State, Handler, HandlerState, timeout);
|
||||
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
|
||||
loop(Req, State, Handler, HandlerState);
|
||||
Message ->
|
||||
call(Req, State, Handler, HandlerState, Message)
|
||||
call(Req, Env, Handler, HandlerState, Message)
|
||||
end.
|
||||
|
||||
call(Req, State, Handler, HandlerState, Message) ->
|
||||
try Handler:info(Message, Req, HandlerState) of
|
||||
{ok, Req2, HandlerState2} ->
|
||||
before_loop(Req2, State, Handler, HandlerState2);
|
||||
{ok, Req2, HandlerState2, hibernate} ->
|
||||
before_loop(Req2, State#state{hibernate=true}, Handler, HandlerState2);
|
||||
{stop, Req2, HandlerState2} ->
|
||||
terminate(Req2, State, Handler, HandlerState2, stop)
|
||||
call(Req0, Env, Handler, HandlerState0, Message) ->
|
||||
try Handler:info(Message, Req0, HandlerState0) of
|
||||
{ok, Req, HandlerState} ->
|
||||
loop(Req, Env, Handler, HandlerState);
|
||||
{ok, Req, HandlerState, hibernate} ->
|
||||
suspend(Req, Env, Handler, HandlerState);
|
||||
{stop, Req, HandlerState} ->
|
||||
terminate(Req, Env, Handler, HandlerState, stop)
|
||||
catch Class:Reason ->
|
||||
cowboy_handler:terminate({crash, Class, Reason}, Req, HandlerState, Handler),
|
||||
cowboy_handler:terminate({crash, Class, Reason}, Req0, HandlerState0, Handler),
|
||||
erlang:raise(Class, Reason, erlang:get_stacktrace())
|
||||
end.
|
||||
|
||||
terminate(Req, #state{env=Env, timeout_ref=TRef},
|
||||
Handler, HandlerState, Reason) ->
|
||||
_ = case TRef of
|
||||
undefined -> ignore;
|
||||
TRef -> erlang:cancel_timer(TRef)
|
||||
end,
|
||||
flush_timeouts(),
|
||||
suspend(Req, Env, Handler, HandlerState) ->
|
||||
{suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState]}.
|
||||
|
||||
terminate(Req, Env, Handler, HandlerState, Reason) ->
|
||||
Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
|
||||
{ok, Req, Env#{result => Result}}.
|
||||
|
||||
flush_timeouts() ->
|
||||
receive
|
||||
{timeout, TRef, ?MODULE} when is_reference(TRef) ->
|
||||
flush_timeouts()
|
||||
after 0 ->
|
||||
ok
|
||||
end.
|
||||
|
|
|
@ -17,15 +17,14 @@
|
|||
-module(cowboy_rest).
|
||||
-behaviour(cowboy_sub_protocol).
|
||||
|
||||
-export([upgrade/6]).
|
||||
-export([upgrade/4]).
|
||||
-export([upgrade/5]).
|
||||
|
||||
%% Common handler callbacks.
|
||||
|
||||
-callback init(Req, any())
|
||||
-> {ok | module(), Req, any()}
|
||||
| {module(), Req, any(), hibernate}
|
||||
| {module(), Req, any(), timeout()}
|
||||
| {module(), Req, any(), timeout(), hibernate}
|
||||
| {module(), Req, any(), any()}
|
||||
when Req::cowboy_req:req().
|
||||
|
||||
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
|
||||
|
@ -232,14 +231,20 @@
|
|||
expires :: undefined | no_call | calendar:datetime() | binary()
|
||||
}).
|
||||
|
||||
-spec upgrade(Req, Env, module(), any(), infinity, run)
|
||||
-spec upgrade(Req, Env, module(), any())
|
||||
-> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
upgrade(Req0, Env, Handler, HandlerState, infinity, run) ->
|
||||
upgrade(Req0, Env, Handler, HandlerState) ->
|
||||
Method = cowboy_req:method(Req0),
|
||||
{ok, Req, Result} = service_available(Req0, #state{method=Method,
|
||||
handler=Handler, handler_state=HandlerState}),
|
||||
{ok, Req, Env#{result => Result}}.
|
||||
|
||||
-spec upgrade(Req, Env, module(), any(), any())
|
||||
-> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
%% cowboy_rest takes no options.
|
||||
upgrade(Req, Env, Handler, HandlerState, _Opts) ->
|
||||
upgrade(Req, Env, Handler, HandlerState).
|
||||
|
||||
service_available(Req, State) ->
|
||||
expect(Req, State, service_available, true, fun known_methods/2, 503).
|
||||
|
||||
|
|
|
@ -15,6 +15,10 @@
|
|||
|
||||
-module(cowboy_sub_protocol).
|
||||
|
||||
-callback upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
|
||||
-callback upgrade(Req, Env, module(), any())
|
||||
-> {ok, Req, Env} | {suspend, module(), atom(), [any()]} | {stop, Req}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
|
||||
-callback upgrade(Req, Env, module(), any(), any())
|
||||
-> {ok, Req, Env} | {suspend, module(), atom(), [any()]} | {stop, Req}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
-module(cowboy_websocket).
|
||||
-behaviour(cowboy_sub_protocol).
|
||||
|
||||
-export([upgrade/6]).
|
||||
-export([upgrade/4]).
|
||||
-export([upgrade/5]).
|
||||
-export([takeover/7]).
|
||||
-export([handler_loop/3]).
|
||||
|
||||
|
@ -34,9 +35,7 @@
|
|||
|
||||
-callback init(Req, any())
|
||||
-> {ok | module(), Req, any()}
|
||||
| {module(), Req, any(), hibernate}
|
||||
| {module(), Req, any(), timeout()}
|
||||
| {module(), Req, any(), timeout(), hibernate}
|
||||
| {module(), Req, any(), any()}
|
||||
when Req::cowboy_req:req().
|
||||
|
||||
-callback websocket_init(State)
|
||||
|
@ -53,6 +52,12 @@
|
|||
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
|
||||
-optional_callbacks([terminate/3]).
|
||||
|
||||
-type opts() :: #{
|
||||
idle_timeout => timeout(),
|
||||
compress => boolean()
|
||||
}.
|
||||
-export_type([opts/0]).
|
||||
|
||||
-record(state, {
|
||||
socket = undefined :: inet:socket() | undefined,
|
||||
transport = undefined :: module(),
|
||||
|
@ -60,6 +65,7 @@
|
|||
key = undefined :: undefined | binary(),
|
||||
timeout = infinity :: timeout(),
|
||||
timeout_ref = undefined :: undefined | reference(),
|
||||
compress = false :: boolean(),
|
||||
messages = undefined :: undefined | {atom(), atom(), atom()},
|
||||
hibernate = false :: boolean(),
|
||||
frag_state = undefined :: cow_ws:frag_state(),
|
||||
|
@ -70,14 +76,22 @@
|
|||
|
||||
%% Stream process.
|
||||
|
||||
-spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
|
||||
-spec upgrade(Req, Env, module(), any())
|
||||
-> {ok, Req, Env}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
upgrade(Req, Env, Handler, HandlerState) ->
|
||||
upgrade(Req, Env, Handler, HandlerState, #{}).
|
||||
|
||||
-spec upgrade(Req, Env, module(), any(), opts())
|
||||
-> {ok, Req, Env}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
%% @todo Immediately crash if a response has already been sent.
|
||||
%% @todo Error out if HTTP/2.
|
||||
upgrade(Req0, Env, Handler, HandlerState, Timeout, Hibernate) ->
|
||||
try websocket_upgrade(#state{handler=Handler, timeout=Timeout,
|
||||
hibernate=Hibernate =:= hibernate}, Req0) of
|
||||
upgrade(Req0, Env, Handler, HandlerState, Opts) ->
|
||||
Timeout = maps:get(idle_timeout, Opts, 60000),
|
||||
Compress = maps:get(compress, Opts, false),
|
||||
State0 = #state{handler=Handler, timeout=Timeout, compress=Compress},
|
||||
try websocket_upgrade(State0, Req0) of
|
||||
{ok, State, Req} ->
|
||||
websocket_handshake(State, Req, HandlerState, Env)
|
||||
catch _:_ ->
|
||||
|
@ -104,14 +118,13 @@ websocket_upgrade(State, Req) ->
|
|||
|
||||
-spec websocket_extensions(#state{}, Req)
|
||||
-> {ok, #state{}, Req} when Req::cowboy_req:req().
|
||||
websocket_extensions(State, Req=#{ref := Ref}) ->
|
||||
websocket_extensions(State=#state{compress=Compress}, Req) ->
|
||||
%% @todo We want different options for this. For example
|
||||
%% * compress everything auto
|
||||
%% * compress only text auto
|
||||
%% * compress only binary auto
|
||||
%% * compress nothing auto (but still enabled it)
|
||||
%% * disable compression
|
||||
Compress = maps:get(websocket_compress, ranch:get_protocol_options(Ref), false),
|
||||
case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of
|
||||
{true, Extensions} when Extensions =/= undefined ->
|
||||
websocket_extensions(State, Req, Extensions, []);
|
||||
|
@ -170,6 +183,7 @@ websocket_handshake(State=#state{key=Key},
|
|||
{#state{}, any()}) -> ok.
|
||||
takeover(_Parent, Ref, Socket, Transport, _Opts, Buffer,
|
||||
{State0=#state{handler=Handler}, HandlerState}) ->
|
||||
%% @todo We should have an option to disable this behavior.
|
||||
ranch:remove_connection(Ref),
|
||||
State1 = handler_loop_timeout(State0#state{socket=Socket, transport=Transport}),
|
||||
State = State1#state{key=undefined, messages=Transport:messages()},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue