0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Add deflate options for Websocket compression

They allow the server to configure what it is willing to accept
for both the negotiated configuration (takeover and window bits)
and the other zlib options (level, mem_level and strategy).

This can be used to reduce the memory and/or CPU footprint of
the compressed data, which comes with a cost in compression ratio.
This commit is contained in:
Loïc Hoguin 2018-11-12 18:12:44 +01:00
parent fe1ee080de
commit 8164b50453
No known key found for this signature in database
GPG key ID: 8A9DF795F6FED764
4 changed files with 220 additions and 50 deletions

View file

@ -152,6 +152,7 @@ Cowboy does it automatically for you.
---- ----
opts() :: #{ opts() :: #{
compress => boolean(), compress => boolean(),
deflate_opts => cow_ws:deflate_opts()
idle_timeout => timeout(), idle_timeout => timeout(),
max_frame_size => non_neg_integer() | infinity, max_frame_size => non_neg_integer() | infinity,
req_filter => fun((cowboy_req:req()) -> map()) req_filter => fun((cowboy_req:req()) -> map())
@ -173,31 +174,44 @@ init(Req, State) ->
The default value is given next to the option name: The default value is given next to the option name:
compress (false):: compress (false)::
Whether to enable the Websocket frame compression
extension. Frames will only be compressed for the Whether to enable the Websocket frame compression
clients that support this extension. extension. Frames will only be compressed for the
clients that support this extension.
deflate_opts (#{})::
Configuration for the permessage-deflate Websocket
extension. Allows configuring both the negotiated
options and the zlib compression options. The
defaults optimize the compression at the expense
of some memory and CPU.
idle_timeout (60000):: idle_timeout (60000)::
Time in milliseconds that Cowboy will keep the
connection open without receiving anything from Time in milliseconds that Cowboy will keep the
the client. connection open without receiving anything from
the client.
max_frame_size (infinity):: max_frame_size (infinity)::
Maximum frame size allowed by this Websocket
handler. Cowboy will close the connection when Maximum frame size allowed by this Websocket
a client attempts to send a frame that goes over handler. Cowboy will close the connection when
this limit. For fragmented frames this applies a client attempts to send a frame that goes over
to the size of the reconstituted frame. this limit. For fragmented frames this applies
to the size of the reconstituted frame.
req_filter:: req_filter::
A function applied to the Req to compact it and
only keep required information. The Req is only A function applied to the Req to compact it and
given back in the `terminate/3` callback. By default only keep required information. The Req is only
it keeps the method, version, URI components and peer given back in the `terminate/3` callback. By default
information. it keeps the method, version, URI components and peer
information.
== Changelog == Changelog
* *2.6*: Deflate options can now be configured via `deflate_opts`.
* *2.0*: The Req object is no longer passed to Websocket callbacks. * *2.0*: The Req object is no longer passed to Websocket callbacks.
* *2.0*: The callback `websocket_terminate/3` was removed in favor of `terminate/3`. * *2.0*: The callback `websocket_terminate/3` was removed in favor of `terminate/3`.
* *1.0*: Protocol introduced. * *1.0*: Protocol introduced.

View file

@ -66,6 +66,7 @@
-type opts() :: #{ -type opts() :: #{
compress => boolean(), compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
idle_timeout => timeout(), idle_timeout => timeout(),
max_frame_size => non_neg_integer() | infinity, max_frame_size => non_neg_integer() | infinity,
req_filter => fun((cowboy_req:req()) -> map()) req_filter => fun((cowboy_req:req()) -> map())
@ -77,13 +78,11 @@
ref :: ranch:ref(), ref :: ranch:ref(),
socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined, socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined,
transport = undefined :: module() | undefined, transport = undefined :: module() | undefined,
opts = #{} :: opts(),
active = true :: boolean(), active = true :: boolean(),
handler :: module(), handler :: module(),
key = undefined :: undefined | binary(), key = undefined :: undefined | binary(),
timeout = infinity :: timeout(),
timeout_ref = undefined :: undefined | reference(), timeout_ref = undefined :: undefined | reference(),
compress = false :: boolean(),
max_frame_size :: non_neg_integer() | infinity,
messages = undefined :: undefined | {atom(), atom(), atom()}, messages = undefined :: undefined | {atom(), atom(), atom()},
hibernate = false :: boolean(), hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(), frag_state = undefined :: cow_ws:frag_state(),
@ -125,15 +124,11 @@ upgrade(Req, Env, Handler, HandlerState) ->
when Req::cowboy_req:req(), Env::cowboy_middleware:env(). when Req::cowboy_req:req(), Env::cowboy_middleware:env().
%% @todo Immediately crash if a response has already been sent. %% @todo Immediately crash if a response has already been sent.
upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) -> upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
Timeout = maps:get(idle_timeout, Opts, 60000),
MaxFrameSize = maps:get(max_frame_size, Opts, infinity),
Compress = maps:get(compress, Opts, false),
FilteredReq = case maps:get(req_filter, Opts, undefined) of FilteredReq = case maps:get(req_filter, Opts, undefined) of
undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0); undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0);
FilterFun -> FilterFun(Req0) FilterFun -> FilterFun(Req0)
end, end,
State0 = #state{handler=Handler, timeout=Timeout, compress=Compress, State0 = #state{opts=Opts, handler=Handler, req=FilteredReq},
max_frame_size=MaxFrameSize, req=FilteredReq},
try websocket_upgrade(State0, Req0) of try websocket_upgrade(State0, Req0) of
{ok, State, Req} -> {ok, State, Req} ->
websocket_handshake(State, Req, HandlerState, Env); websocket_handshake(State, Req, HandlerState, Env);
@ -174,13 +169,14 @@ websocket_version(State, Req) ->
end, end,
websocket_extensions(State, Req#{websocket_version => WsVersion}). websocket_extensions(State, Req#{websocket_version => WsVersion}).
websocket_extensions(State=#state{compress=Compress}, Req) -> websocket_extensions(State=#state{opts=Opts}, Req) ->
%% @todo We want different options for this. For example %% @todo We want different options for this. For example
%% * compress everything auto %% * compress everything auto
%% * compress only text auto %% * compress only text auto
%% * compress only binary auto %% * compress only binary auto
%% * compress nothing auto (but still enabled it) %% * compress nothing auto (but still enabled it)
%% * disable compression %% * disable compression
Compress = maps:get(compress, Opts, false),
case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of
{true, Extensions} when Extensions =/= undefined -> {true, Extensions} when Extensions =/= undefined ->
websocket_extensions(State, Req, Extensions, []); websocket_extensions(State, Req, Extensions, []);
@ -193,15 +189,15 @@ websocket_extensions(State, Req, [], []) ->
websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) -> websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) ->
{ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)}; {ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)};
%% For HTTP/2 we ARE on the controlling process and do NOT want to update the owner. %% For HTTP/2 we ARE on the controlling process and do NOT want to update the owner.
websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, version := Version}, websocket_extensions(State=#state{opts=Opts, extensions=Extensions},
Req=#{pid := Pid, version := Version},
[{<<"permessage-deflate">>, Params}|Tail], RespHeader) -> [{<<"permessage-deflate">>, Params}|Tail], RespHeader) ->
%% @todo Make deflate options configurable. DeflateOpts0 = maps:get(deflate_opts, Opts, #{}),
Opts0 = #{level => best_compression, mem_level => 8, strategy => default}, DeflateOpts = case Version of
Opts = case Version of 'HTTP/1.1' -> DeflateOpts0#{owner => Pid};
'HTTP/1.1' -> Opts0#{owner => Pid}; _ -> DeflateOpts0
_ -> Opts0
end, end,
try cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of try cow_ws:negotiate_permessage_deflate(Params, Extensions, DeflateOpts) of
{ok, RespExt, Extensions2} -> {ok, RespExt, Extensions2} ->
websocket_extensions(State#state{extensions=Extensions2}, websocket_extensions(State#state{extensions=Extensions2},
Req, Tail, [<<", ">>, RespExt|RespHeader]); Req, Tail, [<<", ">>, RespExt|RespHeader]);
@ -210,15 +206,15 @@ websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, vers
catch exit:{error, incompatible_zlib_version, _} -> catch exit:{error, incompatible_zlib_version, _} ->
websocket_extensions(State, Req, Tail, RespHeader) websocket_extensions(State, Req, Tail, RespHeader)
end; end;
websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, version := Version}, websocket_extensions(State=#state{opts=Opts, extensions=Extensions},
Req=#{pid := Pid, version := Version},
[{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) -> [{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) ->
%% @todo Make deflate options configurable. DeflateOpts0 = maps:get(deflate_opts, Opts, #{}),
Opts0 = #{level => best_compression, mem_level => 8, strategy => default}, DeflateOpts = case Version of
Opts = case Version of 'HTTP/1.1' -> DeflateOpts0#{owner => Pid};
'HTTP/1.1' -> Opts0#{owner => Pid}; _ -> DeflateOpts0
_ -> Opts0
end, end,
try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts) of try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, DeflateOpts) of
{ok, RespExt, Extensions2} -> {ok, RespExt, Extensions2} ->
websocket_extensions(State#state{extensions=Extensions2}, websocket_extensions(State#state{extensions=Extensions2},
Req, Tail, [<<", ">>, RespExt|RespHeader]); Req, Tail, [<<", ">>, RespExt|RespHeader]);
@ -317,13 +313,18 @@ before_loop(State=#state{socket=Socket, transport=Transport},
loop(State, HandlerState, ParseState). loop(State, HandlerState, ParseState).
-spec loop_timeout(#state{}) -> #state{}. -spec loop_timeout(#state{}) -> #state{}.
loop_timeout(State=#state{timeout=infinity}) -> loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) ->
State#state{timeout_ref=undefined}; _ = case PrevRef of
loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) -> undefined -> ignore;
_ = case PrevRef of undefined -> ignore; PrevRef -> PrevRef -> erlang:cancel_timer(PrevRef)
erlang:cancel_timer(PrevRef) end, end,
TRef = erlang:start_timer(Timeout, self(), ?MODULE), case maps:get(idle_timeout, Opts, 60000) of
State#state{timeout_ref=TRef}. infinity ->
State#state{timeout_ref=undefined};
Timeout ->
TRef = erlang:start_timer(Timeout, self(), ?MODULE),
State#state{timeout_ref=TRef}
end.
-spec loop(#state{}, any(), parse_state()) -> no_return(). -spec loop(#state{}, any(), parse_state()) -> no_return().
loop(State=#state{parent=Parent, socket=Socket, messages=Messages, loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
@ -377,9 +378,9 @@ parse(State, HandlerState, PS=#ps_payload{buffer=Buffer}, Data) ->
parse_payload(State, HandlerState, PS#ps_payload{buffer= <<>>}, parse_payload(State, HandlerState, PS#ps_payload{buffer= <<>>},
<<Buffer/binary, Data/binary>>). <<Buffer/binary, Data/binary>>).
parse_header(State=#state{max_frame_size=MaxFrameSize, parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions},
frag_state=FragState, extensions=Extensions},
HandlerState, ParseState=#ps_header{buffer=Data}) -> HandlerState, ParseState=#ps_header{buffer=Data}) ->
MaxFrameSize = maps:get(max_frame_size, Opts, infinity),
case cow_ws:parse_header(Data, Extensions, FragState) of case cow_ws:parse_header(Data, Extensions, FragState) of
%% All frames sent from the client to the server are masked. %% All frames sent from the client to the server are masked.
{_, _, _, _, undefined, _} -> {_, _, _, _, undefined, _} ->
@ -423,10 +424,11 @@ parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensio
websocket_close(State, HandlerState, Error) websocket_close(State, HandlerState, Error)
end. end.
dispatch_frame(State=#state{max_frame_size=MaxFrameSize, frag_state=FragState, dispatch_frame(State=#state{opts=Opts, frag_state=FragState,
frag_buffer=SoFar, extensions=Extensions}, HandlerState, frag_buffer=SoFar, extensions=Extensions}, HandlerState,
#ps_payload{type=Type0, unmasked=Payload0, close_code=CloseCode0}, #ps_payload{type=Type0, unmasked=Payload0, close_code=CloseCode0},
RemainingData) -> RemainingData) ->
MaxFrameSize = maps:get(max_frame_size, Opts, infinity),
case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of
%% @todo Allow receiving fragments. %% @todo Allow receiving fragments.
{fragment, _, _, Payload} when byte_size(Payload) + byte_size(SoFar) > MaxFrameSize -> {fragment, _, _, Payload} when byte_size(Payload) + byte_size(SoFar) > MaxFrameSize ->

View file

@ -0,0 +1,36 @@
%% This module enables compression and returns deflate
%% options depending on the query string.
-module(ws_deflate_opts_h).
-behavior(cowboy_websocket).
-export([init/2]).
-export([websocket_handle/2]).
-export([websocket_info/2]).
init(Req=#{qs := Qs}, State) ->
{Name, Value} = case Qs of
<<"server_context_takeover">> -> {server_context_takeover, takeover};
<<"server_no_context_takeover">> -> {server_context_takeover, no_takeover};
<<"client_context_takeover">> -> {client_context_takeover, takeover};
<<"client_no_context_takeover">> -> {client_context_takeover, no_takeover};
<<"server_max_window_bits">> -> {server_max_window_bits, 9};
<<"client_max_window_bits">> -> {client_max_window_bits, 9};
<<"level">> -> {level, best_speed};
<<"mem_level">> -> {mem_level, 1};
<<"strategy">> -> {strategy, rle}
end,
{cowboy_websocket, Req, State, #{
compress => true,
deflate_opts => #{Name => Value}
}}.
websocket_handle({text, Data}, State) ->
{reply, {text, Data}, State};
websocket_handle({binary, Data}, State) ->
{reply, {binary, Data}, State};
websocket_handle(_, State) ->
{ok, State}.
websocket_info(_, State) ->
{ok, State}.

View file

@ -83,7 +83,8 @@ init_dispatch() ->
{"/terminate", ws_terminate_h, []}, {"/terminate", ws_terminate_h, []},
{"/ws_timeout_hibernate", ws_timeout_hibernate, []}, {"/ws_timeout_hibernate", ws_timeout_hibernate, []},
{"/ws_timeout_cancel", ws_timeout_cancel, []}, {"/ws_timeout_cancel", ws_timeout_cancel, []},
{"/ws_max_frame_size", ws_max_frame_size, []} {"/ws_max_frame_size", ws_max_frame_size, []},
{"/ws_deflate_opts", ws_deflate_opts_h, []}
]} ]}
]). ]).
@ -231,6 +232,123 @@ do_ws_version(Socket) ->
{error, closed} = gen_tcp:recv(Socket, 0, 6000), {error, closed} = gen_tcp:recv(Socket, 0, 6000),
ok. ok.
ws_deflate_opts_client_context_takeover(Config) ->
doc("Handler is configured with client context takeover enabled."),
{ok, _, Headers1} = do_handshake("/ws_deflate_opts?client_context_takeover",
"Sec-WebSocket-Extensions: permessage-deflate\r\n", Config),
{_, "permessage-deflate"}
= lists:keyfind("sec-websocket-extensions", 1, Headers1),
{ok, _, Headers2} = do_handshake("/ws_deflate_opts?client_context_takeover",
"Sec-WebSocket-Extensions: permessage-deflate; client_no_context_takeover\r\n", Config),
{_, "permessage-deflate; client_no_context_takeover"}
= lists:keyfind("sec-websocket-extensions", 1, Headers2),
ok.
ws_deflate_opts_client_no_context_takeover(Config) ->
doc("Handler is configured with client context takeover disabled."),
{ok, _, Headers1} = do_handshake("/ws_deflate_opts?client_no_context_takeover",
"Sec-WebSocket-Extensions: permessage-deflate\r\n", Config),
{_, "permessage-deflate; client_no_context_takeover"}
= lists:keyfind("sec-websocket-extensions", 1, Headers1),
{ok, _, Headers2} = do_handshake("/ws_deflate_opts?client_no_context_takeover",
"Sec-WebSocket-Extensions: permessage-deflate; client_no_context_takeover\r\n", Config),
{_, "permessage-deflate; client_no_context_takeover"}
= lists:keyfind("sec-websocket-extensions", 1, Headers2),
ok.
%% We must send client_max_window_bits to indicate we support it.
ws_deflate_opts_client_max_window_bits(Config) ->
doc("Handler is configured with client max window bits."),
{ok, _, Headers} = do_handshake("/ws_deflate_opts?client_max_window_bits",
"Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n", Config),
{_, "permessage-deflate; client_max_window_bits=9"}
= lists:keyfind("sec-websocket-extensions", 1, Headers),
ok.
ws_deflate_opts_client_max_window_bits_override(Config) ->
doc("Handler is configured with client max window bits."),
{ok, _, Headers1} = do_handshake("/ws_deflate_opts?client_max_window_bits",
"Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits=8\r\n", Config),
{_, "permessage-deflate; client_max_window_bits=8"}
= lists:keyfind("sec-websocket-extensions", 1, Headers1),
{ok, _, Headers2} = do_handshake("/ws_deflate_opts?client_max_window_bits",
"Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits=12\r\n", Config),
{_, "permessage-deflate; client_max_window_bits=9"}
= lists:keyfind("sec-websocket-extensions", 1, Headers2),
ok.
ws_deflate_opts_server_context_takeover(Config) ->
doc("Handler is configured with server context takeover enabled."),
{ok, _, Headers1} = do_handshake("/ws_deflate_opts?server_context_takeover",
"Sec-WebSocket-Extensions: permessage-deflate\r\n", Config),
{_, "permessage-deflate"}
= lists:keyfind("sec-websocket-extensions", 1, Headers1),
{ok, _, Headers2} = do_handshake("/ws_deflate_opts?server_context_takeover",
"Sec-WebSocket-Extensions: permessage-deflate; server_no_context_takeover\r\n", Config),
{_, "permessage-deflate; server_no_context_takeover"}
= lists:keyfind("sec-websocket-extensions", 1, Headers2),
ok.
ws_deflate_opts_server_no_context_takeover(Config) ->
doc("Handler is configured with server context takeover disabled."),
{ok, _, Headers1} = do_handshake("/ws_deflate_opts?server_no_context_takeover",
"Sec-WebSocket-Extensions: permessage-deflate\r\n", Config),
{_, "permessage-deflate; server_no_context_takeover"}
= lists:keyfind("sec-websocket-extensions", 1, Headers1),
{ok, _, Headers2} = do_handshake("/ws_deflate_opts?server_no_context_takeover",
"Sec-WebSocket-Extensions: permessage-deflate; server_no_context_takeover\r\n", Config),
{_, "permessage-deflate; server_no_context_takeover"}
= lists:keyfind("sec-websocket-extensions", 1, Headers2),
ok.
ws_deflate_opts_server_max_window_bits(Config) ->
doc("Handler is configured with server max window bits."),
{ok, _, Headers} = do_handshake("/ws_deflate_opts?server_max_window_bits",
"Sec-WebSocket-Extensions: permessage-deflate\r\n", Config),
{_, "permessage-deflate; server_max_window_bits=9"}
= lists:keyfind("sec-websocket-extensions", 1, Headers),
ok.
ws_deflate_opts_server_max_window_bits_override(Config) ->
doc("Handler is configured with server max window bits."),
{ok, _, Headers1} = do_handshake("/ws_deflate_opts?server_max_window_bits",
"Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=8\r\n", Config),
{_, "permessage-deflate; server_max_window_bits=8"}
= lists:keyfind("sec-websocket-extensions", 1, Headers1),
{ok, _, Headers2} = do_handshake("/ws_deflate_opts?server_max_window_bits",
"Sec-WebSocket-Extensions: permessage-deflate; server_max_window_bits=12\r\n", Config),
{_, "permessage-deflate; server_max_window_bits=9"}
= lists:keyfind("sec-websocket-extensions", 1, Headers2),
ok.
ws_deflate_opts_zlevel(Config) ->
doc("Handler is configured with zlib level."),
do_ws_deflate_opts_z("/ws_deflate_opts?level", Config).
ws_deflate_opts_zmemlevel(Config) ->
doc("Handler is configured with zlib mem_level."),
do_ws_deflate_opts_z("/ws_deflate_opts?mem_level", Config).
ws_deflate_opts_zstrategy(Config) ->
doc("Handler is configured with zlib strategy."),
do_ws_deflate_opts_z("/ws_deflate_opts?strategy", Config).
do_ws_deflate_opts_z(Path, Config) ->
{ok, Socket, Headers} = do_handshake(Path,
"Sec-WebSocket-Extensions: permessage-deflate\r\n", Config),
{_, "permessage-deflate"} = lists:keyfind("sec-websocket-extensions", 1, Headers),
%% Send and receive a compressed "Hello" frame.
Mask = 16#11223344,
CompressedHello = << 242, 72, 205, 201, 201, 7, 0 >>,
MaskedHello = do_mask(CompressedHello, Mask, <<>>),
ok = gen_tcp:send(Socket, << 1:1, 1:1, 0:2, 1:4, 1:1, 7:7, Mask:32, MaskedHello/binary >>),
{ok, << 1:1, 1:1, 0:2, 1:4, 0:1, 7:7, CompressedHello/binary >>} = gen_tcp:recv(Socket, 0, 6000),
%% Client-initiated close.
ok = gen_tcp:send(Socket, << 1:1, 0:3, 8:4, 1:1, 0:7, 0:32 >>),
{ok, << 1:1, 0:3, 8:4, 0:8 >>} = gen_tcp:recv(Socket, 0, 6000),
{error, closed} = gen_tcp:recv(Socket, 0, 6000),
ok.
ws_init_return_ok(Config) -> ws_init_return_ok(Config) ->
doc("Handler does nothing."), doc("Handler does nothing."),
{ok, Socket, _} = do_handshake("/ws_init?ok", Config), {ok, Socket, _} = do_handshake("/ws_init?ok", Config),