0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-15 12:40:25 +00:00

Replace init_stream/5 with stream_body/2

This allows us to change the max chunk length on a per chunk basis
instead of for the whole stream. It's also much easier to use this
way even if we don't want to change the chunk size.
This commit is contained in:
Loïc Hoguin 2013-04-02 18:43:37 +02:00
parent 66f7c15c4d
commit ce1d8862c0
2 changed files with 39 additions and 34 deletions

View file

@ -104,9 +104,9 @@ and `body_qs/2` will return `{error, badlength}`. If the request
contains chunked body, `body/1`, `body/2`, `body_qs/1` contains chunked body, `body/1`, `body/2`, `body_qs/1`
and `body_qs/2` will return `{error, chunked}`. and `body_qs/2` will return `{error, chunked}`.
If you get either of the above two errors, you will want to If you get either of the above two errors, you will want to
handle the body of the request using `stream_body/1` and handle the body of the request using `stream_body/1`,
`skip_body/1`, with the streaming process optionally `stream_body/2` and `skip_body/1`, with the streaming process
initialized using `init_stream/4` or `init_stream/5`. optionally initialized using `init_stream/4`.
Multipart request body Multipart request body
---------------------- ----------------------

View file

@ -78,8 +78,8 @@
-export([has_body/1]). -export([has_body/1]).
-export([body_length/1]). -export([body_length/1]).
-export([init_stream/4]). -export([init_stream/4]).
-export([init_stream/5]).
-export([stream_body/1]). -export([stream_body/1]).
-export([stream_body/2]).
-export([skip_body/1]). -export([skip_body/1]).
-export([body/1]). -export([body/1]).
-export([body/2]). -export([body/2]).
@ -155,8 +155,8 @@
meta = [] :: [{atom(), any()}], meta = [] :: [{atom(), any()}],
%% Request body. %% Request body.
body_state = waiting :: waiting | done | {stream, body_state = waiting :: waiting | done
non_neg_integer(), non_neg_integer(), fun(), any(), fun()}, | {stream, non_neg_integer(), fun(), any(), fun()},
multipart = undefined :: undefined | {non_neg_integer(), fun()}, multipart = undefined :: undefined | {non_neg_integer(), fun()},
buffer = <<>> :: binary(), buffer = <<>> :: binary(),
@ -591,17 +591,11 @@ body_length(Req) ->
{undefined, Req2} {undefined, Req2}
end. end.
%% @equiv init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req)
-spec init_stream(fun(), any(), fun(), Req)
-> {ok, Req} when Req::req().
init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req).
%% @doc Initialize body streaming and set custom decoding functions. %% @doc Initialize body streaming and set custom decoding functions.
%% %%
%% Calling this function is optional. It should only be used if you %% Calling this function is optional. It should only be used if you
%% need to override the default behavior of Cowboy. Otherwise you %% need to override the default behavior of Cowboy. Otherwise you
%% should call stream_body/1 directly. %% should call stream_body/{1,2} directly.
%% %%
%% Two decodings happen. First a decoding function is applied to the %% Two decodings happen. First a decoding function is applied to the
%% transferred data, and then another is applied to the actual content. %% transferred data, and then another is applied to the actual content.
@ -613,27 +607,36 @@ init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
%% Content encoding is generally used for compression. %% Content encoding is generally used for compression.
%% %%
%% Standard encodings can be found in cowboy_http. %% Standard encodings can be found in cowboy_http.
-spec init_stream(non_neg_integer(), fun(), any(), fun(), Req) -spec init_stream(fun(), any(), fun(), Req)
-> {ok, Req} when Req::req(). -> {ok, Req} when Req::req().
init_stream(MaxLength, TransferDecode, TransferState, ContentDecode, Req) -> init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
{ok, Req#http_req{body_state= {ok, Req#http_req{body_state=
{stream, 0, MaxLength, TransferDecode, TransferState, ContentDecode}}}. {stream, 0, TransferDecode, TransferState, ContentDecode}}}.
%% @equiv stream_body(Req, 1000000)
-spec stream_body(Req) -> {ok, binary(), Req}
| {done, Req} | {error, atom()} when Req::req().
stream_body(Req) ->
stream_body(Req, 1000000).
%% @doc Stream the request's body. %% @doc Stream the request's body.
%% %%
%% This is the most low level function to read the request body. %% This is the most low level function to read the request body.
%% %%
%% In most cases, if they weren't defined before using stream_body/4, %% In most cases, if they weren't defined before using init_stream/4,
%% this function will guess which transfer and content encodings were %% this function will guess which transfer and content encodings were
%% used for building the request body, and configure the decoding %% used for building the request body, and configure the decoding
%% functions that will be used when streaming. %% functions that will be used when streaming.
%% %%
%% It then starts streaming the body, returning {ok, Data, Req} %% It then starts streaming the body, returning {ok, Data, Req}
%% for each streamed part, and {done, Req} when it's finished streaming. %% for each streamed part, and {done, Req} when it's finished streaming.
-spec stream_body(Req) -> {ok, binary(), Req} %%
%% You can limit the size of the chunks being returned by using the
%% second argument which is the size in bytes. It defaults to 1000000 bytes.
-spec stream_body(Req, non_neg_integer()) -> {ok, binary(), Req}
| {done, Req} | {error, atom()} when Req::req(). | {done, Req} | {error, atom()} when Req::req().
stream_body(Req=#http_req{body_state=waiting, stream_body(Req=#http_req{body_state=waiting, version=Version,
version=Version, transport=Transport, socket=Socket}) -> transport=Transport, socket=Socket}, MaxLength) ->
{ok, ExpectHeader, Req1} = parse_header(<<"expect">>, Req), {ok, ExpectHeader, Req1} = parse_header(<<"expect">>, Req),
case ExpectHeader of case ExpectHeader of
[<<"100-continue">>] -> [<<"100-continue">>] ->
@ -646,9 +649,10 @@ stream_body(Req=#http_req{body_state=waiting,
case parse_header(<<"transfer-encoding">>, Req1) of case parse_header(<<"transfer-encoding">>, Req1) of
{ok, [<<"chunked">>], Req2} -> {ok, [<<"chunked">>], Req2} ->
stream_body(Req2#http_req{body_state= stream_body(Req2#http_req{body_state=
{stream, 0, 1000000, {stream, 0,
fun cowboy_http:te_chunked/2, {0, 0}, fun cowboy_http:te_chunked/2, {0, 0},
fun cowboy_http:ce_identity/1}}); fun cowboy_http:ce_identity/1}},
MaxLength);
{ok, [<<"identity">>], Req2} -> {ok, [<<"identity">>], Req2} ->
{Length, Req3} = body_length(Req2), {Length, Req3} = body_length(Req2),
case Length of case Length of
@ -656,24 +660,25 @@ stream_body(Req=#http_req{body_state=waiting,
{done, Req3#http_req{body_state=done}}; {done, Req3#http_req{body_state=done}};
Length -> Length ->
stream_body(Req3#http_req{body_state= stream_body(Req3#http_req{body_state=
{stream, Length, 1000000, {stream, Length,
fun cowboy_http:te_identity/2, {0, Length}, fun cowboy_http:te_identity/2, {0, Length},
fun cowboy_http:ce_identity/1}}) fun cowboy_http:ce_identity/1}},
MaxLength)
end end
end; end;
stream_body(Req=#http_req{body_state=done}) -> stream_body(Req=#http_req{body_state=done}, _) ->
{done, Req}; {done, Req};
stream_body(Req=#http_req{buffer=Buffer}) stream_body(Req=#http_req{buffer=Buffer}, _)
when Buffer =/= <<>> -> when Buffer =/= <<>> ->
transfer_decode(Buffer, Req#http_req{buffer= <<>>}); transfer_decode(Buffer, Req#http_req{buffer= <<>>});
stream_body(Req) -> stream_body(Req, MaxLength) ->
stream_body_recv(Req). stream_body_recv(Req, MaxLength).
-spec stream_body_recv(Req) -spec stream_body_recv(Req, non_neg_integer())
-> {ok, binary(), Req} | {error, atom()} when Req::req(). -> {ok, binary(), Req} | {error, atom()} when Req::req().
stream_body_recv(Req=#http_req{ stream_body_recv(Req=#http_req{
transport=Transport, socket=Socket, buffer=Buffer, transport=Transport, socket=Socket, buffer=Buffer,
body_state={stream, Length, MaxLength, _, _, _}}) -> body_state={stream, Length, _, _, _}}, MaxLength) ->
%% @todo Allow configuring the timeout. %% @todo Allow configuring the timeout.
case Transport:recv(Socket, min(Length, MaxLength), 5000) of case Transport:recv(Socket, min(Length, MaxLength), 5000) of
{ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>,
@ -683,20 +688,20 @@ stream_body_recv(Req=#http_req{
-spec transfer_decode(binary(), Req) -spec transfer_decode(binary(), Req)
-> {ok, binary(), Req} | {error, atom()} when Req::req(). -> {ok, binary(), Req} | {error, atom()} when Req::req().
transfer_decode(Data, Req=#http_req{body_state={stream, _, MaxLength, transfer_decode(Data, Req=#http_req{body_state={stream, _,
TransferDecode, TransferState, ContentDecode}}) -> TransferDecode, TransferState, ContentDecode}}) ->
case TransferDecode(Data, TransferState) of case TransferDecode(Data, TransferState) of
{ok, Data2, Rest, TransferState2} -> {ok, Data2, Rest, TransferState2} ->
content_decode(ContentDecode, Data2, content_decode(ContentDecode, Data2,
Req#http_req{buffer=Rest, body_state={stream, 0, MaxLength, Req#http_req{buffer=Rest, body_state={stream, 0,
TransferDecode, TransferState2, ContentDecode}}); TransferDecode, TransferState2, ContentDecode}});
%% @todo {header(s) for chunked %% @todo {header(s) for chunked
more -> more ->
stream_body_recv(Req#http_req{buffer=Data, body_state={stream, stream_body_recv(Req#http_req{buffer=Data, body_state={stream,
0, MaxLength, TransferDecode, TransferState, ContentDecode}}); 0, TransferDecode, TransferState, ContentDecode}}, 0);
{more, Length, Data2, TransferState2} -> {more, Length, Data2, TransferState2} ->
content_decode(ContentDecode, Data2, content_decode(ContentDecode, Data2,
Req#http_req{body_state={stream, Length, MaxLength, Req#http_req{body_state={stream, Length,
TransferDecode, TransferState2, ContentDecode}}); TransferDecode, TransferState2, ContentDecode}});
{done, Length, Rest} -> {done, Length, Rest} ->
Req2 = transfer_decode_done(Length, Rest, Req), Req2 = transfer_decode_done(Length, Rest, Req),