mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-14 20:30:23 +00:00
Queue HTTP/2 trailers when there's still data in the buffer
This commit is contained in:
parent
9969684035
commit
92672b49af
3 changed files with 33 additions and 6 deletions
|
@ -46,6 +46,7 @@
|
||||||
{cowboy_stream:fin(), non_neg_integer(), iolist()
|
{cowboy_stream:fin(), non_neg_integer(), iolist()
|
||||||
| {sendfile, non_neg_integer(), pos_integer(), file:name_all()}}),
|
| {sendfile, non_neg_integer(), pos_integer(), file:name_all()}}),
|
||||||
local_buffer_size = 0 :: non_neg_integer(),
|
local_buffer_size = 0 :: non_neg_integer(),
|
||||||
|
local_trailers = undefined :: undefined | cowboy:http_headers(),
|
||||||
%% Whether we finished receiving data.
|
%% Whether we finished receiving data.
|
||||||
remote = nofin :: cowboy_stream:fin(),
|
remote = nofin :: cowboy_stream:fin(),
|
||||||
%% Remote flow control window (how much we accept to receive).
|
%% Remote flow control window (how much we accept to receive).
|
||||||
|
@ -663,6 +664,9 @@ resume_streams(State0, [Stream0|Tail], Acc) ->
|
||||||
resume_streams(State1, Tail, [Stream|Acc])
|
resume_streams(State1, Tail, [Stream|Acc])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers})
|
||||||
|
when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) ->
|
||||||
|
send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers);
|
||||||
%% @todo We might want to print an error if local=fin.
|
%% @todo We might want to print an error if local=fin.
|
||||||
%%
|
%%
|
||||||
%% @todo It's possible that the stream terminates. We must remove it.
|
%% @todo It's possible that the stream terminates. We must remove it.
|
||||||
|
@ -681,12 +685,12 @@ send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}
|
||||||
send_data(State, Stream, IsFin, Data) ->
|
send_data(State, Stream, IsFin, Data) ->
|
||||||
send_data(State, Stream, IsFin, Data, in).
|
send_data(State, Stream, IsFin, Data, in).
|
||||||
|
|
||||||
%% Always send trailer frames even if the window is empty.
|
%% We can send trailers immediately if the queue is empty, otherwise we queue.
|
||||||
send_data(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0},
|
%% We always send trailer frames even if the window is empty.
|
||||||
Stream=#stream{id=StreamID}, fin, {trailers, Trailers}, _) ->
|
send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) ->
|
||||||
{HeaderBlock, EncodeState} = headers_encode(Trailers, EncodeState0),
|
send_trailers(State, Stream, Trailers);
|
||||||
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
|
send_data(State, Stream, fin, {trailers, Trailers}, _) ->
|
||||||
{State#state{encode_state=EncodeState}, Stream#stream{local=fin}};
|
{State, Stream#stream{local_trailers=Trailers}};
|
||||||
%% Send data immediately if we can, buffer otherwise.
|
%% Send data immediately if we can, buffer otherwise.
|
||||||
%% @todo We might want to print an error if local=fin.
|
%% @todo We might want to print an error if local=fin.
|
||||||
send_data(State=#state{local_window=ConnWindow},
|
send_data(State=#state{local_window=ConnWindow},
|
||||||
|
@ -725,6 +729,12 @@ send_data(State=#state{socket=Socket, transport=Transport, local_window=ConnWind
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
send_trailers(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0},
|
||||||
|
Stream=#stream{id=StreamID}, Trailers) ->
|
||||||
|
{HeaderBlock, EncodeState} = headers_encode(Trailers, EncodeState0),
|
||||||
|
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
|
||||||
|
{State#state{encode_state=EncodeState}, Stream#stream{local=fin}}.
|
||||||
|
|
||||||
queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
|
queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
|
||||||
DataSize = case Data of
|
DataSize = case Data of
|
||||||
{sendfile, _, Bytes, _} -> Bytes;
|
{sendfile, _, Bytes, _} -> Bytes;
|
||||||
|
|
|
@ -212,6 +212,15 @@ do(<<"stream_body">>, Req0, Opts) ->
|
||||||
end;
|
end;
|
||||||
do(<<"stream_trailers">>, Req0, Opts) ->
|
do(<<"stream_trailers">>, Req0, Opts) ->
|
||||||
case cowboy_req:binding(arg, Req0) of
|
case cowboy_req:binding(arg, Req0) of
|
||||||
|
<<"large">> ->
|
||||||
|
Req = cowboy_req:stream_reply(200, #{
|
||||||
|
<<"trailer">> => <<"grpc-status">>
|
||||||
|
}, Req0),
|
||||||
|
cowboy_req:stream_body(<<0:800000>>, nofin, Req),
|
||||||
|
cowboy_req:stream_trailers(#{
|
||||||
|
<<"grpc-status">> => <<"0">>
|
||||||
|
}, Req),
|
||||||
|
{ok, Req, Opts};
|
||||||
_ ->
|
_ ->
|
||||||
Req = cowboy_req:stream_reply(200, #{
|
Req = cowboy_req:stream_reply(200, #{
|
||||||
<<"trailer">> => <<"grpc-status">>
|
<<"trailer">> => <<"grpc-status">>
|
||||||
|
|
|
@ -854,6 +854,14 @@ stream_trailers(Config) ->
|
||||||
{_, <<"grpc-status">>} = lists:keyfind(<<"trailer">>, 1, RespHeaders),
|
{_, <<"grpc-status">>} = lists:keyfind(<<"trailer">>, 1, RespHeaders),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
stream_trailers_large(Config) ->
|
||||||
|
doc("Stream large body followed by trailer headers."),
|
||||||
|
{200, RespHeaders, <<0:800000>>, [
|
||||||
|
{<<"grpc-status">>, <<"0">>}
|
||||||
|
]} = do_trailers("/resp/stream_trailers/large", Config),
|
||||||
|
{_, <<"grpc-status">>} = lists:keyfind(<<"trailer">>, 1, RespHeaders),
|
||||||
|
ok.
|
||||||
|
|
||||||
stream_trailers_no_te(Config) ->
|
stream_trailers_no_te(Config) ->
|
||||||
doc("Stream body followed by trailer headers without a te header in the request."),
|
doc("Stream body followed by trailer headers without a te header in the request."),
|
||||||
ConnPid = gun_open(Config),
|
ConnPid = gun_open(Config),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue