mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Better handle socket closing with loop handlers
We now read from the socket to be able to detect errors or TCP close events, and buffer the data if any. Once the data receive goes over a certain limit, which defaults to 5000 bytes, we simply close the connection with an {error, overflow} reason.
This commit is contained in:
parent
65ed13d2da
commit
40b8d0befc
4 changed files with 84 additions and 17 deletions
|
@ -18,6 +18,16 @@
|
||||||
%% environment values. The result of this execution is added to the
|
%% environment values. The result of this execution is added to the
|
||||||
%% environment under the <em>result</em> value.
|
%% environment under the <em>result</em> value.
|
||||||
%%
|
%%
|
||||||
|
%% When using loop handlers, we are receiving data from the socket because we
|
||||||
|
%% want to know when the socket gets closed. This is generally not an issue
|
||||||
|
%% because these kinds of requests are generally not pipelined, and don't have
|
||||||
|
%% a body. If they do have a body, this body is often read in the
|
||||||
|
%% <em>init/3</em> callback and this is no problem. Otherwise, this data
|
||||||
|
%% accumulates in a buffer until we reach a certain threshold of 5000 bytes
|
||||||
|
%% by default. This can be configured through the <em>loop_max_buffer</em>
|
||||||
|
%% environment value. The request will be terminated with an
|
||||||
|
%% <em>{error, overflow}</em> reason if this threshold is reached.
|
||||||
|
%%
|
||||||
%% @see cowboy_http_handler
|
%% @see cowboy_http_handler
|
||||||
-module(cowboy_handler).
|
-module(cowboy_handler).
|
||||||
-behaviour(cowboy_middleware).
|
-behaviour(cowboy_middleware).
|
||||||
|
@ -28,8 +38,10 @@
|
||||||
-record(state, {
|
-record(state, {
|
||||||
env :: cowboy_middleware:env(),
|
env :: cowboy_middleware:env(),
|
||||||
hibernate = false :: boolean(),
|
hibernate = false :: boolean(),
|
||||||
|
loop_buffer_size = 0 :: non_neg_integer(),
|
||||||
|
loop_max_buffer = 5000 :: non_neg_integer() | infinity,
|
||||||
loop_timeout = infinity :: timeout(),
|
loop_timeout = infinity :: timeout(),
|
||||||
loop_timeout_ref :: undefined | reference(),
|
loop_timeout_ref = undefined :: undefined | reference(),
|
||||||
resp_sent = false :: boolean()
|
resp_sent = false :: boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -41,7 +53,12 @@
|
||||||
execute(Req, Env) ->
|
execute(Req, Env) ->
|
||||||
{_, Handler} = lists:keyfind(handler, 1, Env),
|
{_, Handler} = lists:keyfind(handler, 1, Env),
|
||||||
{_, HandlerOpts} = lists:keyfind(handler_opts, 1, Env),
|
{_, HandlerOpts} = lists:keyfind(handler_opts, 1, Env),
|
||||||
handler_init(Req, #state{env=Env}, Handler, HandlerOpts).
|
case lists:keyfind(loop_max_buffer, 1, Env) of
|
||||||
|
false -> MaxBuffer = 5000, ok;
|
||||||
|
{_, MaxBuffer} -> ok
|
||||||
|
end,
|
||||||
|
handler_init(Req, #state{env=Env, loop_max_buffer=MaxBuffer},
|
||||||
|
Handler, HandlerOpts).
|
||||||
|
|
||||||
-spec handler_init(Req, #state{}, module(), any())
|
-spec handler_init(Req, #state{}, module(), any())
|
||||||
-> {ok, Req, cowboy_middleware:env()}
|
-> {ok, Req, cowboy_middleware:env()}
|
||||||
|
@ -53,17 +70,17 @@ handler_init(Req, State, Handler, HandlerOpts) ->
|
||||||
{ok, Req2, HandlerState} ->
|
{ok, Req2, HandlerState} ->
|
||||||
handler_handle(Req2, State, Handler, HandlerState);
|
handler_handle(Req2, State, Handler, HandlerState);
|
||||||
{loop, Req2, HandlerState} ->
|
{loop, Req2, HandlerState} ->
|
||||||
handler_before_loop(Req2, State#state{hibernate=false},
|
handler_before_loop(Req2, State, Handler, HandlerState);
|
||||||
Handler, HandlerState);
|
|
||||||
{loop, Req2, HandlerState, hibernate} ->
|
{loop, Req2, HandlerState, hibernate} ->
|
||||||
handler_before_loop(Req2, State#state{hibernate=true},
|
handler_before_loop(Req2, State#state{hibernate=true},
|
||||||
Handler, HandlerState);
|
Handler, HandlerState);
|
||||||
{loop, Req2, HandlerState, Timeout} ->
|
{loop, Req2, HandlerState, Timeout} ->
|
||||||
handler_before_loop(Req2, State#state{loop_timeout=Timeout},
|
State2 = handler_loop_timeout(State#state{loop_timeout=Timeout}),
|
||||||
Handler, HandlerState);
|
handler_before_loop(Req2, State2, Handler, HandlerState);
|
||||||
{loop, Req2, HandlerState, Timeout, hibernate} ->
|
{loop, Req2, HandlerState, Timeout, hibernate} ->
|
||||||
handler_before_loop(Req2, State#state{
|
State2 = handler_loop_timeout(State#state{
|
||||||
hibernate=true, loop_timeout=Timeout}, Handler, HandlerState);
|
hibernate=true, loop_timeout=Timeout}),
|
||||||
|
handler_before_loop(Req2, State2, Handler, HandlerState);
|
||||||
{shutdown, Req2, HandlerState} ->
|
{shutdown, Req2, HandlerState} ->
|
||||||
terminate_request(Req2, State, Handler, HandlerState,
|
terminate_request(Req2, State, Handler, HandlerState,
|
||||||
{normal, shutdown});
|
{normal, shutdown});
|
||||||
|
@ -123,12 +140,14 @@ handler_handle(Req, State, Handler, HandlerState) ->
|
||||||
| {error, 500, Req} | {suspend, module(), function(), [any()]}
|
| {error, 500, Req} | {suspend, module(), function(), [any()]}
|
||||||
when Req::cowboy_req:req().
|
when Req::cowboy_req:req().
|
||||||
handler_before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) ->
|
handler_before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) ->
|
||||||
State2 = handler_loop_timeout(State),
|
[Socket, Transport] = cowboy_req:get([socket, transport], Req),
|
||||||
|
Transport:setopts(Socket, [{active, once}]),
|
||||||
{suspend, ?MODULE, handler_loop,
|
{suspend, ?MODULE, handler_loop,
|
||||||
[Req, State2#state{hibernate=false}, Handler, HandlerState]};
|
[Req, State#state{hibernate=false}, Handler, HandlerState]};
|
||||||
handler_before_loop(Req, State, Handler, HandlerState) ->
|
handler_before_loop(Req, State, Handler, HandlerState) ->
|
||||||
State2 = handler_loop_timeout(State),
|
[Socket, Transport] = cowboy_req:get([socket, transport], Req),
|
||||||
handler_loop(Req, State2, Handler, HandlerState).
|
Transport:setopts(Socket, [{active, once}]),
|
||||||
|
handler_loop(Req, State, Handler, HandlerState).
|
||||||
|
|
||||||
%% Almost the same code can be found in cowboy_websocket.
|
%% Almost the same code can be found in cowboy_websocket.
|
||||||
-spec handler_loop_timeout(#state{}) -> #state{}.
|
-spec handler_loop_timeout(#state{}) -> #state{}.
|
||||||
|
@ -136,8 +155,10 @@ handler_loop_timeout(State=#state{loop_timeout=infinity}) ->
|
||||||
State#state{loop_timeout_ref=undefined};
|
State#state{loop_timeout_ref=undefined};
|
||||||
handler_loop_timeout(State=#state{loop_timeout=Timeout,
|
handler_loop_timeout(State=#state{loop_timeout=Timeout,
|
||||||
loop_timeout_ref=PrevRef}) ->
|
loop_timeout_ref=PrevRef}) ->
|
||||||
_ = case PrevRef of undefined -> ignore; PrevRef ->
|
_ = case PrevRef of
|
||||||
erlang:cancel_timer(PrevRef) end,
|
undefined -> ignore;
|
||||||
|
PrevRef -> erlang:cancel_timer(PrevRef)
|
||||||
|
end,
|
||||||
TRef = erlang:start_timer(Timeout, self(), ?MODULE),
|
TRef = erlang:start_timer(Timeout, self(), ?MODULE),
|
||||||
State#state{loop_timeout_ref=TRef}.
|
State#state{loop_timeout_ref=TRef}.
|
||||||
|
|
||||||
|
@ -146,16 +167,38 @@ handler_loop_timeout(State=#state{loop_timeout=Timeout,
|
||||||
-> {ok, Req, cowboy_middleware:env()}
|
-> {ok, Req, cowboy_middleware:env()}
|
||||||
| {error, 500, Req} | {suspend, module(), function(), [any()]}
|
| {error, 500, Req} | {suspend, module(), function(), [any()]}
|
||||||
when Req::cowboy_req:req().
|
when Req::cowboy_req:req().
|
||||||
handler_loop(Req, State=#state{loop_timeout_ref=TRef}, Handler, HandlerState) ->
|
handler_loop(Req, State=#state{loop_buffer_size=NbBytes,
|
||||||
|
loop_max_buffer=Threshold, loop_timeout_ref=TRef},
|
||||||
|
Handler, HandlerState) ->
|
||||||
|
[Socket, Transport] = cowboy_req:get([socket, transport], Req),
|
||||||
|
{OK, Closed, Error} = Transport:messages(),
|
||||||
receive
|
receive
|
||||||
|
{OK, Socket, Data} ->
|
||||||
|
NbBytes2 = NbBytes + byte_size(Data),
|
||||||
|
if NbBytes2 > Threshold ->
|
||||||
|
_ = handler_terminate(Req, Handler, HandlerState,
|
||||||
|
{error, overflow}),
|
||||||
|
error_terminate(Req, State);
|
||||||
|
true ->
|
||||||
|
Req2 = cowboy_req:append_buffer(Data, Req),
|
||||||
|
State2 = handler_loop_timeout(State#state{
|
||||||
|
loop_buffer_size=NbBytes2}),
|
||||||
|
handler_loop(Req2, State2, Handler, HandlerState)
|
||||||
|
end;
|
||||||
|
{Closed, Socket} ->
|
||||||
|
terminate_request(Req, State, Handler, HandlerState,
|
||||||
|
{error, closed});
|
||||||
|
{Error, Socket, Reason} ->
|
||||||
|
terminate_request(Req, State, Handler, HandlerState,
|
||||||
|
{error, Reason});
|
||||||
{cowboy_req, resp_sent} ->
|
{cowboy_req, resp_sent} ->
|
||||||
handler_loop(Req, State#state{resp_sent=true},
|
handler_before_loop(Req, State#state{resp_sent=true},
|
||||||
Handler, HandlerState);
|
Handler, HandlerState);
|
||||||
{timeout, TRef, ?MODULE} ->
|
{timeout, TRef, ?MODULE} ->
|
||||||
terminate_request(Req, State, Handler, HandlerState,
|
terminate_request(Req, State, Handler, HandlerState,
|
||||||
{normal, timeout});
|
{normal, timeout});
|
||||||
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
|
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
|
||||||
handler_loop(Req, State, Handler, HandlerState);
|
handler_before_loop(Req, State, Handler, HandlerState);
|
||||||
Message ->
|
Message ->
|
||||||
handler_call(Req, State, Handler, HandlerState, Message)
|
handler_call(Req, State, Handler, HandlerState, Message)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -104,6 +104,7 @@
|
||||||
-export([ensure_response/2]).
|
-export([ensure_response/2]).
|
||||||
|
|
||||||
%% Private setter/getter API.
|
%% Private setter/getter API.
|
||||||
|
-export([append_buffer/2]).
|
||||||
-export([get/2]).
|
-export([get/2]).
|
||||||
-export([set/2]).
|
-export([set/2]).
|
||||||
-export([set_bindings/4]).
|
-export([set_bindings/4]).
|
||||||
|
@ -1065,6 +1066,11 @@ ensure_response(#http_req{socket=Socket, transport=Transport,
|
||||||
|
|
||||||
%% Private setter/getter API.
|
%% Private setter/getter API.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
-spec append_buffer(binary(), Req) -> Req when Req::req().
|
||||||
|
append_buffer(Suffix, Req=#http_req{buffer=Buffer}) ->
|
||||||
|
Req#http_req{buffer= << Buffer/binary, Suffix/binary >>}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
-spec get(atom(), req()) -> any(); ([atom()], req()) -> any().
|
-spec get(atom(), req()) -> any(); ([atom()], req()) -> any().
|
||||||
get(List, Req) when is_list(List) ->
|
get(List, Req) when is_list(List) ->
|
||||||
|
|
|
@ -49,6 +49,7 @@
|
||||||
-export([onresponse_crash/1]).
|
-export([onresponse_crash/1]).
|
||||||
-export([onresponse_reply/1]).
|
-export([onresponse_reply/1]).
|
||||||
-export([pipeline/1]).
|
-export([pipeline/1]).
|
||||||
|
-export([pipeline_long_polling/1]).
|
||||||
-export([rest_bad_accept/1]).
|
-export([rest_bad_accept/1]).
|
||||||
-export([rest_created_path/1]).
|
-export([rest_created_path/1]).
|
||||||
-export([rest_expires/1]).
|
-export([rest_expires/1]).
|
||||||
|
@ -112,6 +113,7 @@ groups() ->
|
||||||
nc_rand,
|
nc_rand,
|
||||||
nc_zero,
|
nc_zero,
|
||||||
pipeline,
|
pipeline,
|
||||||
|
pipeline_long_polling,
|
||||||
rest_bad_accept,
|
rest_bad_accept,
|
||||||
rest_created_path,
|
rest_created_path,
|
||||||
rest_expires,
|
rest_expires,
|
||||||
|
@ -432,6 +434,8 @@ The document has moved
|
||||||
<A HREF=\"http://www.google.co.il/\">here</A>.
|
<A HREF=\"http://www.google.co.il/\">here</A>.
|
||||||
</BODY></HTML>",
|
</BODY></HTML>",
|
||||||
Tests = [
|
Tests = [
|
||||||
|
{102, <<"GET /long_polling HTTP/1.1\r\nHost: localhost\r\n"
|
||||||
|
"Content-Length: 5000\r\n\r\n", 0:5000/unit:8 >>},
|
||||||
{200, ["GET / HTTP/1.0\r\nHost: localhost\r\n"
|
{200, ["GET / HTTP/1.0\r\nHost: localhost\r\n"
|
||||||
"Set-Cookie: ", HugeCookie, "\r\n\r\n"]},
|
"Set-Cookie: ", HugeCookie, "\r\n\r\n"]},
|
||||||
{200, "\r\n\r\n\r\n\r\n\r\nGET / HTTP/1.1\r\nHost: localhost\r\n\r\n"},
|
{200, "\r\n\r\n\r\n\r\n\r\nGET / HTTP/1.1\r\nHost: localhost\r\n\r\n"},
|
||||||
|
@ -449,6 +453,8 @@ The document has moved
|
||||||
{408, "GET / HTTP/1.1\r\nHost: localhost\r\n\r"},
|
{408, "GET / HTTP/1.1\r\nHost: localhost\r\n\r"},
|
||||||
{414, Huge},
|
{414, Huge},
|
||||||
{400, "GET / HTTP/1.1\r\n" ++ Huge},
|
{400, "GET / HTTP/1.1\r\n" ++ Huge},
|
||||||
|
{500, <<"GET /long_polling HTTP/1.1\r\nHost: localhost\r\n"
|
||||||
|
"Content-Length: 100000\r\n\r\n", 0:100000/unit:8 >>},
|
||||||
{505, "GET / HTTP/1.2\r\nHost: localhost\r\n\r\n"},
|
{505, "GET / HTTP/1.2\r\nHost: localhost\r\n\r\n"},
|
||||||
{closed, ""},
|
{closed, ""},
|
||||||
{closed, "\r\n"},
|
{closed, "\r\n"},
|
||||||
|
@ -758,6 +764,16 @@ pipeline(Config) ->
|
||||||
{ok, 200, _, Client11} = cowboy_client:response(Client10),
|
{ok, 200, _, Client11} = cowboy_client:response(Client10),
|
||||||
{error, closed} = cowboy_client:response(Client11).
|
{error, closed} = cowboy_client:response(Client11).
|
||||||
|
|
||||||
|
pipeline_long_polling(Config) ->
|
||||||
|
Client = ?config(client, Config),
|
||||||
|
{ok, Client2} = cowboy_client:request(<<"GET">>,
|
||||||
|
build_url("/long_polling", Config), Client),
|
||||||
|
{ok, Client3} = cowboy_client:request(<<"GET">>,
|
||||||
|
build_url("/long_polling", Config), Client2),
|
||||||
|
{ok, 102, _, Client4} = cowboy_client:response(Client3),
|
||||||
|
{ok, 102, _, Client5} = cowboy_client:response(Client4),
|
||||||
|
{error, closed} = cowboy_client:response(Client5).
|
||||||
|
|
||||||
rest_bad_accept(Config) ->
|
rest_bad_accept(Config) ->
|
||||||
Client = ?config(client, Config),
|
Client = ?config(client, Config),
|
||||||
{ok, Client2} = cowboy_client:request(<<"GET">>,
|
{ok, Client2} = cowboy_client:request(<<"GET">>,
|
||||||
|
|
|
@ -19,4 +19,6 @@ info(timeout, Req, State) ->
|
||||||
{loop, Req, State - 1, hibernate}.
|
{loop, Req, State - 1, hibernate}.
|
||||||
|
|
||||||
terminate({normal, shutdown}, _, _) ->
|
terminate({normal, shutdown}, _, _) ->
|
||||||
|
ok;
|
||||||
|
terminate({error, overflow}, _, _) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue