mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-15 12:40:25 +00:00
Make streamed chunk size configurable
Defaults to a maximum of 1000000 bytes. Also standardize the te_identity and te_chunked decoding functions. Now they both try to read as much as possible (up to the limit), making body reading much faster when not using chunked encoding.
This commit is contained in:
parent
55e98f4f61
commit
233cf43ab9
3 changed files with 47 additions and 33 deletions
|
@ -94,7 +94,8 @@ If you know the request contains a body, and that it is
|
||||||
of appropriate size, then you can read it directly with
|
of appropriate size, then you can read it directly with
|
||||||
either `body/1` or `body_qs/1`. Otherwise, you will want
|
either `body/1` or `body_qs/1`. Otherwise, you will want
|
||||||
to stream it with `stream_body/1` and `skip_body/1`, with
|
to stream it with `stream_body/1` and `skip_body/1`, with
|
||||||
the streaming process optionally initialized using `init_stream/4`.
|
the streaming process optionally initialized using `init_stream/4`
|
||||||
|
or `init_stream/5`.
|
||||||
|
|
||||||
Multipart request body
|
Multipart request body
|
||||||
----------------------
|
----------------------
|
||||||
|
|
|
@ -853,7 +853,7 @@ authorization_basic_password(<<C, Rest/binary>>, Fun, Acc) ->
|
||||||
%% @doc Decode a stream of chunks.
|
%% @doc Decode a stream of chunks.
|
||||||
-spec te_chunked(Bin, TransferState)
|
-spec te_chunked(Bin, TransferState)
|
||||||
-> more | {more, non_neg_integer(), Bin, TransferState}
|
-> more | {more, non_neg_integer(), Bin, TransferState}
|
||||||
| {ok, Bin, TransferState} | {ok, Bin, Bin, TransferState}
|
| {ok, Bin, Bin, TransferState}
|
||||||
| {done, non_neg_integer(), Bin} | {error, badarg}
|
| {done, non_neg_integer(), Bin} | {error, badarg}
|
||||||
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
|
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
|
||||||
te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) ->
|
te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) ->
|
||||||
|
@ -879,11 +879,13 @@ te_chunked(Data, {ChunkRem, Streamed}) ->
|
||||||
|
|
||||||
%% @doc Decode an identity stream.
|
%% @doc Decode an identity stream.
|
||||||
-spec te_identity(Bin, TransferState)
|
-spec te_identity(Bin, TransferState)
|
||||||
-> {ok, Bin, TransferState} | {done, Bin, non_neg_integer(), Bin}
|
-> {more, non_neg_integer(), Bin, TransferState}
|
||||||
|
| {done, Bin, non_neg_integer(), Bin}
|
||||||
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
|
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
|
||||||
te_identity(Data, {Streamed, Total})
|
te_identity(Data, {Streamed, Total})
|
||||||
when Streamed + byte_size(Data) < Total ->
|
when Streamed + byte_size(Data) < Total ->
|
||||||
{ok, Data, {Streamed + byte_size(Data), Total}};
|
Streamed2 = Streamed + byte_size(Data),
|
||||||
|
{more, Total - Streamed2, Data, {Streamed2, Total}};
|
||||||
te_identity(Data, {Streamed, Total}) ->
|
te_identity(Data, {Streamed, Total}) ->
|
||||||
Size = Total - Streamed,
|
Size = Total - Streamed,
|
||||||
<< Data2:Size/binary, Rest/binary >> = Data,
|
<< Data2:Size/binary, Rest/binary >> = Data,
|
||||||
|
|
|
@ -78,6 +78,7 @@
|
||||||
-export([has_body/1]).
|
-export([has_body/1]).
|
||||||
-export([body_length/1]).
|
-export([body_length/1]).
|
||||||
-export([init_stream/4]).
|
-export([init_stream/4]).
|
||||||
|
-export([init_stream/5]).
|
||||||
-export([stream_body/1]).
|
-export([stream_body/1]).
|
||||||
-export([skip_body/1]).
|
-export([skip_body/1]).
|
||||||
-export([body/1]).
|
-export([body/1]).
|
||||||
|
@ -152,7 +153,8 @@
|
||||||
meta = [] :: [{atom(), any()}],
|
meta = [] :: [{atom(), any()}],
|
||||||
|
|
||||||
%% Request body.
|
%% Request body.
|
||||||
body_state = waiting :: waiting | done | {stream, fun(), any(), fun()},
|
body_state = waiting :: waiting | done | {stream,
|
||||||
|
non_neg_integer(), non_neg_integer(), fun(), any(), fun()},
|
||||||
multipart = undefined :: undefined | {non_neg_integer(), fun()},
|
multipart = undefined :: undefined | {non_neg_integer(), fun()},
|
||||||
buffer = <<>> :: binary(),
|
buffer = <<>> :: binary(),
|
||||||
|
|
||||||
|
@ -587,6 +589,12 @@ body_length(Req) ->
|
||||||
{undefined, Req2}
|
{undefined, Req2}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @equiv init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req)
|
||||||
|
-spec init_stream(fun(), any(), fun(), Req)
|
||||||
|
-> {ok, Req} when Req::req().
|
||||||
|
init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
|
||||||
|
init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req).
|
||||||
|
|
||||||
%% @doc Initialize body streaming and set custom decoding functions.
|
%% @doc Initialize body streaming and set custom decoding functions.
|
||||||
%%
|
%%
|
||||||
%% Calling this function is optional. It should only be used if you
|
%% Calling this function is optional. It should only be used if you
|
||||||
|
@ -603,10 +611,11 @@ body_length(Req) ->
|
||||||
%% Content encoding is generally used for compression.
|
%% Content encoding is generally used for compression.
|
||||||
%%
|
%%
|
||||||
%% Standard encodings can be found in cowboy_http.
|
%% Standard encodings can be found in cowboy_http.
|
||||||
-spec init_stream(fun(), any(), fun(), Req) -> {ok, Req} when Req::req().
|
-spec init_stream(non_neg_integer(), fun(), any(), fun(), Req)
|
||||||
init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
|
-> {ok, Req} when Req::req().
|
||||||
|
init_stream(MaxLength, TransferDecode, TransferState, ContentDecode, Req) ->
|
||||||
{ok, Req#http_req{body_state=
|
{ok, Req#http_req{body_state=
|
||||||
{stream, TransferDecode, TransferState, ContentDecode}}}.
|
{stream, 0, MaxLength, TransferDecode, TransferState, ContentDecode}}}.
|
||||||
|
|
||||||
%% @doc Stream the request's body.
|
%% @doc Stream the request's body.
|
||||||
%%
|
%%
|
||||||
|
@ -635,8 +644,9 @@ stream_body(Req=#http_req{body_state=waiting,
|
||||||
case parse_header(<<"transfer-encoding">>, Req1) of
|
case parse_header(<<"transfer-encoding">>, Req1) of
|
||||||
{ok, [<<"chunked">>], Req2} ->
|
{ok, [<<"chunked">>], Req2} ->
|
||||||
stream_body(Req2#http_req{body_state=
|
stream_body(Req2#http_req{body_state=
|
||||||
{stream, fun cowboy_http:te_chunked/2, {0, 0},
|
{stream, 0, 1000000,
|
||||||
fun cowboy_http:ce_identity/1}});
|
fun cowboy_http:te_chunked/2, {0, 0},
|
||||||
|
fun cowboy_http:ce_identity/1}});
|
||||||
{ok, [<<"identity">>], Req2} ->
|
{ok, [<<"identity">>], Req2} ->
|
||||||
{Length, Req3} = body_length(Req2),
|
{Length, Req3} = body_length(Req2),
|
||||||
case Length of
|
case Length of
|
||||||
|
@ -644,24 +654,26 @@ stream_body(Req=#http_req{body_state=waiting,
|
||||||
{done, Req3#http_req{body_state=done}};
|
{done, Req3#http_req{body_state=done}};
|
||||||
Length ->
|
Length ->
|
||||||
stream_body(Req3#http_req{body_state=
|
stream_body(Req3#http_req{body_state=
|
||||||
{stream, fun cowboy_http:te_identity/2, {0, Length},
|
{stream, Length, 1000000,
|
||||||
fun cowboy_http:ce_identity/1}})
|
fun cowboy_http:te_identity/2, {0, Length},
|
||||||
|
fun cowboy_http:ce_identity/1}})
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}})
|
stream_body(Req=#http_req{body_state=done}) ->
|
||||||
|
{done, Req};
|
||||||
|
stream_body(Req=#http_req{buffer=Buffer})
|
||||||
when Buffer =/= <<>> ->
|
when Buffer =/= <<>> ->
|
||||||
transfer_decode(Buffer, Req#http_req{buffer= <<>>});
|
transfer_decode(Buffer, Req#http_req{buffer= <<>>});
|
||||||
stream_body(Req=#http_req{body_state={stream, _, _, _}}) ->
|
stream_body(Req) ->
|
||||||
stream_body_recv(0, Req);
|
stream_body_recv(Req).
|
||||||
stream_body(Req=#http_req{body_state=done}) ->
|
|
||||||
{done, Req}.
|
|
||||||
|
|
||||||
-spec stream_body_recv(non_neg_integer(), Req)
|
-spec stream_body_recv(Req)
|
||||||
-> {ok, binary(), Req} | {error, atom()} when Req::req().
|
-> {ok, binary(), Req} | {error, atom()} when Req::req().
|
||||||
stream_body_recv(Length, Req=#http_req{
|
stream_body_recv(Req=#http_req{
|
||||||
transport=Transport, socket=Socket, buffer=Buffer}) ->
|
transport=Transport, socket=Socket, buffer=Buffer,
|
||||||
|
body_state={stream, Length, MaxLength, _, _, _}}) ->
|
||||||
%% @todo Allow configuring the timeout.
|
%% @todo Allow configuring the timeout.
|
||||||
case Transport:recv(Socket, Length, 5000) of
|
case Transport:recv(Socket, min(Length, MaxLength), 5000) of
|
||||||
{ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>,
|
{ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>,
|
||||||
Req#http_req{buffer= <<>>});
|
Req#http_req{buffer= <<>>});
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
|
@ -669,22 +681,21 @@ stream_body_recv(Length, Req=#http_req{
|
||||||
|
|
||||||
-spec transfer_decode(binary(), Req)
|
-spec transfer_decode(binary(), Req)
|
||||||
-> {ok, binary(), Req} | {error, atom()} when Req::req().
|
-> {ok, binary(), Req} | {error, atom()} when Req::req().
|
||||||
transfer_decode(Data, Req=#http_req{
|
transfer_decode(Data, Req=#http_req{body_state={stream, _, MaxLength,
|
||||||
body_state={stream, TransferDecode, TransferState, ContentDecode}}) ->
|
TransferDecode, TransferState, ContentDecode}}) ->
|
||||||
case TransferDecode(Data, TransferState) of
|
case TransferDecode(Data, TransferState) of
|
||||||
{ok, Data2, TransferState2} ->
|
|
||||||
content_decode(ContentDecode, Data2, Req#http_req{body_state=
|
|
||||||
{stream, TransferDecode, TransferState2, ContentDecode}});
|
|
||||||
{ok, Data2, Rest, TransferState2} ->
|
{ok, Data2, Rest, TransferState2} ->
|
||||||
content_decode(ContentDecode, Data2, Req#http_req{
|
content_decode(ContentDecode, Data2,
|
||||||
buffer=Rest, body_state=
|
Req#http_req{buffer=Rest, body_state={stream, 0, MaxLength,
|
||||||
{stream, TransferDecode, TransferState2, ContentDecode}});
|
TransferDecode, TransferState2, ContentDecode}});
|
||||||
%% @todo {header(s) for chunked
|
%% @todo {header(s) for chunked
|
||||||
more ->
|
more ->
|
||||||
stream_body_recv(0, Req#http_req{buffer=Data});
|
stream_body_recv(Req#http_req{buffer=Data, body_state={stream,
|
||||||
{more, Length, Rest, TransferState2} ->
|
0, MaxLength, TransferDecode, TransferState, ContentDecode}});
|
||||||
stream_body_recv(Length, Req#http_req{buffer=Rest, body_state=
|
{more, Length, Data2, TransferState2} ->
|
||||||
{stream, TransferDecode, TransferState2, ContentDecode}});
|
content_decode(ContentDecode, Data2,
|
||||||
|
Req#http_req{body_state={stream, Length, MaxLength,
|
||||||
|
TransferDecode, TransferState2, ContentDecode}});
|
||||||
{done, Length, Rest} ->
|
{done, Length, Rest} ->
|
||||||
Req2 = transfer_decode_done(Length, Rest, Req),
|
Req2 = transfer_decode_done(Length, Rest, Req),
|
||||||
{done, Req2};
|
{done, Req2};
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue