mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 04:10:24 +00:00
Add experimental metrics stream handler
It collects metrics and passes them to a configurable callback once the stream terminates. It will be documented in a future release. More tests incoming.
This commit is contained in:
parent
c602871f86
commit
4211ea41bd
3 changed files with 412 additions and 1 deletions
|
@ -1,7 +1,7 @@
|
|||
{application, 'cowboy', [
|
||||
{description, "Small, fast, modern HTTP server."},
|
||||
{vsn, "2.0.0"},
|
||||
{modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']},
|
||||
{modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']},
|
||||
{registered, [cowboy_sup,cowboy_clock]},
|
||||
{applications, [kernel,stdlib,crypto,cowlib,ranch]},
|
||||
{mod, {cowboy_app, []}},
|
||||
|
|
277
src/cowboy_metrics_h.erl
Normal file
277
src/cowboy_metrics_h.erl
Normal file
|
@ -0,0 +1,277 @@
|
|||
%% Copyright (c) 2017, Loïc Hoguin <essen@ninenines.eu>
|
||||
%%
|
||||
%% Permission to use, copy, modify, and/or distribute this software for any
|
||||
%% purpose with or without fee is hereby granted, provided that the above
|
||||
%% copyright notice and this permission notice appear in all copies.
|
||||
%%
|
||||
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
-module(cowboy_metrics_h).
|
||||
-behavior(cowboy_stream).
|
||||
|
||||
-export([init/3]).
|
||||
-export([data/4]).
|
||||
-export([info/3]).
|
||||
-export([terminate/3]).
|
||||
-export([early_error/5]).
|
||||
|
||||
-type proc_metrics() :: #{pid() => #{
|
||||
%% Time at which the process spawned.
|
||||
spawn := integer(),
|
||||
|
||||
%% Time at which the process exited.
|
||||
exit => integer(),
|
||||
|
||||
%% Reason for the process exit.
|
||||
reason => any()
|
||||
}}.
|
||||
|
||||
-type metrics() :: #{
|
||||
%% The identifier for this listener.
|
||||
ref := ranch:ref(),
|
||||
|
||||
%% The pid for this connection.
|
||||
pid := pid(),
|
||||
|
||||
%% The streamid also indicates the total number of requests on
|
||||
%% this connection (StreamID div 2 + 1).
|
||||
streamid := cowboy_stream:streamid(),
|
||||
|
||||
%% The terminate reason is always useful.
|
||||
reason := cowboy_stream:reason(),
|
||||
|
||||
%% A filtered Req object or a partial Req object
|
||||
%% depending on how far the request got to.
|
||||
req => cowboy_req:req(),
|
||||
partial_req => cowboy_stream:partial_req(),
|
||||
|
||||
%% Response status.
|
||||
resp_status := cowboy:http_status(),
|
||||
|
||||
%% Filtered response headers.
|
||||
resp_headers := cowboy:http_headers(),
|
||||
|
||||
%% Start/end of the processing of the request.
|
||||
%%
|
||||
%% This represents the time from this stream handler's init
|
||||
%% to terminate. Note that this doesn't indicate the response
|
||||
%% has been sent fully, it still may be queued up in a buffer.
|
||||
req_start => integer(),
|
||||
req_end => integer(),
|
||||
|
||||
%% Start/end of the receiving of the request body.
|
||||
%% Begins when the first packet has been received.
|
||||
req_body_start => integer(),
|
||||
req_body_end => integer(),
|
||||
|
||||
%% Start/end of the sending of the response.
|
||||
%% Begins when we send the headers and ends on the final
|
||||
%% packet of the response body. If everything is sent at
|
||||
%% once these values are identical.
|
||||
resp_start => integer(),
|
||||
resp_end => integer(),
|
||||
|
||||
%% For early errors all we get is the time we received it.
|
||||
early_error_time => integer(),
|
||||
|
||||
%% Start/end of spawned processes. This is where most of
|
||||
%% the user code lies, excluding stream handlers. On a
|
||||
%% default Cowboy configuration there should be only one
|
||||
%% process: the request process.
|
||||
procs => proc_metrics(),
|
||||
|
||||
%% Length of the request and response bodies. This does
|
||||
%% not include the framing.
|
||||
req_body_length => non_neg_integer(),
|
||||
resp_body_length => non_neg_integer()
|
||||
}.
|
||||
-export_type([metrics/0]).
|
||||
|
||||
-record(state, {
|
||||
next :: any(),
|
||||
callback :: fun((metrics()) -> any()),
|
||||
resp_headers_filter :: undefined | fun((cowboy:http_headers()) -> cowboy:http_headers()),
|
||||
req :: map(),
|
||||
resp_status :: undefined | cowboy:http_status(),
|
||||
resp_headers :: undefined | cowboy:http_headers(),
|
||||
ref :: ranch:ref(),
|
||||
req_start :: integer(),
|
||||
req_end :: undefined | integer(),
|
||||
req_body_start :: undefined | integer(),
|
||||
req_body_end :: undefined | integer(),
|
||||
resp_start :: undefined | integer(),
|
||||
resp_end :: undefined | integer(),
|
||||
procs = #{} :: proc_metrics(),
|
||||
req_body_length = 0 :: non_neg_integer(),
|
||||
resp_body_length = 0 :: non_neg_integer()
|
||||
}).
|
||||
|
||||
-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
|
||||
-> {[{spawn, pid(), timeout()}], #state{}}.
|
||||
init(StreamID, Req=#{ref := Ref}, Opts=#{metrics_callback := Fun}) ->
|
||||
ReqStart = erlang:monotonic_time(),
|
||||
{Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
|
||||
FilteredReq = case maps:get(metrics_req_filter, Opts, undefined) of
|
||||
undefined -> Req;
|
||||
ReqFilter -> ReqFilter(Req)
|
||||
end,
|
||||
RespHeadersFilter = maps:get(metrics_resp_headers_filter, Opts, undefined),
|
||||
{Commands, fold(Commands, #state{
|
||||
next=Next,
|
||||
callback=Fun,
|
||||
resp_headers_filter=RespHeadersFilter,
|
||||
req=FilteredReq,
|
||||
ref=Ref,
|
||||
req_start=ReqStart
|
||||
})}.
|
||||
|
||||
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
|
||||
-> {cowboy_stream:commands(), State} when State::#state{}.
|
||||
data(StreamID, IsFin=fin, Data, State=#state{req_body_start=undefined}) ->
|
||||
ReqBody = erlang:monotonic_time(),
|
||||
do_data(StreamID, IsFin, Data, State#state{
|
||||
req_body_start=ReqBody,
|
||||
req_body_end=ReqBody,
|
||||
req_body_length=byte_size(Data)
|
||||
});
|
||||
data(StreamID, IsFin=fin, Data, State=#state{req_body_length=ReqBodyLen}) ->
|
||||
ReqBodyEnd = erlang:monotonic_time(),
|
||||
do_data(StreamID, IsFin, Data, State#state{
|
||||
req_body_end=ReqBodyEnd,
|
||||
req_body_length=ReqBodyLen + byte_size(Data)
|
||||
});
|
||||
data(StreamID, IsFin, Data, State=#state{req_body_start=undefined}) ->
|
||||
ReqBodyStart = erlang:monotonic_time(),
|
||||
do_data(StreamID, IsFin, Data, State#state{
|
||||
req_body_start=ReqBodyStart,
|
||||
req_body_length=byte_size(Data)
|
||||
});
|
||||
data(StreamID, IsFin, Data, State) ->
|
||||
do_data(StreamID, IsFin, Data, State).
|
||||
|
||||
do_data(StreamID, IsFin, Data, State0=#state{next=Next0}) ->
|
||||
{Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
|
||||
{Commands, fold(Commands, State0#state{next=Next})}.
|
||||
|
||||
-spec info(cowboy_stream:streamid(), any(), State)
|
||||
-> {cowboy_stream:commands(), State} when State::#state{}.
|
||||
info(StreamID, Info={'EXIT', Pid, Reason}, State0=#state{procs=Procs}) ->
|
||||
ProcEnd = erlang:monotonic_time(),
|
||||
P = maps:get(Pid, Procs),
|
||||
State = State0#state{procs=Procs#{Pid => P#{
|
||||
exit => ProcEnd,
|
||||
reason => Reason
|
||||
}}},
|
||||
do_info(StreamID, Info, State);
|
||||
info(StreamID, Info, State) ->
|
||||
do_info(StreamID, Info, State).
|
||||
|
||||
do_info(StreamID, Info, State0=#state{next=Next0}) ->
|
||||
{Commands, Next} = cowboy_stream:info(StreamID, Info, Next0),
|
||||
{Commands, fold(Commands, State0#state{next=Next})}.
|
||||
|
||||
fold([], State) ->
|
||||
State;
|
||||
fold([{spawn, Pid, _}|Tail], State0=#state{procs=Procs}) ->
|
||||
ProcStart = erlang:monotonic_time(),
|
||||
State = State0#state{procs=Procs#{Pid => #{spawn => ProcStart}}},
|
||||
fold(Tail, State);
|
||||
fold([{response, Status, Headers, Body}|Tail],
|
||||
State=#state{resp_headers_filter=RespHeadersFilter}) ->
|
||||
Resp = erlang:monotonic_time(),
|
||||
fold(Tail, State#state{
|
||||
resp_status=Status,
|
||||
resp_headers=case RespHeadersFilter of
|
||||
undefined -> Headers;
|
||||
_ -> RespHeadersFilter(Headers)
|
||||
end,
|
||||
resp_start=Resp,
|
||||
resp_end=Resp,
|
||||
resp_body_length=resp_body_length(Body)
|
||||
});
|
||||
fold([{headers, Status, Headers}|Tail],
|
||||
State=#state{resp_headers_filter=RespHeadersFilter}) ->
|
||||
RespStart = erlang:monotonic_time(),
|
||||
fold(Tail, State#state{
|
||||
resp_status=Status,
|
||||
resp_headers=case RespHeadersFilter of
|
||||
undefined -> Headers;
|
||||
_ -> RespHeadersFilter(Headers)
|
||||
end,
|
||||
resp_start=RespStart
|
||||
});
|
||||
fold([{data, nofin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) ->
|
||||
fold(Tail, State#state{
|
||||
resp_body_length=RespBodyLen + resp_body_length(Data)
|
||||
});
|
||||
fold([{data, fin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) ->
|
||||
RespEnd = erlang:monotonic_time(),
|
||||
fold(Tail, State#state{
|
||||
resp_end=RespEnd,
|
||||
resp_body_length=RespBodyLen + resp_body_length(Data)
|
||||
});
|
||||
fold([_|Tail], State) ->
|
||||
fold(Tail, State).
|
||||
|
||||
-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> any().
|
||||
terminate(StreamID, Reason, #state{next=Next, callback=Fun,
|
||||
req=Req, resp_status=RespStatus, resp_headers=RespHeaders, ref=Ref,
|
||||
req_start=ReqStart, req_body_start=ReqBodyStart,
|
||||
req_body_end=ReqBodyEnd, resp_start=RespStart, resp_end=RespEnd,
|
||||
procs=Procs, req_body_length=ReqBodyLen, resp_body_length=RespBodyLen}) ->
|
||||
Res = cowboy_stream:terminate(StreamID, Reason, Next),
|
||||
ReqEnd = erlang:monotonic_time(),
|
||||
Metrics = #{
|
||||
ref => Ref,
|
||||
pid => self(),
|
||||
streamid => StreamID,
|
||||
reason => Reason,
|
||||
req => Req,
|
||||
resp_status => RespStatus,
|
||||
resp_headers => RespHeaders,
|
||||
req_start => ReqStart,
|
||||
req_end => ReqEnd,
|
||||
req_body_start => ReqBodyStart,
|
||||
req_body_end => ReqBodyEnd,
|
||||
resp_start => RespStart,
|
||||
resp_end => RespEnd,
|
||||
procs => Procs,
|
||||
req_body_length => ReqBodyLen,
|
||||
resp_body_length => RespBodyLen
|
||||
},
|
||||
Fun(Metrics),
|
||||
Res.
|
||||
|
||||
-spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
|
||||
cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
|
||||
when Resp::cowboy_stream:resp_command().
|
||||
early_error(StreamID, Reason, PartialReq=#{ref := Ref}, Resp0, Opts=#{metrics_callback := Fun}) ->
|
||||
Time = erlang:monotonic_time(),
|
||||
Resp = {response, RespStatus, RespHeaders, RespBody}
|
||||
= cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp0, Opts),
|
||||
%% As far as metrics go we are limited in what we can provide
|
||||
%% in this case.
|
||||
Metrics = #{
|
||||
ref => Ref,
|
||||
pid => self(),
|
||||
streamid => StreamID,
|
||||
reason => Reason,
|
||||
partial_req => PartialReq,
|
||||
resp_status => RespStatus,
|
||||
resp_headers => RespHeaders,
|
||||
early_error_time => Time,
|
||||
resp_body_length => resp_body_length(RespBody)
|
||||
},
|
||||
Fun(Metrics),
|
||||
Resp.
|
||||
|
||||
resp_body_length({sendfile, _, Len, _}) ->
|
||||
Len;
|
||||
resp_body_length(Data) ->
|
||||
iolist_size(Data).
|
134
test/metrics_SUITE.erl
Normal file
134
test/metrics_SUITE.erl
Normal file
|
@ -0,0 +1,134 @@
|
|||
%% Copyright (c) 2017, Loïc Hoguin <essen@ninenines.eu>
|
||||
%%
|
||||
%% Permission to use, copy, modify, and/or distribute this software for any
|
||||
%% purpose with or without fee is hereby granted, provided that the above
|
||||
%% copyright notice and this permission notice appear in all copies.
|
||||
%%
|
||||
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
-module(metrics_SUITE).
|
||||
-compile(export_all).
|
||||
|
||||
-import(ct_helper, [config/2]).
|
||||
-import(ct_helper, [doc/1]).
|
||||
-import(cowboy_test, [gun_open/1]).
|
||||
-import(cowboy_test, [gun_down/1]).
|
||||
|
||||
%% ct.
|
||||
|
||||
all() ->
|
||||
cowboy_test:common_all().
|
||||
|
||||
groups() ->
|
||||
cowboy_test:common_groups(ct_helper:all(?MODULE)).
|
||||
|
||||
init_per_group(Name = http, Config) ->
|
||||
cowboy_test:init_http(Name, init_plain_opts(Config), Config);
|
||||
init_per_group(Name = https, Config) ->
|
||||
cowboy_test:init_http(Name, init_plain_opts(Config), Config);
|
||||
init_per_group(Name = h2, Config) ->
|
||||
cowboy_test:init_http(Name, init_plain_opts(Config), Config);
|
||||
init_per_group(Name = h2c, Config) ->
|
||||
Config1 = cowboy_test:init_http(Name, init_plain_opts(Config), Config),
|
||||
lists:keyreplace(protocol, 1, Config1, {protocol, http2});
|
||||
init_per_group(Name = http_compress, Config) ->
|
||||
cowboy_test:init_http(Name, init_compress_opts(Config), Config);
|
||||
init_per_group(Name = https_compress, Config) ->
|
||||
cowboy_test:init_http(Name, init_compress_opts(Config), Config);
|
||||
init_per_group(Name = h2_compress, Config) ->
|
||||
cowboy_test:init_http(Name, init_compress_opts(Config), Config);
|
||||
init_per_group(Name = h2c_compress, Config) ->
|
||||
Config1 = cowboy_test:init_http(Name, init_compress_opts(Config), Config),
|
||||
lists:keyreplace(protocol, 1, Config1, {protocol, http2}).
|
||||
|
||||
end_per_group(Name, _) ->
|
||||
cowboy:stop_listener(Name).
|
||||
|
||||
init_plain_opts(Config) ->
|
||||
#{
|
||||
env => #{dispatch => cowboy_router:compile(init_routes(Config))},
|
||||
metrics_callback => do_metrics_callback(),
|
||||
stream_handlers => [cowboy_metrics_h, cowboy_stream_h]
|
||||
}.
|
||||
|
||||
init_compress_opts(Config) ->
|
||||
#{
|
||||
env => #{dispatch => cowboy_router:compile(init_routes(Config))},
|
||||
metrics_callback => do_metrics_callback(),
|
||||
stream_handlers => [cowboy_metrics_h, cowboy_compress_h, cowboy_stream_h]
|
||||
}.
|
||||
|
||||
init_routes(_) -> [
|
||||
{"localhost", [
|
||||
{"/", hello_h, []}
|
||||
]}
|
||||
].
|
||||
|
||||
do_metrics_callback() ->
|
||||
fun(Metrics=#{req := #{headers := #{<<"x-test-pid">> := PidBin}}}) ->
|
||||
Pid = list_to_pid(binary_to_list(PidBin)),
|
||||
Pid ! {metrics, self(), Metrics},
|
||||
ok
|
||||
end.
|
||||
|
||||
%% Tests.
|
||||
|
||||
hello_world(Config) ->
|
||||
%% Perform a request.
|
||||
ConnPid = gun_open(Config),
|
||||
Ref = gun:get(ConnPid, "/", [{<<"x-test-pid">>, pid_to_list(self())}]),
|
||||
{response, nofin, 200, RespHeaders} = gun:await(ConnPid, Ref),
|
||||
{ok, RespBody} = gun:await_body(ConnPid, Ref),
|
||||
gun:close(ConnPid),
|
||||
%% Receive the metrics and print them.
|
||||
receive
|
||||
{metrics, From, Metrics} ->
|
||||
%% Ensure the timestamps are in the expected order.
|
||||
#{
|
||||
req_start := ReqStart, req_end := ReqEnd,
|
||||
resp_start := RespStart, resp_end := RespEnd
|
||||
} = Metrics,
|
||||
true = (ReqStart =< RespStart)
|
||||
and (RespStart =< RespEnd)
|
||||
and (RespEnd =< ReqEnd),
|
||||
%% We didn't send a body.
|
||||
#{
|
||||
req_body_start := undefined,
|
||||
req_body_end := undefined,
|
||||
req_body_length := 0
|
||||
} = Metrics,
|
||||
%% We got a 200 response with a body.
|
||||
#{
|
||||
resp_status := 200,
|
||||
resp_headers := ExpectedRespHeaders,
|
||||
resp_body_length := RespBodyLen
|
||||
} = Metrics,
|
||||
ExpectedRespHeaders = maps:from_list(RespHeaders),
|
||||
true = RespBodyLen > 0,
|
||||
%% The request process executed normally.
|
||||
#{procs := Procs} = Metrics,
|
||||
[{_, #{
|
||||
spawn := ProcSpawn,
|
||||
exit := ProcExit,
|
||||
reason := normal
|
||||
}}] = maps:to_list(Procs),
|
||||
true = ProcSpawn =< ProcExit,
|
||||
%% Confirm other metadata are as expected.
|
||||
#{
|
||||
ref := _,
|
||||
pid := From,
|
||||
streamid := 1,
|
||||
reason := normal,
|
||||
req := #{}
|
||||
} = Metrics,
|
||||
%% All good!
|
||||
ok
|
||||
after 1000 ->
|
||||
error(timeout)
|
||||
end.
|
Loading…
Add table
Add a link
Reference in a new issue