mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 12:20:24 +00:00
Add support for fragmented websocket messages
This commit is contained in:
parent
4fb2a6face
commit
0b15ed914d
2 changed files with 172 additions and 29 deletions
|
@ -46,6 +46,14 @@
|
||||||
-type opcode() :: 0 | 1 | 2 | 8 | 9 | 10.
|
-type opcode() :: 0 | 1 | 2 | 8 | 9 | 10.
|
||||||
-type mask_key() :: 0..16#ffffffff.
|
-type mask_key() :: 0..16#ffffffff.
|
||||||
|
|
||||||
|
%% The websocket_data/4 function may be called multiple times for a message.
|
||||||
|
%% The websocket_dispatch/4 function is only called once for each message.
|
||||||
|
-type frag_state() ::
|
||||||
|
undefined | %% no fragmentation has been seen.
|
||||||
|
{nofin, opcode()} | %% first fragment has been seen.
|
||||||
|
{nofin, opcode(), binary()} | %% first fragment has been unmasked.
|
||||||
|
{fin, opcode(), binary()}. %% last fragment has been seen.
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
version :: 0 | 7 | 8 | 13,
|
version :: 0 | 7 | 8 | 13,
|
||||||
handler :: module(),
|
handler :: module(),
|
||||||
|
@ -56,7 +64,8 @@
|
||||||
messages = undefined :: undefined | {atom(), atom(), atom()},
|
messages = undefined :: undefined | {atom(), atom(), atom()},
|
||||||
hibernate = false :: boolean(),
|
hibernate = false :: boolean(),
|
||||||
eop :: undefined | tuple(), %% hixie-76 specific.
|
eop :: undefined | tuple(), %% hixie-76 specific.
|
||||||
origin = undefined :: undefined | binary() %% hixie-76 specific.
|
origin = undefined :: undefined | binary(), %% hixie-76 specific.
|
||||||
|
frag_state = undefined :: frag_state()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% @doc Upgrade a HTTP request to the WebSocket protocol.
|
%% @doc Upgrade a HTTP request to the WebSocket protocol.
|
||||||
|
@ -266,31 +275,94 @@ websocket_data(State=#state{version=0, eop=EOP}, Req, HandlerState,
|
||||||
websocket_data(State=#state{version=Version}, Req, HandlerState, Data)
|
websocket_data(State=#state{version=Version}, Req, HandlerState, Data)
|
||||||
when Version =/= 0, byte_size(Data) =:= 1 ->
|
when Version =/= 0, byte_size(Data) =:= 1 ->
|
||||||
handler_before_loop(State, Req, HandlerState, Data);
|
handler_before_loop(State, Req, HandlerState, Data);
|
||||||
%% hybi data frame.
|
%% 7 bit payload length prefix exists
|
||||||
%% @todo Handle Fin.
|
websocket_data(State, Req, HandlerState,
|
||||||
websocket_data(State=#state{version=Version}, Req, HandlerState, Data)
|
<< Fin:1, Rsv:3, Opcode:4, Mask:1, PayloadLen:7, Rest/bits >>
|
||||||
when Version =/= 0 ->
|
= Data) when PayloadLen < 126 ->
|
||||||
<< 1:1, 0:3, Opcode:4, Mask:1, PayloadLen:7, Rest/bits >> = Data,
|
websocket_data(State, Req, HandlerState,
|
||||||
case {PayloadLen, Rest} of
|
Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data);
|
||||||
{126, _} when Opcode >= 8 -> websocket_close(
|
%% 7+16 bits payload length prefix exists
|
||||||
State, Req, HandlerState, {error, protocol});
|
websocket_data(State, Req, HandlerState,
|
||||||
{127, _} when Opcode >= 8 -> websocket_close(
|
<< Fin:1, Rsv:3, Opcode:4, Mask:1, 126:7, PayloadLen:16, Rest/bits >>
|
||||||
State, Req, HandlerState, {error, protocol});
|
= Data) when PayloadLen > 125 ->
|
||||||
{126, << L:16, R/bits >>} -> websocket_before_unmask(
|
websocket_data(State, Req, HandlerState,
|
||||||
State, Req, HandlerState, Data, R, Opcode, Mask, L);
|
Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data);
|
||||||
{126, Rest} -> websocket_before_unmask(
|
%% 7+16 bits payload length prefix missing
|
||||||
State, Req, HandlerState, Data, Rest, Opcode, Mask, undefined);
|
websocket_data(State, Req, HandlerState,
|
||||||
{127, << 0:1, L:63, R/bits >>} -> websocket_before_unmask(
|
<< _Fin:1, _Rsv:3, _Opcode:4, _Mask:1, 126:7, Rest/bits >>
|
||||||
State, Req, HandlerState, Data, R, Opcode, Mask, L);
|
= Data) when byte_size(Rest) < 2 ->
|
||||||
{127, Rest} -> websocket_before_unmask(
|
handler_before_loop(State, Req, HandlerState, Data);
|
||||||
State, Req, HandlerState, Data, Rest, Opcode, Mask, undefined);
|
%% 7+64 bits payload length prefix exists
|
||||||
{PayloadLen, Rest} -> websocket_before_unmask(
|
websocket_data(State, Req, HandlerState,
|
||||||
State, Req, HandlerState, Data, Rest, Opcode, Mask, PayloadLen)
|
<< Fin:1, Rsv:3, Opcode:4, Mask:1, 127:7, 0:1, PayloadLen:63,
|
||||||
end;
|
Rest/bits >> = Data) when PayloadLen > 16#FFFF ->
|
||||||
%% Something was wrong with the frame. Close the connection.
|
websocket_data(State, Req, HandlerState,
|
||||||
websocket_data(State, Req, HandlerState, _Bad) ->
|
Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data);
|
||||||
|
%% 7+64 bits payload length prefix missing
|
||||||
|
websocket_data(State, Req, HandlerState,
|
||||||
|
<< _Fin:1, _Rsv:3, _Opcode:4, _Mask:1, 127:7, Rest/bits >>
|
||||||
|
= Data) when byte_size(Rest) < 8 ->
|
||||||
|
handler_before_loop(State, Req, HandlerState, Data);
|
||||||
|
%% invalid payload length prefix.
|
||||||
|
websocket_data(State, Req, HandlerState, _Data) ->
|
||||||
websocket_close(State, Req, HandlerState, {error, badframe}).
|
websocket_close(State, Req, HandlerState, {error, badframe}).
|
||||||
|
|
||||||
|
|
||||||
|
-spec websocket_data(#state{}, #http_req{}, any(), non_neg_integer(),
|
||||||
|
non_neg_integer(), non_neg_integer(), non_neg_integer(),
|
||||||
|
non_neg_integer(), binary(), binary()) -> closed.
|
||||||
|
%% A fragmented message MUST start a non-zero opcode.
|
||||||
|
websocket_data(State=#state{frag_state=undefined}, Req, HandlerState,
|
||||||
|
_Fin=0, _Rsv=0, _Opcode=0, _Mask, _PayloadLen, _Rest, _Buffer) ->
|
||||||
|
websocket_close(State, Req, HandlerState, {error, badframe});
|
||||||
|
%% A control message MUST NOT be fragmented.
|
||||||
|
websocket_data(State, Req, HandlerState, _Fin=0, _Rsv=0, Opcode, _Mask,
|
||||||
|
_PayloadLen, _Rest, _Buffer) when Opcode >= 8 ->
|
||||||
|
websocket_close(State, Req, HandlerState, {error, badframe});
|
||||||
|
%% The opcode is only included in the first message fragment.
|
||||||
|
websocket_data(State=#state{frag_state=undefined}, Req, HandlerState,
|
||||||
|
_Fin=0, _Rsv=0, Opcode, Mask, PayloadLen, Rest, Data) ->
|
||||||
|
websocket_before_unmask(
|
||||||
|
State#state{frag_state={nofin, Opcode}}, Req, HandlerState,
|
||||||
|
Data, Rest, 0, Mask, PayloadLen);
|
||||||
|
%% non-control opcode when expecting control message or next fragment.
|
||||||
|
websocket_data(State=#state{frag_state={nofin, _, _}}, Req, HandlerState, _Fin,
|
||||||
|
_Rsv=0, Opcode, _Mask, _Ln, _Rest, _Data) when Opcode > 0, Opcode < 8 ->
|
||||||
|
websocket_close(State, Req, HandlerState, {error, badframe});
|
||||||
|
%% If the first message fragment was incomplete, retry unmasking.
|
||||||
|
websocket_data(State=#state{frag_state={nofin, Opcode}}, Req, HandlerState,
|
||||||
|
_Fin=0, _Rsv=0, Opcode, Mask, PayloadLen, Rest, Data) ->
|
||||||
|
websocket_before_unmask(
|
||||||
|
State#state{frag_state={nofin, Opcode}}, Req, HandlerState,
|
||||||
|
Data, Rest, 0, Mask, PayloadLen);
|
||||||
|
%% if the opcode is zero and the fin flag is zero, unmask and await next.
|
||||||
|
websocket_data(State=#state{frag_state={nofin, _Opcode, _Payloads}}, Req,
|
||||||
|
HandlerState, _Fin=0, _Rsv=0, _Opcode2=0, Mask, PayloadLen, Rest,
|
||||||
|
Data) ->
|
||||||
|
websocket_before_unmask(
|
||||||
|
State, Req, HandlerState, Data, Rest, 0, Mask, PayloadLen);
|
||||||
|
%% when the last fragment is seen. Update the fragmentation status.
|
||||||
|
websocket_data(State=#state{frag_state={nofin, Opcode, Payloads}}, Req,
|
||||||
|
HandlerState, _Fin=1, _Rsv=0, _Opcode=0, Mask, PayloadLen, Rest,
|
||||||
|
Data) ->
|
||||||
|
websocket_before_unmask(
|
||||||
|
State#state{frag_state={fin, Opcode, Payloads}},
|
||||||
|
Req, HandlerState, Data, Rest, 0, Mask, PayloadLen);
|
||||||
|
%% control messages MUST NOT use 7+16 bits or 7+64 bits payload length prefixes
|
||||||
|
websocket_data(State, Req, HandlerState, _Fin, _Rsv, Opcode, _Mask, PayloadLen,
|
||||||
|
_Rest, _Data) when Opcode >= 8, PayloadLen > 125 ->
|
||||||
|
websocket_close(State, Req, HandlerState, {error, protocol});
|
||||||
|
%% unfragmented message. unmask and dispatch the message.
|
||||||
|
websocket_data(State=#state{version=Version}, Req, HandlerState, _Fin=1, _Rsv=0,
|
||||||
|
Opcode, Mask, PayloadLen, Rest, Data) when Version =/= 0 ->
|
||||||
|
websocket_before_unmask(
|
||||||
|
State, Req, HandlerState, Data, Rest, Opcode, Mask, PayloadLen);
|
||||||
|
%% Something was wrong with the frame. Close the connection.
|
||||||
|
websocket_data(State, Req, HandlerState, _Fin, _Rsv, _Opcode, _Mask,
|
||||||
|
_PayloadLen, _Rest, _Data) ->
|
||||||
|
websocket_close(State, Req, HandlerState, {error, badframe}).
|
||||||
|
|
||||||
|
|
||||||
%% hybi routing depending on whether unmasking is needed.
|
%% hybi routing depending on whether unmasking is needed.
|
||||||
-spec websocket_before_unmask(#state{}, #http_req{}, any(), binary(),
|
-spec websocket_before_unmask(#state{}, #http_req{}, any(), binary(),
|
||||||
binary(), opcode(), 0 | 1, non_neg_integer() | undefined) -> closed.
|
binary(), opcode(), 0 | 1, non_neg_integer() | undefined) -> closed.
|
||||||
|
@ -349,8 +421,22 @@ websocket_unmask(State, Req, HandlerState, RemainingData,
|
||||||
%% hybi dispatching.
|
%% hybi dispatching.
|
||||||
-spec websocket_dispatch(#state{}, #http_req{}, any(), binary(),
|
-spec websocket_dispatch(#state{}, #http_req{}, any(), binary(),
|
||||||
opcode(), binary()) -> closed.
|
opcode(), binary()) -> closed.
|
||||||
%% @todo Fragmentation.
|
%% First frame of a fragmented message unmasked. Expect intermediate or last.
|
||||||
%~ websocket_dispatch(State, Req, HandlerState, RemainingData, 0, Payload) ->
|
websocket_dispatch(State=#state{frag_state={nofin, Opcode}}, Req, HandlerState,
|
||||||
|
RemainingData, 0, Payload) ->
|
||||||
|
websocket_data(State#state{frag_state={nofin, Opcode, Payload}},
|
||||||
|
Req, HandlerState, RemainingData);
|
||||||
|
%% Intermediate frame of a fragmented message unmasked. Add payload to buffer.
|
||||||
|
websocket_dispatch(State=#state{frag_state={nofin, Opcode, Payloads}}, Req,
|
||||||
|
HandlerState, RemainingData, 0, Payload) ->
|
||||||
|
websocket_data(State#state{frag_state={nofin, Opcode,
|
||||||
|
<<Payloads/binary, Payload/binary>>}}, Req, HandlerState,
|
||||||
|
RemainingData);
|
||||||
|
%% Last frame of a fragmented message unmasked. Dispatch to handler.
|
||||||
|
websocket_dispatch(State=#state{frag_state={fin, Opcode, Payloads}}, Req,
|
||||||
|
HandlerState, RemainingData, 0, Payload) ->
|
||||||
|
websocket_dispatch(State#state{frag_state=undefined}, Req, HandlerState,
|
||||||
|
RemainingData, Opcode, <<Payloads/binary, Payload/binary>>);
|
||||||
%% Text frame.
|
%% Text frame.
|
||||||
websocket_dispatch(State, Req, HandlerState, RemainingData, 1, Payload) ->
|
websocket_dispatch(State, Req, HandlerState, RemainingData, 1, Payload) ->
|
||||||
handler_call(State, Req, HandlerState, RemainingData,
|
handler_call(State, Req, HandlerState, RemainingData,
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
-export([all/0, groups/0, init_per_suite/1, end_per_suite/1,
|
-export([all/0, groups/0, init_per_suite/1, end_per_suite/1,
|
||||||
init_per_group/2, end_per_group/2]). %% ct.
|
init_per_group/2, end_per_group/2]). %% ct.
|
||||||
-export([ws0/1, ws8/1, ws8_single_bytes/1, ws8_init_shutdown/1,
|
-export([ws0/1, ws8/1, ws8_single_bytes/1, ws8_init_shutdown/1,
|
||||||
ws13/1, ws_timeout_hibernate/1]). %% ws.
|
ws13/1, ws_timeout_hibernate/1, ws_text_fragments/1]). %% ws.
|
||||||
|
|
||||||
%% ct.
|
%% ct.
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ all() ->
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
BaseTests = [ws0, ws8, ws8_single_bytes, ws8_init_shutdown, ws13,
|
BaseTests = [ws0, ws8, ws8_single_bytes, ws8_init_shutdown, ws13,
|
||||||
ws_timeout_hibernate],
|
ws_timeout_hibernate, ws_text_fragments],
|
||||||
[{ws, [], BaseTests}].
|
[{ws, [], BaseTests}].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -60,7 +60,8 @@ init_dispatch() ->
|
||||||
{[<<"localhost">>], [
|
{[<<"localhost">>], [
|
||||||
{[<<"websocket">>], websocket_handler, []},
|
{[<<"websocket">>], websocket_handler, []},
|
||||||
{[<<"ws_timeout_hibernate">>], ws_timeout_hibernate_handler, []},
|
{[<<"ws_timeout_hibernate">>], ws_timeout_hibernate_handler, []},
|
||||||
{[<<"ws_init_shutdown">>], websocket_handler_init_shutdown, []}
|
{[<<"ws_init_shutdown">>], websocket_handler_init_shutdown, []},
|
||||||
|
{[<<"ws_echo_handler">>], websocket_echo_handler, []}
|
||||||
]}
|
]}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
@ -310,6 +311,62 @@ ws13(Config) ->
|
||||||
{error, closed} = gen_tcp:recv(Socket, 0, 6000),
|
{error, closed} = gen_tcp:recv(Socket, 0, 6000),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
ws_text_fragments(Config) ->
|
||||||
|
{port, Port} = lists:keyfind(port, 1, Config),
|
||||||
|
{ok, Socket} = gen_tcp:connect("localhost", Port,
|
||||||
|
[binary, {active, false}, {packet, raw}]),
|
||||||
|
ok = gen_tcp:send(Socket, [
|
||||||
|
"GET /ws_echo_handler HTTP/1.1\r\n"
|
||||||
|
"Host: localhost\r\n"
|
||||||
|
"Connection: Upgrade\r\n"
|
||||||
|
"Upgrade: websocket\r\n"
|
||||||
|
"Sec-WebSocket-Origin: http://localhost\r\n"
|
||||||
|
"Sec-WebSocket-Version: 8\r\n"
|
||||||
|
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
|
||||||
|
"\r\n"]),
|
||||||
|
{ok, Handshake} = gen_tcp:recv(Socket, 0, 6000),
|
||||||
|
{ok, {http_response, {1, 1}, 101, "Switching Protocols"}, Rest}
|
||||||
|
= erlang:decode_packet(http, Handshake, []),
|
||||||
|
[Headers, <<>>] = websocket_headers(
|
||||||
|
erlang:decode_packet(httph, Rest, []), []),
|
||||||
|
{'Connection', "Upgrade"} = lists:keyfind('Connection', 1, Headers),
|
||||||
|
{'Upgrade', "websocket"} = lists:keyfind('Upgrade', 1, Headers),
|
||||||
|
{"sec-websocket-accept", "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="}
|
||||||
|
= lists:keyfind("sec-websocket-accept", 1, Headers),
|
||||||
|
|
||||||
|
ok = gen_tcp:send(Socket, [
|
||||||
|
<< 0:1, 0:3, 1:4, 1:1, 5:7 >>,
|
||||||
|
<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
|
||||||
|
<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]),
|
||||||
|
ok = gen_tcp:send(Socket, [
|
||||||
|
<< 1:1, 0:3, 0:4, 1:1, 5:7 >>,
|
||||||
|
<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
|
||||||
|
<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]),
|
||||||
|
{ok, << 1:1, 0:3, 1:4, 0:1, 10:7, "HelloHello" >>}
|
||||||
|
= gen_tcp:recv(Socket, 0, 6000),
|
||||||
|
|
||||||
|
ok = gen_tcp:send(Socket, [
|
||||||
|
%% #1
|
||||||
|
<< 0:1, 0:3, 1:4, 1:1, 5:7 >>,
|
||||||
|
<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
|
||||||
|
<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>,
|
||||||
|
%% #2
|
||||||
|
<< 0:1, 0:3, 0:4, 1:1, 5:7 >>,
|
||||||
|
<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
|
||||||
|
<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>,
|
||||||
|
%% #3
|
||||||
|
<< 1:1, 0:3, 0:4, 1:1, 5:7 >>,
|
||||||
|
<< 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>,
|
||||||
|
<< 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]),
|
||||||
|
{ok, << 1:1, 0:3, 1:4, 0:1, 15:7, "HelloHelloHello" >>}
|
||||||
|
= gen_tcp:recv(Socket, 0, 6000),
|
||||||
|
|
||||||
|
ok = gen_tcp:send(Socket, << 1:1, 0:3, 8:4, 0:8 >>), %% close
|
||||||
|
{ok, << 1:1, 0:3, 8:4, 0:8 >>} = gen_tcp:recv(Socket, 0, 6000),
|
||||||
|
{error, closed} = gen_tcp:recv(Socket, 0, 6000),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
|
||||||
websocket_headers({ok, http_eoh, Rest}, Acc) ->
|
websocket_headers({ok, http_eoh, Rest}, Acc) ->
|
||||||
[Acc, Rest];
|
[Acc, Rest];
|
||||||
websocket_headers({ok, {http_header, _I, Key, _R, Value}, Rest}, Acc) ->
|
websocket_headers({ok, {http_header, _I, Key, _R, Value}, Rest}, Acc) ->
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue