mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-15 04:30:25 +00:00
Fix chunked streaming of request body and improve speed
This commit is contained in:
parent
ab0699ab29
commit
85d05fff34
3 changed files with 41 additions and 20 deletions
|
@ -804,22 +804,23 @@ qvalue(Data, Fun, Q, _M) ->
|
||||||
%% Decoding.
|
%% Decoding.
|
||||||
|
|
||||||
%% @doc Decode a stream of chunks.
|
%% @doc Decode a stream of chunks.
|
||||||
-spec te_chunked(binary(), {non_neg_integer(), non_neg_integer()})
|
-spec te_chunked(Bin, TransferState)
|
||||||
-> more | {ok, binary(), {non_neg_integer(), non_neg_integer()}}
|
-> more | {more, non_neg_integer(), Bin, TransferState}
|
||||||
| {ok, binary(), binary(), {non_neg_integer(), non_neg_integer()}}
|
| {ok, Bin, TransferState} | {ok, Bin, Bin, TransferState}
|
||||||
| {done, non_neg_integer(), binary()} | {error, badarg}.
|
| {done, non_neg_integer(), Bin} | {error, badarg}
|
||||||
te_chunked(<<>>, _) ->
|
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
|
||||||
more;
|
|
||||||
te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) ->
|
te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) ->
|
||||||
{done, Streamed, Rest};
|
{done, Streamed, Rest};
|
||||||
te_chunked(Data, {0, Streamed}) ->
|
te_chunked(Data, {0, Streamed}) ->
|
||||||
%% @todo We are expecting an hex size, not a general token.
|
%% @todo We are expecting an hex size, not a general token.
|
||||||
token(Data,
|
token(Data,
|
||||||
fun (Rest, _) when byte_size(Rest) < 4 ->
|
fun (<< "\r\n", Rest/binary >>, BinLen) ->
|
||||||
more;
|
|
||||||
(<< "\r\n", Rest/binary >>, BinLen) ->
|
|
||||||
Len = list_to_integer(binary_to_list(BinLen), 16),
|
Len = list_to_integer(binary_to_list(BinLen), 16),
|
||||||
te_chunked(Rest, {Len, Streamed});
|
te_chunked(Rest, {Len, Streamed});
|
||||||
|
%% Chunk size shouldn't take too many bytes,
|
||||||
|
%% don't try to stream forever.
|
||||||
|
(Rest, _) when byte_size(Rest) < 16 ->
|
||||||
|
more;
|
||||||
(_, _) ->
|
(_, _) ->
|
||||||
{error, badarg}
|
{error, badarg}
|
||||||
end);
|
end);
|
||||||
|
@ -827,13 +828,12 @@ te_chunked(Data, {ChunkRem, Streamed}) when byte_size(Data) >= ChunkRem + 2 ->
|
||||||
<< Chunk:ChunkRem/binary, "\r\n", Rest/binary >> = Data,
|
<< Chunk:ChunkRem/binary, "\r\n", Rest/binary >> = Data,
|
||||||
{ok, Chunk, Rest, {0, Streamed + byte_size(Chunk)}};
|
{ok, Chunk, Rest, {0, Streamed + byte_size(Chunk)}};
|
||||||
te_chunked(Data, {ChunkRem, Streamed}) ->
|
te_chunked(Data, {ChunkRem, Streamed}) ->
|
||||||
Size = byte_size(Data),
|
{more, ChunkRem + 2, Data, {ChunkRem, Streamed}}.
|
||||||
{ok, Data, {ChunkRem - Size, Streamed + Size}}.
|
|
||||||
|
|
||||||
%% @doc Decode an identity stream.
|
%% @doc Decode an identity stream.
|
||||||
-spec te_identity(binary(), {non_neg_integer(), non_neg_integer()})
|
-spec te_identity(Bin, TransferState)
|
||||||
-> {ok, binary(), {non_neg_integer(), non_neg_integer()}}
|
-> {ok, Bin, TransferState} | {done, Bin, non_neg_integer(), Bin}
|
||||||
| {done, binary(), non_neg_integer(), binary()}.
|
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
|
||||||
te_identity(Data, {Streamed, Total})
|
te_identity(Data, {Streamed, Total})
|
||||||
when Streamed + byte_size(Data) < Total ->
|
when Streamed + byte_size(Data) < Total ->
|
||||||
{ok, Data, {Streamed + byte_size(Data), Total}};
|
{ok, Data, {Streamed + byte_size(Data), Total}};
|
||||||
|
|
|
@ -639,17 +639,18 @@ stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}})
|
||||||
when Buffer =/= <<>> ->
|
when Buffer =/= <<>> ->
|
||||||
transfer_decode(Buffer, Req#http_req{buffer= <<>>});
|
transfer_decode(Buffer, Req#http_req{buffer= <<>>});
|
||||||
stream_body(Req=#http_req{body_state={stream, _, _, _}}) ->
|
stream_body(Req=#http_req{body_state={stream, _, _, _}}) ->
|
||||||
stream_body_recv(Req);
|
stream_body_recv(0, Req);
|
||||||
stream_body(Req=#http_req{body_state=done}) ->
|
stream_body(Req=#http_req{body_state=done}) ->
|
||||||
{done, Req}.
|
{done, Req}.
|
||||||
|
|
||||||
-spec stream_body_recv(Req)
|
-spec stream_body_recv(non_neg_integer(), Req)
|
||||||
-> {ok, binary(), Req} | {error, atom()} when Req::req().
|
-> {ok, binary(), Req} | {error, atom()} when Req::req().
|
||||||
stream_body_recv(Req=#http_req{
|
stream_body_recv(Length, Req=#http_req{
|
||||||
transport=Transport, socket=Socket, buffer=Buffer}) ->
|
transport=Transport, socket=Socket, buffer=Buffer}) ->
|
||||||
%% @todo Allow configuring the timeout.
|
%% @todo Allow configuring the timeout.
|
||||||
case Transport:recv(Socket, 0, 5000) of
|
case Transport:recv(Socket, Length, 5000) of
|
||||||
{ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, Req);
|
{ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>,
|
||||||
|
Req#http_req{buffer= <<>>});
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -667,7 +668,10 @@ transfer_decode(Data, Req=#http_req{
|
||||||
{stream, TransferDecode, TransferState2, ContentDecode}});
|
{stream, TransferDecode, TransferState2, ContentDecode}});
|
||||||
%% @todo {header(s) for chunked
|
%% @todo {header(s) for chunked
|
||||||
more ->
|
more ->
|
||||||
stream_body_recv(Req#http_req{buffer=Data});
|
stream_body_recv(0, Req#http_req{buffer=Data});
|
||||||
|
{more, Length, Rest, TransferState2} ->
|
||||||
|
stream_body_recv(Length, Req#http_req{buffer=Rest, body_state=
|
||||||
|
{stream, TransferDecode, TransferState2, ContentDecode}});
|
||||||
{done, Length, Rest} ->
|
{done, Length, Rest} ->
|
||||||
Req2 = transfer_decode_done(Length, Rest, Req),
|
Req2 = transfer_decode_done(Length, Rest, Req),
|
||||||
{done, Req2};
|
{done, Req2};
|
||||||
|
|
|
@ -73,6 +73,7 @@
|
||||||
-export([stream_body_set_resp/1]).
|
-export([stream_body_set_resp/1]).
|
||||||
-export([stream_body_set_resp_close/1]).
|
-export([stream_body_set_resp_close/1]).
|
||||||
-export([te_chunked/1]).
|
-export([te_chunked/1]).
|
||||||
|
-export([te_chunked_chopped/1]).
|
||||||
-export([te_chunked_delayed/1]).
|
-export([te_chunked_delayed/1]).
|
||||||
-export([te_identity/1]).
|
-export([te_identity/1]).
|
||||||
|
|
||||||
|
@ -133,6 +134,7 @@ groups() ->
|
||||||
stream_body_set_resp,
|
stream_body_set_resp,
|
||||||
stream_body_set_resp_close,
|
stream_body_set_resp_close,
|
||||||
te_chunked,
|
te_chunked,
|
||||||
|
te_chunked_chopped,
|
||||||
te_chunked_delayed,
|
te_chunked_delayed,
|
||||||
te_identity
|
te_identity
|
||||||
],
|
],
|
||||||
|
@ -1037,6 +1039,21 @@ te_chunked(Config) ->
|
||||||
{ok, 200, _, Client3} = cowboy_client:response(Client2),
|
{ok, 200, _, Client3} = cowboy_client:response(Client2),
|
||||||
{ok, Body, _} = cowboy_client:response_body(Client3).
|
{ok, Body, _} = cowboy_client:response_body(Client3).
|
||||||
|
|
||||||
|
te_chunked_chopped(Config) ->
|
||||||
|
Client = ?config(client, Config),
|
||||||
|
Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])),
|
||||||
|
Body2 = iolist_to_binary(body_to_chunks(50, Body, [])),
|
||||||
|
{ok, Client2} = cowboy_client:request(<<"GET">>,
|
||||||
|
build_url("/echo/body", Config),
|
||||||
|
[{<<"transfer-encoding">>, <<"chunked">>}], Client),
|
||||||
|
{ok, Transport, Socket} = cowboy_client:transport(Client2),
|
||||||
|
_ = [begin
|
||||||
|
ok = Transport:send(Socket, << C >>),
|
||||||
|
ok = timer:sleep(10)
|
||||||
|
end || << C >> <= Body2],
|
||||||
|
{ok, 200, _, Client3} = cowboy_client:response(Client2),
|
||||||
|
{ok, Body, _} = cowboy_client:response_body(Client3).
|
||||||
|
|
||||||
te_chunked_delayed(Config) ->
|
te_chunked_delayed(Config) ->
|
||||||
Client = ?config(client, Config),
|
Client = ?config(client, Config),
|
||||||
Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])),
|
Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue