mirror of
https://github.com/ninenines/cowboy.git
synced 2025-07-16 05:00:24 +00:00
129 lines
5.1 KiB
Erlang
129 lines
5.1 KiB
Erlang
![]() |
%% Copyright (c) 2016, Loïc Hoguin <essen@ninenines.eu>
|
||
|
%%
|
||
|
%% Permission to use, copy, modify, and/or distribute this software for any
|
||
|
%% purpose with or without fee is hereby granted, provided that the above
|
||
|
%% copyright notice and this permission notice appear in all copies.
|
||
|
%%
|
||
|
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||
|
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||
|
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||
|
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||
|
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||
|
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||
|
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||
|
|
||
|
-module(cowboy_stream_h).
|
||
|
%% @todo -behaviour(cowboy_stream).
|
||
|
|
||
|
%% @todo Maybe have a callback for the type of process this is, worker or supervisor.
|
||
|
-export([init/3]).
|
||
|
-export([data/4]).
|
||
|
-export([info/3]).
|
||
|
-export([terminate/3]).
|
||
|
|
||
|
-export([execute/3]).
|
||
|
-export([resume/5]).
|
||
|
|
||
|
-record(state, {
|
||
|
pid = undefined :: pid(),
|
||
|
read_body_ref = undefined :: reference(),
|
||
|
read_body_length = 0 :: non_neg_integer(),
|
||
|
read_body_is_fin = nofin :: nofin | fin,
|
||
|
read_body_buffer = <<>> :: binary()
|
||
|
}).
|
||
|
|
||
|
%% @todo For shutting down children we need to have a timeout before we terminate
|
||
|
%% the stream like supervisors do. So here just send a message to yourself first,
|
||
|
%% and then decide what to do when receiving this message.
|
||
|
|
||
|
%% @todo proper specs
|
||
|
-spec init(_,_,_) -> _.
|
||
|
init(_StreamID, Req, Opts) ->
|
||
|
Env = maps:get(env, Opts, #{}),
|
||
|
Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
|
||
|
Shutdown = maps:get(shutdown, Opts, 5000),
|
||
|
Pid = proc_lib:spawn_link(?MODULE, execute, [Req, Env, Middlewares]),
|
||
|
{[{spawn, Pid, Shutdown}], #state{pid=Pid}}.
|
||
|
|
||
|
%% If we receive data and stream is waiting for data:
|
||
|
%% If we accumulated enough data or IsFin=fin, send it.
|
||
|
%% If not, buffer it.
|
||
|
%% If not, buffer it.
|
||
|
|
||
|
%% @todo proper specs
|
||
|
-spec data(_,_,_,_) -> _.
|
||
|
data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
|
||
|
{[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
|
||
|
data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
|
||
|
{[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}};
|
||
|
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) ->
|
||
|
Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
|
||
|
{[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}.
|
||
|
|
||
|
%% @todo proper specs
|
||
|
-spec info(_,_,_) -> _.
|
||
|
info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
|
||
|
{[stop], State};
|
||
|
info(_StreamID, Reason = {'EXIT', Pid, _}, State=#state{pid=Pid}) ->
|
||
|
{[{internal_error, Reason, 'Stream process crashed.'}], State};
|
||
|
%% Request body, no body buffer but IsFin=fin.
|
||
|
info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) ->
|
||
|
Pid ! {request_body, Ref, fin, <<>>},
|
||
|
{[], State};
|
||
|
%% Request body, body buffered large enough or complete.
|
||
|
info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data})
|
||
|
when element(1, IsFin) =:= fin; byte_size(Data) >= Length ->
|
||
|
Pid ! {request_body, Ref, IsFin, Data},
|
||
|
{[], State#state{read_body_buffer= <<>>}};
|
||
|
%% Request body, not enough to send yet.
|
||
|
info(_StreamID, {read_body, Ref, Length}, State) ->
|
||
|
{[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}};
|
||
|
%% Response.
|
||
|
info(_StreamID, Response = {response, _, _, _}, State) ->
|
||
|
{[Response], State};
|
||
|
info(_StreamID, Headers = {headers, _, _}, State) ->
|
||
|
{[Headers], State};
|
||
|
info(_StreamID, Data = {data, _, _}, State) ->
|
||
|
{[Data], State};
|
||
|
info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
|
||
|
{[SwitchProtocol], State};
|
||
|
%% Stray message.
|
||
|
info(_StreamID, _Msg, State) ->
|
||
|
%% @todo Cleanup if no reply was sent when stream ends.
|
||
|
{[], State}.
|
||
|
|
||
|
%% @todo proper specs
|
||
|
-spec terminate(_,_,_) -> _.
|
||
|
terminate(_StreamID, _Reason, _State) ->
|
||
|
ok.
|
||
|
|
||
|
%% Request process.
|
||
|
|
||
|
%% @todo
|
||
|
%-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
|
||
|
% -> ok.
|
||
|
-spec execute(_, _, _) -> _.
|
||
|
execute(_, _, []) ->
|
||
|
ok; %% @todo Maybe error reason should differ here and there.
|
||
|
execute(Req, Env, [Middleware|Tail]) ->
|
||
|
case Middleware:execute(Req, Env) of
|
||
|
{ok, Req2, Env2} ->
|
||
|
execute(Req2, Env2, Tail);
|
||
|
{suspend, Module, Function, Args} ->
|
||
|
proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
|
||
|
{stop, _Req2} ->
|
||
|
ok %% @todo Maybe error reason should differ here and there.
|
||
|
end.
|
||
|
|
||
|
-spec resume(cowboy_middleware:env(), [module()],
|
||
|
module(), module(), [any()]) -> ok.
|
||
|
resume(Env, Tail, Module, Function, Args) ->
|
||
|
case apply(Module, Function, Args) of
|
||
|
{ok, Req2, Env2} ->
|
||
|
execute(Req2, Env2, Tail);
|
||
|
{suspend, Module2, Function2, Args2} ->
|
||
|
proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
|
||
|
{stop, _Req2} ->
|
||
|
ok %% @todo Maybe error reason should differ here and there.
|
||
|
end.
|