0
Fork 0
mirror of https://github.com/ninenines/cowboy.git synced 2025-07-14 20:30:23 +00:00

Implement recv timeout for SPDY

This commit is contained in:
Loïc Hoguin 2013-09-07 14:01:19 +02:00
parent c7f7e4456e
commit 299c93f661

View file

@ -51,7 +51,7 @@
input = nofin :: fin | nofin, input = nofin :: fin | nofin,
in_buffer = <<>> :: binary(), in_buffer = <<>> :: binary(),
is_recv = false :: {true, {non_neg_integer(), pid()}, is_recv = false :: {true, {non_neg_integer(), pid()},
pid(), non_neg_integer()} | false, pid(), non_neg_integer(), reference()} | false,
output = nofin :: fin | nofin output = nofin :: fin | nofin
}). }).
@ -127,8 +127,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
terminate(State); terminate(State);
{Error, Socket, _Reason} -> {Error, Socket, _Reason} ->
terminate(State); terminate(State);
%% @todo Timeout (send a message to self). {recv, FromSocket = {Pid, StreamID}, FromPid, Length, Timeout}
{recv, FromSocket = {Pid, StreamID}, FromPid, Length, _Timeout}
when Pid =:= self() -> when Pid =:= self() ->
Child = #child{in_buffer=InBuffer, is_recv=false} Child = #child{in_buffer=InBuffer, is_recv=false}
= get_child(StreamID, State), = get_child(StreamID, State),
@ -141,9 +140,18 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
FromPid ! {recv, FromSocket, {ok, Data}}, FromPid ! {recv, FromSocket, {ok, Data}},
loop(replace_child(Child#child{in_buffer=Rest}, State)); loop(replace_child(Child#child{in_buffer=Rest}, State));
true -> true ->
TRef = erlang:send_after(Timeout, self(),
{recv_timeout, FromSocket}),
loop(replace_child(Child#child{ loop(replace_child(Child#child{
is_recv={true, FromSocket, FromPid, Length}}, State)) is_recv={true, FromSocket, FromPid, Length, TRef}},
State))
end; end;
{recv_timeout, {Pid, StreamID}}
when Pid =:= self() ->
Child = #child{is_recv={true, FromSocket, FromPid, _, _}}
= get_child(StreamID, State),
FromPid ! {recv, FromSocket, {error, timeout}},
loop(replace_child(Child#child{is_recv=false}, State));
{reply, {Pid, StreamID}, Status, Headers} {reply, {Pid, StreamID}, Status, Headers}
when Pid =:= self() -> when Pid =:= self() ->
Child = #child{output=nofin} = get_child(StreamID, State), Child = #child{output=nofin} = get_child(StreamID, State),
@ -257,12 +265,15 @@ handle_frame(State, {data, StreamID, IsFin, Data}) ->
Data2 = << Buffer/binary, Data/binary >>, Data2 = << Buffer/binary, Data/binary >>,
IsFin2 = if IsFin -> fin; true -> nofin end, IsFin2 = if IsFin -> fin; true -> nofin end,
Child2 = case IsRecv of Child2 = case IsRecv of
{true, FromSocket, FromPid, 0} -> {true, FromSocket, FromPid, 0, TRef} ->
FromPid ! {recv, FromSocket, {ok, Data2}}, FromPid ! {recv, FromSocket, {ok, Data2}},
cancel_recv_timeout(StreamID, TRef),
Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false}; Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false};
{true, FromSocket, FromPid, Length} when byte_size(Data2) >= Length -> {true, FromSocket, FromPid, Length, TRef}
when byte_size(Data2) >= Length ->
<< Data3:Length/binary, Rest/binary >> = Data2, << Data3:Length/binary, Rest/binary >> = Data2,
FromPid ! {recv, FromSocket, {ok, Data3}}, FromPid ! {recv, FromSocket, {ok, Data3}},
cancel_recv_timeout(StreamID, TRef),
Child#child{input=IsFin2, in_buffer=Rest, is_recv=false}; Child#child{input=IsFin2, in_buffer=Rest, is_recv=false};
_ -> _ ->
Child#child{input=IsFin2, in_buffer=Data2} Child#child{input=IsFin2, in_buffer=Data2}
@ -277,6 +288,16 @@ handle_frame(State, Frame) ->
error_logger:error_msg("Ignored frame ~p", [Frame]), error_logger:error_msg("Ignored frame ~p", [Frame]),
loop(State). loop(State).
cancel_recv_timeout(StreamID, TRef) ->
_ = erlang:cancel_timer(TRef),
receive
{recv_timeout, {Pid, StreamID}}
when Pid =:= self() ->
ok
after 0 ->
ok
end.
%% @todo We must wait for the children to finish here, %% @todo We must wait for the children to finish here,
%% but only up to N milliseconds. Then we shutdown. %% but only up to N milliseconds. Then we shutdown.
terminate(_State) -> terminate(_State) ->