0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 12:20:24 +00:00

Fix HTTP/1.1 stopping streams too early

It is possible in some cases to move on to the next request
without waiting, but that can be done as an optimization
later on if necessary.
This commit is contained in:
Loïc Hoguin 2017-10-20 13:16:04 +01:00
parent b9526a1745
commit c602871f86
No known key found for this signature in database
GPG key ID: 71366FF21851DF03
3 changed files with 50 additions and 19 deletions

View file

@ -98,7 +98,7 @@
out_streamid = 1 :: pos_integer(), out_streamid = 1 :: pos_integer(),
%% Whether we finished writing data for the current stream. %% Whether we finished writing data for the current stream.
out_state = wait :: wait | headers | chunked | done, out_state = wait :: wait | chunked | done,
%% The connection will be closed after this stream. %% The connection will be closed after this stream.
last_streamid = undefined :: pos_integer(), last_streamid = undefined :: pos_integer(),
@ -809,12 +809,10 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
case Body of case Body of
{sendfile, O, B, P} -> {sendfile, O, B, P} ->
Transport:send(Socket, Response), Transport:send(Socket, Response),
commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]); commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]);
_ -> _ ->
Transport:send(Socket, [Response, Body]), Transport:send(Socket, [Response, Body]),
%% @todo If max number of requests, close connection. commands(State#state{out_state=done}, StreamID, Tail)
%% @todo If IsFin, maybe skip body of current request.
maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
end; end;
%% Send response headers and initiate chunked encoding. %% Send response headers and initiate chunked encoding.
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
@ -836,7 +834,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str
%% %%
%% @todo WINDOW_UPDATE stuff require us to buffer some data. %% @todo WINDOW_UPDATE stuff require us to buffer some data.
%% @todo We probably want to allow Data to be the {sendfile, ...} tuple also. %% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
[{data, IsFin, Data}|Tail]) -> [{data, IsFin, Data}|Tail]) ->
%% Do not send anything when the user asks to send an empty %% Do not send anything when the user asks to send an empty
%% data frame, as that would break the protocol. %% data frame, as that would break the protocol.
@ -853,12 +851,20 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, Stre
Transport:send(Socket, Data) Transport:send(Socket, Data)
end end
end, end,
maybe_terminate(State, StreamID, Tail, IsFin); State = case IsFin of
fin -> State0#state{out_state=done};
nofin -> State0
end,
commands(State, StreamID, Tail);
%% Send a file. %% Send a file.
commands(State=#state{socket=Socket, transport=Transport}, StreamID, commands(State0=#state{socket=Socket, transport=Transport}, StreamID,
[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
Transport:sendfile(Socket, Path, Offset, Bytes), Transport:sendfile(Socket, Path, Offset, Bytes),
maybe_terminate(State, StreamID, Tail, IsFin); State = case IsFin of
fin -> State0#state{out_state=done};
nofin -> State0
end,
commands(State, StreamID, Tail);
%% Protocol takeover. %% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
opts=Opts, children=Children}, StreamID, opts=Opts, children=Children}, StreamID,
@ -886,11 +892,13 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
%% Stream shutdown. %% Stream shutdown.
commands(State, StreamID, [stop|Tail]) -> commands(State, StreamID, [stop|Tail]) ->
%% @todo Do we want to run the commands after a stop? %% @todo Do we want to run the commands after a stop?
% commands(stream_terminate(State, StreamID, stop), StreamID, Tail). %% @todo We currently wait for the stop command before we
%% continue with the next request/response. In theory, if
%% @todo I think that's where we need to terminate streams. %% the request body was read fully and the response body
%% was sent fully we should be able to start working on
maybe_terminate(State, StreamID, Tail, fin); %% the next request concurrently. This can be done as a
%% future optimization.
maybe_terminate(State, StreamID, Tail);
%% HTTP/1.1 does not support push; ignore. %% HTTP/1.1 does not support push; ignore.
commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) -> commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
commands(State, StreamID, Tail). commands(State, StreamID, Tail).
@ -905,12 +913,10 @@ headers_to_list(Headers) ->
flush() -> flush() ->
receive _ -> flush() after 0 -> ok end. receive _ -> flush() after 0 -> ok end.
maybe_terminate(State, StreamID, Tail, nofin) ->
commands(State, StreamID, Tail);
%% @todo In these cases I'm not sure if we should continue processing commands. %% @todo In these cases I'm not sure if we should continue processing commands.
maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) -> maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok? terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
maybe_terminate(State, StreamID, _Tail, fin) -> maybe_terminate(State, StreamID, _Tail) ->
stream_terminate(State, StreamID, normal). stream_terminate(State, StreamID, normal).
stream_reset(State, StreamID, StreamError={internal_error, _, _}) -> stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->

View file

@ -43,6 +43,8 @@ init_commands(_, _, State=#state{test=shutdown_timeout_on_stream_stop}) ->
init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) -> init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) ->
Spawn = init_process(true, State), Spawn = init_process(true, State),
[{headers, 200, #{}}, {spawn, Spawn, 2000}]; [{headers, 200, #{}}, {spawn, Spawn, 2000}];
init_commands(_, _, State=#state{test=terminate_on_stop}) ->
[{response, 204, #{}, <<>>}];
init_commands(_, _, _) -> init_commands(_, _, _) ->
[{headers, 200, #{}}]. [{headers, 200, #{}}].
@ -72,7 +74,10 @@ info(_, crash, #state{test=crash_in_info}) ->
error(crash); error(crash);
info(StreamID, Info, State=#state{pid=Pid}) -> info(StreamID, Info, State=#state{pid=Pid}) ->
Pid ! {Pid, self(), info, StreamID, Info, State}, Pid ! {Pid, self(), info, StreamID, Info, State},
{[], State}. case Info of
please_stop -> {[stop], State};
_ -> {[], State}
end.
terminate(StreamID, Reason, State=#state{pid=Pid, test=crash_in_terminate}) -> terminate(StreamID, Reason, State=#state{pid=Pid, test=crash_in_terminate}) ->
Pid ! {Pid, self(), terminate, StreamID, Reason, State}, Pid ! {Pid, self(), terminate, StreamID, Reason, State},

View file

@ -341,3 +341,23 @@ terminate_on_socket_close(Config) ->
%% Confirm terminate/3 is called. %% Confirm terminate/3 is called.
receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
ok. ok.
terminate_on_stop(Config) ->
doc("Confirm terminate/3 is called after stop is returned."),
Self = self(),
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/long_polling", [
{<<"accept-encoding">>, <<"gzip">>},
{<<"x-test-case">>, <<"terminate_on_stop">>},
{<<"x-test-pid">>, pid_to_list(Self)}
]),
%% Confirm init/3 is called and receive the response.
Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end,
{response, fin, 204, _} = gun:await(ConnPid, Ref),
%% Confirm the stream is still alive even though we
%% received the response fully, and tell it to stop.
Pid ! {{Pid, 1}, please_stop},
receive {Self, Pid, info, _, please_stop, _} -> ok after 1000 -> error(timeout) end,
%% Confirm terminate/3 is called.
receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
ok.