mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-15 20:50:24 +00:00
Add initial implementation of Websocket over HTTP/2
Using the current draft: https://tools.ietf.org/html/draft-ietf-httpbis-h2-websockets-01
This commit is contained in:
parent
a7b06f2e13
commit
bbfc1569cc
6 changed files with 622 additions and 79 deletions
|
@ -165,9 +165,10 @@ init(Parent, Ref, Socket, Transport, Opts) ->
|
|||
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
|
||||
binary() | undefined, binary()) -> ok.
|
||||
init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) ->
|
||||
State = #state{parent=Parent, ref=Ref, socket=Socket,
|
||||
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
|
||||
transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert,
|
||||
parse_state={preface, sequence, preface_timeout(Opts)}},
|
||||
State = settings_init(State0, Opts),
|
||||
preface(State),
|
||||
case Buffer of
|
||||
<<>> -> before_loop(State, Buffer);
|
||||
|
@ -188,16 +189,21 @@ init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer, _Settings,
|
|||
State1 = stream_handler_init(State0, 1, fin, upgrade, Req),
|
||||
%% We assume that the upgrade will be applied. A stream handler
|
||||
%% must not prevent the normal operations of the server.
|
||||
State = info(State1, 1, {switch_protocol, #{
|
||||
State2 = info(State1, 1, {switch_protocol, #{
|
||||
<<"connection">> => <<"Upgrade">>,
|
||||
<<"upgrade">> => <<"h2c">>
|
||||
}, ?MODULE, undefined}), %% @todo undefined or #{}?
|
||||
State = settings_init(State2, Opts),
|
||||
preface(State),
|
||||
case Buffer of
|
||||
<<>> -> before_loop(State, Buffer);
|
||||
_ -> parse(State, Buffer)
|
||||
end.
|
||||
|
||||
settings_init(State=#state{next_settings=Settings}, Opts) ->
|
||||
EnableConnectProtocol = maps:get(enable_connect_protocol, Opts, false),
|
||||
State#state{next_settings=Settings#{enable_connect_protocol => EnableConnectProtocol}}.
|
||||
|
||||
preface(#state{socket=Socket, transport=Transport, next_settings=Settings}) ->
|
||||
%% We send next_settings and use defaults until we get a ack.
|
||||
Transport:send(Socket, cow_http2:settings(Settings)).
|
||||
|
@ -413,9 +419,9 @@ frame(State0=#state{socket=Socket, transport=Transport, remote_settings=Settings
|
|||
State
|
||||
end;
|
||||
%% Ack for a previously sent SETTINGS frame.
|
||||
frame(State=#state{next_settings=_NextSettings}, settings_ack) ->
|
||||
%% @todo Apply SETTINGS that require synchronization.
|
||||
State;
|
||||
frame(State=#state{local_settings=Local0, next_settings=Next}, settings_ack) ->
|
||||
Local = maps:merge(Local0, Next),
|
||||
State#state{local_settings=Local, next_settings=#{}};
|
||||
%% Unexpected PUSH_PROMISE frame.
|
||||
frame(State, {push_promise, _, _, _, _}) ->
|
||||
terminate(State, {connection_error, protocol_error,
|
||||
|
@ -637,9 +643,11 @@ commands(State=#state{socket=Socket, transport=Transport},
|
|||
Stream=#stream{local=upgrade}, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
|
||||
Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))),
|
||||
commands(State, Stream#stream{local=idle}, Tail);
|
||||
%% HTTP/2 has no support for the Upgrade mechanism.
|
||||
commands(State, Stream, [{switch_protocol, _Headers, _Mod, _ModState}|Tail]) ->
|
||||
%% @todo This is an error. Not sure what to do here yet.
|
||||
%% Use a different protocol within the stream (CONNECT :protocol).
|
||||
%% @todo Make sure we error out when the feature is disabled.
|
||||
commands(State0, #stream{id=StreamID}, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
|
||||
State = #state{streams=Streams} = info(State0, StreamID, {headers, 200, Headers}),
|
||||
Stream = lists:keyfind(StreamID, #stream.id, Streams),
|
||||
commands(State, Stream, Tail);
|
||||
commands(State, Stream=#stream{id=StreamID}, [stop|_Tail]) ->
|
||||
%% @todo Do we want to run the commands after a stop?
|
||||
|
@ -840,8 +848,22 @@ stream_decode_init(State=#state{decode_state=DecodeState0}, StreamID, IsFin, Hea
|
|||
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
|
||||
end.
|
||||
|
||||
stream_pseudo_headers_init(State, StreamID, IsFin, Headers0) ->
|
||||
stream_pseudo_headers_init(State=#state{local_settings=LocalSettings},
|
||||
StreamID, IsFin, Headers0) ->
|
||||
IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false),
|
||||
case pseudo_headers(Headers0, #{}) of
|
||||
{ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _,
|
||||
authority := _, path := _, protocol := _}, Headers}
|
||||
when IsExtendedConnectEnabled ->
|
||||
stream_regular_headers_init(State, StreamID, IsFin, Headers, PseudoHeaders);
|
||||
{ok, #{method := <<"CONNECT">>, scheme := _,
|
||||
authority := _, path := _}, _}
|
||||
when IsExtendedConnectEnabled ->
|
||||
stream_malformed(State, StreamID,
|
||||
'The :protocol pseudo-header MUST be sent with an extended CONNECT. (draft_h2_websockets 4)');
|
||||
{ok, #{protocol := _}, _} ->
|
||||
stream_malformed(State, StreamID,
|
||||
'The :protocol pseudo-header is only defined for the extended CONNECT. (draft_h2_websockets 4)');
|
||||
%% @todo Add clause for CONNECT requests (no scheme/path).
|
||||
{ok, PseudoHeaders=#{method := <<"CONNECT">>}, _} ->
|
||||
stream_early_error(State, StreamID, IsFin, 501, PseudoHeaders,
|
||||
|
@ -869,13 +891,15 @@ pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) ->
|
|||
pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) ->
|
||||
{error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'};
|
||||
pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) ->
|
||||
%% @todo Probably parse the authority here.
|
||||
pseudo_headers(Tail, PseudoHeaders#{authority => Authority});
|
||||
pseudo_headers([{<<":path">>, _}|_], #{path := _}) ->
|
||||
{error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'};
|
||||
pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) ->
|
||||
%% @todo Probably parse the path here.
|
||||
pseudo_headers(Tail, PseudoHeaders#{path => Path});
|
||||
pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) ->
|
||||
{error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'};
|
||||
pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) ->
|
||||
pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol});
|
||||
pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
|
||||
{error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
|
||||
pseudo_headers(Headers, PseudoHeaders) ->
|
||||
|
@ -946,7 +970,7 @@ stream_req_init(State, StreamID, IsFin, Headers, PseudoHeaders) ->
|
|||
end.
|
||||
|
||||
stream_req_init(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert},
|
||||
StreamID, IsFin, Headers, #{method := Method, scheme := Scheme,
|
||||
StreamID, IsFin, Headers, PseudoHeaders=#{method := Method, scheme := Scheme,
|
||||
authority := Authority, path := PathWithQs}, BodyLength) ->
|
||||
try cow_http_hd:parse_host(Authority) of
|
||||
{Host, Port} ->
|
||||
|
@ -955,7 +979,7 @@ stream_req_init(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert},
|
|||
stream_malformed(State, StreamID,
|
||||
'The path component must not be empty. (RFC7540 8.1.2.3)');
|
||||
{Path, Qs} ->
|
||||
Req = #{
|
||||
Req0 = #{
|
||||
ref => Ref,
|
||||
pid => self(),
|
||||
streamid => StreamID,
|
||||
|
@ -973,6 +997,13 @@ stream_req_init(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert},
|
|||
has_body => IsFin =:= nofin,
|
||||
body_length => BodyLength
|
||||
},
|
||||
%% We add the protocol information for extended CONNECTs.
|
||||
Req = case PseudoHeaders of
|
||||
#{protocol := Protocol} ->
|
||||
Req0#{protocol => Protocol};
|
||||
_ ->
|
||||
Req0
|
||||
end,
|
||||
stream_handler_init(State, StreamID, IsFin, idle, Req)
|
||||
catch _:_ ->
|
||||
stream_malformed(State, StreamID,
|
||||
|
|
|
@ -413,6 +413,7 @@ parse_header_fun(<<"if-unmodified-since">>) -> fun cow_http_hd:parse_if_unmodifi
|
|||
parse_header_fun(<<"range">>) -> fun cow_http_hd:parse_range/1;
|
||||
parse_header_fun(<<"sec-websocket-extensions">>) -> fun cow_http_hd:parse_sec_websocket_extensions/1;
|
||||
parse_header_fun(<<"sec-websocket-protocol">>) -> fun cow_http_hd:parse_sec_websocket_protocol_req/1;
|
||||
parse_header_fun(<<"sec-websocket-version">>) -> fun cow_http_hd:parse_sec_websocket_version_req/1;
|
||||
parse_header_fun(<<"upgrade">>) -> fun cow_http_hd:parse_upgrade/1;
|
||||
parse_header_fun(<<"x-forwarded-for">>) -> fun cow_http_hd:parse_x_forwarded_for/1.
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
expect = undefined :: undefined | continue,
|
||||
read_body_ref = undefined :: reference() | undefined,
|
||||
read_body_timer_ref = undefined :: reference() | undefined,
|
||||
read_body_length = 0 :: non_neg_integer() | infinity,
|
||||
read_body_length = 0 :: non_neg_integer() | infinity | auto,
|
||||
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
|
||||
read_body_buffer = <<>> :: binary(),
|
||||
body_length = 0 :: non_neg_integer()
|
||||
|
@ -65,8 +65,9 @@ expect(Req) ->
|
|||
end.
|
||||
|
||||
%% If we receive data and stream is waiting for data:
|
||||
%% If we accumulated enough data or IsFin=fin, send it.
|
||||
%% If not, buffer it.
|
||||
%% If we accumulated enough data or IsFin=fin, send it.
|
||||
%% If we are in auto mode, send it and update flow control.
|
||||
%% If not, buffer it.
|
||||
%% If not, buffer it.
|
||||
%%
|
||||
%% We always reset the expect field when we receive data,
|
||||
|
@ -75,6 +76,7 @@ expect(Req) ->
|
|||
|
||||
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
|
||||
-> {cowboy_stream:commands(), State} when State::#state{}.
|
||||
%% Stream isn't waiting for data.
|
||||
data(_StreamID, IsFin, Data, State=#state{
|
||||
read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
|
||||
{[], State#state{
|
||||
|
@ -82,6 +84,16 @@ data(_StreamID, IsFin, Data, State=#state{
|
|||
read_body_is_fin=IsFin,
|
||||
read_body_buffer= << Buffer/binary, Data/binary >>,
|
||||
body_length=BodyLen + byte_size(Data)}};
|
||||
%% Stream is waiting for data using auto mode.
|
||||
%%
|
||||
%% There is no buffering done in auto mode.
|
||||
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
|
||||
read_body_length=auto, body_length=BodyLen}) ->
|
||||
send_request_body(Pid, Ref, IsFin, BodyLen, Data),
|
||||
{[{flow, byte_size(Data)}], State#state{
|
||||
read_body_ref=undefined,
|
||||
body_length=BodyLen}};
|
||||
%% Stream is waiting for data but we didn't receive enough to send yet.
|
||||
data(_StreamID, nofin, Data, State=#state{
|
||||
read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
|
||||
when byte_size(Data) + byte_size(Buffer) < ReadLen ->
|
||||
|
@ -89,9 +101,11 @@ data(_StreamID, nofin, Data, State=#state{
|
|||
expect=undefined,
|
||||
read_body_buffer= << Buffer/binary, Data/binary >>,
|
||||
body_length=BodyLen + byte_size(Data)}};
|
||||
%% Stream is waiting for data and we received enough to send.
|
||||
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
|
||||
read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
|
||||
BodyLen = BodyLen0 + byte_size(Data),
|
||||
%% @todo Handle the infinity case where no TRef was defined.
|
||||
ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
|
||||
send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
|
||||
{[], State#state{
|
||||
|
@ -121,6 +135,16 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref,
|
|||
{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
|
||||
{internal_error, Exit, 'Stream process crashed.'}
|
||||
], State};
|
||||
%% Request body, auto mode, no body buffered.
|
||||
info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
|
||||
{[], State#state{
|
||||
read_body_ref=Ref,
|
||||
read_body_length=auto}};
|
||||
%% Request body, auto mode, body buffered or complete.
|
||||
info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{pid=Pid,
|
||||
read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
|
||||
send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
|
||||
{[{flow, byte_size(Buffer)}], State#state{read_body_buffer= <<>>}};
|
||||
%% Request body, body buffered large enough or complete.
|
||||
%%
|
||||
%% We do not send a 100 continue response if the client
|
||||
|
@ -136,6 +160,7 @@ info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) ->
|
|||
continue -> [{inform, 100, #{}}, {flow, Length}];
|
||||
undefined -> [{flow, Length}]
|
||||
end,
|
||||
%% @todo Handle the case where Period =:= infinity.
|
||||
TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
|
||||
{Commands, State#state{
|
||||
read_body_ref=Ref,
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-module(cowboy_websocket).
|
||||
-behaviour(cowboy_sub_protocol).
|
||||
|
||||
-export([is_upgrade_request/1]).
|
||||
-export([upgrade/4]).
|
||||
-export([upgrade/5]).
|
||||
-export([takeover/7]).
|
||||
|
@ -82,6 +83,25 @@
|
|||
req = #{} :: map()
|
||||
}).
|
||||
|
||||
%% Because the HTTP/1.1 and HTTP/2 handshakes are so different,
|
||||
%% this function is necessary to figure out whether a request
|
||||
%% is trying to upgrade to the Websocket protocol.
|
||||
|
||||
-spec is_upgrade_request(cowboy_req:req()) -> boolean().
|
||||
is_upgrade_request(#{version := 'HTTP/2', method := <<"CONNECT">>, protocol := Protocol}) ->
|
||||
<<"websocket">> =:= cowboy_bstr:to_lower(Protocol);
|
||||
is_upgrade_request(Req=#{version := 'HTTP/1.1', method := <<"GET">>}) ->
|
||||
ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []),
|
||||
case lists:member(<<"upgrade">>, ConnTokens) of
|
||||
false ->
|
||||
false;
|
||||
true ->
|
||||
UpgradeTokens = cowboy_req:parse_header(<<"upgrade">>, Req),
|
||||
lists:member(<<"websocket">>, UpgradeTokens)
|
||||
end;
|
||||
is_upgrade_request(_) ->
|
||||
false.
|
||||
|
||||
%% Stream process.
|
||||
|
||||
-spec upgrade(Req, Env, module(), any())
|
||||
|
@ -94,8 +114,7 @@ upgrade(Req, Env, Handler, HandlerState) ->
|
|||
-> {ok, Req, Env}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
%% @todo Immediately crash if a response has already been sent.
|
||||
%% @todo Error out if HTTP/2.
|
||||
upgrade(Req0, Env, Handler, HandlerState, Opts) ->
|
||||
upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
|
||||
Timeout = maps:get(idle_timeout, Opts, 60000),
|
||||
MaxFrameSize = maps:get(max_frame_size, Opts, infinity),
|
||||
Compress = maps:get(compress, Opts, false),
|
||||
|
@ -108,11 +127,15 @@ upgrade(Req0, Env, Handler, HandlerState, Opts) ->
|
|||
try websocket_upgrade(State0, Req0) of
|
||||
{ok, State, Req} ->
|
||||
websocket_handshake(State, Req, HandlerState, Env);
|
||||
{error, upgrade_required} ->
|
||||
%% The status code 426 is specific to HTTP/1.1 connections.
|
||||
{error, upgrade_required} when Version =:= 'HTTP/1.1' ->
|
||||
{ok, cowboy_req:reply(426, #{
|
||||
<<"connection">> => <<"upgrade">>,
|
||||
<<"upgrade">> => <<"websocket">>
|
||||
}, Req0), Env}
|
||||
}, Req0), Env};
|
||||
%% Use a generic 400 error for HTTP/2.
|
||||
{error, upgrade_required} ->
|
||||
{ok, cowboy_req:reply(400, Req0), Env}
|
||||
catch _:_ ->
|
||||
%% @todo Probably log something here?
|
||||
%% @todo Test that we can have 2 /ws 400 status code in a row on the same connection.
|
||||
|
@ -120,27 +143,27 @@ upgrade(Req0, Env, Handler, HandlerState, Opts) ->
|
|||
{ok, cowboy_req:reply(400, Req0), Env}
|
||||
end.
|
||||
|
||||
websocket_upgrade(State, Req) ->
|
||||
ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []),
|
||||
case lists:member(<<"upgrade">>, ConnTokens) of
|
||||
websocket_upgrade(State, Req=#{version := Version}) ->
|
||||
case is_upgrade_request(Req) of
|
||||
false ->
|
||||
{error, upgrade_required};
|
||||
true when Version =:= 'HTTP/1.1' ->
|
||||
Key = cowboy_req:header(<<"sec-websocket-key">>, Req),
|
||||
false = Key =:= undefined,
|
||||
websocket_version(State#state{key=Key}, Req);
|
||||
true ->
|
||||
UpgradeTokens = cowboy_req:parse_header(<<"upgrade">>, Req, []),
|
||||
case lists:member(<<"websocket">>, UpgradeTokens) of
|
||||
false ->
|
||||
{error, upgrade_required};
|
||||
true ->
|
||||
Version = cowboy_req:header(<<"sec-websocket-version">>, Req),
|
||||
IntVersion = binary_to_integer(Version),
|
||||
true = (IntVersion =:= 7) orelse (IntVersion =:= 8)
|
||||
orelse (IntVersion =:= 13),
|
||||
Key = cowboy_req:header(<<"sec-websocket-key">>, Req),
|
||||
false = Key =:= undefined,
|
||||
websocket_extensions(State#state{key=Key}, Req#{websocket_version => IntVersion})
|
||||
end
|
||||
websocket_version(State, Req)
|
||||
end.
|
||||
|
||||
websocket_version(State, Req) ->
|
||||
WsVersion = cowboy_req:parse_header(<<"sec-websocket-version">>, Req),
|
||||
case WsVersion of
|
||||
7 -> ok;
|
||||
8 -> ok;
|
||||
13 -> ok
|
||||
end,
|
||||
websocket_extensions(State, Req#{websocket_version => WsVersion}).
|
||||
|
||||
websocket_extensions(State=#state{compress=Compress}, Req) ->
|
||||
%% @todo We want different options for this. For example
|
||||
%% * compress everything auto
|
||||
|
@ -159,11 +182,16 @@ websocket_extensions(State, Req, [], []) ->
|
|||
{ok, State, Req};
|
||||
websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) ->
|
||||
{ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)};
|
||||
websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid},
|
||||
%% For HTTP/2 we ARE on the controlling process and do NOT want to update the owner.
|
||||
websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, version := Version},
|
||||
[{<<"permessage-deflate">>, Params}|Tail], RespHeader) ->
|
||||
%% @todo Make deflate options configurable.
|
||||
Opts = #{level => best_compression, mem_level => 8, strategy => default},
|
||||
try cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts#{owner => Pid}) of
|
||||
Opts0 = #{level => best_compression, mem_level => 8, strategy => default},
|
||||
Opts = case Version of
|
||||
'HTTP/1.1' -> Opts0#{owner => Pid};
|
||||
_ -> Opts0
|
||||
end,
|
||||
try cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of
|
||||
{ok, RespExt, Extensions2} ->
|
||||
websocket_extensions(State#state{extensions=Extensions2},
|
||||
Req, Tail, [<<", ">>, RespExt|RespHeader]);
|
||||
|
@ -172,11 +200,15 @@ websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid},
|
|||
catch exit:{error, incompatible_zlib_version, _} ->
|
||||
websocket_extensions(State, Req, Tail, RespHeader)
|
||||
end;
|
||||
websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid},
|
||||
websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, version := Version},
|
||||
[{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) ->
|
||||
%% @todo Make deflate options configurable.
|
||||
Opts = #{level => best_compression, mem_level => 8, strategy => default},
|
||||
try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts#{owner => Pid}) of
|
||||
Opts0 = #{level => best_compression, mem_level => 8, strategy => default},
|
||||
Opts = case Version of
|
||||
'HTTP/1.1' -> Opts0#{owner => Pid};
|
||||
_ -> Opts0
|
||||
end,
|
||||
try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts) of
|
||||
{ok, RespExt, Extensions2} ->
|
||||
websocket_extensions(State#state{extensions=Extensions2},
|
||||
Req, Tail, [<<", ">>, RespExt|RespHeader]);
|
||||
|
@ -192,7 +224,8 @@ websocket_extensions(State, Req, [_|Tail], RespHeader) ->
|
|||
-> {ok, Req, Env}
|
||||
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
|
||||
websocket_handshake(State=#state{key=Key},
|
||||
Req=#{pid := Pid, streamid := StreamID}, HandlerState, Env) ->
|
||||
Req=#{version := 'HTTP/1.1', pid := Pid, streamid := StreamID},
|
||||
HandlerState, Env) ->
|
||||
Challenge = base64:encode(crypto:hash(sha,
|
||||
<< Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)),
|
||||
%% @todo We don't want date and server headers.
|
||||
|
@ -202,7 +235,17 @@ websocket_handshake(State=#state{key=Key},
|
|||
<<"sec-websocket-accept">> => Challenge
|
||||
}, Req),
|
||||
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
|
||||
{ok, Req, Env}.
|
||||
{ok, Req, Env};
|
||||
%% For HTTP/2 we do not let the process die, we instead keep it
|
||||
%% for the Websocket stream. This is because in HTTP/2 we only
|
||||
%% have a stream, it doesn't take over the whole connection.
|
||||
websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
|
||||
HandlerState, _Env) ->
|
||||
%% @todo We don't want date and server headers.
|
||||
Headers = cowboy_req:response_headers(#{}, Req),
|
||||
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
|
||||
takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>,
|
||||
{State, HandlerState}).
|
||||
|
||||
%% Connection process.
|
||||
|
||||
|
@ -223,21 +266,34 @@ websocket_handshake(State=#state{key=Key},
|
|||
|
||||
-type parse_state() :: #ps_header{} | #ps_payload{}.
|
||||
|
||||
-spec takeover(pid(), ranch:ref(), inet:socket(), module(), any(), binary(),
|
||||
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
|
||||
module() | undefined, any(), binary(),
|
||||
{#state{}, any()}) -> no_return().
|
||||
takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
|
||||
{State0=#state{handler=Handler}, HandlerState}) ->
|
||||
%% @todo We should have an option to disable this behavior.
|
||||
ranch:remove_connection(Ref),
|
||||
Messages = case Transport of
|
||||
undefined -> undefined;
|
||||
_ -> Transport:messages()
|
||||
end,
|
||||
State = loop_timeout(State0#state{parent=Parent,
|
||||
ref=Ref, socket=Socket, transport=Transport,
|
||||
key=undefined, messages=Transport:messages()}),
|
||||
key=undefined, messages=Messages}),
|
||||
case erlang:function_exported(Handler, websocket_init, 1) of
|
||||
true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer},
|
||||
websocket_init, undefined, fun before_loop/3);
|
||||
false -> before_loop(State, HandlerState, #ps_header{buffer=Buffer})
|
||||
end.
|
||||
|
||||
%% @todo We probably shouldn't do the setopts if we have not received a socket message.
|
||||
%% @todo We need to hibernate when HTTP/2 is used too.
|
||||
before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined},
|
||||
HandlerState, ParseState) ->
|
||||
%% @todo Keep Ref around.
|
||||
ReadBodyRef = make_ref(),
|
||||
Pid ! {Stream, {read_body, ReadBodyRef, auto, infinity}},
|
||||
loop(State, HandlerState, ParseState);
|
||||
before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true},
|
||||
HandlerState, ParseState) ->
|
||||
Transport:setopts(Socket, [{active, once}]),
|
||||
|
@ -258,19 +314,32 @@ loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) ->
|
|||
State#state{timeout_ref=TRef}.
|
||||
|
||||
-spec loop(#state{}, any(), parse_state()) -> no_return().
|
||||
loop(State=#state{parent=Parent, socket=Socket, messages={OK, Closed, Error},
|
||||
loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
|
||||
timeout_ref=TRef}, HandlerState, ParseState) ->
|
||||
receive
|
||||
{OK, Socket, Data} ->
|
||||
%% Socket messages. (HTTP/1.1)
|
||||
{OK, Socket, Data} when OK =:= element(1, Messages) ->
|
||||
State2 = loop_timeout(State),
|
||||
parse(State2, HandlerState, ParseState, Data);
|
||||
{Closed, Socket} ->
|
||||
{Closed, Socket} when Closed =:= element(2, Messages) ->
|
||||
terminate(State, HandlerState, {error, closed});
|
||||
{Error, Socket, Reason} ->
|
||||
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
|
||||
terminate(State, HandlerState, {error, Reason});
|
||||
%% Body reading messages. (HTTP/2)
|
||||
{request_body, _Ref, nofin, Data} ->
|
||||
State2 = loop_timeout(State),
|
||||
parse(State2, HandlerState, ParseState, Data);
|
||||
%% @todo We need to handle this case as if it was an {error, closed}
|
||||
%% but not before we finish processing frames. We probably should have
|
||||
%% a check in before_loop to let us stop looping if a flag is set.
|
||||
{request_body, _Ref, fin, _, Data} ->
|
||||
State2 = loop_timeout(State),
|
||||
parse(State2, HandlerState, ParseState, Data);
|
||||
%% Timeouts.
|
||||
{timeout, TRef, ?MODULE} ->
|
||||
websocket_close(State, HandlerState, timeout);
|
||||
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
|
||||
%% @todo This should call before_loop.
|
||||
loop(State, HandlerState, ParseState);
|
||||
%% System messages.
|
||||
{'EXIT', Parent, Reason} ->
|
||||
|
@ -282,6 +351,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages={OK, Closed, Error},
|
|||
%% Calls from supervisor module.
|
||||
{'$gen_call', From, Call} ->
|
||||
cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
|
||||
%% @todo This should call before_loop.
|
||||
loop(State, HandlerState, ParseState);
|
||||
Message ->
|
||||
handler_call(State, HandlerState, ParseState,
|
||||
|
@ -341,8 +411,7 @@ parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensio
|
|||
websocket_close(State, HandlerState, Error)
|
||||
end.
|
||||
|
||||
dispatch_frame(State=#state{socket=Socket, transport=Transport,
|
||||
max_frame_size=MaxFrameSize, frag_state=FragState,
|
||||
dispatch_frame(State=#state{max_frame_size=MaxFrameSize, frag_state=FragState,
|
||||
frag_buffer=SoFar, extensions=Extensions}, HandlerState,
|
||||
#ps_payload{type=Type0, unmasked=Payload0, close_code=CloseCode0},
|
||||
RemainingData) ->
|
||||
|
@ -363,12 +432,12 @@ dispatch_frame(State=#state{socket=Socket, transport=Transport,
|
|||
{close, CloseCode, Payload} ->
|
||||
websocket_close(State, HandlerState, {remote, CloseCode, Payload});
|
||||
Frame = ping ->
|
||||
Transport:send(Socket, cow_ws:frame(pong, Extensions)),
|
||||
transport_send(State, nofin, cow_ws:frame(pong, Extensions)),
|
||||
handler_call(State, HandlerState,
|
||||
#ps_header{buffer=RemainingData},
|
||||
websocket_handle, Frame, fun parse_header/3);
|
||||
Frame = {ping, Payload} ->
|
||||
Transport:send(Socket, cow_ws:frame({pong, Payload}, Extensions)),
|
||||
transport_send(State, nofin, cow_ws:frame({pong, Payload}, Extensions)),
|
||||
handler_call(State, HandlerState,
|
||||
#ps_header{buffer=RemainingData},
|
||||
websocket_handle, Frame, fun parse_header/3);
|
||||
|
@ -415,24 +484,32 @@ handler_call(State=#state{handler=Handler}, HandlerState,
|
|||
erlang:raise(Class, Reason, erlang:get_stacktrace())
|
||||
end.
|
||||
|
||||
transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) ->
|
||||
Pid ! {Stream, {data, IsFin, Data}},
|
||||
ok;
|
||||
transport_send(#state{socket=Socket, transport=Transport}, _, Data) ->
|
||||
Transport:send(Socket, Data).
|
||||
|
||||
-spec websocket_send(cow_ws:frame(), #state{}) -> ok | stop | {error, atom()}.
|
||||
websocket_send(Frames, State) when is_list(Frames) ->
|
||||
websocket_send_many(Frames, State, []);
|
||||
websocket_send(Frame, #state{socket=Socket, transport=Transport, extensions=Extensions}) ->
|
||||
Res = Transport:send(Socket, cow_ws:frame(Frame, Extensions)),
|
||||
websocket_send(Frame, State=#state{extensions=Extensions}) ->
|
||||
Data = cow_ws:frame(Frame, Extensions),
|
||||
case is_close_frame(Frame) of
|
||||
true -> stop;
|
||||
false -> Res
|
||||
true ->
|
||||
_ = transport_send(State, fin, Data),
|
||||
stop;
|
||||
false ->
|
||||
transport_send(State, nofin, Data)
|
||||
end.
|
||||
|
||||
websocket_send_many([], #state{socket=Socket, transport=Transport}, Acc) ->
|
||||
Transport:send(Socket, lists:reverse(Acc));
|
||||
websocket_send_many([Frame|Tail], State=#state{socket=Socket, transport=Transport,
|
||||
extensions=Extensions}, Acc0) ->
|
||||
websocket_send_many([], State, Acc) ->
|
||||
transport_send(State, nofin, lists:reverse(Acc));
|
||||
websocket_send_many([Frame|Tail], State=#state{extensions=Extensions}, Acc0) ->
|
||||
Acc = [cow_ws:frame(Frame, Extensions)|Acc0],
|
||||
case is_close_frame(Frame) of
|
||||
true ->
|
||||
_ = Transport:send(Socket, lists:reverse(Acc)),
|
||||
_ = transport_send(State, fin, lists:reverse(Acc)),
|
||||
stop;
|
||||
false ->
|
||||
websocket_send_many(Tail, State, Acc)
|
||||
|
@ -448,23 +525,22 @@ websocket_close(State, HandlerState, Reason) ->
|
|||
websocket_send_close(State, Reason),
|
||||
terminate(State, HandlerState, Reason).
|
||||
|
||||
websocket_send_close(#state{socket=Socket, transport=Transport,
|
||||
extensions=Extensions}, Reason) ->
|
||||
websocket_send_close(State=#state{extensions=Extensions}, Reason) ->
|
||||
_ = case Reason of
|
||||
Normal when Normal =:= stop; Normal =:= timeout ->
|
||||
Transport:send(Socket, cow_ws:frame({close, 1000, <<>>}, Extensions));
|
||||
transport_send(State, fin, cow_ws:frame({close, 1000, <<>>}, Extensions));
|
||||
{error, badframe} ->
|
||||
Transport:send(Socket, cow_ws:frame({close, 1002, <<>>}, Extensions));
|
||||
transport_send(State, fin, cow_ws:frame({close, 1002, <<>>}, Extensions));
|
||||
{error, badencoding} ->
|
||||
Transport:send(Socket, cow_ws:frame({close, 1007, <<>>}, Extensions));
|
||||
transport_send(State, fin, cow_ws:frame({close, 1007, <<>>}, Extensions));
|
||||
{error, badsize} ->
|
||||
Transport:send(Socket, cow_ws:frame({close, 1009, <<>>}, Extensions));
|
||||
transport_send(State, fin, cow_ws:frame({close, 1009, <<>>}, Extensions));
|
||||
{crash, _, _} ->
|
||||
Transport:send(Socket, cow_ws:frame({close, 1011, <<>>}, Extensions));
|
||||
transport_send(State, fin, cow_ws:frame({close, 1011, <<>>}, Extensions));
|
||||
remote ->
|
||||
Transport:send(Socket, cow_ws:frame(close, Extensions));
|
||||
transport_send(State, fin, cow_ws:frame(close, Extensions));
|
||||
{remote, Code, _} ->
|
||||
Transport:send(Socket, cow_ws:frame({close, Code, <<>>}, Extensions))
|
||||
transport_send(State, fin, cow_ws:frame({close, Code, <<>>}, Extensions))
|
||||
end,
|
||||
ok.
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue