diff --git a/src/ec_plists.erl b/src/ec_plists.erl
index 688d5a5..a021d02 100644
--- a/src/ec_plists.erl
+++ b/src/ec_plists.erl
@@ -1,214 +1,264 @@
-% @author Stephen Marsh
-% @copyright 2007 Stephen Marsh freeyourmind ++ [$@|gmail.com]
-% @doc plists is a drop-in replacement for module
-% lists,
-% making most list operations parallel. It can operate on each element in
-% parallel, for IO-bound operations, on sublists in parallel, for
-% taking advantage of multi-core machines with CPU-bound operations, and
-% across erlang nodes, for parallizing inside a cluster. It handles
-% errors and node failures. It can be configured, tuned, and tweaked to
-% get optimal performance while minimizing overhead.
-%
-% Almost all the functions are
-% identical to equivalent functions in lists, returning exactly the same
-% result, and having both a form with an identical syntax that operates on
-% each element in parallel and a form which takes an optional "malt",
-% a specification for how to parallize the operation.
-%
-% fold is the one exception, parallel fold is different from linear fold.
-% This module also include a simple mapreduce implementation, and the
-% function runmany. All the other functions are implemented with runmany,
-% which is as a generalization of parallel list operations.
-%
-% == Malts ==
-% A malt specifies how to break a list into sublists, and can optionally
-% specify a timeout, which nodes to run on, and how many processes to start
-% per node.
-%
-% Malt = MaltComponent | [MaltComponent]
-% MaltComponent = SubListSize::integer() | {processes, integer()} |
-% {processes, schedulers} |
-% {timeout, Milliseconds::integer()} | {nodes, [NodeSpec]}
-% NodeSpec = Node::atom() | {Node::atom(), NumProcesses::integer()} |
-% {Node::atom(), schedulers}
-%
-% An integer can be given to specify the exact size for
-% sublists. 1 is a good choice for IO-bound operations and when
-% the operation on each list element is expensive. Larger numbers
-% minimize overhead and are faster for cheap operations.
-%
-% If the integer is omitted, and
-% you have specified a {processes, X}, the list is
-% split into X sublists. This is only
-% useful when the time to process each element is close to identical and you
-% know exactly how many lines of execution are available to you.
-%
-% If neither of the above applies, the sublist size defaults to 1.
-%
-% You can use {processes, X} to have the list processed
-% by X processes on the local machine. A good choice for X is the number of
-% lines of execution (cores) the machine provides. This can be done
-% automatically with {processes, schedulers}, which sets
-% the number of processes to the number of schedulers in the erlang virtual
-% machine (probably equal to the number of cores).
-%
-% {timeout, Milliseconds} specifies a timeout. This is a timeout for the entire
-% operation, both operating on the sublists and combining the results.
-% exit(timeout) is evaluated if the timeout is exceeded.
-%
-% {nodes, NodeList} specifies that the operation should be done across nodes.
-% Every element of NodeList is of the form {NodeName, NumProcesses} or
-% NodeName, which means the same as {NodeName, 1}. plists runs
-% NumProcesses processes on NodeName concurrently. A good choice for
-% NumProcesses is the number of lines of execution (cores) a node provides
-% plus one. This ensures the node is completely busy even when
-% fetching a new sublist. This can be done automatically with
-% {NodeName, schedulers}, in which case
-% plists uses a cached value if it has one, and otherwise finds the number of
-% schedulers in the remote node and adds one. This will ensure at least one
-% busy process per core (assuming the node has a scheduler for each core).
-%
-% plists is able to recover if a node goes down.
-% If all nodes go down, exit(allnodescrashed) is evaluated.
-%
-% Any of the above may be used as a malt, or may be combined into a list.
-% {nodes, NodeList} and {processes, X} may not be combined.
-%
-% === Examples ===
-% % start a process for each element (1-element sublists)
-% 1
-%
-% % start a process for each ten elements (10-element sublists)
-% 10
-%
-% % split the list into two sublists and process in two processes
-% {processes, 2}
-%
-% % split the list into X sublists and process in X processes,
-% % where X is the number of cores in the machine
-% {processes, schedulers}
-%
-% % split the list into 10-element sublists and process in two processes
-% [10, {processes, 2}]
-%
-% % timeout after one second. Assumes that a process should be started
-% % for each element.
-% {timeout, 1000}
-%
-% % Runs 3 processes at a time on apple@desktop,
-% and 2 on orange@laptop
-% % This is the best way to utilize all the CPU-power of a dual-core
-% % desktop and a single-core laptop. Assumes that the list should be
-% % split into 1-element sublists.
-% {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}
-%
-% Like above, but makes plists figure out how many processes to use.
-% {nodes, [{apple@desktop, schedulers}, {orange@laptop, schedulers}]}
-%
-% % Gives apple and orange three seconds to process the list as
-% % 100-element sublists.
-% [100, {timeout, 3000}, {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}]
-%
-% === Aside: Why Malt? ===
-% I needed a word for this concept, so maybe my subconsciousness gave me one by
-% making me misspell multiply. Maybe it is an acronym for Malt is A List
-% Tearing Specification. Maybe it is a beer metaphor, suggesting that code
-% only runs in parallel if bribed with spirits. It's jargon, learn it
-% or you can't be part of the in-group.
-%
-% == Messages and Errors ==
-% plists assures that no extraneous messages are left in or will later
-% enter the message queue. This is guaranteed even in the event of an error.
-%
-% Errors in spawned processes are caught and propagated to the calling
-% process. If you invoke
-%
-% plists:map(fun (X) -> 1/X end, [1, 2, 3, 0]).
-%
-% you get a badarith error, exactly like when you use lists:map.
-%
-% plists uses monitors to watch the processes it spawns. It is not a good idea
-% to invoke plists when you are already monitoring processes. If one of them
-% does a non-normal exit, plists receives the 'DOWN' message believing it to be
-% from one of its own processes. The error propagation system goes into
-% effect, which results in the error occuring in the calling process.
-%
-% == License ==
-% The MIT License
-%
-% Copyright (c) 2007 Stephen Marsh
-%
-% Permission is hereby granted, free of charge, to any person obtaining a copy
-% of this software and associated documentation files (the "Software"), to deal
-% in the Software without restriction, including without limitation the rights
-% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-% copies of the Software, and to permit persons to whom the Software is
-% furnished to do so, subject to the following conditions:
-%
-% The above copyright notice and this permission notice shall be included in
-% all copies or substantial portions of the Software.
-%
-% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-% THE SOFTWARE.
+%%% -*- mode: Erlang; fill-column: 80; comment-column: 75; -*-
+%%% The MIT License
+%%%
+%%% Copyright (c) 2007 Stephen Marsh
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in
+%%% all copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%%% THE SOFTWARE.
+%%%---------------------------------------------------------------------------
+%%% @author Stephen Marsh
+%%% @copyright 2007 Stephen Marsh freeyourmind ++ [$@|gmail.com]
+%%% @doc
+%%% plists is a drop-in replacement for module lists, making
+%%% most list operations parallel. It can operate on each element in
+%%% parallel, for IO-bound operations, on sublists in parallel, for
+%%% taking advantage of multi-core machines with CPU-bound operations,
+%%% and across erlang nodes, for parallizing inside a cluster. It
+%%% handles errors and node failures. It can be configured, tuned, and
+%%% tweaked to get optimal performance while minimizing overhead.
+%%%
+%%% Almost all the functions are identical to equivalent functions in
+%%% lists, returning exactly the same result, and having both a form
+%%% with an identical syntax that operates on each element in parallel
+%%% and a form which takes an optional "malt", a specification for how
+%%% to parallize the operation.
+%%%
+%%% fold is the one exception, parallel fold is different from linear
+%%% fold. This module also include a simple mapreduce implementation,
+%%% and the function runmany. All the other functions are implemented
+%%% with runmany, which is as a generalization of parallel list
+%%% operations.
+%%%
+%%% Malts
+%%% =====
+%%%
+%%% A malt specifies how to break a list into sublists, and can optionally
+%%% specify a timeout, which nodes to run on, and how many processes to start
+%%% per node.
+%%%
+%%% Malt = MaltComponent | [MaltComponent]
+%%% MaltComponent = SubListSize::integer() | {processes, integer()} |
+%%% {processes, schedulers} |
+%%% {timeout, Milliseconds::integer()} | {nodes, [NodeSpec]}
+%%%
+%%% NodeSpec = Node::atom() | {Node::atom(), NumProcesses::integer()} |
+%%% {Node::atom(), schedulers}
+%%%
+%%% An integer can be given to specify the exact size for sublists. 1
+%%% is a good choice for IO-bound operations and when the operation on
+%%% each list element is expensive. Larger numbers minimize overhead
+%%% and are faster for cheap operations.
+%%%
+%%% If the integer is omitted, and you have specified a `{processes,
+%%% X}`, the list is split into X sublists. This is only useful when
+%%% the time to process each element is close to identical and you
+%%% know exactly how many lines of execution are available to you.
+%%%
+%%% If neither of the above applies, the sublist size defaults to 1.
+%%%
+%%% You can use `{processes, X}` to have the list processed by `X`
+%%% processes on the local machine. A good choice for `X` is the
+%%% number of lines of execution (cores) the machine provides. This
+%%% can be done automatically with {processes, schedulers}, which sets
+%%% the number of processes to the number of schedulers in the erlang
+%%% virtual machine (probably equal to the number of cores).
+%%%
+%%% `{timeout, Milliseconds}` specifies a timeout. This is a timeout
+%%% for the entire operation, both operating on the sublists and
+%%% combining the results. exit(timeout) is evaluated if the timeout
+%%% is exceeded.
+%%%
+%%% `{nodes, NodeList}` specifies that the operation should be done
+%%% across nodes. Every element of NodeList is of the form
+%%% `{NodeName, NumProcesses}` or NodeName, which means the same as
+%%% `{NodeName, 1}`. plists runs NumProcesses processes on NodeName
+%%% concurrently. A good choice for NumProcesses is the number of
+%%% lines of execution (cores) a node provides plus one. This ensures
+%%% the node is completely busy even when fetching a new sublist. This
+%%% can be done automatically with `{NodeName, schedulers}`, in which
+%%% case plists uses a cached value if it has one, and otherwise finds
+%%% the number of schedulers in the remote node and adds one. This
+%%% will ensure at least one busy process per core (assuming the node
+%%% has a scheduler for each core).
+%%%
+%%% plists is able to recover if a node goes down. If all nodes go
+%%% down, exit(allnodescrashed) is evaluated.
+%%%
+%%% Any of the above may be used as a malt, or may be combined into a
+%%% list. `{nodes, NodeList}` and {processes, X} may not be combined.
+%%%
+%%% Examples
+%%% ========
+%%%
+%%% %%start a process for each element (1-element sublists)<
+%%% 1
+%%%
+%%% %% start a process for each ten elements (10-element sublists)
+%%% 10
+%%%
+%%% %% split the list into two sublists and process in two processes
+%%% {processes, 2}
+%%%
+%%% %% split the list into X sublists and process in X processes,
+%%% %% where X is the number of cores in the machine
+%%% {processes, schedulers}
+%%%
+%%% %% split the list into 10-element sublists and process in two processes
+%%% [10, {processes, 2}]
+%%%
+%%% %% timeout after one second. Assumes that a process should be started
+%%% %% for each element.
+%%% {timeout, 1000}
+%%%
+%%% %% Runs 3 processes at a time on apple@desktop, and 2 on orange@laptop
+%%% %% This is the best way to utilize all the CPU-power of a dual-core
+%%% %% desktop and a single-core laptop. Assumes that the list should be
+%%% %% split into 1-element sublists.
+%%% {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}
+%%%
+%%% %% Like above, but makes plists figure out how many processes to use.
+%%% {nodes, [{apple@desktop, schedulers}, {orange@laptop, schedulers}]}
+%%%
+%%% %% Gives apple and orange three seconds to process the list as
+%%% %% 100-element sublists.
+%%% [100, {timeout, 3000}, {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}]
+%%%
+%%% Aside: Why Malt?
+%%% ================
+%%%
+%%% I needed a word for this concept, so maybe my subconsciousness
+%%% gave me one by making me misspell multiply. Maybe it is an acronym
+%%% for Malt is A List Tearing Specification. Maybe it is a beer
+%%% metaphor, suggesting that code only runs in parallel if bribed
+%%% with spirits. It's jargon, learn it or you can't be part of the
+%%% in-group.
+%%%
+%%% Messages and Errors
+%%% ===================
+%%%
+%%% plists assures that no extraneous messages are left in or will
+%%% later enter the message queue. This is guaranteed even in the
+%%% event of an error.
+%%%
+%%% Errors in spawned processes are caught and propagated to the
+%%% calling process. If you invoke
+%%%
+%%% plists:map(fun (X) -> 1/X end, [1, 2, 3, 0]).
+%%%
+%%% you get a badarith error, exactly like when you use lists:map.
+%%%
+%%% plists uses monitors to watch the processes it spawns. It is not a
+%%% good idea to invoke plists when you are already monitoring
+%%% processes. If one of them does a non-normal exit, plists receives
+%%% the 'DOWN' message believing it to be from one of its own
+%%% processes. The error propagation system goes into effect, which
+%%% results in the error occuring in the calling process.
+%%%
+-module(ec_plists).
+-export([all/2, all/3,
+ any/2, any/3,
+ filter/2, filter/3,
+ fold/3, fold/4, fold/5,
+ foreach/2, foreach/3,
+ map/2, map/3,
+ ftmap/2, ftmap/3,
+ partition/2, partition/3,
+ sort/1, sort/2, sort/3,
+ usort/1, usort/2, usort/3,
+ mapreduce/2, mapreduce/3, mapreduce/5,
+ runmany/3, runmany/4]).
--module(plists).
--export([all/2, all/3, any/2, any/3, filter/2, filter/3,
-fold/3, fold/4, fold/5, foreach/2, foreach/3, map/2, map/3,
-partition/2, partition/3, sort/1, sort/2, sort/3,
-usort/1, usort/2, usort/3, mapreduce/2, mapreduce/3, mapreduce/5,
-runmany/3, runmany/4]).
+-export_type([malt/0, malt_component/0, node_spec/0, fuse/0, fuse_fun/0]).
-% Everything here is defined in terms of runmany.
-% The following methods are convient interfaces to runmany.
+%%============================================================================
+%% types
+%%============================================================================
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List) -> bool()
+-type malt() :: malt_component() | [malt_component()].
+
+-type malt_component() :: SubListSize::integer()
+ | {processes, integer()}
+ | {processes, schedulers}
+ | {timeout, Milliseconds::integer()}
+ | {nodes, [node_spec()]}.
+
+-type node_spec() :: Node::atom()
+ | {Node::atom(), NumProcesses::integer()}
+ | {Node::atom(), schedulers}.
+
+-type fuse_fun() :: fun((term(), term()) -> term()).
+-type fuse() :: fuse_fun() | {recursive, fuse_fun()} | {reverse, fuse_fun()}.
+-type el_fun() :: fun((term()) -> term()).
+
+%%============================================================================
+%% API
+%%============================================================================
+
+%% Everything here is defined in terms of runmany.
+%% The following methods are convient interfaces to runmany.
+
+%% @doc Same semantics as in module
+%% lists.
+-spec all/2 :: (el_fun(), list()) -> boolean().
all(Fun, List) ->
all(Fun, List, 1).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List, Malt) -> bool()
+%% @doc Same semantics as in module
+%% lists.
+-spec all/3 :: (el_fun(), list(), malt()) -> boolean().
all(Fun, List, Malt) ->
- try runmany(fun (L) ->
+ try
+ runmany(fun (L) ->
B = lists:all(Fun, L),
- if B ->
+ if
+ B ->
nil;
- true ->
- exit(notall)
+ true ->
+ erlang:throw(notall)
end
end,
fun (_A1, _A2) ->
nil
end,
- List, Malt) of
- _ ->
- true
- catch exit:notall ->
- false
+ List, Malt),
+ true
+ catch
+ throw:notall ->
+ false
end.
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List) -> bool()
+%% @doc Same semantics as in module
+%% lists.
+-spec any/2 :: (fun(), list()) -> boolean().
any(Fun, List) ->
any(Fun, List, 1).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List, Malt) -> bool()
+%% @doc Same semantics as in module
+%% lists.
+-spec any/3 :: (fun(), list(), malt()) -> boolean().
any(Fun, List, Malt) ->
- try runmany(fun (L) ->
+ try
+ runmany(fun (L) ->
B = lists:any(Fun, L),
if B ->
- exit(any);
+ erlang:throw(any);
true ->
nil
end
@@ -219,19 +269,19 @@ any(Fun, List, Malt) ->
List, Malt) of
_ ->
false
- catch exit:any ->
+ catch throw:any ->
true
end.
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List) -> list()
+%% @doc Same semantics as in module
+%% lists.
+-spec filter/2 :: (fun(), list()) -> list().
filter(Fun, List) ->
filter(Fun, List, 1).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List, Malt) -> list()
+%% @doc Same semantics as in module
+%% lists.
+-spec filter/3 :: (fun(), list(), malt()) -> list().
filter(Fun, List, Malt) ->
runmany(fun (L) ->
lists:filter(Fun, L)
@@ -241,53 +291,55 @@ filter(Fun, List, Malt) ->
end},
List, Malt).
-% Note that with parallel fold there is not foldl and foldr,
-% instead just one fold that can fuse Accumlators.
+%% Note that with parallel fold there is not foldl and foldr,
+%% instead just one fold that can fuse Accumlators.
-% @doc Like below, but assumes 1 as the Malt. This function is almost useless,
-% and is intended only to aid converting code from using lists to plists.
-% @spec (Fun, InitAcc, List) -> term()
+%% @doc Like below, but assumes 1 as the Malt. This function is almost useless,
+%% and is intended only to aid converting code from using lists to plists.
+-spec fold/3 :: (fun(), InitAcc::term(), list()) -> term().
fold(Fun, InitAcc, List) ->
fold(Fun, Fun, InitAcc, List, 1).
-% @doc Like below, but uses the Fun as the Fuse by default.
-% @spec (Fun, InitAcc, List, Malt) -> term()
+%% @doc Like below, but uses the Fun as the Fuse by default.
+-spec fold/4 :: (fun(), InitAcc::term(), list(), malt()) -> term().
fold(Fun, InitAcc, List, Malt) ->
fold(Fun, Fun, InitAcc, List, Malt).
-% @doc fold is more complex when made parallel. There is no foldl and foldr,
-% accumulators aren't passed in any defined order.
-% The list is split into sublists which are folded together. Fun is
-% identical to the function passed to lists:fold[lr], it takes
-% (an element, and the accumulator) and returns -> a new accumulator.
-% It is used for the initial stage of folding sublists. Fuse fuses together
-% the results, it takes (Results1, Result2) and returns -> a new result.
-% By default sublists are fused left to right, each result of a fuse being
-% fed into the first element of the next fuse. The result of the last fuse
-% is the result.
-%
-% Fusing may also run in parallel using a recursive algorithm,
-% by specifying the fuse as {recursive, Fuse}. See
-% the discussion in {@link runmany/4}.
-%
-% Malt is the malt for the initial folding of sublists, and for the
-% possible recursive fuse.
-% @spec (Fun, Fuse, InitAcc, List, Malt) -> term()
+%% @doc fold is more complex when made parallel. There is no foldl and
+%% foldr, accumulators aren't passed in any defined order. The list
+%% is split into sublists which are folded together. Fun is identical
+%% to the function passed to lists:fold[lr], it takes (an element, and
+%% the accumulator) and returns -> a new accumulator. It is used for
+%% the initial stage of folding sublists. Fuse fuses together the
+%% results, it takes (Results1, Result2) and returns -> a new result.
+%% By default sublists are fused left to right, each result of a fuse
+%% being fed into the first element of the next fuse. The result of
+%% the last fuse is the result.
+%%
+%% Fusing may also run in parallel using a recursive algorithm,
+%% by specifying the fuse as {recursive, Fuse}. See
+%% the discussion in {@link runmany/4}.
+%%
+%% Malt is the malt for the initial folding of sublists, and for the
+%% possible recursive fuse.
+-spec fold/5 :: (fun(), fuse(), InitAcc::term(), list(), malt()) -> term().
fold(Fun, Fuse, InitAcc, List, Malt) ->
- Fun2 = fun (L) -> lists:foldl(Fun, InitAcc, L) end,
+ Fun2 = fun (L) ->
+ lists:foldl(Fun, InitAcc, L)
+ end,
runmany(Fun2, Fuse, List, Malt).
-% @doc Similiar to foreach in module
-% lists
-% except it makes no guarantee about the order it processes list elements.
-% @spec (Fun, List) -> void()
+%% @doc Similiar to foreach in module
+%% lists
+%% except it makes no guarantee about the order it processes list elements.
+-spec foreach/2 :: (fun(), list()) -> ok.
foreach(Fun, List) ->
foreach(Fun, List, 1).
-% @doc Similiar to foreach in module
-% lists
-% except it makes no guarantee about the order it processes list elements.
-% @spec (Fun, List, Malt) -> void()
+%% @doc Similiar to foreach in module
+%% lists
+%% except it makes no guarantee about the order it processes list elements.
+-spec foreach/3 :: (fun(), list(), malt()) -> ok.
foreach(Fun, List, Malt) ->
runmany(fun (L) ->
lists:foreach(Fun, L)
@@ -297,33 +349,57 @@ foreach(Fun, List, Malt) ->
end,
List, Malt).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List) -> list()
+%% @doc Same semantics as in module
+%% lists.
+-spec map/2 :: (fun(), list()) -> list().
map(Fun, List) ->
map(Fun, List, 1).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List, Malt) -> list()
+%% @doc Same semantics as in module
+%% lists.
+-spec map/3 :: (fun(), list(), malt()) -> list().
map(Fun, List, Malt) ->
runmany(fun (L) ->
lists:map(Fun, L)
end,
{reverse, fun (A1, A2) ->
- A1 ++ A2
- end},
+ A1 ++ A2
+ end},
List, Malt).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List) -> {list(), list()}
+%% @doc values are returned as {value, term()}.
+-spec ftmap/2 :: (fun(), list()) -> list().
+ftmap(Fun, List) ->
+ map(fun(L) ->
+ try
+ {value, Fun(L)}
+ catch
+ Class:Type ->
+ {error, {Class, Type}}
+ end
+ end, List).
+
+%% @doc values are returned as {value, term()}.
+-spec ftmap/3 :: (fun(), list(), malt()) -> list().
+ftmap(Fun, List, Malt) ->
+ map(fun(L) ->
+ try
+ {value, Fun(L)}
+ catch
+ Class:Type ->
+ {error, {Class, Type}}
+ end
+ end, List, Malt).
+
+%% @doc Same semantics as in module
+%% lists.
+-spec partition/2 :: (fun(), list()) -> {list(), list()}.
partition(Fun, List) ->
partition(Fun, List, 1).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List, Malt) -> {list(), list()}
+%% @doc Same semantics as in module
+%% lists.
+-spec partition/3 :: (fun(), list(), malt()) -> {list(), list()}.
partition(Fun, List, Malt) ->
runmany(fun (L) ->
lists:partition(Fun, L)
@@ -333,109 +409,110 @@ partition(Fun, List, Malt) ->
end},
List, Malt).
-% SORTMALT needs to be tuned
+%% SORTMALT needs to be tuned
-define(SORTMALT, 100).
-% @doc Same semantics as in module
-% lists.
-% @spec (List) -> list()
+%% @doc Same semantics as in module
+%% lists.
+-spec sort/1 :: (list()) -> list().
sort(List) ->
sort(fun (A, B) ->
A =< B
end,
List).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List) -> list()
+%% @doc Same semantics as in module
+%% lists.
+-spec sort/2 :: (fun(), list()) -> list().
sort(Fun, List) ->
sort(Fun, List, ?SORTMALT).
-% @doc This version lets you specify your own malt for sort.
-%
-% sort splits the list into sublists and sorts them, and it merges the
-% sorted lists together. These are done in parallel. Each sublist is
-% sorted in a seperate process, and each merging of results is done in a
-% seperate process. Malt defaults to 100, causing the list to be split into
-% 100-element sublists.
-% @spec (Fun, List, Malt) -> list()
+%% @doc This version lets you specify your own malt for sort.
+%%
+%% sort splits the list into sublists and sorts them, and it merges the
+%% sorted lists together. These are done in parallel. Each sublist is
+%% sorted in a seperate process, and each merging of results is done in a
+%% seperate process. Malt defaults to 100, causing the list to be split into
+%% 100-element sublists.
+-spec sort/3 :: (fun(), list(), malt()) -> list().
sort(Fun, List, Malt) ->
Fun2 = fun (L) ->
- lists:sort(Fun, L)
- end,
+ lists:sort(Fun, L)
+ end,
Fuse = fun (A1, A2) ->
- lists:merge(Fun, A1, A2)
- end,
+ lists:merge(Fun, A1, A2)
+ end,
runmany(Fun2, {recursive, Fuse}, List, Malt).
-% @doc Same semantics as in module
-% lists.
-% @spec (List) -> list()
+%% @doc Same semantics as in module
+%% lists.
+-spec usort/1 :: (list()) -> list().
usort(List) ->
usort(fun (A, B) ->
- A =< B
- end,
+ A =< B
+ end,
List).
-% @doc Same semantics as in module
-% lists.
-% @spec (Fun, List) -> list()
+%% @doc Same semantics as in module
+%% lists.
+-spec usort/2 :: (fun(), list()) -> list().
usort(Fun, List) ->
usort(Fun, List, ?SORTMALT).
-% @doc This version lets you specify your own malt for usort.
-%
-% usort splits the list into sublists and sorts them, and it merges the
-% sorted lists together. These are done in parallel. Each sublist is
-% sorted in a seperate process, and each merging of results is done in a
-% seperate process. Malt defaults to 100, causing the list to be split into
-% 100-element sublists.
-%
-% usort removes duplicate elments while it sorts.
-% @spec (Fun, List, Malt) -> list()
+%% @doc This version lets you specify your own malt for usort.
+%%
+%% usort splits the list into sublists and sorts them, and it merges the
+%% sorted lists together. These are done in parallel. Each sublist is
+%% sorted in a seperate process, and each merging of results is done in a
+%% seperate process. Malt defaults to 100, causing the list to be split into
+%% 100-element sublists.
+%%
+%% usort removes duplicate elments while it sorts.
+-spec usort/3 :: (fun(), list(), malt()) -> list().
usort(Fun, List, Malt) ->
Fun2 = fun (L) ->
- lists:usort(Fun, L)
- end,
+ lists:usort(Fun, L)
+ end,
Fuse = fun (A1, A2) ->
- lists:umerge(Fun, A1, A2)
- end,
+ lists:umerge(Fun, A1, A2)
+ end,
runmany(Fun2, {recursive, Fuse}, List, Malt).
-% @doc Like below, assumes default MapMalt of 1.
-% @spec (MapFunc, List) -> Dict
-% MapFunc = (term()) -> DeepListOfKeyValuePairs
-% DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value}
+%% @doc Like below, assumes default MapMalt of 1.
+-spec mapreduce/2 :: (MapFunc, list()) -> dict() when
+ MapFunc :: fun((term()) -> DeepListOfKeyValuePairs),
+ DeepListOfKeyValuePairs :: [DeepListOfKeyValuePairs] | {Key::term(), Value::term()}.
mapreduce(MapFunc, List) ->
mapreduce(MapFunc, List, 1).
-% Like below, but uses a default reducer that collects all
-% {Key, Value} pairs into a
-% dict,
-% with values {Key, [Value1, Value2...]}.
-% This dict is returned as the result.
+%% Like below, but uses a default reducer that collects all
+%% {Key, Value} pairs into a
+%% dict,
+%% with values {Key, [Value1, Value2...]}.
+%% This dict is returned as the result.
mapreduce(MapFunc, List, MapMalt) ->
mapreduce(MapFunc, List, dict:new(), fun add_key/3, MapMalt).
-% @doc This is a very basic mapreduce. You won't write a Google-rivaling
-% search engine with it. It has no equivalent in lists. Each
-% element in the list is run through the MapFunc, which produces either
-% a {Key, Value} pair, or a lists of key value pairs, or a list of lists of
-% key value pairs...etc. A reducer process runs in parallel with the mapping
-% processes, collecting the key value pairs. It starts with a state given by
-% InitState, and for each {Key, Value} pair that it receives it invokes
-% ReduceFunc(OldState, Key, Value) to compute its new state. mapreduce returns
-% the reducer's final state.
-%
-% MapMalt is the malt for the mapping operation, with a default value of 1,
-% meaning each element of the list is mapped by a seperate process.
-%
-% mapreduce requires OTP R11B, or it may leave monitoring messages in the
-% message queue.
-% @spec (MapFunc, List, InitState, ReduceFunc, MapMalt) -> Dict
-% MapFunc = (term()) -> DeepListOfKeyValuePairs
-% DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value}
-% ReduceFunc = (OldState::term(), Key::term(), Value::term() -> NewState::term()
+%% @doc This is a very basic mapreduce. You won't write a
+%% Google-rivaling search engine with it. It has no equivalent in
+%% lists. Each element in the list is run through the MapFunc, which
+%% produces either a {Key, Value} pair, or a lists of key value pairs,
+%% or a list of lists of key value pairs...etc. A reducer process runs
+%% in parallel with the mapping processes, collecting the key value
+%% pairs. It starts with a state given by InitState, and for each
+%% {Key, Value} pair that it receives it invokes ReduceFunc(OldState,
+%% Key, Value) to compute its new state. mapreduce returns the
+%% reducer's final state.
+%%
+%% MapMalt is the malt for the mapping operation, with a default value of 1,
+%% meaning each element of the list is mapped by a seperate process.
+%%
+%% mapreduce requires OTP R11B, or it may leave monitoring messages in the
+%% message queue.
+-spec mapreduce/5 :: (MapFunc, list(), InitState::term(), ReduceFunc, malt()) -> dict() when
+ MapFunc :: fun((term()) -> DeepListOfKeyValuePairs),
+ DeepListOfKeyValuePairs :: [DeepListOfKeyValuePairs] | {Key::term(), Value::term()},
+ ReduceFunc :: fun((OldState::term(), Key::term(), Value::term()) -> NewState::term()).
mapreduce(MapFunc, List, InitState, ReduceFunc, MapMalt) ->
Parent = self(),
{Reducer, ReducerRef} =
@@ -446,7 +523,8 @@ mapreduce(MapFunc, List, InitState, ReduceFunc, MapMalt) ->
Reducer ! lists:map(MapFunc, L),
1
end,
- SentMessages = try runmany(MapFunc2, fun (A, B) -> A+B end, List, MapMalt)
+ SentMessages = try
+ runmany(MapFunc2, fun (A, B) -> A+B end, List, MapMalt)
catch
exit:Reason ->
erlang:demonitor(ReducerRef, [flush]),
@@ -491,94 +569,94 @@ add_key(Dict, Key, Value) ->
dict:store(Key, [Value], Dict)
end.
-% @doc Like below, but assumes a Malt of 1,
-% meaning each element of the list is processed by a seperate process.
-% @spec (Fun, Fuse, List) -> term()
+%% @doc Like below, but assumes a Malt of 1,
+%% meaning each element of the list is processed by a seperate process.
+-spec runmany/3 :: (fun(), fuse(), list()) -> term().
runmany(Fun, Fuse, List) ->
runmany(Fun, Fuse, List, 1).
-% Begin internal stuff (though runmany/4 is exported).
+%% Begin internal stuff (though runmany/4 is exported).
-% @doc All of the other functions are implemented with runmany. runmany
-% takes a List, splits it into sublists, and starts processes to operate on
-% each sublist, all done according to Malt. Each process passes its sublist
-% into Fun and sends the result back.
-%
-% The results are then fused together to get the final result. There are two
-% ways this can operate, lineraly and recursively. If Fuse is a function,
-% a fuse is done linearly left-to-right on the sublists, the results
-% of processing the first and second sublists being passed to Fuse, then
-% the result of the first fuse and processing the third sublits, and so on. If
-% Fuse is {reverse, FuseFunc}, then a fuse is done right-to-left, the results
-% of processing the second-to-last and last sublists being passed to FuseFunc,
-% then the results of processing the third-to-last sublist and
-% the results of the first fuse, and and so forth.
-% Both methods preserve the original order of the lists elements.
-%
-% To do a recursive fuse, pass Fuse as {recursive, FuseFunc}.
-% The recursive fuse makes no guarantee about the order the results of
-% sublists, or the results of fuses are passed to FuseFunc. It
-% continues fusing pairs of results until it is down to one.
-%
-% Recursive fuse is down in parallel with processing the sublists, and a
-% process is spawned to fuse each pair of results. It is a parallized
-% algorithm. Linear fuse is done after all results of processing sublists
-% have been collected, and can only run in a single process.
-%
-% Even if you pass {recursive, FuseFunc}, a recursive fuse is only done if
-% the malt contains {nodes, NodeList} or {processes, X}. If this is not the
-% case, a linear fuse is done.
-% @spec (Fun, Fuse, List, Malt) -> term()
-% Fun = (list()) -> term()
-% Fuse = FuseFunc | {recursive, FuseFunc}
-% FuseFunc = (term(), term()) -> term()
-runmany(Fun, Fuse, List, Malt) when is_list(Malt) ->
+%% @doc All of the other functions are implemented with runmany. runmany
+%% takes a List, splits it into sublists, and starts processes to operate on
+%% each sublist, all done according to Malt. Each process passes its sublist
+%% into Fun and sends the result back.
+%%
+%% The results are then fused together to get the final result. There are two
+%% ways this can operate, lineraly and recursively. If Fuse is a function,
+%% a fuse is done linearly left-to-right on the sublists, the results
+%% of processing the first and second sublists being passed to Fuse, then
+%% the result of the first fuse and processing the third sublits, and so on. If
+%% Fuse is {reverse, FuseFunc}, then a fuse is done right-to-left, the results
+%% of processing the second-to-last and last sublists being passed to FuseFunc,
+%% then the results of processing the third-to-last sublist and
+%% the results of the first fuse, and and so forth.
+%% Both methods preserve the original order of the lists elements.
+%%
+%% To do a recursive fuse, pass Fuse as {recursive, FuseFunc}.
+%% The recursive fuse makes no guarantee about the order the results of
+%% sublists, or the results of fuses are passed to FuseFunc. It
+%% continues fusing pairs of results until it is down to one.
+%%
+%% Recursive fuse is down in parallel with processing the sublists, and a
+%% process is spawned to fuse each pair of results. It is a parallized
+%% algorithm. Linear fuse is done after all results of processing sublists
+%% have been collected, and can only run in a single process.
+%%
+%% Even if you pass {recursive, FuseFunc}, a recursive fuse is only done if
+%% the malt contains {nodes, NodeList} or {processes, X}. If this is not the
+%% case, a linear fuse is done.
+-spec runmany/4 :: (fun(([term()]) -> term()), fuse(), list(), malt()) -> term().
+runmany(Fun, Fuse, List, Malt)
+ when erlang:is_list(Malt) ->
runmany(Fun, Fuse, List, local, no_split, Malt);
runmany(Fun, Fuse, List, Malt) ->
runmany(Fun, Fuse, List, [Malt]).
-runmany(Fun, Fuse, List, Nodes, no_split, [MaltTerm|Malt]) when is_integer(MaltTerm) ->
+runmany(Fun, Fuse, List, Nodes, no_split, [MaltTerm|Malt])
+ when erlang:is_integer(MaltTerm) ->
runmany(Fun, Fuse, List, Nodes, MaltTerm, Malt);
-% run a process for each scheduler
runmany(Fun, Fuse, List, local, Split, [{processes, schedulers}|Malt]) ->
+ %% run a process for each scheduler
S = erlang:system_info(schedulers),
runmany(Fun, Fuse, List, local, Split, [{processes, S}|Malt]);
-% Split the list into X sublists, where X is the number of processes
runmany(Fun, Fuse, List, local, no_split, [{processes, X}|_]=Malt) ->
- L = length(List),
- case L rem X of
+ %% Split the list into X sublists, where X is the number of processes
+ L = erlang:length(List),
+ case (L rem X) of
0 ->
- runmany(Fun, Fuse, List, local, L div X, Malt);
+ runmany(Fun, Fuse, List, local, (L / X), Malt);
_ ->
- runmany(Fun, Fuse, List, local, L div X + 1, Malt)
+ runmany(Fun, Fuse, List, local, (L / X) + 1, Malt)
end;
-% run X process on local machine
runmany(Fun, Fuse, List, local, Split, [{processes, X}|Malt]) ->
+ %% run X process on local machine
Nodes = lists:duplicate(X, node()),
runmany(Fun, Fuse, List, Nodes, Split, Malt);
runmany(Fun, Fuse, List, Nodes, Split, [{timeout, X}|Malt]) ->
- Parent = self(),
- Timer = spawn(fun () ->
- receive
- stoptimer ->
- Parent ! {timerstopped, self()}
- after X ->
- Parent ! {timerrang, self()},
- receive
- stoptimer ->
- Parent ! {timerstopped, self()}
- end
- end
- end),
- Ans = try runmany(Fun, Fuse, List, Nodes, Split, Malt)
+ Parent = erlang:self(),
+ Timer = proc_lib:spawn(fun () ->
+ receive
+ stoptimer ->
+ Parent ! {timerstopped, erlang:self()}
+ after X ->
+ Parent ! {timerrang, erlang:self()},
+ receive
+ stoptimer ->
+ Parent ! {timerstopped, erlang:self()}
+ end
+ end
+ end),
+ Ans = try
+ runmany(Fun, Fuse, List, Nodes, Split, Malt)
catch
- % we really just want the after block, the syntax
- % makes this catch necessary.
+ %% we really just want the after block, the syntax
+ %% makes this catch necessary.
willneverhappen ->
nil
after
- Timer ! stoptimer,
- cleanup_timer(Timer)
+ Timer ! stoptimer,
+ cleanup_timer(Timer)
end,
Ans;
runmany(Fun, Fuse, List, local, Split, [{nodes, NodeList}|Malt]) ->
@@ -592,12 +670,12 @@ runmany(Fun, Fuse, List, local, Split, [{nodes, NodeList}|Malt]) ->
end,
[], NodeList),
runmany(Fun, Fuse, List, Nodes, Split, Malt);
-% local recursive fuse, for when we weren't invoked with {processes, X}
-% or {nodes, NodeList}. Degenerates recursive fuse into linear fuse.
runmany(Fun, {recursive, Fuse}, List, local, Split, []) ->
+ %% local recursive fuse, for when we weren't invoked with {processes, X}
+ %% or {nodes, NodeList}. Degenerates recursive fuse into linear fuse.
runmany(Fun, Fuse, List, local, Split, []);
-% by default, operate on each element seperately
runmany(Fun, Fuse, List, Nodes, no_split, []) ->
+ %% by default, operate on each element seperately
runmany(Fun, Fuse, List, Nodes, 1, []);
runmany(Fun, Fuse, List, local, Split, []) ->
List2 = splitmany(List, Split),
@@ -615,10 +693,10 @@ cleanup_timer(Timer) ->
end.
schedulers_on_node(Node) ->
- case get(plists_schedulers_on_nodes) of
+ case erlang:get(ec_plists_schedulers_on_nodes) of
undefined ->
X = determine_schedulers(Node),
- put(plists_schedulers_on_nodes,
+ erlang:put(ec_plists_schedulers_on_nodes,
dict:store(Node, X, dict:new())),
X;
Dict ->
@@ -627,17 +705,17 @@ schedulers_on_node(Node) ->
dict:fetch(Node, Dict);
false ->
X = determine_schedulers(Node),
- put(plists_schedulers_on_nodes,
+ erlang:put(ec_plists_schedulers_on_nodes,
dict:store(Node, X, Dict)),
X
end
end.
determine_schedulers(Node) ->
- Parent = self(),
- Child = spawn(Node, fun () ->
- Parent ! {self(), erlang:system_info(schedulers)}
- end),
+ Parent = erlang:self(),
+ Child = proc_lib:spawn(Node, fun () ->
+ Parent ! {self(), erlang:system_info(schedulers)}
+ end),
erlang:monitor(process, Child),
receive
{Child, X} ->
@@ -650,21 +728,22 @@ determine_schedulers(Node) ->
0
end.
-% local runmany, for when we weren't invoked with {processes, X}
-% or {nodes, NodeList}. Every sublist is processed in parallel.
+%% @doc local runmany, for when we weren't invoked with {processes, X}
+%% or {nodes, NodeList}. Every sublist is processed in parallel.
local_runmany(Fun, Fuse, List) ->
Parent = self (),
Pids = lists:map(fun (L) ->
F = fun () ->
- Parent !
- {self (), Fun(L)}
+ Parent ! {self (), Fun(L)}
end,
{Pid, _} = erlang:spawn_monitor(F),
Pid
end,
List),
- Answers = try lists:map(fun receivefrom/1, Pids)
- catch throw:Message ->
+ Answers = try
+ lists:map(fun receivefrom/1, Pids)
+ catch
+ throw:Message ->
{BadPid, Reason} = Message,
handle_error(BadPid, Reason, Pids)
end,
@@ -678,70 +757,74 @@ receivefrom(Pid) ->
{Pid, R} ->
R;
{'DOWN', _, _, BadPid, Reason} when Reason =/= normal ->
- throw({BadPid, Reason});
+ erlang:throw({BadPid, Reason});
{timerrang, _} ->
- throw({nil, timeout})
+ erlang:throw({nil, timeout})
end.
-% Convert List into [{Number, Sublist}]
+%% Convert List into [{Number, Sublist}]
cluster_runmany(Fun, Fuse, List, Nodes) ->
{List2, _} = lists:foldl(fun (X, {L, Count}) ->
- {[{Count, X}|L], Count+1}
- end,
- {[], 0}, List),
+ {[{Count, X}|L], Count+1}
+ end,
+ {[], 0}, List),
cluster_runmany(Fun, Fuse, List2, Nodes, [], []).
-% Add a pair of results into the TaskList as a fusing task
+%% @doc Add a pair of results into the TaskList as a fusing task
cluster_runmany(Fun, {recursive, Fuse}, [], Nodes, Running,
[{_, R1}, {_, R2}|Results]) ->
cluster_runmany(Fun, {recursive, Fuse}, [{fuse, R1, R2}], Nodes,
Running, Results);
-% recursive fuse done, return result
cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], [{_, Result}]) ->
+ %% recursive fuse done, return result
Result;
-% edge case where we are asked to do nothing
cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], []) ->
+ %% edge case where we are asked to do nothing
[];
-% We're done, now we just have to [linear] fuse the results
cluster_runmany(_, Fuse, [], _Nodes, [], Results) ->
- fuse(Fuse, lists:map(fun ({_, R}) -> R end,
+ %% We're done, now we just have to [linear] fuse the results
+ fuse(Fuse, lists:map(fun ({_, R}) ->
+ R
+ end,
lists:sort(fun ({A, _}, {B, _}) ->
A =< B
end,
lists:reverse(Results))));
-% We have a ready node and a sublist or fuse to be processed, so we start
-% a new process
cluster_runmany(Fun, Fuse, [Task|TaskList], [N|Nodes], Running, Results) ->
- Parent = self(),
+%% We have a ready node and a sublist or fuse to be processed, so we start
+%% a new process
+
+ Parent = erlang:self(),
case Task of
{Num, L2} ->
Fun2 = fun () ->
- Parent ! {self(), Num, Fun(L2)}
+ Parent ! {erlang:self(), Num, Fun(L2)}
end;
{fuse, R1, R2} ->
{recursive, FuseFunc} = Fuse,
Fun2 = fun () ->
- Parent ! {self(), fuse, FuseFunc(R1, R2)}
+ Parent ! {erlang:self(), fuse, FuseFunc(R1, R2)}
end
end,
Fun3 = fun () ->
- try Fun2()
+ try
+ Fun2()
catch
- exit:siblingdied ->
+ exit:siblingdied ->
ok;
- exit:Reason ->
- Parent ! {self(), error, Reason};
- error:R ->
- Parent ! {self(), error, {R, erlang:get_stacktrace()}};
- throw:R ->
- Parent ! {self(), error, {{nocatch, R}, erlang:get_stacktrace()}}
- end
+ exit:Reason ->
+ Parent ! {erlang:self(), error, Reason};
+ error:R ->
+ Parent ! {erlang:self(), error, {R, erlang:get_stacktrace()}};
+ throw:R ->
+ Parent ! {erlang:self(), error, {{nocatch, R}, erlang:get_stacktrace()}}
+ end
end,
- Pid = spawn(N, Fun3),
+ Pid = proc_lib:spawn(N, Fun3),
erlang:monitor(process, Pid),
cluster_runmany(Fun, Fuse, TaskList, Nodes, [{Pid, N, Task}|Running], Results);
-% We can't start a new process, but can watch over already running ones
cluster_runmany(Fun, Fuse, TaskList, Nodes, Running, Results) when length(Running) > 0 ->
+ %% We can't start a new process, but can watch over already running ones
receive
{_Pid, error, Reason} ->
RunningPids = lists:map(fun ({Pid, _, _}) ->
@@ -750,9 +833,10 @@ cluster_runmany(Fun, Fuse, TaskList, Nodes, Running, Results) when length(Runnin
Running),
handle_error(junkvalue, Reason, RunningPids);
{Pid, Num, Result} ->
- % throw out the exit message, Reason should be
- % normal, noproc, or noconnection
- receive {'DOWN', _, _, Pid, _Reason} ->
+ %% throw out the exit message, Reason should be
+ %% normal, noproc, or noconnection
+ receive
+ {'DOWN', _, _, Pid, _Reason} ->
nil
end,
{Running2, FinishedNode, _} = delete_running(Pid, Running, []),
@@ -764,16 +848,16 @@ cluster_runmany(Fun, Fuse, TaskList, Nodes, Running, Results) when length(Runnin
end,
Running),
handle_error(nil, timeout, RunningPids);
- % node failure
+ %% node failure
{'DOWN', _, _, Pid, noconnection} ->
{Running2, _DeadNode, Task} = delete_running(Pid, Running, []),
cluster_runmany(Fun, Fuse, [Task|TaskList], Nodes,
Running2, Results);
- % could a noproc exit message come before the message from
- % the process? we are assuming it can't.
- % this clause is unlikely to get invoked due to cluster_runmany's
- % spawned processes. It will still catch errors in mapreduce's
- % reduce process, however.
+ %% could a noproc exit message come before the message from
+ %% the process? we are assuming it can't.
+ %% this clause is unlikely to get invoked due to cluster_runmany's
+ %% spawned processes. It will still catch errors in mapreduce's
+ %% reduce process, however.
{'DOWN', _, _, BadPid, Reason} when Reason =/= normal ->
RunningPids = lists:map(fun ({Pid, _, _}) ->
Pid
@@ -781,9 +865,9 @@ cluster_runmany(Fun, Fuse, TaskList, Nodes, Running, Results) when length(Runnin
Running),
handle_error(BadPid, Reason, RunningPids)
end;
-% We have data, but no nodes either available or occupied
cluster_runmany(_, _, [_Non|_Empty], []=_Nodes, []=_Running, _) ->
- exit(allnodescrashed).
+%% We have data, but no nodes either available or occupied
+ erlang:exit(allnodescrashed).
delete_running(Pid, [{Pid, Node, List}|Running], Acc) ->
{Running ++ Acc, Node, List};
@@ -792,12 +876,12 @@ delete_running(Pid, [R|Running], Acc) ->
handle_error(BadPid, Reason, Pids) ->
lists:foreach(fun (Pid) ->
- exit(Pid, siblingdied)
+ erlang:exit(Pid, siblingdied)
end, Pids),
lists:foreach(fun (Pid) ->
error_cleanup(Pid, BadPid)
end, Pids),
- exit(Reason).
+ erlang:exit(Reason).
error_cleanup(BadPid, BadPid) ->
ok;
@@ -817,7 +901,7 @@ normal_cleanup(Pid) ->
ok
end.
-% edge case
+%% edge case
fuse(_, []) ->
[];
fuse({reverse, _}=Fuse, Results) ->
@@ -833,9 +917,9 @@ fuse(Fuse, [R2|Results], R1) ->
fuse(_, [], R) ->
R.
-% Splits a list into a list of sublists, each of size Size,
-% except for the last element which is less if the original list
-% could not be evenly divided into Size-sized lists.
+%% @doc Splits a list into a list of sublists, each of size Size,
+%% except for the last element which is less if the original list
+%% could not be evenly divided into Size-sized lists.
splitmany(List, Size) ->
splitmany(List, [], Size).
@@ -845,8 +929,8 @@ splitmany(List, Acc, Size) ->
{Top, NList} = split(Size, List),
splitmany(NList, [Top|Acc], Size).
-% Like lists:split, except it splits a list smaller than its first
-% parameter
+%% @doc Like lists:split, except it splits a list smaller than its first
+%% parameter
split(Size, List) ->
split(Size, List, []).
diff --git a/test/ec_plists_tests.erl b/test/ec_plists_tests.erl
new file mode 100644
index 0000000..7acefe6
--- /dev/null
+++ b/test/ec_plists_tests.erl
@@ -0,0 +1,75 @@
+%%% @copyright Erlware, LLC.
+-module(ec_plists_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%%%===================================================================
+%%% Tests
+%%%===================================================================
+
+map_good_test() ->
+ Results = ec_plists:map(fun(_) ->
+ ok
+ end,
+ lists:seq(1, 5)),
+ ?assertMatch([ok, ok, ok, ok, ok],
+ Results).
+
+ftmap_good_test() ->
+ Results = ec_plists:ftmap(fun(_) ->
+ ok
+ end,
+ lists:seq(1, 3)),
+ ?assertMatch([{value, ok}, {value, ok}, {value, ok}],
+ Results).
+
+filter_good_test() ->
+ Results = ec_plists:filter(fun(X) ->
+ X == show
+ end,
+ [show, show, remove]),
+ ?assertMatch([show, show],
+ Results).
+
+map_timeout_test() ->
+ ?assertExit(timeout,
+ ec_plists:map(fun(T) ->
+ timer:sleep(T),
+ T
+ end,
+ [1, 100], {timeout, 10})).
+
+ftmap_timeout_test() ->
+ ?assertExit(timeout,
+ ec_plists:ftmap(fun(X) ->
+ timer:sleep(X),
+ true
+ end,
+ [100, 1], {timeout, 10})).
+
+filter_timeout_test() ->
+ ?assertExit(timeout,
+ ec_plists:filter(fun(T) ->
+ timer:sleep(T),
+ T == 1
+ end,
+ [1, 100], {timeout, 10})).
+
+map_bad_test() ->
+ ?assertExit({{nocatch,test_exception}, _},
+ ec_plists:map(fun(_) ->
+ erlang:throw(test_exception)
+ end,
+ lists:seq(1, 5))).
+
+
+ftmap_bad_test() ->
+ Results =
+ ec_plists:ftmap(fun(2) ->
+ erlang:throw(test_exception);
+ (N) ->
+ N
+ end,
+ lists:seq(1, 5)),
+ ?assertMatch([{value, 1}, {error,{throw,test_exception}}, {value, 3},
+ {value, 4}, {value, 5}] , Results).