mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Implement dynamic socket buffer sizes
Cowboy will set the socket's buffer size dynamically to better fit the current workload. When the incoming data is small, a low buffer size reduces the memory footprint and improves responsiveness and therefore performance. When the incoming data is large, such as large HTTP request bodies, a larger buffer size helps us avoid doing too many binary appends and related allocations. Setting a large buffer size for all use cases is sub-optimal because allocating more than needed necessarily results in a performance hit (not just increased memory usage). By default Cowboy starts with a buffer size of 8192 bytes. It then doubles or halves the buffer size depending on the size of the data it receives from the socket. It stops decreasing at 8192 and increasing at 131072 by default. To keep track of the size of the incoming data Cowboy maintains a moving average. It allows Cowboy to avoid changing the buffer too often but still react quickly when necessary. Cowboy will increase the buffer size when the moving average is above 90% of the current buffer size, and decrease when the moving average is below 40% of the current buffer size. The current buffer size and moving average are propagated when switching protocols. The dynamic buffer is implemented in HTTP/1, HTTP/2 and HTTP/1 Websocket. HTTP/2 Websocket has it disabled because it doesn't interact directly with the socket; in that case it is HTTP/2 that has a dynamic buffer. The dynamic buffer provides a very large performance improvement in many scenarios, at minimal cost for others. Because it largely depend on the underlying protocol the improvements are no all equal. TLS and compression also impact the results. The improvement when reading a large request body, with the requests repeated in a fast loop are: * HTTP: 6x to 20x faster * HTTPS: 2x to 6x faster * H2: 4x to 5x faster * H2C: 20x to 40x faster I am not sure why H2C's performance was so bad, especially compared to H2, when using default buffer sizes. Dynamic buffers make H2C a lot more viable with default settings. The performance impact on "hello world" type requests is minimal, it goes from -5% to +5% roughly. Websocket improvements vary again depending on the protocol, but also depending on whether compression is enabled: * HTTP echo: roughly 2x faster * HTTP send: roughly 4x faster * H2C echo: roughly 2x faster * H2C send: 3x to 4x faster In the echo test we reply back, and Gun doesn't have the dynamic buffer optimisation, so that probably explains the x2 difference. With compression however there isn't much improvement. The results are roughly within -10% to +10% of each other. Zlib compression seems to be a bottleneck, or at least to modify the performance profile to such an extent that the size of the buffer does not matter. This happens to randomly generated binary data as well so it is probably not caused by the test data.
This commit is contained in:
parent
fcab905eca
commit
49be0f57cf
12 changed files with 451 additions and 85 deletions
|
@ -20,6 +20,7 @@ opts() :: #{
|
||||||
active_n => pos_integer(),
|
active_n => pos_integer(),
|
||||||
chunked => boolean(),
|
chunked => boolean(),
|
||||||
connection_type => worker | supervisor,
|
connection_type => worker | supervisor,
|
||||||
|
dynamic_buffer => false | {pos_integer(), pos_integer()},
|
||||||
http10_keepalive => boolean(),
|
http10_keepalive => boolean(),
|
||||||
idle_timeout => timeout(),
|
idle_timeout => timeout(),
|
||||||
inactivity_timeout => timeout(),
|
inactivity_timeout => timeout(),
|
||||||
|
@ -53,7 +54,7 @@ Ranch functions `ranch:get_protocol_options/1` and
|
||||||
|
|
||||||
The default value is given next to the option name:
|
The default value is given next to the option name:
|
||||||
|
|
||||||
active_n (100)::
|
active_n (1)::
|
||||||
|
|
||||||
The number of packets Cowboy will request from the socket at once.
|
The number of packets Cowboy will request from the socket at once.
|
||||||
This can be used to tweak the performance of the server. Higher
|
This can be used to tweak the performance of the server. Higher
|
||||||
|
@ -75,6 +76,17 @@ connection_type (supervisor)::
|
||||||
|
|
||||||
Whether the connection process also acts as a supervisor.
|
Whether the connection process also acts as a supervisor.
|
||||||
|
|
||||||
|
dynamic_buffer ({8192, 131072})::
|
||||||
|
|
||||||
|
Cowboy will dynamically change the socket's `buffer` size
|
||||||
|
depending on the size of the data it receives from the socket.
|
||||||
|
This lets Cowboy use the optimal buffer size for the current
|
||||||
|
workload.
|
||||||
|
+
|
||||||
|
The dynamic buffer size functionality can be disabled by
|
||||||
|
setting this option to `false`. Cowboy will also disable
|
||||||
|
it by default when the `buffer` transport option is configured.
|
||||||
|
|
||||||
http10_keepalive (true)::
|
http10_keepalive (true)::
|
||||||
|
|
||||||
Whether keep-alive is enabled for HTTP/1.0 connections.
|
Whether keep-alive is enabled for HTTP/1.0 connections.
|
||||||
|
@ -166,6 +178,8 @@ Ordered list of stream handlers that will handle all stream events.
|
||||||
|
|
||||||
== Changelog
|
== Changelog
|
||||||
|
|
||||||
|
* *2.13*: The `active_n` default value was changed to `1`.
|
||||||
|
* *2.13*: The `dynamic_buffer` option was added.
|
||||||
* *2.11*: The `reset_idle_timeout_on_send` option was added.
|
* *2.11*: The `reset_idle_timeout_on_send` option was added.
|
||||||
* *2.8*: The `active_n` option was added.
|
* *2.8*: The `active_n` option was added.
|
||||||
* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
|
* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
|
||||||
|
|
|
@ -21,6 +21,7 @@ opts() :: #{
|
||||||
connection_type => worker | supervisor,
|
connection_type => worker | supervisor,
|
||||||
connection_window_margin_size => 0..16#7fffffff,
|
connection_window_margin_size => 0..16#7fffffff,
|
||||||
connection_window_update_threshold => 0..16#7fffffff,
|
connection_window_update_threshold => 0..16#7fffffff,
|
||||||
|
dynamic_buffer => false | {pos_integer(), pos_integer()},
|
||||||
enable_connect_protocol => boolean(),
|
enable_connect_protocol => boolean(),
|
||||||
goaway_initial_timeout => timeout(),
|
goaway_initial_timeout => timeout(),
|
||||||
goaway_complete_timeout => timeout(),
|
goaway_complete_timeout => timeout(),
|
||||||
|
@ -66,7 +67,7 @@ Ranch functions `ranch:get_protocol_options/1` and
|
||||||
|
|
||||||
The default value is given next to the option name:
|
The default value is given next to the option name:
|
||||||
|
|
||||||
active_n (100)::
|
active_n (1)::
|
||||||
|
|
||||||
The number of packets Cowboy will request from the socket at once.
|
The number of packets Cowboy will request from the socket at once.
|
||||||
This can be used to tweak the performance of the server. Higher
|
This can be used to tweak the performance of the server. Higher
|
||||||
|
@ -91,6 +92,17 @@ The connection window will only get updated when its size
|
||||||
becomes lower than this threshold, in bytes. This is to
|
becomes lower than this threshold, in bytes. This is to
|
||||||
avoid sending too many `WINDOW_UPDATE` frames.
|
avoid sending too many `WINDOW_UPDATE` frames.
|
||||||
|
|
||||||
|
dynamic_buffer ({8192, 131072})::
|
||||||
|
|
||||||
|
Cowboy will dynamically change the socket's `buffer` size
|
||||||
|
depending on the size of the data it receives from the socket.
|
||||||
|
This lets Cowboy use the optimal buffer size for the current
|
||||||
|
workload.
|
||||||
|
+
|
||||||
|
The dynamic buffer size functionality can be disabled by
|
||||||
|
setting this option to `false`. Cowboy will also disable
|
||||||
|
it by default when the `buffer` transport option is configured.
|
||||||
|
|
||||||
enable_connect_protocol (false)::
|
enable_connect_protocol (false)::
|
||||||
|
|
||||||
Whether to enable the extended CONNECT method to allow
|
Whether to enable the extended CONNECT method to allow
|
||||||
|
@ -289,6 +301,8 @@ too many `WINDOW_UPDATE` frames.
|
||||||
|
|
||||||
== Changelog
|
== Changelog
|
||||||
|
|
||||||
|
* *2.13*: The `active_n` default value was changed to `1`.
|
||||||
|
* *2.13*: The `dynamic_buffer` option was added.
|
||||||
* *2.11*: Websocket over HTTP/2 is now considered stable.
|
* *2.11*: Websocket over HTTP/2 is now considered stable.
|
||||||
* *2.11*: The `reset_idle_timeout_on_send` option was added.
|
* *2.11*: The `reset_idle_timeout_on_send` option was added.
|
||||||
* *2.11*: Add the option `max_cancel_stream_rate` to protect
|
* *2.11*: Add the option `max_cancel_stream_rate` to protect
|
||||||
|
|
|
@ -203,6 +203,7 @@ opts() :: #{
|
||||||
active_n => pos_integer(),
|
active_n => pos_integer(),
|
||||||
compress => boolean(),
|
compress => boolean(),
|
||||||
deflate_opts => cow_ws:deflate_opts()
|
deflate_opts => cow_ws:deflate_opts()
|
||||||
|
dynamic_buffer => false | {pos_integer(), pos_integer()},
|
||||||
idle_timeout => timeout(),
|
idle_timeout => timeout(),
|
||||||
max_frame_size => non_neg_integer() | infinity,
|
max_frame_size => non_neg_integer() | infinity,
|
||||||
req_filter => fun((cowboy_req:req()) -> map()),
|
req_filter => fun((cowboy_req:req()) -> map()),
|
||||||
|
@ -224,7 +225,7 @@ init(Req, State) ->
|
||||||
|
|
||||||
The default value is given next to the option name:
|
The default value is given next to the option name:
|
||||||
|
|
||||||
active_n (100)::
|
active_n (1)::
|
||||||
|
|
||||||
The number of packets Cowboy will request from the socket at once.
|
The number of packets Cowboy will request from the socket at once.
|
||||||
This can be used to tweak the performance of the server. Higher
|
This can be used to tweak the performance of the server. Higher
|
||||||
|
@ -248,6 +249,17 @@ options and the zlib compression options. The
|
||||||
defaults optimize the compression at the expense
|
defaults optimize the compression at the expense
|
||||||
of some memory and CPU.
|
of some memory and CPU.
|
||||||
|
|
||||||
|
dynamic_buffer ({8192, 131072})::
|
||||||
|
|
||||||
|
Cowboy will dynamically change the socket's `buffer` size
|
||||||
|
depending on the size of the data it receives from the socket.
|
||||||
|
This lets Cowboy use the optimal buffer size for the current
|
||||||
|
workload.
|
||||||
|
+
|
||||||
|
The dynamic buffer size functionality can be disabled by
|
||||||
|
setting this option to `false`. Cowboy will also disable
|
||||||
|
it by default when the `buffer` transport option is configured.
|
||||||
|
|
||||||
idle_timeout (60000)::
|
idle_timeout (60000)::
|
||||||
|
|
||||||
Time in milliseconds that Cowboy will keep the
|
Time in milliseconds that Cowboy will keep the
|
||||||
|
@ -287,6 +299,8 @@ normal circumstances if necessary.
|
||||||
|
|
||||||
== Changelog
|
== Changelog
|
||||||
|
|
||||||
|
* *2.13*: The `active_n` default value was changed to `1`.
|
||||||
|
* *2.13*: The `dynamic_buffer` option was added.
|
||||||
* *2.13*: The `max_frame_size` option can now be set dynamically.
|
* *2.13*: The `max_frame_size` option can now be set dynamically.
|
||||||
* *2.11*: Websocket over HTTP/2 is now considered stable.
|
* *2.11*: Websocket over HTTP/2 is now considered stable.
|
||||||
* *2.11*: HTTP/1.1 Websocket no longer traps exits by default.
|
* *2.11*: HTTP/1.1 Websocket no longer traps exits by default.
|
||||||
|
|
|
@ -51,8 +51,12 @@
|
||||||
|
|
||||||
start_clear(Ref, TransOpts0, ProtoOpts0) ->
|
start_clear(Ref, TransOpts0, ProtoOpts0) ->
|
||||||
TransOpts1 = ranch:normalize_opts(TransOpts0),
|
TransOpts1 = ranch:normalize_opts(TransOpts0),
|
||||||
{TransOpts, ConnectionType} = ensure_connection_type(TransOpts1),
|
{TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
|
||||||
ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
|
{TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
|
||||||
|
ProtoOpts = ProtoOpts0#{
|
||||||
|
connection_type => ConnectionType,
|
||||||
|
dynamic_buffer => DynamicBuffer
|
||||||
|
},
|
||||||
ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
|
ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
|
||||||
|
|
||||||
-spec start_tls(ranch:ref(), ranch:opts(), opts())
|
-spec start_tls(ranch:ref(), ranch:opts(), opts())
|
||||||
|
@ -60,12 +64,13 @@ start_clear(Ref, TransOpts0, ProtoOpts0) ->
|
||||||
|
|
||||||
start_tls(Ref, TransOpts0, ProtoOpts0) ->
|
start_tls(Ref, TransOpts0, ProtoOpts0) ->
|
||||||
TransOpts1 = ranch:normalize_opts(TransOpts0),
|
TransOpts1 = ranch:normalize_opts(TransOpts0),
|
||||||
SocketOpts = maps:get(socket_opts, TransOpts1, []),
|
{TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
|
||||||
TransOpts2 = TransOpts1#{socket_opts => [
|
TransOpts3 = ensure_alpn(TransOpts2),
|
||||||
{alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
|
{TransOpts, ConnectionType} = ensure_connection_type(TransOpts3),
|
||||||
|SocketOpts]},
|
ProtoOpts = ProtoOpts0#{
|
||||||
{TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
|
connection_type => ConnectionType,
|
||||||
ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
|
dynamic_buffer => DynamicBuffer
|
||||||
|
},
|
||||||
ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
|
ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
|
||||||
|
|
||||||
%% @todo Experimental function to start a barebone QUIC listener.
|
%% @todo Experimental function to start a barebone QUIC listener.
|
||||||
|
@ -77,6 +82,7 @@ start_tls(Ref, TransOpts0, ProtoOpts0) ->
|
||||||
-spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts())
|
-spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts())
|
||||||
-> {ok, pid()}.
|
-> {ok, pid()}.
|
||||||
|
|
||||||
|
%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies.
|
||||||
start_quic(Ref, TransOpts, ProtoOpts) ->
|
start_quic(Ref, TransOpts, ProtoOpts) ->
|
||||||
{ok, _} = application:ensure_all_started(quicer),
|
{ok, _} = application:ensure_all_started(quicer),
|
||||||
Parent = self(),
|
Parent = self(),
|
||||||
|
@ -139,11 +145,32 @@ port_0() ->
|
||||||
end,
|
end,
|
||||||
Port.
|
Port.
|
||||||
|
|
||||||
|
ensure_alpn(TransOpts) ->
|
||||||
|
SocketOpts = maps:get(socket_opts, TransOpts, []),
|
||||||
|
TransOpts#{socket_opts => [
|
||||||
|
{alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
|
||||||
|
|SocketOpts]}.
|
||||||
|
|
||||||
ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) ->
|
ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) ->
|
||||||
{TransOpts, ConnectionType};
|
{TransOpts, ConnectionType};
|
||||||
ensure_connection_type(TransOpts) ->
|
ensure_connection_type(TransOpts) ->
|
||||||
{TransOpts#{connection_type => supervisor}, supervisor}.
|
{TransOpts#{connection_type => supervisor}, supervisor}.
|
||||||
|
|
||||||
|
%% Dynamic buffer was set; accept transport options as-is.
|
||||||
|
%% Note that initial 'buffer' size may be lower than dynamic buffer allows.
|
||||||
|
ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) ->
|
||||||
|
{TransOpts, DynamicBuffer};
|
||||||
|
%% Dynamic buffer was not set; define default dynamic buffer
|
||||||
|
%% only if 'buffer' size was not configured. In that case we
|
||||||
|
%% set the 'buffer' size to the lowest value.
|
||||||
|
ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) ->
|
||||||
|
case proplists:get_value(buffer, SocketOpts, undefined) of
|
||||||
|
undefined ->
|
||||||
|
{TransOpts#{socket_opts => [{buffer, 8192}|SocketOpts]}, {8192, 131072}};
|
||||||
|
_ ->
|
||||||
|
{TransOpts, false}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
|
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
|
||||||
|
|
||||||
stop_listener(Ref) ->
|
stop_listener(Ref) ->
|
||||||
|
|
80
src/cowboy_dynamic_buffer.hrl
Normal file
80
src/cowboy_dynamic_buffer.hrl
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
%% Copyright (c) 2025, Loïc Hoguin <essen@ninenines.eu>
|
||||||
|
%%
|
||||||
|
%% Permission to use, copy, modify, and/or distribute this software for any
|
||||||
|
%% purpose with or without fee is hereby granted, provided that the above
|
||||||
|
%% copyright notice and this permission notice appear in all copies.
|
||||||
|
%%
|
||||||
|
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||||
|
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||||
|
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||||
|
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||||
|
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||||
|
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
|
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
|
||||||
|
%% These functions are common to cowboy_http, cowboy_http2 and
|
||||||
|
%% cowboy_websocket. It requires the options and the state
|
||||||
|
%% to use the same field names.
|
||||||
|
|
||||||
|
%% Experiments have shown that the size of the 'buffer' can greatly
|
||||||
|
%% impact performance: a buffer too small leads to more messages
|
||||||
|
%% being handled and typically more binary appends; and a buffer
|
||||||
|
%% too large results in inefficient use of memory which in turn
|
||||||
|
%% reduces the throughput, presumably because large binary appends
|
||||||
|
%% are not as efficient as smaller ones, and because while the
|
||||||
|
%% buffer gets allocated only when there is data, the allocated
|
||||||
|
%% size remains until the binary is GC and so under-use hurts.
|
||||||
|
%%
|
||||||
|
%% The performance of a given 'buffer' size will also depend on
|
||||||
|
%% how the client is sending data, and on the protocol. For example,
|
||||||
|
%% HTTP/1.1 doesn't need a very large 'buffer' size for reading
|
||||||
|
%% request headers, but it does need one for reading large request
|
||||||
|
%% bodies. At the same time, HTTP/2 performs best reading large
|
||||||
|
%% request bodies when the 'buffer' size is about half that of
|
||||||
|
%% HTTP/1.1.
|
||||||
|
%%
|
||||||
|
%% It therefore becomes important to resize the buffer dynamically
|
||||||
|
%% depending on what is currently going on. We do this based on
|
||||||
|
%% the size of data packets we received from the transport. We
|
||||||
|
%% maintain a moving average and when that moving average is
|
||||||
|
%% 90% of the current 'buffer' size, we double the 'buffer' size.
|
||||||
|
%% When things slow down and the moving average falls below
|
||||||
|
%% 40% of the current 'buffer' size, we halve the 'buffer' size.
|
||||||
|
%%
|
||||||
|
%% To calculate the moving average we do (MovAvg + DataLen) div 2.
|
||||||
|
%% This means that the moving average will change very quickly when
|
||||||
|
%% DataLen increases or decreases rapidly. That's OK, we want to
|
||||||
|
%% be reactive, but also setting the buffer size is a pretty fast
|
||||||
|
%% operation. The formula could be changed to the following if it
|
||||||
|
%% became a problem: (MovAvg * N + DataLen) div (N + 1).
|
||||||
|
%%
|
||||||
|
%% Note that this works best when active,N uses low values of N.
|
||||||
|
%% We don't want to accumulate too much data because we resize
|
||||||
|
%% the buffer.
|
||||||
|
|
||||||
|
init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) ->
|
||||||
|
DynamicBuffer;
|
||||||
|
init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) ->
|
||||||
|
LowDynamicBuffer;
|
||||||
|
init_dynamic_buffer_size(_) ->
|
||||||
|
false.
|
||||||
|
|
||||||
|
maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) ->
|
||||||
|
State;
|
||||||
|
maybe_resize_buffer(State=#state{transport=Transport, socket=Socket,
|
||||||
|
opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}},
|
||||||
|
dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) ->
|
||||||
|
DataLen = byte_size(Data),
|
||||||
|
MovingAvg = (MovingAvg0 + DataLen) div 2,
|
||||||
|
if
|
||||||
|
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
|
||||||
|
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
|
||||||
|
ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
|
||||||
|
State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
|
||||||
|
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
|
||||||
|
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
|
||||||
|
ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
|
||||||
|
State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
|
||||||
|
true ->
|
||||||
|
State#state{dynamic_buffer_moving_average=MovingAvg}
|
||||||
|
end.
|
|
@ -28,6 +28,9 @@
|
||||||
compress_buffering => boolean(),
|
compress_buffering => boolean(),
|
||||||
compress_threshold => non_neg_integer(),
|
compress_threshold => non_neg_integer(),
|
||||||
connection_type => worker | supervisor,
|
connection_type => worker | supervisor,
|
||||||
|
dynamic_buffer => false | {pos_integer(), pos_integer()},
|
||||||
|
dynamic_buffer_initial_average => non_neg_integer(),
|
||||||
|
dynamic_buffer_initial_size => pos_integer(),
|
||||||
env => cowboy_middleware:env(),
|
env => cowboy_middleware:env(),
|
||||||
http10_keepalive => boolean(),
|
http10_keepalive => boolean(),
|
||||||
idle_timeout => timeout(),
|
idle_timeout => timeout(),
|
||||||
|
@ -137,6 +140,10 @@
|
||||||
%% Flow requested for the current stream.
|
%% Flow requested for the current stream.
|
||||||
flow = infinity :: non_neg_integer() | infinity,
|
flow = infinity :: non_neg_integer() | infinity,
|
||||||
|
|
||||||
|
%% Dynamic buffer moving average and current buffer size.
|
||||||
|
dynamic_buffer_size :: pos_integer() | false,
|
||||||
|
dynamic_buffer_moving_average :: non_neg_integer(),
|
||||||
|
|
||||||
%% Identifier for the stream currently being written.
|
%% Identifier for the stream currently being written.
|
||||||
%% Note that out_streamid =< in_streamid.
|
%% Note that out_streamid =< in_streamid.
|
||||||
out_streamid = 1 :: pos_integer(),
|
out_streamid = 1 :: pos_integer(),
|
||||||
|
@ -181,12 +188,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
|
||||||
parent=Parent, ref=Ref, socket=Socket,
|
parent=Parent, ref=Ref, socket=Socket,
|
||||||
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
|
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
|
||||||
peer=Peer, sock=Sock, cert=Cert,
|
peer=Peer, sock=Sock, cert=Cert,
|
||||||
|
dynamic_buffer_size=init_dynamic_buffer_size(Opts),
|
||||||
|
dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
|
||||||
last_streamid=maps:get(max_keepalive, Opts, 1000)},
|
last_streamid=maps:get(max_keepalive, Opts, 1000)},
|
||||||
safe_setopts_active(State),
|
safe_setopts_active(State),
|
||||||
loop(set_timeout(State, request_timeout)).
|
loop(set_timeout(State, request_timeout)).
|
||||||
|
|
||||||
|
-include("cowboy_dynamic_buffer.hrl").
|
||||||
|
|
||||||
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
||||||
N = maps:get(active_n, Opts, 100),
|
N = maps:get(active_n, Opts, 1),
|
||||||
Transport:setopts(Socket, [{active, N}]).
|
Transport:setopts(Socket, [{active, N}]).
|
||||||
|
|
||||||
safe_setopts_active(State) ->
|
safe_setopts_active(State) ->
|
||||||
|
@ -220,11 +231,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
|
||||||
receive
|
receive
|
||||||
%% Discard data coming in after the last request
|
%% Discard data coming in after the last request
|
||||||
%% we want to process was received fully.
|
%% we want to process was received fully.
|
||||||
{OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
|
{OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
|
||||||
loop(State);
|
State1 = maybe_resize_buffer(State, Data),
|
||||||
|
loop(State1);
|
||||||
%% Socket messages.
|
%% Socket messages.
|
||||||
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
||||||
parse(<< Buffer/binary, Data/binary >>, State);
|
State1 = maybe_resize_buffer(State, Data),
|
||||||
|
parse(<< Buffer/binary, Data/binary >>, State1);
|
||||||
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
||||||
terminate(State, {socket_error, closed, 'The socket has been closed.'});
|
terminate(State, {socket_error, closed, 'The socket has been closed.'});
|
||||||
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
||||||
|
@ -885,12 +898,12 @@ is_http2_upgrade(_, _) ->
|
||||||
|
|
||||||
%% Prior knowledge upgrade, without an HTTP/1.1 request.
|
%% Prior knowledge upgrade, without an HTTP/1.1 request.
|
||||||
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
|
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
|
||||||
proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
|
proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
|
||||||
case Transport:secure() of
|
case Transport:secure() of
|
||||||
false ->
|
false ->
|
||||||
_ = cancel_timeout(State),
|
_ = cancel_timeout(State),
|
||||||
cowboy_http2:init(Parent, Ref, Socket, Transport,
|
cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
|
||||||
ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
|
opts_for_upgrade(State), Peer, Sock, Cert, Buffer);
|
||||||
true ->
|
true ->
|
||||||
error_terminate(400, State, {connection_error, protocol_error,
|
error_terminate(400, State, {connection_error, protocol_error,
|
||||||
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
|
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
|
||||||
|
@ -898,7 +911,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
|
||||||
|
|
||||||
%% Upgrade via an HTTP/1.1 request.
|
%% Upgrade via an HTTP/1.1 request.
|
||||||
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
|
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
|
||||||
proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
|
proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert},
|
||||||
Buffer, HTTP2Settings, Req) ->
|
Buffer, HTTP2Settings, Req) ->
|
||||||
%% @todo
|
%% @todo
|
||||||
%% However if the client sent a body, we need to read the body in full
|
%% However if the client sent a body, we need to read the body in full
|
||||||
|
@ -907,13 +920,22 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
|
||||||
try cow_http_hd:parse_http2_settings(HTTP2Settings) of
|
try cow_http_hd:parse_http2_settings(HTTP2Settings) of
|
||||||
Settings ->
|
Settings ->
|
||||||
_ = cancel_timeout(State),
|
_ = cancel_timeout(State),
|
||||||
cowboy_http2:init(Parent, Ref, Socket, Transport,
|
cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
|
||||||
ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
|
opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req)
|
||||||
catch _:_ ->
|
catch _:_ ->
|
||||||
error_terminate(400, State, {connection_error, protocol_error,
|
error_terminate(400, State, {connection_error, protocol_error,
|
||||||
'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
|
'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) ->
|
||||||
|
Opts;
|
||||||
|
opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size,
|
||||||
|
dynamic_buffer_moving_average=MovingAvg}) ->
|
||||||
|
Opts#{
|
||||||
|
dynamic_buffer_initial_average => MovingAvg,
|
||||||
|
dynamic_buffer_initial_size => Size
|
||||||
|
}.
|
||||||
|
|
||||||
%% Request body parsing.
|
%% Request body parsing.
|
||||||
|
|
||||||
parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
|
parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
|
||||||
|
@ -1210,7 +1232,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_
|
||||||
commands(State, StreamID, Tail);
|
commands(State, StreamID, Tail);
|
||||||
%% Protocol takeover.
|
%% Protocol takeover.
|
||||||
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
|
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
|
||||||
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
|
out_state=OutState, buffer=Buffer, children=Children}, StreamID,
|
||||||
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
|
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
|
||||||
%% @todo If there's streams opened after this one, fail instead of 101.
|
%% @todo If there's streams opened after this one, fail instead of 101.
|
||||||
State1 = cancel_timeout(State0),
|
State1 = cancel_timeout(State0),
|
||||||
|
@ -1234,7 +1256,8 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
|
||||||
%% Turn off the trap_exit process flag
|
%% Turn off the trap_exit process flag
|
||||||
%% since this process will no longer be a supervisor.
|
%% since this process will no longer be a supervisor.
|
||||||
process_flag(trap_exit, false),
|
process_flag(trap_exit, false),
|
||||||
Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
|
Protocol:takeover(Parent, Ref, Socket, Transport,
|
||||||
|
opts_for_upgrade(State), Buffer, InitialState);
|
||||||
%% Set options dynamically.
|
%% Set options dynamically.
|
||||||
commands(State0=#state{overriden_opts=Opts},
|
commands(State0=#state{overriden_opts=Opts},
|
||||||
StreamID, [{set_options, SetOpts}|Tail]) ->
|
StreamID, [{set_options, SetOpts}|Tail]) ->
|
||||||
|
|
|
@ -29,6 +29,9 @@
|
||||||
connection_type => worker | supervisor,
|
connection_type => worker | supervisor,
|
||||||
connection_window_margin_size => 0..16#7fffffff,
|
connection_window_margin_size => 0..16#7fffffff,
|
||||||
connection_window_update_threshold => 0..16#7fffffff,
|
connection_window_update_threshold => 0..16#7fffffff,
|
||||||
|
dynamic_buffer => false | {pos_integer(), pos_integer()},
|
||||||
|
dynamic_buffer_initial_average => non_neg_integer(),
|
||||||
|
dynamic_buffer_initial_size => pos_integer(),
|
||||||
enable_connect_protocol => boolean(),
|
enable_connect_protocol => boolean(),
|
||||||
env => cowboy_middleware:env(),
|
env => cowboy_middleware:env(),
|
||||||
goaway_initial_timeout => timeout(),
|
goaway_initial_timeout => timeout(),
|
||||||
|
@ -133,6 +136,10 @@
|
||||||
%% Flow requested for all streams.
|
%% Flow requested for all streams.
|
||||||
flow = 0 :: non_neg_integer(),
|
flow = 0 :: non_neg_integer(),
|
||||||
|
|
||||||
|
%% Dynamic buffer moving average and current buffer size.
|
||||||
|
dynamic_buffer_size :: pos_integer() | false,
|
||||||
|
dynamic_buffer_moving_average :: non_neg_integer(),
|
||||||
|
|
||||||
%% Currently active HTTP/2 streams. Streams may be initiated either
|
%% Currently active HTTP/2 streams. Streams may be initiated either
|
||||||
%% by the client or by the server through PUSH_PROMISE frames.
|
%% by the client or by the server through PUSH_PROMISE frames.
|
||||||
streams = #{} :: #{cow_http2:streamid() => #stream{}},
|
streams = #{} :: #{cow_http2:streamid() => #stream{}},
|
||||||
|
@ -169,12 +176,15 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
|
||||||
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
|
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
|
||||||
binary() | undefined, binary()) -> ok.
|
binary() | undefined, binary()) -> ok.
|
||||||
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
|
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
|
||||||
|
DynamicBuffer = init_dynamic_buffer_size(Opts),
|
||||||
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
|
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
|
||||||
%% Send the preface before doing all the init in case we get a socket error.
|
%% Send the preface before doing all the init in case we get a socket error.
|
||||||
ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)),
|
ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)),
|
||||||
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
|
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
|
||||||
transport=Transport, proxy_header=ProxyHeader,
|
transport=Transport, proxy_header=ProxyHeader,
|
||||||
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
||||||
|
dynamic_buffer_size=DynamicBuffer,
|
||||||
|
dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
|
||||||
http2_status=sequence, http2_machine=HTTP2Machine}), 0),
|
http2_status=sequence, http2_machine=HTTP2Machine}), 0),
|
||||||
safe_setopts_active(State),
|
safe_setopts_active(State),
|
||||||
case Buffer of
|
case Buffer of
|
||||||
|
@ -216,12 +226,15 @@ add_period(Time, Period) -> Time + Period.
|
||||||
binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
|
binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
|
||||||
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
|
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
|
||||||
_Settings, Req=#{method := Method}) ->
|
_Settings, Req=#{method := Method}) ->
|
||||||
|
DynamicBuffer = init_dynamic_buffer_size(Opts),
|
||||||
{ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
|
{ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
|
||||||
{ok, StreamID, HTTP2Machine}
|
{ok, StreamID, HTTP2Machine}
|
||||||
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
|
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
|
||||||
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
||||||
transport=Transport, proxy_header=ProxyHeader,
|
transport=Transport, proxy_header=ProxyHeader,
|
||||||
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
||||||
|
dynamic_buffer_size=DynamicBuffer,
|
||||||
|
dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
|
||||||
http2_status=upgrade, http2_machine=HTTP2Machine},
|
http2_status=upgrade, http2_machine=HTTP2Machine},
|
||||||
State1 = headers_frame(State0#state{
|
State1 = headers_frame(State0#state{
|
||||||
http2_machine=HTTP2Machine}, StreamID, Req),
|
http2_machine=HTTP2Machine}, StreamID, Req),
|
||||||
|
@ -241,11 +254,14 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
|
||||||
_ -> parse(State, Buffer)
|
_ -> parse(State, Buffer)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-include("cowboy_dynamic_buffer.hrl").
|
||||||
|
|
||||||
%% Because HTTP/2 has flow control and Cowboy has other rate limiting
|
%% Because HTTP/2 has flow control and Cowboy has other rate limiting
|
||||||
%% mechanisms implemented, a very large active_n value should be fine,
|
%% mechanisms implemented, a very large active_n value should be fine,
|
||||||
%% as long as the stream handlers do their work in a timely manner.
|
%% as long as the stream handlers do their work in a timely manner.
|
||||||
|
%% However large active_n values reduce the impact of dynamic_buffer.
|
||||||
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
||||||
N = maps:get(active_n, Opts, 100),
|
N = maps:get(active_n, Opts, 1),
|
||||||
Transport:setopts(Socket, [{active, N}]).
|
Transport:setopts(Socket, [{active, N}]).
|
||||||
|
|
||||||
safe_setopts_active(State) ->
|
safe_setopts_active(State) ->
|
||||||
|
@ -258,7 +274,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
|
||||||
receive
|
receive
|
||||||
%% Socket messages.
|
%% Socket messages.
|
||||||
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
||||||
parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
|
State1 = maybe_resize_buffer(State, Data),
|
||||||
|
parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
|
||||||
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
||||||
Reason = case State#state.http2_status of
|
Reason = case State#state.http2_status of
|
||||||
closing -> {stop, closed, 'The client is going away.'};
|
closing -> {stop, closed, 'The client is going away.'};
|
||||||
|
|
|
@ -69,6 +69,9 @@
|
||||||
active_n => pos_integer(),
|
active_n => pos_integer(),
|
||||||
compress => boolean(),
|
compress => boolean(),
|
||||||
deflate_opts => cow_ws:deflate_opts(),
|
deflate_opts => cow_ws:deflate_opts(),
|
||||||
|
dynamic_buffer => false | {pos_integer(), pos_integer()},
|
||||||
|
dynamic_buffer_initial_average => non_neg_integer(),
|
||||||
|
dynamic_buffer_initial_size => pos_integer(),
|
||||||
idle_timeout => timeout(),
|
idle_timeout => timeout(),
|
||||||
max_frame_size => non_neg_integer() | infinity,
|
max_frame_size => non_neg_integer() | infinity,
|
||||||
req_filter => fun((cowboy_req:req()) -> map()),
|
req_filter => fun((cowboy_req:req()) -> map()),
|
||||||
|
@ -97,6 +100,11 @@
|
||||||
timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
|
timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
|
||||||
messages = undefined :: undefined | {atom(), atom(), atom()}
|
messages = undefined :: undefined | {atom(), atom(), atom()}
|
||||||
| {atom(), atom(), atom(), atom()},
|
| {atom(), atom(), atom(), atom()},
|
||||||
|
|
||||||
|
%% Dynamic buffer moving average and current buffer size.
|
||||||
|
dynamic_buffer_size = false :: pos_integer() | false,
|
||||||
|
dynamic_buffer_moving_average = 0 :: non_neg_integer(),
|
||||||
|
|
||||||
hibernate = false :: boolean(),
|
hibernate = false :: boolean(),
|
||||||
frag_state = undefined :: cow_ws:frag_state(),
|
frag_state = undefined :: cow_ws:frag_state(),
|
||||||
frag_buffer = <<>> :: binary(),
|
frag_buffer = <<>> :: binary(),
|
||||||
|
@ -270,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
|
||||||
%% @todo We don't want date and server headers.
|
%% @todo We don't want date and server headers.
|
||||||
Headers = cowboy_req:response_headers(#{}, Req),
|
Headers = cowboy_req:response_headers(#{}, Req),
|
||||||
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
|
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
|
||||||
takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>,
|
takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
|
||||||
{State, HandlerState}).
|
{State, HandlerState}).
|
||||||
|
|
||||||
%% Connection process.
|
%% Connection process.
|
||||||
|
@ -295,8 +303,8 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
|
||||||
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
|
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
|
||||||
module() | undefined, any(), binary(),
|
module() | undefined, any(), binary(),
|
||||||
{#state{}, any()}) -> no_return().
|
{#state{}, any()}) -> no_return().
|
||||||
takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
|
takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
|
||||||
{State0=#state{handler=Handler, req=Req}, HandlerState}) ->
|
{State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) ->
|
||||||
case Req of
|
case Req of
|
||||||
#{version := 'HTTP/3'} -> ok;
|
#{version := 'HTTP/3'} -> ok;
|
||||||
%% @todo We should have an option to disable this behavior.
|
%% @todo We should have an option to disable this behavior.
|
||||||
|
@ -308,7 +316,11 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
|
||||||
end,
|
end,
|
||||||
State = set_idle_timeout(State0#state{parent=Parent,
|
State = set_idle_timeout(State0#state{parent=Parent,
|
||||||
ref=Ref, socket=Socket, transport=Transport,
|
ref=Ref, socket=Socket, transport=Transport,
|
||||||
key=undefined, messages=Messages}, 0),
|
opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)},
|
||||||
|
key=undefined, messages=Messages,
|
||||||
|
%% Dynamic buffer only applies to HTTP/1.1 Websocket.
|
||||||
|
dynamic_buffer_size=init_dynamic_buffer_size(Opts),
|
||||||
|
dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0),
|
||||||
%% We call parse_header/3 immediately because there might be
|
%% We call parse_header/3 immediately because there might be
|
||||||
%% some data in the buffer that was sent along with the handshake.
|
%% some data in the buffer that was sent along with the handshake.
|
||||||
%% While it is not allowed by the protocol to send frames immediately,
|
%% While it is not allowed by the protocol to send frames immediately,
|
||||||
|
@ -319,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
|
||||||
false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
|
false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-include("cowboy_dynamic_buffer.hrl").
|
||||||
|
|
||||||
|
%% @todo Implement early socket error detection.
|
||||||
|
maybe_socket_error(_, _) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
after_init(State=#state{active=true}, HandlerState, ParseState) ->
|
after_init(State=#state{active=true}, HandlerState, ParseState) ->
|
||||||
%% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
|
%% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
|
||||||
%% We must do this only after calling websocket_init/1 (if any)
|
%% We must do this only after calling websocket_init/1 (if any)
|
||||||
|
@ -340,7 +358,7 @@ after_init(State, HandlerState, ParseState) ->
|
||||||
setopts_active(#state{transport=undefined}) ->
|
setopts_active(#state{transport=undefined}) ->
|
||||||
ok;
|
ok;
|
||||||
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
|
||||||
N = maps:get(active_n, Opts, 100),
|
N = maps:get(active_n, Opts, 1),
|
||||||
Transport:setopts(Socket, [{active, N}]).
|
Transport:setopts(Socket, [{active, N}]).
|
||||||
|
|
||||||
maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
|
maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
|
||||||
|
@ -414,7 +432,8 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
|
||||||
receive
|
receive
|
||||||
%% Socket messages. (HTTP/1.1)
|
%% Socket messages. (HTTP/1.1)
|
||||||
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
||||||
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
|
State1 = maybe_resize_buffer(State, Data),
|
||||||
|
parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data);
|
||||||
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
||||||
terminate(State, HandlerState, {error, closed});
|
terminate(State, HandlerState, {error, closed});
|
||||||
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
||||||
|
|
15
test/handlers/read_body_h.erl
Normal file
15
test/handlers/read_body_h.erl
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
%% This module reads the request body fully and send a 204 response.
|
||||||
|
|
||||||
|
-module(read_body_h).
|
||||||
|
|
||||||
|
-export([init/2]).
|
||||||
|
|
||||||
|
init(Req0, Opts) ->
|
||||||
|
{ok, Req} = read_body(Req0),
|
||||||
|
{ok, cowboy_req:reply(200, #{}, Req), Opts}.
|
||||||
|
|
||||||
|
read_body(Req0) ->
|
||||||
|
case cowboy_req:read_body(Req0) of
|
||||||
|
{ok, _, Req} -> {ok, Req};
|
||||||
|
{more, _, Req} -> read_body(Req)
|
||||||
|
end.
|
20
test/handlers/ws_ignore.erl
Normal file
20
test/handlers/ws_ignore.erl
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
%% Feel free to use, reuse and abuse the code in this file.
|
||||||
|
|
||||||
|
-module(ws_ignore).
|
||||||
|
|
||||||
|
-export([init/2]).
|
||||||
|
-export([websocket_handle/2]).
|
||||||
|
-export([websocket_info/2]).
|
||||||
|
|
||||||
|
init(Req, _) ->
|
||||||
|
{cowboy_websocket, Req, undefined, #{
|
||||||
|
compress => true
|
||||||
|
}}.
|
||||||
|
|
||||||
|
websocket_handle({text, <<"CHECK">>}, State) ->
|
||||||
|
{[{text, <<"CHECK">>}], State};
|
||||||
|
websocket_handle(_Frame, State) ->
|
||||||
|
{[], State}.
|
||||||
|
|
||||||
|
websocket_info(_Info, State) ->
|
||||||
|
{[], State}.
|
|
@ -32,7 +32,7 @@ groups() ->
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
do_log("", []),
|
do_log("", []),
|
||||||
%% Optionally enable `perf` for the current node.
|
%% Optionally enable `perf` for the current node.
|
||||||
% spawn(fun() -> ct:pal(os:cmd("perf record -g -F 9999 -o /tmp/ws_perf.data -p " ++ os:getpid() ++ " -- sleep 60")) end),
|
% spawn(fun() -> ct:pal(os:cmd("perf record -g -F 9999 -o /tmp/http_perf.data -p " ++ os:getpid() ++ " -- sleep 60")) end),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_) ->
|
end_per_suite(_) ->
|
||||||
|
@ -43,7 +43,16 @@ init_per_group(Name, Config) ->
|
||||||
%% HTTP/1.1
|
%% HTTP/1.1
|
||||||
max_keepalive => infinity,
|
max_keepalive => infinity,
|
||||||
%% HTTP/2
|
%% HTTP/2
|
||||||
max_received_frame_rate => {10_000_000, 1}
|
%% @todo Must configure Gun for performance too.
|
||||||
|
connection_window_margin_size => 64*1024,
|
||||||
|
enable_connect_protocol => true,
|
||||||
|
env => #{dispatch => init_dispatch(Config)},
|
||||||
|
max_frame_size_sent => 64*1024,
|
||||||
|
max_frame_size_received => 16384 * 1024 - 1,
|
||||||
|
max_received_frame_rate => {10_000_000, 1},
|
||||||
|
stream_window_data_threshold => 1024,
|
||||||
|
stream_window_margin_size => 64*1024
|
||||||
|
|
||||||
})].
|
})].
|
||||||
|
|
||||||
end_per_group(Name, _) ->
|
end_per_group(Name, _) ->
|
||||||
|
@ -54,10 +63,19 @@ end_per_group(Name, _) ->
|
||||||
|
|
||||||
init_dispatch(_) ->
|
init_dispatch(_) ->
|
||||||
cowboy_router:compile([{'_', [
|
cowboy_router:compile([{'_', [
|
||||||
{"/", hello_h, []}
|
{"/", hello_h, []},
|
||||||
|
{"/read_body", read_body_h, []}
|
||||||
]}]).
|
]}]).
|
||||||
|
|
||||||
%% Tests.
|
%% Tests: Hello world.
|
||||||
|
|
||||||
|
plain_h_hello_1(Config) ->
|
||||||
|
doc("Plain HTTP handler Hello World; 10K requests per 1 client."),
|
||||||
|
do_bench_get(?FUNCTION_NAME, "/", #{}, 1, 10000, Config).
|
||||||
|
|
||||||
|
plain_h_hello_10(Config) ->
|
||||||
|
doc("Plain HTTP handler Hello World; 10K requests per 10 clients."),
|
||||||
|
do_bench_get(?FUNCTION_NAME, "/", #{}, 10, 10000, Config).
|
||||||
|
|
||||||
stream_h_hello_1(Config) ->
|
stream_h_hello_1(Config) ->
|
||||||
doc("Stream handler Hello World; 10K requests per 1 client."),
|
doc("Stream handler Hello World; 10K requests per 1 client."),
|
||||||
|
@ -81,51 +99,86 @@ do_stream_h_hello(Config, NumClients) ->
|
||||||
do_bench_get(?FUNCTION_NAME, "/", #{}, NumClients, 10000, Config),
|
do_bench_get(?FUNCTION_NAME, "/", #{}, NumClients, 10000, Config),
|
||||||
ranch:set_protocol_options(Ref, ProtoOpts).
|
ranch:set_protocol_options(Ref, ProtoOpts).
|
||||||
|
|
||||||
plain_h_hello_1(Config) ->
|
%% Tests: Large body upload.
|
||||||
doc("Plain HTTP handler Hello World; 10K requests per 1 client."),
|
|
||||||
do_bench_get(?FUNCTION_NAME, "/", #{}, 1, 10000, Config).
|
|
||||||
|
|
||||||
plain_h_hello_10(Config) ->
|
plain_h_1M_post_1(Config) ->
|
||||||
doc("Plain HTTP handler Hello World; 10K requests per 10 clients."),
|
doc("Plain HTTP handler body reading; 100 requests per 1 client."),
|
||||||
do_bench_get(?FUNCTION_NAME, "/", #{}, 10, 10000, Config).
|
do_bench_post(?FUNCTION_NAME, "/read_body", #{}, <<0:8_000_000>>, 1, 10000, Config).
|
||||||
|
|
||||||
|
plain_h_1M_post_10(Config) ->
|
||||||
|
doc("Plain HTTP handler body reading; 100 requests per 10 clients."),
|
||||||
|
do_bench_post(?FUNCTION_NAME, "/read_body", #{}, <<0:8_000_000>>, 10, 10000, Config).
|
||||||
|
|
||||||
%% Internal.
|
%% Internal.
|
||||||
|
|
||||||
do_bench_get(What, Path, Headers, NumClients, NumRuns, Config) ->
|
do_bench_get(What, Path, Headers, NumClients, NumRuns, Config) ->
|
||||||
Clients = [spawn_link(?MODULE, do_bench_proc, [self(), What, Path, Headers, NumRuns, Config])
|
Clients = [spawn_link(?MODULE, do_bench_get_proc,
|
||||||
|
[self(), What, Path, Headers, NumRuns, Config])
|
||||||
|| _ <- lists:seq(1, NumClients)],
|
|| _ <- lists:seq(1, NumClients)],
|
||||||
_ = [receive {What, ready} -> ok end || _ <- Clients],
|
_ = [receive {What, ready} -> ok end || _ <- Clients],
|
||||||
{Time, _} = timer:tc(?MODULE, do_bench_get1, [What, Clients]),
|
{Time, _} = timer:tc(?MODULE, do_bench_wait, [What, Clients]),
|
||||||
do_log("~32s: ~8bµs ~8.1freqs/s", [
|
do_log("~32s: ~8bµs ~8.1freqs/s", [
|
||||||
[atom_to_list(config(group, Config)), $., atom_to_list(What)],
|
[atom_to_list(config(group, Config)), $., atom_to_list(What)],
|
||||||
Time,
|
Time,
|
||||||
(NumClients * NumRuns) / Time * 1_000_000]),
|
(NumClients * NumRuns) / Time * 1_000_000]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_bench_get1(What, Clients) ->
|
do_bench_get_proc(Parent, What, Path, Headers0, NumRuns, Config) ->
|
||||||
_ = [ClientPid ! {What, go} || ClientPid <- Clients],
|
|
||||||
_ = [receive {What, done} -> ok end || _ <- Clients],
|
|
||||||
ok.
|
|
||||||
|
|
||||||
do_bench_proc(Parent, What, Path, Headers0, NumRuns, Config) ->
|
|
||||||
ConnPid = gun_open(Config),
|
ConnPid = gun_open(Config),
|
||||||
Headers = Headers0#{<<"accept-encoding">> => <<"gzip">>},
|
Headers = Headers0#{<<"accept-encoding">> => <<"gzip">>},
|
||||||
Parent ! {What, ready},
|
Parent ! {What, ready},
|
||||||
receive {What, go} -> ok end,
|
receive {What, go} -> ok end,
|
||||||
do_bench_run(ConnPid, Path, Headers, NumRuns),
|
do_bench_get_run(ConnPid, Path, Headers, NumRuns),
|
||||||
Parent ! {What, done},
|
Parent ! {What, done},
|
||||||
gun:close(ConnPid).
|
gun:close(ConnPid).
|
||||||
|
|
||||||
do_bench_run(_, _, _, 0) ->
|
do_bench_get_run(_, _, _, 0) ->
|
||||||
ok;
|
ok;
|
||||||
do_bench_run(ConnPid, Path, Headers, Num) ->
|
do_bench_get_run(ConnPid, Path, Headers, Num) ->
|
||||||
Ref = gun:request(ConnPid, <<"GET">>, Path, Headers, <<>>),
|
Ref = gun:request(ConnPid, <<"GET">>, Path, Headers, <<>>),
|
||||||
{response, IsFin, 200, _RespHeaders} = gun:await(ConnPid, Ref, infinity),
|
{response, IsFin, 200, _RespHeaders} = gun:await(ConnPid, Ref, infinity),
|
||||||
{ok, _} = case IsFin of
|
{ok, _} = case IsFin of
|
||||||
nofin -> gun:await_body(ConnPid, Ref, infinity);
|
nofin -> gun:await_body(ConnPid, Ref, infinity);
|
||||||
fin -> {ok, <<>>}
|
fin -> {ok, <<>>}
|
||||||
end,
|
end,
|
||||||
do_bench_run(ConnPid, Path, Headers, Num - 1).
|
do_bench_get_run(ConnPid, Path, Headers, Num - 1).
|
||||||
|
|
||||||
|
do_bench_post(What, Path, Headers, Body, NumClients, NumRuns, Config) ->
|
||||||
|
Clients = [spawn_link(?MODULE, do_bench_post_proc,
|
||||||
|
[self(), What, Path, Headers, Body, NumRuns, Config])
|
||||||
|
|| _ <- lists:seq(1, NumClients)],
|
||||||
|
_ = [receive {What, ready} -> ok end || _ <- Clients],
|
||||||
|
{Time, _} = timer:tc(?MODULE, do_bench_wait, [What, Clients]),
|
||||||
|
do_log("~32s: ~8bµs ~8.1freqs/s", [
|
||||||
|
[atom_to_list(config(group, Config)), $., atom_to_list(What)],
|
||||||
|
Time,
|
||||||
|
(NumClients * NumRuns) / Time * 1_000_000]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
do_bench_post_proc(Parent, What, Path, Headers0, Body, NumRuns, Config) ->
|
||||||
|
ConnPid = gun_open(Config),
|
||||||
|
Headers = Headers0#{<<"accept-encoding">> => <<"gzip">>},
|
||||||
|
Parent ! {What, ready},
|
||||||
|
receive {What, go} -> ok end,
|
||||||
|
do_bench_post_run(ConnPid, Path, Headers, Body, NumRuns),
|
||||||
|
Parent ! {What, done},
|
||||||
|
gun:close(ConnPid).
|
||||||
|
|
||||||
|
do_bench_post_run(_, _, _, _, 0) ->
|
||||||
|
ok;
|
||||||
|
do_bench_post_run(ConnPid, Path, Headers, Body, Num) ->
|
||||||
|
Ref = gun:request(ConnPid, <<"POST">>, Path, Headers, Body),
|
||||||
|
{response, IsFin, 200, _RespHeaders} = gun:await(ConnPid, Ref, infinity),
|
||||||
|
{ok, _} = case IsFin of
|
||||||
|
nofin -> gun:await_body(ConnPid, Ref, infinity);
|
||||||
|
fin -> {ok, <<>>}
|
||||||
|
end,
|
||||||
|
do_bench_post_run(ConnPid, Path, Headers, Body, Num - 1).
|
||||||
|
|
||||||
|
do_bench_wait(What, Clients) ->
|
||||||
|
_ = [ClientPid ! {What, go} || ClientPid <- Clients],
|
||||||
|
_ = [receive {What, done} -> ok end || _ <- Clients],
|
||||||
|
ok.
|
||||||
|
|
||||||
do_log(Str, Args) ->
|
do_log(Str, Args) ->
|
||||||
ct:log(Str, Args),
|
ct:log(Str, Args),
|
||||||
|
|
|
@ -60,6 +60,7 @@ init_per_group(Name, Config) when Name =:= h2c; Name =:= h2c_compress ->
|
||||||
env => #{dispatch => init_dispatch(Config)},
|
env => #{dispatch => init_dispatch(Config)},
|
||||||
max_frame_size_sent => 64*1024,
|
max_frame_size_sent => 64*1024,
|
||||||
max_frame_size_received => 16384 * 1024 - 1,
|
max_frame_size_received => 16384 * 1024 - 1,
|
||||||
|
max_received_frame_rate => {10_000_000, 1},
|
||||||
stream_window_data_threshold => 1024,
|
stream_window_data_threshold => 1024,
|
||||||
stream_window_margin_size => 64*1024
|
stream_window_margin_size => 64*1024
|
||||||
}, [{flavor, Flavor}|Config]),
|
}, [{flavor, Flavor}|Config]),
|
||||||
|
@ -102,13 +103,14 @@ end_per_group(Name, _Config) ->
|
||||||
init_dispatch(_Config) ->
|
init_dispatch(_Config) ->
|
||||||
cowboy_router:compile([
|
cowboy_router:compile([
|
||||||
{"localhost", [
|
{"localhost", [
|
||||||
{"/ws_echo", ws_echo, []}
|
{"/ws_echo", ws_echo, []},
|
||||||
|
{"/ws_ignore", ws_ignore, []}
|
||||||
]}
|
]}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Support functions for testing using Gun.
|
%% Support functions for testing using Gun.
|
||||||
|
|
||||||
do_gun_open_ws(Config) ->
|
do_gun_open_ws(Path, Config) ->
|
||||||
ConnPid = gun_open(Config, #{
|
ConnPid = gun_open(Config, #{
|
||||||
http2_opts => #{
|
http2_opts => #{
|
||||||
connection_window_margin_size => 64*1024,
|
connection_window_margin_size => 64*1024,
|
||||||
|
@ -127,7 +129,7 @@ do_gun_open_ws(Config) ->
|
||||||
{notify, settings_changed, #{enable_connect_protocol := true}}
|
{notify, settings_changed, #{enable_connect_protocol := true}}
|
||||||
= gun:await(ConnPid, undefined) %% @todo Maybe have a gun:await/1?
|
= gun:await(ConnPid, undefined) %% @todo Maybe have a gun:await/1?
|
||||||
end,
|
end,
|
||||||
StreamRef = gun:ws_upgrade(ConnPid, "/ws_echo"),
|
StreamRef = gun:ws_upgrade(ConnPid, Path),
|
||||||
receive
|
receive
|
||||||
{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
|
{gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
|
||||||
{ok, ConnPid, StreamRef};
|
{ok, ConnPid, StreamRef};
|
||||||
|
@ -149,72 +151,140 @@ receive_ws(ConnPid, StreamRef) ->
|
||||||
|
|
||||||
%% Tests.
|
%% Tests.
|
||||||
|
|
||||||
one_00064KiB(Config) ->
|
echo_1_00064KiB(Config) ->
|
||||||
doc("Send and receive a 64KiB frame."),
|
doc("Send and receive a 64KiB frame."),
|
||||||
do_full(Config, one, 1, 64 * 1024).
|
do_echo(Config, echo_1, 1, 64 * 1024).
|
||||||
|
|
||||||
one_00256KiB(Config) ->
|
echo_1_00256KiB(Config) ->
|
||||||
doc("Send and receive a 256KiB frame."),
|
doc("Send and receive a 256KiB frame."),
|
||||||
do_full(Config, one, 1, 256 * 1024).
|
do_echo(Config, echo_1, 1, 256 * 1024).
|
||||||
|
|
||||||
one_01024KiB(Config) ->
|
echo_1_01024KiB(Config) ->
|
||||||
doc("Send and receive a 1024KiB frame."),
|
doc("Send and receive a 1024KiB frame."),
|
||||||
do_full(Config, one, 1, 1024 * 1024).
|
do_echo(Config, echo_1, 1, 1024 * 1024).
|
||||||
|
|
||||||
one_04096KiB(Config) ->
|
echo_1_04096KiB(Config) ->
|
||||||
doc("Send and receive a 4096KiB frame."),
|
doc("Send and receive a 4096KiB frame."),
|
||||||
do_full(Config, one, 1, 4096 * 1024).
|
do_echo(Config, echo_1, 1, 4096 * 1024).
|
||||||
|
|
||||||
%% Minus one because frames can only get so big.
|
%% Minus one because frames can only get so big.
|
||||||
one_16384KiB(Config) ->
|
echo_1_16384KiB(Config) ->
|
||||||
doc("Send and receive a 16384KiB - 1 frame."),
|
doc("Send and receive a 16384KiB - 1 frame."),
|
||||||
do_full(Config, one, 1, 16384 * 1024 - 1).
|
do_echo(Config, echo_1, 1, 16384 * 1024 - 1).
|
||||||
|
|
||||||
repeat_00000B(Config) ->
|
echo_N_00000B(Config) ->
|
||||||
doc("Send and receive a 0B frame 1000 times."),
|
doc("Send and receive a 0B frame 1000 times."),
|
||||||
do_full(Config, repeat, 1000, 0).
|
do_echo(Config, echo_N, 1000, 0).
|
||||||
|
|
||||||
repeat_00256B(Config) ->
|
echo_N_00256B(Config) ->
|
||||||
doc("Send and receive a 256B frame 1000 times."),
|
doc("Send and receive a 256B frame 1000 times."),
|
||||||
do_full(Config, repeat, 1000, 256).
|
do_echo(Config, echo_N, 1000, 256).
|
||||||
|
|
||||||
repeat_01024B(Config) ->
|
echo_N_01024B(Config) ->
|
||||||
doc("Send and receive a 1024B frame 1000 times."),
|
doc("Send and receive a 1024B frame 1000 times."),
|
||||||
do_full(Config, repeat, 1000, 1024).
|
do_echo(Config, echo_N, 1000, 1024).
|
||||||
|
|
||||||
repeat_04096B(Config) ->
|
echo_N_04096B(Config) ->
|
||||||
doc("Send and receive a 4096B frame 1000 times."),
|
doc("Send and receive a 4096B frame 1000 times."),
|
||||||
do_full(Config, repeat, 1000, 4096).
|
do_echo(Config, echo_N, 1000, 4096).
|
||||||
|
|
||||||
repeat_16384B(Config) ->
|
echo_N_16384B(Config) ->
|
||||||
doc("Send and receive a 16384B frame 1000 times."),
|
doc("Send and receive a 16384B frame 1000 times."),
|
||||||
do_full(Config, repeat, 1000, 16384).
|
do_echo(Config, echo_N, 1000, 16384).
|
||||||
|
|
||||||
%repeat_16384B_10K(Config) ->
|
%echo_N_16384B_10K(Config) ->
|
||||||
% doc("Send and receive a 16384B frame 10000 times."),
|
% doc("Send and receive a 16384B frame 10000 times."),
|
||||||
% do_full(Config, repeat, 10000, 16384).
|
% do_echo(Config, echo_N, 10000, 16384).
|
||||||
|
|
||||||
do_full(Config, What, Num, FrameSize) ->
|
do_echo(Config, What, Num, FrameSize) ->
|
||||||
{ok, ConnPid, StreamRef} = do_gun_open_ws(Config),
|
{ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_echo", Config),
|
||||||
FrameType = config(frame_type, Config),
|
FrameType = config(frame_type, Config),
|
||||||
FrameData = case FrameType of
|
FrameData = case FrameType of
|
||||||
text -> do_text_data(Config, FrameSize);
|
text -> do_text_data(Config, FrameSize);
|
||||||
binary -> rand:bytes(FrameSize)
|
binary -> rand:bytes(FrameSize)
|
||||||
end,
|
end,
|
||||||
%% Heat up the processes before doing the real run.
|
%% Heat up the processes before doing the real run.
|
||||||
% do_full1(ConnPid, StreamRef, Num, FrameType, FrameData),
|
% do_echo_loop(ConnPid, StreamRef, Num, FrameType, FrameData),
|
||||||
{Time, _} = timer:tc(?MODULE, do_full1, [ConnPid, StreamRef, Num, FrameType, FrameData]),
|
{Time, _} = timer:tc(?MODULE, do_echo_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]),
|
||||||
do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
|
do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
|
||||||
gun:ws_send(ConnPid, StreamRef, close),
|
gun:ws_send(ConnPid, StreamRef, close),
|
||||||
{ok, close} = receive_ws(ConnPid, StreamRef),
|
{ok, close} = receive_ws(ConnPid, StreamRef),
|
||||||
gun_down(ConnPid).
|
gun_down(ConnPid).
|
||||||
|
|
||||||
do_full1(_, _, 0, _, _) ->
|
do_echo_loop(_, _, 0, _, _) ->
|
||||||
ok;
|
ok;
|
||||||
do_full1(ConnPid, StreamRef, Num, FrameType, FrameData) ->
|
do_echo_loop(ConnPid, StreamRef, Num, FrameType, FrameData) ->
|
||||||
gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
|
gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
|
||||||
{ok, {FrameType, FrameData}} = receive_ws(ConnPid, StreamRef),
|
{ok, {FrameType, FrameData}} = receive_ws(ConnPid, StreamRef),
|
||||||
do_full1(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
|
do_echo_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
|
||||||
|
|
||||||
|
send_1_00064KiB(Config) ->
|
||||||
|
doc("Send a 64KiB frame."),
|
||||||
|
do_send(Config, send_1, 1, 64 * 1024).
|
||||||
|
|
||||||
|
send_1_00256KiB(Config) ->
|
||||||
|
doc("Send a 256KiB frame."),
|
||||||
|
do_send(Config, send_1, 1, 256 * 1024).
|
||||||
|
|
||||||
|
send_1_01024KiB(Config) ->
|
||||||
|
doc("Send a 1024KiB frame."),
|
||||||
|
do_send(Config, send_1, 1, 1024 * 1024).
|
||||||
|
|
||||||
|
send_1_04096KiB(Config) ->
|
||||||
|
doc("Send a 4096KiB frame."),
|
||||||
|
do_send(Config, send_1, 1, 4096 * 1024).
|
||||||
|
|
||||||
|
%% Minus one because frames can only get so big.
|
||||||
|
send_1_16384KiB(Config) ->
|
||||||
|
doc("Send a 16384KiB - 1 frame."),
|
||||||
|
do_send(Config, send_1, 1, 16384 * 1024 - 1).
|
||||||
|
|
||||||
|
send_N_00000B(Config) ->
|
||||||
|
doc("Send a 0B frame 10000 times."),
|
||||||
|
do_send(Config, send_N, 10000, 0).
|
||||||
|
|
||||||
|
send_N_00256B(Config) ->
|
||||||
|
doc("Send a 256B frame 10000 times."),
|
||||||
|
do_send(Config, send_N, 10000, 256).
|
||||||
|
|
||||||
|
send_N_01024B(Config) ->
|
||||||
|
doc("Send a 1024B frame 10000 times."),
|
||||||
|
do_send(Config, send_N, 10000, 1024).
|
||||||
|
|
||||||
|
send_N_04096B(Config) ->
|
||||||
|
doc("Send a 4096B frame 10000 times."),
|
||||||
|
do_send(Config, send_N, 10000, 4096).
|
||||||
|
|
||||||
|
send_N_16384B(Config) ->
|
||||||
|
doc("Send a 16384B frame 10000 times."),
|
||||||
|
do_send(Config, send_N, 10000, 16384).
|
||||||
|
|
||||||
|
%send_N_16384B_10K(Config) ->
|
||||||
|
% doc("Send and receive a 16384B frame 10000 times."),
|
||||||
|
% do_send(Config, send_N, 10000, 16384).
|
||||||
|
|
||||||
|
do_send(Config, What, Num, FrameSize) ->
|
||||||
|
{ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_ignore", Config),
|
||||||
|
FrameType = config(frame_type, Config),
|
||||||
|
FrameData = case FrameType of
|
||||||
|
text -> do_text_data(Config, FrameSize);
|
||||||
|
binary -> rand:bytes(FrameSize)
|
||||||
|
end,
|
||||||
|
%% Heat up the processes before doing the real run.
|
||||||
|
% do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData),
|
||||||
|
{Time, _} = timer:tc(?MODULE, do_send_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]),
|
||||||
|
do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]),
|
||||||
|
gun:ws_send(ConnPid, StreamRef, close),
|
||||||
|
{ok, close} = receive_ws(ConnPid, StreamRef),
|
||||||
|
gun_down(ConnPid).
|
||||||
|
|
||||||
|
do_send_loop(ConnPid, StreamRef, 0, _, _) ->
|
||||||
|
gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}),
|
||||||
|
{ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef),
|
||||||
|
ok;
|
||||||
|
do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData) ->
|
||||||
|
gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}),
|
||||||
|
do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData).
|
||||||
|
|
||||||
%% Internal.
|
%% Internal.
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue