0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Experiment with {active,N}

This commit is contained in:
Loïc Hoguin 2019-12-04 11:17:34 +01:00
parent 63b17e4edf
commit cd7870df15
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
2 changed files with 26 additions and 12 deletions

View file

@ -29,10 +29,10 @@ dep_gun = git https://github.com/ninenines/gun master
dep_ci.erlang.mk = git https://github.com/ninenines/ci.erlang.mk master
DEP_EARLY_PLUGINS = ci.erlang.mk
AUTO_CI_OTP ?= OTP-LATEST-20+
AUTO_CI_OTP ?= OTP-LATEST-22+
AUTO_CI_HIPE ?= OTP-LATEST
# AUTO_CI_ERLLVM ?= OTP-LATEST
AUTO_CI_WINDOWS ?= OTP-LATEST-20+
AUTO_CI_WINDOWS ?= OTP-LATEST-22+
# Standard targets.

View file

@ -125,6 +125,9 @@
timer = undefined :: undefined | reference(),
%% Whether we are currently receiving data from the socket.
active = false :: boolean(),
%% Identifier for the stream currently being read (or waiting to be received).
in_streamid = 1 :: pos_integer(),
@ -189,12 +192,21 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
end.
active(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, 100}]),
State#state{active=true}.
passive(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, false}]),
State#state{active=false}.
before_loop(State=#state{active=true}) ->
loop(State);
%% Do not read from the socket unless flow is large enough.
before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
loop(State);
before_loop(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, once}]),
loop(State).
before_loop(State) ->
loop(active(State)).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
@ -213,6 +225,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
{Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
loop(active(State));
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
@ -939,8 +953,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
State#state{streams=Streams};
%% Read the request body.
commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
[{flow, Size}|Tail]) ->
commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) ->
%% We must read *at least* Size of data otherwise functions
%% like cowboy_req:read_body/1,2 will wait indefinitely.
Flow = if
@ -948,11 +961,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
true -> Flow0 + Size
end,
%% Reenable active mode if necessary.
_ = if
State = if
Flow0 =< 0, Flow > 0 ->
Transport:setopts(Socket, [{active, once}]);
active(State0);
true ->
ok
State0
end,
commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
@ -1122,14 +1135,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
State = cancel_timeout(State0),
State1 = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
%% from the socket, otherwise the data might be receive before the
%% call to flush/0 and we end up inadvertently dropping a packet.
%%
%% @todo Handle cases where the request came with a body. We need
%% to process or skip the body before the upgrade can be completed.
Transport:setopts(Socket, [{active, false}]),
State = passive(State1),
%% Send a 101 response if necessary, then terminate the stream.
#state{streams=Streams} = case OutState of
wait -> info(State, StreamID, {inform, 101, Headers});
@ -1432,6 +1445,7 @@ terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef
Messages = Transport:messages(),
%% We may already have a message in the mailbox when we do this
%% but it's OK because we are shutting down anyway.
%% @todo Use active,N here as well.
case Transport:setopts(Socket, [{active, once}]) of
ok ->
receive