0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-15 20:50:24 +00:00

Add experimental and incomplete SPDY support

The SPDY connection processes are also supervisors.

Missing:
 *  sendfile support
 *  request body reading support
This commit is contained in:
Loïc Hoguin 2013-05-30 20:21:01 +02:00
parent c7f0834dc3
commit 9a2d35c2e8
7 changed files with 980 additions and 35 deletions

View file

@ -130,13 +130,13 @@
| {done, binary(), non_neg_integer(), binary()}
| {error, atom()}).
-type resp_body_fun() :: fun((inet:socket(), module()) -> ok).
-type resp_body_fun() :: fun((any(), module()) -> ok).
-type send_chunk_fun() :: fun((iodata()) -> ok | {error, atom()}).
-type resp_chunked_fun() :: fun((send_chunk_fun()) -> ok).
-record(http_req, {
%% Transport.
socket = undefined :: undefined | inet:socket(),
socket = undefined :: any(),
transport = undefined :: undefined | module(),
connection = keepalive :: keepalive | close,
@ -189,7 +189,7 @@
%%
%% Since we always need to parse the Connection header, we do it
%% in an optimized way and add the parsed value to p_headers' cache.
-spec new(inet:socket(), module(),
-spec new(any(), module(),
undefined | {inet:ip_address(), inet:port_number()},
binary(), binary(), binary(),
cowboy:http_version(), cowboy:http_headers(), binary(),
@ -917,7 +917,7 @@ has_resp_body(#http_req{resp_body={Length, _}}) ->
has_resp_body(#http_req{resp_body=RespBody}) ->
iolist_size(RespBody) > 0.
%% Remove a header previously set for the response.
%% @doc Remove a header previously set for the response.
-spec delete_resp_header(binary(), Req)
-> Req when Req::req().
delete_resp_header(Name, Req=#http_req{resp_headers=RespHeaders}) ->
@ -944,20 +944,30 @@ reply(Status, Headers, Body, Req=#http_req{
version=Version, connection=Connection,
method=Method, resp_compress=Compress,
resp_state=waiting, resp_headers=RespHeaders}) ->
HTTP11Headers = case Version of
'HTTP/1.1' -> [{<<"connection">>, atom_to_connection(Connection)}];
_ -> []
HTTP11Headers = if
Transport =/= cowboy_spdy, Version =:= 'HTTP/1.1' ->
[{<<"connection">>, atom_to_connection(Connection)}];
true ->
[]
end,
Req3 = case Body of
BodyFun when is_function(BodyFun) ->
%% We stream the response body until we close the connection.
RespConn = close,
{RespType, Req2} = response(Status, Headers, RespHeaders, [
{<<"connection">>, <<"close">>},
{<<"date">>, cowboy_clock:rfc1123()},
{<<"server">>, <<"Cowboy">>},
{<<"transfer-encoding">>, <<"identity">>}
], <<>>, Req),
{RespType, Req2} = if
Transport =:= cowboy_spdy ->
response(Status, Headers, RespHeaders, [
{<<"date">>, cowboy_clock:rfc1123()},
{<<"server">>, <<"Cowboy">>}
], stream, Req);
true ->
response(Status, Headers, RespHeaders, [
{<<"connection">>, <<"close">>},
{<<"date">>, cowboy_clock:rfc1123()},
{<<"server">>, <<"Cowboy">>},
{<<"transfer-encoding">>, <<"identity">>}
], <<>>, Req)
end,
if RespType =/= hook, Method =/= <<"HEAD">> ->
BodyFun(Socket, Transport);
true -> ok
@ -970,13 +980,12 @@ reply(Status, Headers, Body, Req=#http_req{
ChunkFun = fun(IoData) -> chunk(IoData, Req2) end,
BodyFun(ChunkFun),
%% Terminate the chunked body for HTTP/1.1 only.
_ = case Version of
'HTTP/1.0' -> ok;
_ -> Transport:send(Socket, <<"0\r\n\r\n">>)
case Version of
'HTTP/1.0' -> Req2;
_ -> last_chunk(Req2)
end;
true -> ok
end,
Req2;
true -> Req2
end;
{ContentLength, BodyFun} ->
%% We stream the response body for ContentLength bytes.
RespConn = response_connection(Headers, Connection),
@ -984,7 +993,7 @@ reply(Status, Headers, Body, Req=#http_req{
{<<"content-length">>, integer_to_list(ContentLength)},
{<<"date">>, cowboy_clock:rfc1123()},
{<<"server">>, <<"Cowboy">>}
|HTTP11Headers], <<>>, Req),
|HTTP11Headers], stream, Req),
if RespType =/= hook, Method =/= <<"HEAD">> ->
BodyFun(Socket, Transport);
true -> ok
@ -1001,7 +1010,7 @@ reply(Status, Headers, Body, Req=#http_req{
RespHeaders, HTTP11Headers, Method, iolist_size(Body)),
Req2#http_req{connection=RespConn}
end,
{ok, Req3#http_req{resp_state=done,resp_headers=[], resp_body= <<>>}}.
{ok, Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}}.
reply_may_compress(Status, Headers, Body, Req,
RespHeaders, HTTP11Headers, Method) ->
@ -1065,18 +1074,34 @@ chunked_reply(Status, Headers, Req) ->
-spec chunk(iodata(), req()) -> ok | {error, atom()}.
chunk(_Data, #http_req{method= <<"HEAD">>}) ->
ok;
chunk(Data, #http_req{socket=Socket, transport=Transport, version='HTTP/1.0'}) ->
chunk(Data, #http_req{socket=Socket, transport=cowboy_spdy,
resp_state=chunks}) ->
cowboy_spdy:stream_data(Socket, Data);
chunk(Data, #http_req{socket=Socket, transport=Transport,
resp_state=chunks, version='HTTP/1.0'}) ->
Transport:send(Socket, Data);
chunk(Data, #http_req{socket=Socket, transport=Transport, resp_state=chunks}) ->
chunk(Data, #http_req{socket=Socket, transport=Transport,
resp_state=chunks}) ->
Transport:send(Socket, [integer_to_list(iolist_size(Data), 16),
<<"\r\n">>, Data, <<"\r\n">>]).
%% @doc Finish the chunked reply.
%% @todo If ever made public, need to send nothing if HEAD.
-spec last_chunk(Req) -> Req when Req::req().
last_chunk(Req=#http_req{socket=Socket, transport=cowboy_spdy}) ->
_ = cowboy_spdy:stream_close(Socket),
Req#http_req{resp_state=done};
last_chunk(Req=#http_req{socket=Socket, transport=Transport}) ->
_ = Transport:send(Socket, <<"0\r\n\r\n">>),
Req#http_req{resp_state=done}.
%% @doc Send an upgrade reply.
%% @private
-spec upgrade_reply(cowboy:http_status(), cowboy:http_headers(), Req)
-> {ok, Req} when Req::req().
upgrade_reply(Status, Headers, Req=#http_req{
resp_state=waiting, resp_headers=RespHeaders}) ->
upgrade_reply(Status, Headers, Req=#http_req{transport=Transport,
resp_state=waiting, resp_headers=RespHeaders})
when Transport =/= cowboy_spdy ->
{_, Req2} = response(Status, Headers, RespHeaders, [
{<<"connection">>, <<"Upgrade">>}
], <<>>, Req),
@ -1098,9 +1123,8 @@ ensure_response(#http_req{method= <<"HEAD">>, resp_state=chunks}, _) ->
ok;
ensure_response(#http_req{version='HTTP/1.0', resp_state=chunks}, _) ->
ok;
ensure_response(#http_req{socket=Socket, transport=Transport,
resp_state=chunks}, _) ->
Transport:send(Socket, <<"0\r\n\r\n">>),
ensure_response(Req=#http_req{resp_state=chunks}, _) ->
_ = last_chunk(Req),
ok.
%% Private setter/getter API.
@ -1212,6 +1236,15 @@ to_list(Req) ->
-spec chunked_response(cowboy:http_status(), cowboy:http_headers(), Req) ->
{normal | hook, Req} when Req::req().
chunked_response(Status, Headers, Req=#http_req{
transport=cowboy_spdy, resp_state=waiting,
resp_headers=RespHeaders}) ->
{RespType, Req2} = response(Status, Headers, RespHeaders, [
{<<"date">>, cowboy_clock:rfc1123()},
{<<"server">>, <<"Cowboy">>}
], stream, Req),
{RespType, Req2#http_req{resp_state=chunks,
resp_headers=[], resp_body= <<>>}};
chunked_response(Status, Headers, Req=#http_req{
version=Version, connection=Connection,
resp_state=waiting, resp_headers=RespHeaders}) ->
@ -1230,7 +1263,7 @@ chunked_response(Status, Headers, Req=#http_req{
resp_headers=[], resp_body= <<>>}}.
-spec response(cowboy:http_status(), cowboy:http_headers(),
cowboy:http_headers(), cowboy:http_headers(), iodata(), Req)
cowboy:http_headers(), cowboy:http_headers(), stream | iodata(), Req)
-> {normal | hook, Req} when Req::req().
response(Status, Headers, RespHeaders, DefaultHeaders, Body, Req=#http_req{
socket=Socket, transport=Transport, version=Version,
@ -1239,22 +1272,32 @@ response(Status, Headers, RespHeaders, DefaultHeaders, Body, Req=#http_req{
already_called -> Headers;
_ -> response_merge_headers(Headers, RespHeaders, DefaultHeaders)
end,
Body2 = case Body of stream -> <<>>; _ -> Body end,
Req2 = case OnResponse of
already_called -> Req;
undefined -> Req;
OnResponse -> OnResponse(Status, FullHeaders, Body,
%% Don't call 'onresponse' from the hook itself.
Req#http_req{resp_headers=[], resp_body= <<>>,
onresponse=already_called})
OnResponse ->
OnResponse(Status, FullHeaders, Body2,
%% Don't call 'onresponse' from the hook itself.
Req#http_req{resp_headers=[], resp_body= <<>>,
onresponse=already_called})
end,
ReplyType = case Req2#http_req.resp_state of
waiting when Transport =:= cowboy_spdy, Body =:= stream ->
cowboy_spdy:stream_reply(Socket, status(Status), FullHeaders),
ReqPid ! {?MODULE, resp_sent},
normal;
waiting when Transport =:= cowboy_spdy ->
cowboy_spdy:reply(Socket, status(Status), FullHeaders, Body),
ReqPid ! {?MODULE, resp_sent},
normal;
waiting ->
HTTPVer = atom_to_binary(Version, latin1),
StatusLine = << HTTPVer/binary, " ",
(status(Status))/binary, "\r\n" >>,
HeaderLines = [[Key, <<": ">>, Value, <<"\r\n">>]
|| {Key, Value} <- FullHeaders],
Transport:send(Socket, [StatusLine, HeaderLines, <<"\r\n">>, Body]),
Transport:send(Socket, [StatusLine, HeaderLines, <<"\r\n">>, Body2]),
ReqPid ! {?MODULE, resp_sent},
normal;
_ ->