mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Use cow_http2_machine's timer handling
This commit is contained in:
parent
7d118b547f
commit
f1018fd1c1
2 changed files with 26 additions and 56 deletions
|
@ -1,4 +1,4 @@
|
||||||
{deps, [
|
{deps, [
|
||||||
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.6.0"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.6.2"}}
|
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","master"}},{ranch,".*",{git,"https://github.com/ninenines/ranch","1.6.2"}}
|
||||||
]}.
|
]}.
|
||||||
{erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard,warn_missing_spec,warn_untyped_record]}.
|
{erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard,warn_missing_spec,warn_untyped_record]}.
|
||||||
|
|
|
@ -63,11 +63,8 @@
|
||||||
%% Client certificate (TLS only).
|
%% Client certificate (TLS only).
|
||||||
cert :: undefined | binary(),
|
cert :: undefined | binary(),
|
||||||
|
|
||||||
%% HTTP/2 timers.
|
|
||||||
preface = undefined :: {sequence | settings, undefined | reference()} | upgrade | complete,
|
|
||||||
next_settings_timer = undefined :: undefined | reference(),
|
|
||||||
|
|
||||||
%% HTTP/2 state machine.
|
%% HTTP/2 state machine.
|
||||||
|
http2_init :: sequence | upgrade | complete,
|
||||||
http2_machine :: cow_http2_machine:http2_machine(),
|
http2_machine :: cow_http2_machine:http2_machine(),
|
||||||
|
|
||||||
%% Currently active HTTP/2 streams. Streams may be initiated either
|
%% Currently active HTTP/2 streams. Streams may be initiated either
|
||||||
|
@ -115,9 +112,7 @@ init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) ->
|
||||||
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
|
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
|
||||||
State = #state{parent=Parent, ref=Ref, socket=Socket,
|
State = #state{parent=Parent, ref=Ref, socket=Socket,
|
||||||
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
||||||
preface={sequence, start_timer(preface_timeout, Opts)},
|
http2_init=sequence, http2_machine=HTTP2Machine},
|
||||||
http2_machine=HTTP2Machine,
|
|
||||||
next_settings_timer=start_timer(settings_timeout, Opts)},
|
|
||||||
Transport:send(Socket, Preface),
|
Transport:send(Socket, Preface),
|
||||||
case Buffer of
|
case Buffer of
|
||||||
<<>> -> before_loop(State, Buffer);
|
<<>> -> before_loop(State, Buffer);
|
||||||
|
@ -135,35 +130,28 @@ init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer,
|
||||||
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
|
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
|
||||||
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
||||||
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
||||||
preface=upgrade, next_settings_timer=start_timer(settings_timeout, Opts),
|
http2_init=upgrade, http2_machine=HTTP2Machine},
|
||||||
http2_machine=HTTP2Machine},
|
|
||||||
State1 = headers_frame(State0#state{
|
State1 = headers_frame(State0#state{
|
||||||
http2_machine=HTTP2Machine}, StreamID, Req),
|
http2_machine=HTTP2Machine}, StreamID, Req),
|
||||||
%% We assume that the upgrade will be applied. A stream handler
|
%% We assume that the upgrade will be applied. A stream handler
|
||||||
%% must not prevent the normal operations of the server.
|
%% must not prevent the normal operations of the server.
|
||||||
State = info(State1, 1, {switch_protocol, #{
|
State2 = info(State1, 1, {switch_protocol, #{
|
||||||
<<"connection">> => <<"Upgrade">>,
|
<<"connection">> => <<"Upgrade">>,
|
||||||
<<"upgrade">> => <<"h2c">>
|
<<"upgrade">> => <<"h2c">>
|
||||||
}, ?MODULE, undefined}), %% @todo undefined or #{}?
|
}, ?MODULE, undefined}), %% @todo undefined or #{}?
|
||||||
|
State = State2#state{http2_init=sequence},
|
||||||
Transport:send(Socket, Preface),
|
Transport:send(Socket, Preface),
|
||||||
case Buffer of
|
case Buffer of
|
||||||
<<>> -> before_loop(State, Buffer);
|
<<>> -> before_loop(State, Buffer);
|
||||||
_ -> parse(State, Buffer)
|
_ -> parse(State, Buffer)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_timer(Name, Opts) ->
|
|
||||||
case maps:get(Name, Opts, 5000) of
|
|
||||||
infinity -> undefined;
|
|
||||||
Timeout -> erlang:start_timer(Timeout, self(), Name)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @todo Add the timeout for last time since we heard of connection.
|
%% @todo Add the timeout for last time since we heard of connection.
|
||||||
before_loop(State, Buffer) ->
|
before_loop(State, Buffer) ->
|
||||||
loop(State, Buffer).
|
loop(State, Buffer).
|
||||||
|
|
||||||
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
||||||
opts=Opts, children=Children, next_settings_timer=SettingsTRef,
|
opts=Opts, children=Children}, Buffer) ->
|
||||||
preface=PrefaceTimer}, Buffer) ->
|
|
||||||
Transport:setopts(Socket, [{active, once}]),
|
Transport:setopts(Socket, [{active, once}]),
|
||||||
{OK, Closed, Error} = Transport:messages(),
|
{OK, Closed, Error} = Transport:messages(),
|
||||||
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
|
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
|
||||||
|
@ -185,17 +173,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
||||||
{timeout, Ref, {shutdown, Pid}} ->
|
{timeout, Ref, {shutdown, Pid}} ->
|
||||||
cowboy_children:shutdown_timeout(Children, Ref, Pid),
|
cowboy_children:shutdown_timeout(Children, Ref, Pid),
|
||||||
loop(State, Buffer);
|
loop(State, Buffer);
|
||||||
{timeout, TRef, preface_timeout} ->
|
{timeout, TRef, {cow_http2_machine, Name}} ->
|
||||||
case PrefaceTimer of
|
loop(timeout(State, Name, TRef), Buffer);
|
||||||
{_, TRef} ->
|
|
||||||
terminate(State, {connection_error, protocol_error,
|
|
||||||
'The preface was not received in a reasonable amount of time.'});
|
|
||||||
_ ->
|
|
||||||
loop(State, Buffer)
|
|
||||||
end;
|
|
||||||
{timeout, SettingsTRef, settings_timeout} ->
|
|
||||||
terminate(State, {connection_error, settings_timeout,
|
|
||||||
'The SETTINGS ack was not received within the configured time. (RFC7540 6.5.3)'});
|
|
||||||
%% Messages pertaining to a stream.
|
%% Messages pertaining to a stream.
|
||||||
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
{{Pid, StreamID}, Msg} when Pid =:= self() ->
|
||||||
loop(info(State, StreamID, Msg), Buffer);
|
loop(info(State, StreamID, Msg), Buffer);
|
||||||
|
@ -215,10 +194,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
||||||
|
|
||||||
%% HTTP/2 protocol parsing.
|
%% HTTP/2 protocol parsing.
|
||||||
|
|
||||||
parse(State=#state{preface={sequence, TRef}}, Data) ->
|
parse(State=#state{http2_init=sequence}, Data) ->
|
||||||
case cow_http2:parse_sequence(Data) of
|
case cow_http2:parse_sequence(Data) of
|
||||||
{ok, Rest} ->
|
{ok, Rest} ->
|
||||||
parse(State#state{preface={settings, TRef}}, Rest);
|
parse(State#state{http2_init=complete}, Rest);
|
||||||
more ->
|
more ->
|
||||||
before_loop(State, Data);
|
before_loop(State, Data);
|
||||||
Error = {connection_error, _, _} ->
|
Error = {connection_error, _, _} ->
|
||||||
|
@ -244,7 +223,7 @@ parse(State=#state{http2_machine=HTTP2Machine}, Data) ->
|
||||||
frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
|
frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
|
||||||
case cow_http2_machine:frame(Frame, HTTP2Machine0) of
|
case cow_http2_machine:frame(Frame, HTTP2Machine0) of
|
||||||
{ok, HTTP2Machine} ->
|
{ok, HTTP2Machine} ->
|
||||||
after_frame(State#state{http2_machine=HTTP2Machine}, Frame);
|
maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame);
|
||||||
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
|
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
|
||||||
data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
|
data_frame(State#state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
|
||||||
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
|
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
|
||||||
|
@ -259,7 +238,7 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
|
||||||
terminate(State#state{http2_machine=HTTP2Machine},
|
terminate(State#state{http2_machine=HTTP2Machine},
|
||||||
{stop, Frame, 'Client is going away.'});
|
{stop, Frame, 'Client is going away.'});
|
||||||
{send, SendData, HTTP2Machine} ->
|
{send, SendData, HTTP2Machine} ->
|
||||||
send_data(after_frame(State#state{http2_machine=HTTP2Machine}, Frame), SendData);
|
send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData);
|
||||||
%% @todo Yeah StreamID would be better out of this, {error, StreamID, Tuple, State}
|
%% @todo Yeah StreamID would be better out of this, {error, StreamID, Tuple, State}
|
||||||
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
|
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
|
||||||
stream_reset(State#state{http2_machine=HTTP2Machine},
|
stream_reset(State#state{http2_machine=HTTP2Machine},
|
||||||
|
@ -268,24 +247,6 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
|
||||||
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% If we are waiting for a a SETTINGS ack and receive one, stop the timer.
|
|
||||||
after_frame(State=#state{next_settings_timer=TRef}, settings_ack) ->
|
|
||||||
ok = case TRef of
|
|
||||||
undefined -> ok;
|
|
||||||
_ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
|
|
||||||
end,
|
|
||||||
State#state{next_settings_timer=undefined};
|
|
||||||
%% If we are still waiting for the preface and receive the SETTINGS
|
|
||||||
%% frame, we can mark the preface as complete and stop the timer.
|
|
||||||
after_frame(State=#state{preface={settings, TRef}}, Frame={settings, _}) ->
|
|
||||||
ok = case TRef of
|
|
||||||
undefined -> ok;
|
|
||||||
_ -> erlang:cancel_timer(TRef, [{async, true}, {info, false}])
|
|
||||||
end,
|
|
||||||
maybe_ack(State#state{preface=complete}, Frame);
|
|
||||||
after_frame(State, Frame) ->
|
|
||||||
maybe_ack(State, Frame).
|
|
||||||
|
|
||||||
maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
|
maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
|
||||||
case Frame of
|
case Frame of
|
||||||
{settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
|
{settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
|
||||||
|
@ -454,6 +415,16 @@ ignored_frame(State=#state{http2_machine=HTTP2Machine0}) ->
|
||||||
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% HTTP/2 timeouts.
|
||||||
|
|
||||||
|
timeout(State=#state{http2_machine=HTTP2Machine0}, TRef, Name) ->
|
||||||
|
case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of
|
||||||
|
{ok, HTTP2Machine} ->
|
||||||
|
State#state{http2_machine=HTTP2Machine};
|
||||||
|
{error, Error={connection_error, _, _}, HTTP2Machine} ->
|
||||||
|
terminate(State#state{http2_machine=HTTP2Machine}, Error)
|
||||||
|
end.
|
||||||
|
|
||||||
%% Erlang messages.
|
%% Erlang messages.
|
||||||
|
|
||||||
down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
|
||||||
|
@ -588,12 +559,11 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) ->
|
||||||
%% @todo Only reset when the stream still exists.
|
%% @todo Only reset when the stream still exists.
|
||||||
stream_reset(State, StreamID, Error);
|
stream_reset(State, StreamID, Error);
|
||||||
%% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
|
%% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
|
||||||
commands(State=#state{socket=Socket, transport=Transport, opts=Opts, preface=upgrade},
|
commands(State=#state{socket=Socket, transport=Transport, http2_init=upgrade},
|
||||||
StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
|
StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
|
||||||
%% @todo This 101 response needs to be passed through stream handlers.
|
%% @todo This 101 response needs to be passed through stream handlers.
|
||||||
Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
|
Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
|
||||||
commands(State#state{preface={sequence, start_timer(preface_timeout, Opts)}},
|
commands(State, StreamID, Tail);
|
||||||
StreamID, Tail);
|
|
||||||
%% Use a different protocol within the stream (CONNECT :protocol).
|
%% Use a different protocol within the stream (CONNECT :protocol).
|
||||||
%% @todo Make sure we error out when the feature is disabled.
|
%% @todo Make sure we error out when the feature is disabled.
|
||||||
commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
|
commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
|
||||||
|
@ -689,7 +659,7 @@ send_data_frame(State=#state{socket=Socket, transport=Transport,
|
||||||
-spec terminate(#state{}, _) -> no_return().
|
-spec terminate(#state{}, _) -> no_return().
|
||||||
terminate(undefined, Reason) ->
|
terminate(undefined, Reason) ->
|
||||||
exit({shutdown, Reason});
|
exit({shutdown, Reason});
|
||||||
terminate(State=#state{socket=Socket, transport=Transport, preface=complete,
|
terminate(State=#state{socket=Socket, transport=Transport, http2_init=complete,
|
||||||
http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason) ->
|
http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason) ->
|
||||||
%% @todo We might want to optionally send the Reason value
|
%% @todo We might want to optionally send the Reason value
|
||||||
%% as debug data in the GOAWAY frame here. Perhaps more.
|
%% as debug data in the GOAWAY frame here. Perhaps more.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue