mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Add the early_error cowboy_stream callback
This callback is called when an error occurs before the request (including headers, excluding body) was fully received. The init/3 callback will not be called. The callback receives the partial Req object (possibly empty), the reason for the error and the response command that the server will send. It allows you to be aware of the error and possibly modify the response before it is sent.
This commit is contained in:
parent
14a01f71cb
commit
10dfd8c910
4 changed files with 100 additions and 28 deletions
|
@ -19,6 +19,7 @@
|
|||
-export([data/4]).
|
||||
-export([info/3]).
|
||||
-export([terminate/3]).
|
||||
-export([early_error/5]).
|
||||
|
||||
-record(state, {
|
||||
next :: any(),
|
||||
|
@ -55,6 +56,12 @@ terminate(StreamID, Reason, #state{next=Next, deflate=Z}) ->
|
|||
end,
|
||||
cowboy_stream:terminate(StreamID, Reason, Next).
|
||||
|
||||
-spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
|
||||
cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
|
||||
when Resp::cowboy_stream:resp_command().
|
||||
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
|
||||
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
|
||||
|
||||
%% Internal.
|
||||
|
||||
%% Check if the client supports decoding of gzip responses.
|
||||
|
|
|
@ -219,11 +219,10 @@ timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
|
|||
%% @todo If other streams are running, just set the connection to be closed
|
||||
%% and stop trying to read from the socket?
|
||||
terminate(State, {connection_error, timeout, 'No request-line received before timeout.'});
|
||||
timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) ->
|
||||
timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
|
||||
%% @todo If other streams are running, maybe wait for their reply before sending 408?
|
||||
%% -> Definitely. Either way, stop reading from the socket and make that stream the last.
|
||||
Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])),
|
||||
terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}).
|
||||
error_terminate(408, State, {connection_error, timeout, 'Request headers not received before timeout.'}).
|
||||
|
||||
%% Request-line.
|
||||
parse(<<>>, State) ->
|
||||
|
@ -440,10 +439,12 @@ parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
|
|||
NumHeaders = maps:size(Headers),
|
||||
case match_colon(Buffer, 0) of
|
||||
nomatch when byte_size(Buffer) > MaxLength ->
|
||||
error_terminate(431, State, {connection_error, limit_reached,
|
||||
error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
|
||||
{connection_error, limit_reached,
|
||||
'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
|
||||
nomatch when NumHeaders >= MaxHeaders ->
|
||||
error_terminate(431, State, {connection_error, limit_reached,
|
||||
error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
|
||||
{connection_error, limit_reached,
|
||||
'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
|
||||
nomatch ->
|
||||
{more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
|
||||
|
@ -460,8 +461,9 @@ match_colon(_, _) ->
|
|||
|
||||
parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
|
||||
parse_hd_before_value(Rest, State, H, SoFar);
|
||||
parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) ->
|
||||
error_terminate(400, State, {connection_error, protocol_error,
|
||||
parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) ->
|
||||
error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
|
||||
{connection_error, protocol_error,
|
||||
'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2)'});
|
||||
parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) ->
|
||||
parse_hd_name_ws(Rest, State, H, SoFar);
|
||||
|
@ -483,7 +485,8 @@ parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
|
|||
MaxLength = maps:get(max_header_value_length, Opts, 4096),
|
||||
case match_eol(Buffer, 0) of
|
||||
nomatch when byte_size(Buffer) > MaxLength ->
|
||||
error_terminate(431, State, {connection_error, limit_reached,
|
||||
error_terminate(431, State#state{in_state=PS#ps_header{headers=H}},
|
||||
{connection_error, limit_reached,
|
||||
'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
|
||||
nomatch ->
|
||||
{more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
|
||||
|
@ -538,11 +541,12 @@ horse_clean_value_ws_end() ->
|
|||
-endif.
|
||||
|
||||
request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
|
||||
in_state=#ps_header{version=Version}}, Headers) ->
|
||||
in_state=PS=#ps_header{version=Version}}, Headers) ->
|
||||
case maps:get(<<"host">>, Headers, undefined) of
|
||||
undefined when Version =:= 'HTTP/1.1' ->
|
||||
%% @todo Might want to not close the connection on this and next one.
|
||||
error_terminate(400, State, {stream_error, StreamID, protocol_error,
|
||||
error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
|
||||
{stream_error, StreamID, protocol_error,
|
||||
'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
|
||||
undefined ->
|
||||
request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
|
||||
|
@ -553,7 +557,8 @@ request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
|
|||
{Host, Port} ->
|
||||
request(Buffer, State, Headers, Host, Port)
|
||||
catch _:_ ->
|
||||
error_terminate(400, State, {stream_error, StreamID, protocol_error,
|
||||
error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
|
||||
{stream_error, StreamID, protocol_error,
|
||||
'The host header is invalid. (RFC7230 5.4)'})
|
||||
end
|
||||
end.
|
||||
|
@ -565,7 +570,7 @@ default_port(_) -> 80.
|
|||
%% End of request parsing.
|
||||
|
||||
request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, in_streamid=StreamID,
|
||||
in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
|
||||
in_state=PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
|
||||
Headers, Host, Port) ->
|
||||
Scheme = case Transport:secure() of
|
||||
true -> <<"https">>;
|
||||
|
@ -578,9 +583,9 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, in_stream
|
|||
Length = try
|
||||
cow_http_hd:parse_content_length(BinLength)
|
||||
catch _:_ ->
|
||||
error_terminate(400, State0, {stream_error, StreamID, protocol_error,
|
||||
error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}},
|
||||
{stream_error, StreamID, protocol_error,
|
||||
'The content-length header is invalid. (RFC7230 3.3.2)'})
|
||||
%% @todo Err should terminate here...
|
||||
end,
|
||||
{true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
|
||||
%% @todo Better handling of transfer decoding.
|
||||
|
@ -622,7 +627,10 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, in_stream
|
|||
end,
|
||||
{request, Req, State, Buffer};
|
||||
{true, HTTP2Settings} ->
|
||||
http2_upgrade(State0, Buffer, HTTP2Settings, Req)
|
||||
%% We save the headers in case the upgrade will fail
|
||||
%% and we need to pass them to cowboy_stream:early_error.
|
||||
http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}},
|
||||
Buffer, HTTP2Settings, Req)
|
||||
end.
|
||||
|
||||
%% HTTP/2 upgrade.
|
||||
|
@ -667,6 +675,8 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
|
|||
%% Always half-closed stream coming from this side.
|
||||
try cow_http_hd:parse_http2_settings(HTTP2Settings) of
|
||||
Settings ->
|
||||
%% @todo We should invoke cowboy_stream:info for this stream,
|
||||
%% with a switch_protocol tuple.
|
||||
Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(#{
|
||||
<<"connection">> => <<"Upgrade">>,
|
||||
<<"upgrade">> => <<"h2c">>
|
||||
|
@ -676,7 +686,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
|
|||
cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, Settings, Req)
|
||||
catch _:_ ->
|
||||
error_terminate(400, State, {connection_error, protocol_error,
|
||||
'The HTTP2-Settings header contains a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
|
||||
'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
|
||||
end.
|
||||
|
||||
%% Request body parsing.
|
||||
|
@ -916,6 +926,7 @@ stream_terminate(State=#state{socket=Socket, transport=Transport,
|
|||
= lists:keytake(StreamID, #stream.id, Streams0),
|
||||
_ = case OutState of
|
||||
wait ->
|
||||
%% @todo This should probably go through the stream handler info callback.
|
||||
Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', []));
|
||||
chunked when Version =:= 'HTTP/1.1' ->
|
||||
Transport:send(Socket, <<"0\r\n\r\n">>);
|
||||
|
@ -1005,11 +1016,36 @@ connection_hd_is_close(Conn) ->
|
|||
Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
|
||||
lists:member(<<"close">>, Conns).
|
||||
|
||||
%% This function is only called when an error occurs on a new stream.
|
||||
-spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
|
||||
error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) ->
|
||||
Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [
|
||||
{<<"content-length">>, <<"0">>}
|
||||
])),
|
||||
error_terminate(StatusCode0, State=#state{ref=Ref, socket=Socket, transport=Transport,
|
||||
opts=Opts, peer=Peer, in_streamid=StreamID, in_state=StreamState}, Reason) ->
|
||||
PartialReq = case StreamState of
|
||||
#ps_request_line{} ->
|
||||
#{};
|
||||
#ps_header{method=Method, path=Path, qs=Qs,
|
||||
version=Version, headers=ReqHeaders} -> #{
|
||||
ref => Ref,
|
||||
peer => Peer,
|
||||
method => Method,
|
||||
path => Path,
|
||||
qs => Qs,
|
||||
version => Version,
|
||||
headers => case ReqHeaders of
|
||||
undefined -> #{};
|
||||
_ -> ReqHeaders
|
||||
end
|
||||
}
|
||||
end,
|
||||
{response, StatusCode, RespHeaders, RespBody}
|
||||
= cowboy_stream:early_error(StreamID, Reason, PartialReq,
|
||||
{response, StatusCode0, #{
|
||||
<<"content-length">> => <<"0">>
|
||||
}, <<>>}, Opts),
|
||||
Transport:send(Socket, [
|
||||
cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
|
||||
RespBody
|
||||
]),
|
||||
terminate(State, Reason).
|
||||
|
||||
-spec terminate(_, _) -> no_return().
|
||||
|
|
|
@ -25,7 +25,11 @@
|
|||
|
||||
%% @todo Perhaps it makes more sense to have resp_body in this module?
|
||||
|
||||
-type commands() :: [{response, cowboy:http_status(), cowboy:http_headers(), cowboy_req:resp_body()}
|
||||
-type resp_command()
|
||||
:: {response, cowboy:http_status(), cowboy:http_headers(), cowboy_req:resp_body()}.
|
||||
-export_type([resp_command/0]).
|
||||
|
||||
-type commands() :: [resp_command()
|
||||
| {headers, cowboy:http_status(), cowboy:http_headers()}
|
||||
| {data, fin(), iodata()}
|
||||
| {push, binary(), binary(), binary(), inet:port_number(),
|
||||
|
@ -51,10 +55,15 @@
|
|||
| {stop, cow_http2:frame(), human_reason()}.
|
||||
-export_type([reason/0]).
|
||||
|
||||
-type partial_req() :: map(). %% @todo Take what's in cowboy_req with everything? optional.
|
||||
-export_type([partial_req/0]).
|
||||
|
||||
-callback init(streamid(), cowboy_req:req(), cowboy:opts()) -> {commands(), state()}.
|
||||
-callback data(streamid(), fin(), binary(), State) -> {commands(), State} when State::state().
|
||||
-callback info(streamid(), any(), State) -> {commands(), State} when State::state().
|
||||
-callback terminate(streamid(), reason(), state()) -> any().
|
||||
-callback early_error(streamid(), reason(), partial_req(), Resp, cowboy:opts())
|
||||
-> Resp when Resp::resp_command().
|
||||
|
||||
%% @todo To optimize the number of active timers we could have a command
|
||||
%% that enables a timeout that is called in the absence of any other call,
|
||||
|
@ -71,6 +80,7 @@
|
|||
-export([data/4]).
|
||||
-export([info/3]).
|
||||
-export([terminate/3]).
|
||||
-export([early_error/5]).
|
||||
|
||||
%% Note that this and other functions in this module do NOT catch
|
||||
%% exceptions. We want the exception to go all the way down to the
|
||||
|
@ -127,3 +137,15 @@ terminate(_, _, undefined) ->
|
|||
terminate(StreamID, Reason, {Handler, State}) ->
|
||||
_ = Handler:terminate(StreamID, Reason, State),
|
||||
ok.
|
||||
|
||||
-spec early_error(streamid(), reason(), partial_req(), Resp, cowboy:opts())
|
||||
-> Resp when Resp::resp_command().
|
||||
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
|
||||
case maps:get(stream_handlers, Opts, [cowboy_stream_h]) of
|
||||
[] ->
|
||||
Resp;
|
||||
[Handler|Tail] ->
|
||||
%% This is the same behavior as in init/3.
|
||||
Handler:early_error(StreamID, Reason,
|
||||
PartialReq, Resp, Opts#{stream_handlers => Tail})
|
||||
end.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
-export([data/4]).
|
||||
-export([info/3]).
|
||||
-export([terminate/3]).
|
||||
-export([early_error/5]).
|
||||
|
||||
-export([proc_lib_hack/3]).
|
||||
-export([execute/3]).
|
||||
|
@ -122,6 +123,12 @@ info(_StreamID, _Info, State) ->
|
|||
terminate(_StreamID, _Reason, _State) ->
|
||||
ok.
|
||||
|
||||
-spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
|
||||
cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
|
||||
when Resp::cowboy_stream:resp_command().
|
||||
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
|
||||
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
|
||||
|
||||
%% We use ~999999p here instead of ~w because the latter doesn't
|
||||
%% support printable strings.
|
||||
report_crash(_, _, _, normal, _) ->
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue