2012-10-25 11:52:47 -05:00
|
|
|
% @author Stephen Marsh
|
|
|
|
% @copyright 2007 Stephen Marsh freeyourmind ++ [$@|gmail.com]
|
|
|
|
% @doc plists is a drop-in replacement for module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>,
|
|
|
|
% making most list operations parallel. It can operate on each element in
|
|
|
|
% parallel, for IO-bound operations, on sublists in parallel, for
|
|
|
|
% taking advantage of multi-core machines with CPU-bound operations, and
|
|
|
|
% across erlang nodes, for parallizing inside a cluster. It handles
|
|
|
|
% errors and node failures. It can be configured, tuned, and tweaked to
|
|
|
|
% get optimal performance while minimizing overhead.
|
|
|
|
%
|
|
|
|
% Almost all the functions are
|
|
|
|
% identical to equivalent functions in lists, returning exactly the same
|
|
|
|
% result, and having both a form with an identical syntax that operates on
|
|
|
|
% each element in parallel and a form which takes an optional "malt",
|
|
|
|
% a specification for how to parallize the operation.
|
|
|
|
%
|
|
|
|
% fold is the one exception, parallel fold is different from linear fold.
|
|
|
|
% This module also include a simple mapreduce implementation, and the
|
|
|
|
% function runmany. All the other functions are implemented with runmany,
|
|
|
|
% which is as a generalization of parallel list operations.
|
|
|
|
%
|
|
|
|
% == Malts ==
|
|
|
|
% A malt specifies how to break a list into sublists, and can optionally
|
|
|
|
% specify a timeout, which nodes to run on, and how many processes to start
|
|
|
|
% per node.
|
|
|
|
%
|
|
|
|
% Malt = MaltComponent | [MaltComponent]<br/>
|
|
|
|
% MaltComponent = SubListSize::integer() | {processes, integer()} |
|
|
|
|
% {processes, schedulers} |
|
|
|
|
% {timeout, Milliseconds::integer()} | {nodes, [NodeSpec]}<br/>
|
|
|
|
% NodeSpec = Node::atom() | {Node::atom(), NumProcesses::integer()} |
|
|
|
|
% {Node::atom(), schedulers}
|
|
|
|
%
|
|
|
|
% An integer can be given to specify the exact size for
|
|
|
|
% sublists. 1 is a good choice for IO-bound operations and when
|
|
|
|
% the operation on each list element is expensive. Larger numbers
|
|
|
|
% minimize overhead and are faster for cheap operations.
|
|
|
|
%
|
|
|
|
% If the integer is omitted, and
|
|
|
|
% you have specified a {processes, X}, the list is
|
|
|
|
% split into X sublists. This is only
|
|
|
|
% useful when the time to process each element is close to identical and you
|
|
|
|
% know exactly how many lines of execution are available to you.
|
|
|
|
%
|
|
|
|
% If neither of the above applies, the sublist size defaults to 1.
|
|
|
|
%
|
|
|
|
% You can use {processes, X} to have the list processed
|
|
|
|
% by X processes on the local machine. A good choice for X is the number of
|
|
|
|
% lines of execution (cores) the machine provides. This can be done
|
|
|
|
% automatically with {processes, schedulers}, which sets
|
|
|
|
% the number of processes to the number of schedulers in the erlang virtual
|
|
|
|
% machine (probably equal to the number of cores).
|
|
|
|
%
|
|
|
|
% {timeout, Milliseconds} specifies a timeout. This is a timeout for the entire
|
|
|
|
% operation, both operating on the sublists and combining the results.
|
|
|
|
% exit(timeout) is evaluated if the timeout is exceeded.
|
|
|
|
%
|
|
|
|
% {nodes, NodeList} specifies that the operation should be done across nodes.
|
|
|
|
% Every element of NodeList is of the form {NodeName, NumProcesses} or
|
|
|
|
% NodeName, which means the same as {NodeName, 1}. plists runs
|
|
|
|
% NumProcesses processes on NodeName concurrently. A good choice for
|
|
|
|
% NumProcesses is the number of lines of execution (cores) a node provides
|
|
|
|
% plus one. This ensures the node is completely busy even when
|
|
|
|
% fetching a new sublist. This can be done automatically with
|
|
|
|
% {NodeName, schedulers}, in which case
|
|
|
|
% plists uses a cached value if it has one, and otherwise finds the number of
|
|
|
|
% schedulers in the remote node and adds one. This will ensure at least one
|
|
|
|
% busy process per core (assuming the node has a scheduler for each core).
|
|
|
|
%
|
|
|
|
% plists is able to recover if a node goes down.
|
|
|
|
% If all nodes go down, exit(allnodescrashed) is evaluated.
|
|
|
|
%
|
|
|
|
% Any of the above may be used as a malt, or may be combined into a list.
|
|
|
|
% {nodes, NodeList} and {processes, X} may not be combined.
|
|
|
|
%
|
|
|
|
% === Examples ===
|
|
|
|
% % start a process for each element (1-element sublists)<br/>
|
|
|
|
% 1
|
|
|
|
%
|
|
|
|
% % start a process for each ten elements (10-element sublists)<br/>
|
|
|
|
% 10
|
|
|
|
%
|
|
|
|
% % split the list into two sublists and process in two processes<br/>
|
|
|
|
% {processes, 2}
|
|
|
|
%
|
|
|
|
% % split the list into X sublists and process in X processes,<br/>
|
|
|
|
% % where X is the number of cores in the machine<br/>
|
|
|
|
% {processes, schedulers}
|
|
|
|
%
|
|
|
|
% % split the list into 10-element sublists and process in two processes<br/>
|
|
|
|
% [10, {processes, 2}]
|
|
|
|
%
|
|
|
|
% % timeout after one second. Assumes that a process should be started<br/>
|
|
|
|
% % for each element.<br/>
|
|
|
|
% {timeout, 1000}
|
|
|
|
%
|
|
|
|
% % Runs 3 processes at a time on apple@desktop,
|
|
|
|
% and 2 on orange@laptop<br/>
|
|
|
|
% % This is the best way to utilize all the CPU-power of a dual-core<br/>
|
|
|
|
% % desktop and a single-core laptop. Assumes that the list should be<br/>
|
|
|
|
% % split into 1-element sublists.<br/>
|
|
|
|
% {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}
|
|
|
|
%
|
|
|
|
% Like above, but makes plists figure out how many processes to use.
|
|
|
|
% {nodes, [{apple@desktop, schedulers}, {orange@laptop, schedulers}]}
|
|
|
|
%
|
|
|
|
% % Gives apple and orange three seconds to process the list as<br/>
|
|
|
|
% % 100-element sublists.<br/>
|
|
|
|
% [100, {timeout, 3000}, {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}]
|
|
|
|
%
|
|
|
|
% === Aside: Why Malt? ===
|
|
|
|
% I needed a word for this concept, so maybe my subconsciousness gave me one by
|
|
|
|
% making me misspell multiply. Maybe it is an acronym for Malt is A List
|
|
|
|
% Tearing Specification. Maybe it is a beer metaphor, suggesting that code
|
|
|
|
% only runs in parallel if bribed with spirits. It's jargon, learn it
|
|
|
|
% or you can't be part of the in-group.
|
|
|
|
%
|
|
|
|
% == Messages and Errors ==
|
|
|
|
% plists assures that no extraneous messages are left in or will later
|
|
|
|
% enter the message queue. This is guaranteed even in the event of an error.
|
|
|
|
%
|
|
|
|
% Errors in spawned processes are caught and propagated to the calling
|
|
|
|
% process. If you invoke
|
|
|
|
%
|
|
|
|
% plists:map(fun (X) -> 1/X end, [1, 2, 3, 0]).
|
|
|
|
%
|
|
|
|
% you get a badarith error, exactly like when you use lists:map.
|
|
|
|
%
|
|
|
|
% plists uses monitors to watch the processes it spawns. It is not a good idea
|
|
|
|
% to invoke plists when you are already monitoring processes. If one of them
|
|
|
|
% does a non-normal exit, plists receives the 'DOWN' message believing it to be
|
|
|
|
% from one of its own processes. The error propagation system goes into
|
|
|
|
% effect, which results in the error occuring in the calling process.
|
|
|
|
%
|
|
|
|
% == License ==
|
|
|
|
% The MIT License
|
|
|
|
%
|
|
|
|
% Copyright (c) 2007 Stephen Marsh
|
|
|
|
%
|
|
|
|
% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
|
% of this software and associated documentation files (the "Software"), to deal
|
|
|
|
% in the Software without restriction, including without limitation the rights
|
|
|
|
% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
|
% copies of the Software, and to permit persons to whom the Software is
|
|
|
|
% furnished to do so, subject to the following conditions:
|
|
|
|
%
|
|
|
|
% The above copyright notice and this permission notice shall be included in
|
|
|
|
% all copies or substantial portions of the Software.
|
|
|
|
%
|
|
|
|
% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
|
% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
|
% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
|
% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
|
% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
|
|
% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
|
|
% THE SOFTWARE.
|
|
|
|
|
|
|
|
|
|
|
|
-module(plists).
|
|
|
|
-export([all/2, all/3, any/2, any/3, filter/2, filter/3,
|
|
|
|
fold/3, fold/4, fold/5, foreach/2, foreach/3, map/2, map/3,
|
|
|
|
partition/2, partition/3, sort/1, sort/2, sort/3,
|
|
|
|
usort/1, usort/2, usort/3, mapreduce/2, mapreduce/3, mapreduce/5,
|
|
|
|
runmany/3, runmany/4]).
|
|
|
|
|
|
|
|
% Everything here is defined in terms of runmany.
|
|
|
|
% The following methods are convient interfaces to runmany.
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List) -> bool()
|
|
|
|
all(Fun, List) ->
|
|
|
|
all(Fun, List, 1).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List, Malt) -> bool()
|
|
|
|
all(Fun, List, Malt) ->
|
|
|
|
try runmany(fun (L) ->
|
|
|
|
B = lists:all(Fun, L),
|
|
|
|
if B ->
|
|
|
|
nil;
|
|
|
|
true ->
|
|
|
|
exit(notall)
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
fun (_A1, _A2) ->
|
|
|
|
nil
|
|
|
|
end,
|
|
|
|
List, Malt) of
|
|
|
|
_ ->
|
|
|
|
true
|
|
|
|
catch exit:notall ->
|
|
|
|
false
|
|
|
|
end.
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List) -> bool()
|
|
|
|
any(Fun, List) ->
|
|
|
|
any(Fun, List, 1).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List, Malt) -> bool()
|
|
|
|
any(Fun, List, Malt) ->
|
|
|
|
try runmany(fun (L) ->
|
|
|
|
B = lists:any(Fun, L),
|
|
|
|
if B ->
|
|
|
|
exit(any);
|
|
|
|
true ->
|
|
|
|
nil
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
fun (_A1, _A2) ->
|
|
|
|
nil
|
|
|
|
end,
|
|
|
|
List, Malt) of
|
|
|
|
_ ->
|
|
|
|
false
|
|
|
|
catch exit:any ->
|
|
|
|
true
|
|
|
|
end.
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List) -> list()
|
2011-04-03 21:23:12 -05:00
|
|
|
filter(Fun, List) ->
|
2012-10-25 11:52:47 -05:00
|
|
|
filter(Fun, List, 1).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List, Malt) -> list()
|
|
|
|
filter(Fun, List, Malt) ->
|
|
|
|
runmany(fun (L) ->
|
|
|
|
lists:filter(Fun, L)
|
|
|
|
end,
|
|
|
|
{reverse, fun (A1, A2) ->
|
|
|
|
A1 ++ A2
|
|
|
|
end},
|
|
|
|
List, Malt).
|
|
|
|
|
|
|
|
% Note that with parallel fold there is not foldl and foldr,
|
|
|
|
% instead just one fold that can fuse Accumlators.
|
|
|
|
|
|
|
|
% @doc Like below, but assumes 1 as the Malt. This function is almost useless,
|
|
|
|
% and is intended only to aid converting code from using lists to plists.
|
|
|
|
% @spec (Fun, InitAcc, List) -> term()
|
|
|
|
fold(Fun, InitAcc, List) ->
|
|
|
|
fold(Fun, Fun, InitAcc, List, 1).
|
2011-04-03 21:23:12 -05:00
|
|
|
|
2012-10-25 11:52:47 -05:00
|
|
|
% @doc Like below, but uses the Fun as the Fuse by default.
|
|
|
|
% @spec (Fun, InitAcc, List, Malt) -> term()
|
|
|
|
fold(Fun, InitAcc, List, Malt) ->
|
|
|
|
fold(Fun, Fun, InitAcc, List, Malt).
|
2011-04-03 21:23:12 -05:00
|
|
|
|
2012-10-25 11:52:47 -05:00
|
|
|
% @doc fold is more complex when made parallel. There is no foldl and foldr,
|
|
|
|
% accumulators aren't passed in any defined order.
|
|
|
|
% The list is split into sublists which are folded together. Fun is
|
|
|
|
% identical to the function passed to lists:fold[lr], it takes
|
|
|
|
% (an element, and the accumulator) and returns -> a new accumulator.
|
|
|
|
% It is used for the initial stage of folding sublists. Fuse fuses together
|
|
|
|
% the results, it takes (Results1, Result2) and returns -> a new result.
|
|
|
|
% By default sublists are fused left to right, each result of a fuse being
|
|
|
|
% fed into the first element of the next fuse. The result of the last fuse
|
|
|
|
% is the result.
|
|
|
|
%
|
|
|
|
% Fusing may also run in parallel using a recursive algorithm,
|
|
|
|
% by specifying the fuse as {recursive, Fuse}. See
|
|
|
|
% the discussion in {@link runmany/4}.
|
|
|
|
%
|
|
|
|
% Malt is the malt for the initial folding of sublists, and for the
|
|
|
|
% possible recursive fuse.
|
|
|
|
% @spec (Fun, Fuse, InitAcc, List, Malt) -> term()
|
|
|
|
fold(Fun, Fuse, InitAcc, List, Malt) ->
|
|
|
|
Fun2 = fun (L) -> lists:foldl(Fun, InitAcc, L) end,
|
|
|
|
runmany(Fun2, Fuse, List, Malt).
|
|
|
|
|
|
|
|
% @doc Similiar to foreach in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>
|
|
|
|
% except it makes no guarantee about the order it processes list elements.
|
|
|
|
% @spec (Fun, List) -> void()
|
|
|
|
foreach(Fun, List) ->
|
|
|
|
foreach(Fun, List, 1).
|
|
|
|
|
|
|
|
% @doc Similiar to foreach in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>
|
|
|
|
% except it makes no guarantee about the order it processes list elements.
|
|
|
|
% @spec (Fun, List, Malt) -> void()
|
|
|
|
foreach(Fun, List, Malt) ->
|
|
|
|
runmany(fun (L) ->
|
|
|
|
lists:foreach(Fun, L)
|
|
|
|
end,
|
|
|
|
fun (_A1, _A2) ->
|
|
|
|
ok
|
|
|
|
end,
|
|
|
|
List, Malt).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List) -> list()
|
|
|
|
map(Fun, List) ->
|
|
|
|
map(Fun, List, 1).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List, Malt) -> list()
|
|
|
|
map(Fun, List, Malt) ->
|
|
|
|
runmany(fun (L) ->
|
|
|
|
lists:map(Fun, L)
|
|
|
|
end,
|
|
|
|
{reverse, fun (A1, A2) ->
|
|
|
|
A1 ++ A2
|
|
|
|
end},
|
|
|
|
List, Malt).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List) -> {list(), list()}
|
|
|
|
partition(Fun, List) ->
|
|
|
|
partition(Fun, List, 1).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List, Malt) -> {list(), list()}
|
|
|
|
partition(Fun, List, Malt) ->
|
|
|
|
runmany(fun (L) ->
|
|
|
|
lists:partition(Fun, L)
|
|
|
|
end,
|
|
|
|
{reverse, fun ({True1, False1}, {True2, False2}) ->
|
|
|
|
{True1 ++ True2, False1 ++ False2}
|
|
|
|
end},
|
|
|
|
List, Malt).
|
|
|
|
|
|
|
|
% SORTMALT needs to be tuned
|
|
|
|
-define(SORTMALT, 100).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (List) -> list()
|
|
|
|
sort(List) ->
|
|
|
|
sort(fun (A, B) ->
|
|
|
|
A =< B
|
|
|
|
end,
|
|
|
|
List).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List) -> list()
|
|
|
|
sort(Fun, List) ->
|
|
|
|
sort(Fun, List, ?SORTMALT).
|
|
|
|
|
|
|
|
% @doc This version lets you specify your own malt for sort.
|
|
|
|
%
|
|
|
|
% sort splits the list into sublists and sorts them, and it merges the
|
|
|
|
% sorted lists together. These are done in parallel. Each sublist is
|
|
|
|
% sorted in a seperate process, and each merging of results is done in a
|
|
|
|
% seperate process. Malt defaults to 100, causing the list to be split into
|
|
|
|
% 100-element sublists.
|
|
|
|
% @spec (Fun, List, Malt) -> list()
|
|
|
|
sort(Fun, List, Malt) ->
|
|
|
|
Fun2 = fun (L) ->
|
|
|
|
lists:sort(Fun, L)
|
|
|
|
end,
|
|
|
|
Fuse = fun (A1, A2) ->
|
|
|
|
lists:merge(Fun, A1, A2)
|
|
|
|
end,
|
|
|
|
runmany(Fun2, {recursive, Fuse}, List, Malt).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (List) -> list()
|
|
|
|
usort(List) ->
|
|
|
|
usort(fun (A, B) ->
|
|
|
|
A =< B
|
|
|
|
end,
|
|
|
|
List).
|
|
|
|
|
|
|
|
% @doc Same semantics as in module
|
|
|
|
% <a href="http://www.erlang.org/doc/man/lists.html">lists</a>.
|
|
|
|
% @spec (Fun, List) -> list()
|
|
|
|
usort(Fun, List) ->
|
|
|
|
usort(Fun, List, ?SORTMALT).
|
|
|
|
|
|
|
|
% @doc This version lets you specify your own malt for usort.
|
|
|
|
%
|
|
|
|
% usort splits the list into sublists and sorts them, and it merges the
|
|
|
|
% sorted lists together. These are done in parallel. Each sublist is
|
|
|
|
% sorted in a seperate process, and each merging of results is done in a
|
|
|
|
% seperate process. Malt defaults to 100, causing the list to be split into
|
|
|
|
% 100-element sublists.
|
|
|
|
%
|
|
|
|
% usort removes duplicate elments while it sorts.
|
|
|
|
% @spec (Fun, List, Malt) -> list()
|
|
|
|
usort(Fun, List, Malt) ->
|
|
|
|
Fun2 = fun (L) ->
|
|
|
|
lists:usort(Fun, L)
|
|
|
|
end,
|
|
|
|
Fuse = fun (A1, A2) ->
|
|
|
|
lists:umerge(Fun, A1, A2)
|
|
|
|
end,
|
|
|
|
runmany(Fun2, {recursive, Fuse}, List, Malt).
|
|
|
|
|
|
|
|
% @doc Like below, assumes default MapMalt of 1.
|
|
|
|
% @spec (MapFunc, List) -> Dict
|
|
|
|
% MapFunc = (term()) -> DeepListOfKeyValuePairs
|
|
|
|
% DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value}
|
|
|
|
mapreduce(MapFunc, List) ->
|
|
|
|
mapreduce(MapFunc, List, 1).
|
|
|
|
|
|
|
|
% Like below, but uses a default reducer that collects all
|
|
|
|
% {Key, Value} pairs into a
|
|
|
|
% <a href="http://www.erlang.org/doc/man/dict.html">dict</a>,
|
|
|
|
% with values {Key, [Value1, Value2...]}.
|
|
|
|
% This dict is returned as the result.
|
|
|
|
mapreduce(MapFunc, List, MapMalt) ->
|
|
|
|
mapreduce(MapFunc, List, dict:new(), fun add_key/3, MapMalt).
|
|
|
|
|
|
|
|
% @doc This is a very basic mapreduce. You won't write a Google-rivaling
|
|
|
|
% search engine with it. It has no equivalent in lists. Each
|
|
|
|
% element in the list is run through the MapFunc, which produces either
|
|
|
|
% a {Key, Value} pair, or a lists of key value pairs, or a list of lists of
|
|
|
|
% key value pairs...etc. A reducer process runs in parallel with the mapping
|
|
|
|
% processes, collecting the key value pairs. It starts with a state given by
|
|
|
|
% InitState, and for each {Key, Value} pair that it receives it invokes
|
|
|
|
% ReduceFunc(OldState, Key, Value) to compute its new state. mapreduce returns
|
|
|
|
% the reducer's final state.
|
|
|
|
%
|
|
|
|
% MapMalt is the malt for the mapping operation, with a default value of 1,
|
|
|
|
% meaning each element of the list is mapped by a seperate process.
|
|
|
|
%
|
|
|
|
% mapreduce requires OTP R11B, or it may leave monitoring messages in the
|
|
|
|
% message queue.
|
|
|
|
% @spec (MapFunc, List, InitState, ReduceFunc, MapMalt) -> Dict
|
|
|
|
% MapFunc = (term()) -> DeepListOfKeyValuePairs
|
|
|
|
% DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value}
|
|
|
|
% ReduceFunc = (OldState::term(), Key::term(), Value::term() -> NewState::term()
|
|
|
|
mapreduce(MapFunc, List, InitState, ReduceFunc, MapMalt) ->
|
|
|
|
Parent = self(),
|
|
|
|
{Reducer, ReducerRef} =
|
|
|
|
erlang:spawn_monitor(fun () ->
|
|
|
|
reducer(Parent, 0, InitState, ReduceFunc)
|
|
|
|
end),
|
|
|
|
MapFunc2 = fun (L) ->
|
|
|
|
Reducer ! lists:map(MapFunc, L),
|
|
|
|
1
|
|
|
|
end,
|
|
|
|
SentMessages = try runmany(MapFunc2, fun (A, B) -> A+B end, List, MapMalt)
|
|
|
|
catch
|
|
|
|
exit:Reason ->
|
|
|
|
erlang:demonitor(ReducerRef, [flush]),
|
|
|
|
Reducer ! die,
|
|
|
|
exit(Reason)
|
|
|
|
end,
|
|
|
|
Reducer ! {mappers, done, SentMessages},
|
|
|
|
Results = receive
|
|
|
|
{Reducer, Results2} ->
|
|
|
|
Results2;
|
|
|
|
{'DOWN', _, _, Reducer, Reason2} ->
|
|
|
|
exit(Reason2)
|
|
|
|
end,
|
2011-04-03 21:23:12 -05:00
|
|
|
receive
|
2012-10-25 11:52:47 -05:00
|
|
|
{'DOWN', _, _, Reducer, normal} ->
|
|
|
|
nil
|
|
|
|
end,
|
|
|
|
Results.
|
|
|
|
|
|
|
|
reducer(Parent, NumReceived, State, Func) ->
|
|
|
|
receive
|
|
|
|
die ->
|
|
|
|
nil;
|
|
|
|
{mappers, done, NumReceived} ->
|
|
|
|
Parent ! {self (), State};
|
|
|
|
Keys ->
|
|
|
|
reducer(Parent, NumReceived + 1, each_key(State, Func, Keys), Func)
|
2011-04-03 21:23:12 -05:00
|
|
|
end.
|
|
|
|
|
2012-10-25 11:52:47 -05:00
|
|
|
each_key(State, Func, {Key, Value}) ->
|
|
|
|
Func(State, Key, Value);
|
|
|
|
each_key(State, Func, [List|Keys]) ->
|
|
|
|
each_key(each_key(State, Func, List), Func, Keys);
|
|
|
|
each_key(State, _, []) ->
|
|
|
|
State.
|
2011-04-03 21:23:12 -05:00
|
|
|
|
2012-10-25 11:52:47 -05:00
|
|
|
add_key(Dict, Key, Value) ->
|
|
|
|
case dict:is_key(Key, Dict) of
|
|
|
|
true ->
|
|
|
|
dict:append(Key, Value, Dict);
|
|
|
|
false ->
|
|
|
|
dict:store(Key, [Value], Dict)
|
|
|
|
end.
|
|
|
|
|
|
|
|
% @doc Like below, but assumes a Malt of 1,
|
|
|
|
% meaning each element of the list is processed by a seperate process.
|
|
|
|
% @spec (Fun, Fuse, List) -> term()
|
|
|
|
runmany(Fun, Fuse, List) ->
|
|
|
|
runmany(Fun, Fuse, List, 1).
|
|
|
|
|
|
|
|
% Begin internal stuff (though runmany/4 is exported).
|
|
|
|
|
|
|
|
% @doc All of the other functions are implemented with runmany. runmany
|
|
|
|
% takes a List, splits it into sublists, and starts processes to operate on
|
|
|
|
% each sublist, all done according to Malt. Each process passes its sublist
|
|
|
|
% into Fun and sends the result back.
|
|
|
|
%
|
|
|
|
% The results are then fused together to get the final result. There are two
|
|
|
|
% ways this can operate, lineraly and recursively. If Fuse is a function,
|
|
|
|
% a fuse is done linearly left-to-right on the sublists, the results
|
|
|
|
% of processing the first and second sublists being passed to Fuse, then
|
|
|
|
% the result of the first fuse and processing the third sublits, and so on. If
|
|
|
|
% Fuse is {reverse, FuseFunc}, then a fuse is done right-to-left, the results
|
|
|
|
% of processing the second-to-last and last sublists being passed to FuseFunc,
|
|
|
|
% then the results of processing the third-to-last sublist and
|
|
|
|
% the results of the first fuse, and and so forth.
|
|
|
|
% Both methods preserve the original order of the lists elements.
|
|
|
|
%
|
|
|
|
% To do a recursive fuse, pass Fuse as {recursive, FuseFunc}.
|
|
|
|
% The recursive fuse makes no guarantee about the order the results of
|
|
|
|
% sublists, or the results of fuses are passed to FuseFunc. It
|
|
|
|
% continues fusing pairs of results until it is down to one.
|
|
|
|
%
|
|
|
|
% Recursive fuse is down in parallel with processing the sublists, and a
|
|
|
|
% process is spawned to fuse each pair of results. It is a parallized
|
|
|
|
% algorithm. Linear fuse is done after all results of processing sublists
|
|
|
|
% have been collected, and can only run in a single process.
|
|
|
|
%
|
|
|
|
% Even if you pass {recursive, FuseFunc}, a recursive fuse is only done if
|
|
|
|
% the malt contains {nodes, NodeList} or {processes, X}. If this is not the
|
|
|
|
% case, a linear fuse is done.
|
|
|
|
% @spec (Fun, Fuse, List, Malt) -> term()
|
|
|
|
% Fun = (list()) -> term()
|
|
|
|
% Fuse = FuseFunc | {recursive, FuseFunc}
|
|
|
|
% FuseFunc = (term(), term()) -> term()
|
|
|
|
runmany(Fun, Fuse, List, Malt) when is_list(Malt) ->
|
|
|
|
runmany(Fun, Fuse, List, local, no_split, Malt);
|
|
|
|
runmany(Fun, Fuse, List, Malt) ->
|
|
|
|
runmany(Fun, Fuse, List, [Malt]).
|
|
|
|
|
|
|
|
runmany(Fun, Fuse, List, Nodes, no_split, [MaltTerm|Malt]) when is_integer(MaltTerm) ->
|
|
|
|
runmany(Fun, Fuse, List, Nodes, MaltTerm, Malt);
|
|
|
|
% run a process for each scheduler
|
|
|
|
runmany(Fun, Fuse, List, local, Split, [{processes, schedulers}|Malt]) ->
|
|
|
|
S = erlang:system_info(schedulers),
|
|
|
|
runmany(Fun, Fuse, List, local, Split, [{processes, S}|Malt]);
|
|
|
|
% Split the list into X sublists, where X is the number of processes
|
|
|
|
runmany(Fun, Fuse, List, local, no_split, [{processes, X}|_]=Malt) ->
|
|
|
|
L = length(List),
|
|
|
|
case L rem X of
|
|
|
|
0 ->
|
|
|
|
runmany(Fun, Fuse, List, local, L div X, Malt);
|
|
|
|
_ ->
|
|
|
|
runmany(Fun, Fuse, List, local, L div X + 1, Malt)
|
2011-04-03 21:23:12 -05:00
|
|
|
end;
|
2012-10-25 11:52:47 -05:00
|
|
|
% run X process on local machine
|
|
|
|
runmany(Fun, Fuse, List, local, Split, [{processes, X}|Malt]) ->
|
|
|
|
Nodes = lists:duplicate(X, node()),
|
|
|
|
runmany(Fun, Fuse, List, Nodes, Split, Malt);
|
|
|
|
runmany(Fun, Fuse, List, Nodes, Split, [{timeout, X}|Malt]) ->
|
|
|
|
Parent = self(),
|
|
|
|
Timer = spawn(fun () ->
|
|
|
|
receive
|
|
|
|
stoptimer ->
|
|
|
|
Parent ! {timerstopped, self()}
|
|
|
|
after X ->
|
|
|
|
Parent ! {timerrang, self()},
|
|
|
|
receive
|
|
|
|
stoptimer ->
|
|
|
|
Parent ! {timerstopped, self()}
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end),
|
|
|
|
Ans = try runmany(Fun, Fuse, List, Nodes, Split, Malt)
|
|
|
|
catch
|
|
|
|
% we really just want the after block, the syntax
|
|
|
|
% makes this catch necessary.
|
|
|
|
willneverhappen ->
|
|
|
|
nil
|
|
|
|
after
|
|
|
|
Timer ! stoptimer,
|
|
|
|
cleanup_timer(Timer)
|
|
|
|
end,
|
|
|
|
Ans;
|
|
|
|
runmany(Fun, Fuse, List, local, Split, [{nodes, NodeList}|Malt]) ->
|
|
|
|
Nodes = lists:foldl(fun ({Node, schedulers}, A) ->
|
|
|
|
X = schedulers_on_node(Node) + 1,
|
|
|
|
lists:reverse(lists:duplicate(X, Node), A);
|
|
|
|
({Node, X}, A) ->
|
|
|
|
lists:reverse(lists:duplicate(X, Node), A);
|
|
|
|
(Node, A) ->
|
|
|
|
[Node|A]
|
|
|
|
end,
|
|
|
|
[], NodeList),
|
|
|
|
runmany(Fun, Fuse, List, Nodes, Split, Malt);
|
|
|
|
% local recursive fuse, for when we weren't invoked with {processes, X}
|
|
|
|
% or {nodes, NodeList}. Degenerates recursive fuse into linear fuse.
|
|
|
|
runmany(Fun, {recursive, Fuse}, List, local, Split, []) ->
|
|
|
|
runmany(Fun, Fuse, List, local, Split, []);
|
|
|
|
% by default, operate on each element seperately
|
|
|
|
runmany(Fun, Fuse, List, Nodes, no_split, []) ->
|
|
|
|
runmany(Fun, Fuse, List, Nodes, 1, []);
|
|
|
|
runmany(Fun, Fuse, List, local, Split, []) ->
|
|
|
|
List2 = splitmany(List, Split),
|
|
|
|
local_runmany(Fun, Fuse, List2);
|
|
|
|
runmany(Fun, Fuse, List, Nodes, Split, []) ->
|
|
|
|
List2 = splitmany(List, Split),
|
|
|
|
cluster_runmany(Fun, Fuse, List2, Nodes).
|
2011-04-03 21:23:12 -05:00
|
|
|
|
2012-10-25 11:52:47 -05:00
|
|
|
cleanup_timer(Timer) ->
|
2011-04-03 21:23:12 -05:00
|
|
|
receive
|
2012-10-25 11:52:47 -05:00
|
|
|
{timerrang, Timer} ->
|
|
|
|
cleanup_timer(Timer);
|
|
|
|
{timerstopped, Timer} ->
|
|
|
|
nil
|
|
|
|
end.
|
|
|
|
|
|
|
|
schedulers_on_node(Node) ->
|
|
|
|
case get(plists_schedulers_on_nodes) of
|
|
|
|
undefined ->
|
|
|
|
X = determine_schedulers(Node),
|
|
|
|
put(plists_schedulers_on_nodes,
|
|
|
|
dict:store(Node, X, dict:new())),
|
|
|
|
X;
|
|
|
|
Dict ->
|
|
|
|
case dict:is_key(Node, Dict) of
|
|
|
|
true ->
|
|
|
|
dict:fetch(Node, Dict);
|
|
|
|
false ->
|
|
|
|
X = determine_schedulers(Node),
|
|
|
|
put(plists_schedulers_on_nodes,
|
|
|
|
dict:store(Node, X, Dict)),
|
|
|
|
X
|
|
|
|
end
|
|
|
|
end.
|
2011-04-03 21:23:12 -05:00
|
|
|
|
2012-10-25 11:52:47 -05:00
|
|
|
determine_schedulers(Node) ->
|
|
|
|
Parent = self(),
|
|
|
|
Child = spawn(Node, fun () ->
|
|
|
|
Parent ! {self(), erlang:system_info(schedulers)}
|
|
|
|
end),
|
|
|
|
erlang:monitor(process, Child),
|
2011-04-03 21:23:12 -05:00
|
|
|
receive
|
2012-10-25 11:52:47 -05:00
|
|
|
{Child, X} ->
|
|
|
|
receive
|
|
|
|
{'DOWN', _, _, Child, _Reason} ->
|
|
|
|
nil
|
|
|
|
end,
|
|
|
|
X;
|
|
|
|
{'DOWN', _, _, Child, Reason} when Reason =/= normal ->
|
|
|
|
0
|
2011-04-03 21:23:12 -05:00
|
|
|
end.
|
|
|
|
|
2012-10-25 11:52:47 -05:00
|
|
|
% local runmany, for when we weren't invoked with {processes, X}
|
|
|
|
% or {nodes, NodeList}. Every sublist is processed in parallel.
|
|
|
|
local_runmany(Fun, Fuse, List) ->
|
|
|
|
Parent = self (),
|
|
|
|
Pids = lists:map(fun (L) ->
|
|
|
|
F = fun () ->
|
|
|
|
Parent !
|
|
|
|
{self (), Fun(L)}
|
|
|
|
end,
|
|
|
|
{Pid, _} = erlang:spawn_monitor(F),
|
|
|
|
Pid
|
2011-04-03 21:23:12 -05:00
|
|
|
end,
|
2012-10-25 11:52:47 -05:00
|
|
|
List),
|
|
|
|
Answers = try lists:map(fun receivefrom/1, Pids)
|
|
|
|
catch throw:Message ->
|
|
|
|
{BadPid, Reason} = Message,
|
|
|
|
handle_error(BadPid, Reason, Pids)
|
2011-09-26 11:48:14 -05:00
|
|
|
end,
|
2012-10-25 11:52:47 -05:00
|
|
|
lists:foreach(fun (Pid) ->
|
|
|
|
normal_cleanup(Pid)
|
|
|
|
end, Pids),
|
|
|
|
fuse(Fuse, Answers).
|
|
|
|
|
|
|
|
receivefrom(Pid) ->
|
|
|
|
receive
|
|
|
|
{Pid, R} ->
|
|
|
|
R;
|
|
|
|
{'DOWN', _, _, BadPid, Reason} when Reason =/= normal ->
|
|
|
|
throw({BadPid, Reason});
|
|
|
|
{timerrang, _} ->
|
|
|
|
throw({nil, timeout})
|
|
|
|
end.
|
|
|
|
|
|
|
|
% Convert List into [{Number, Sublist}]
|
|
|
|
cluster_runmany(Fun, Fuse, List, Nodes) ->
|
|
|
|
{List2, _} = lists:foldl(fun (X, {L, Count}) ->
|
|
|
|
{[{Count, X}|L], Count+1}
|
|
|
|
end,
|
|
|
|
{[], 0}, List),
|
|
|
|
cluster_runmany(Fun, Fuse, List2, Nodes, [], []).
|
|
|
|
|
|
|
|
% Add a pair of results into the TaskList as a fusing task
|
|
|
|
cluster_runmany(Fun, {recursive, Fuse}, [], Nodes, Running,
|
|
|
|
[{_, R1}, {_, R2}|Results]) ->
|
|
|
|
cluster_runmany(Fun, {recursive, Fuse}, [{fuse, R1, R2}], Nodes,
|
|
|
|
Running, Results);
|
|
|
|
% recursive fuse done, return result
|
|
|
|
cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], [{_, Result}]) ->
|
|
|
|
Result;
|
|
|
|
% edge case where we are asked to do nothing
|
|
|
|
cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], []) ->
|
|
|
|
[];
|
|
|
|
% We're done, now we just have to [linear] fuse the results
|
|
|
|
cluster_runmany(_, Fuse, [], _Nodes, [], Results) ->
|
|
|
|
fuse(Fuse, lists:map(fun ({_, R}) -> R end,
|
|
|
|
lists:sort(fun ({A, _}, {B, _}) ->
|
|
|
|
A =< B
|
|
|
|
end,
|
|
|
|
lists:reverse(Results))));
|
|
|
|
% We have a ready node and a sublist or fuse to be processed, so we start
|
|
|
|
% a new process
|
|
|
|
cluster_runmany(Fun, Fuse, [Task|TaskList], [N|Nodes], Running, Results) ->
|
|
|
|
Parent = self(),
|
|
|
|
case Task of
|
|
|
|
{Num, L2} ->
|
|
|
|
Fun2 = fun () ->
|
|
|
|
Parent ! {self(), Num, Fun(L2)}
|
|
|
|
end;
|
|
|
|
{fuse, R1, R2} ->
|
|
|
|
{recursive, FuseFunc} = Fuse,
|
|
|
|
Fun2 = fun () ->
|
|
|
|
Parent ! {self(), fuse, FuseFunc(R1, R2)}
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
Fun3 = fun () ->
|
|
|
|
try Fun2()
|
|
|
|
catch
|
|
|
|
exit:siblingdied ->
|
|
|
|
ok;
|
|
|
|
exit:Reason ->
|
|
|
|
Parent ! {self(), error, Reason};
|
|
|
|
error:R ->
|
|
|
|
Parent ! {self(), error, {R, erlang:get_stacktrace()}};
|
|
|
|
throw:R ->
|
|
|
|
Parent ! {self(), error, {{nocatch, R}, erlang:get_stacktrace()}}
|
|
|
|
end
|
|
|
|
end,
|
|
|
|
Pid = spawn(N, Fun3),
|
|
|
|
erlang:monitor(process, Pid),
|
|
|
|
cluster_runmany(Fun, Fuse, TaskList, Nodes, [{Pid, N, Task}|Running], Results);
|
|
|
|
% We can't start a new process, but can watch over already running ones
|
|
|
|
cluster_runmany(Fun, Fuse, TaskList, Nodes, Running, Results) when length(Running) > 0 ->
|
|
|
|
receive
|
|
|
|
{_Pid, error, Reason} ->
|
|
|
|
RunningPids = lists:map(fun ({Pid, _, _}) ->
|
|
|
|
Pid
|
|
|
|
end,
|
|
|
|
Running),
|
|
|
|
handle_error(junkvalue, Reason, RunningPids);
|
|
|
|
{Pid, Num, Result} ->
|
|
|
|
% throw out the exit message, Reason should be
|
|
|
|
% normal, noproc, or noconnection
|
|
|
|
receive {'DOWN', _, _, Pid, _Reason} ->
|
|
|
|
nil
|
|
|
|
end,
|
|
|
|
{Running2, FinishedNode, _} = delete_running(Pid, Running, []),
|
|
|
|
cluster_runmany(Fun, Fuse, TaskList,
|
|
|
|
[FinishedNode|Nodes], Running2, [{Num, Result}|Results]);
|
|
|
|
{timerrang, _} ->
|
|
|
|
RunningPids = lists:map(fun ({Pid, _, _}) ->
|
|
|
|
Pid
|
|
|
|
end,
|
|
|
|
Running),
|
|
|
|
handle_error(nil, timeout, RunningPids);
|
|
|
|
% node failure
|
|
|
|
{'DOWN', _, _, Pid, noconnection} ->
|
|
|
|
{Running2, _DeadNode, Task} = delete_running(Pid, Running, []),
|
|
|
|
cluster_runmany(Fun, Fuse, [Task|TaskList], Nodes,
|
|
|
|
Running2, Results);
|
|
|
|
% could a noproc exit message come before the message from
|
|
|
|
% the process? we are assuming it can't.
|
|
|
|
% this clause is unlikely to get invoked due to cluster_runmany's
|
|
|
|
% spawned processes. It will still catch errors in mapreduce's
|
|
|
|
% reduce process, however.
|
|
|
|
{'DOWN', _, _, BadPid, Reason} when Reason =/= normal ->
|
|
|
|
RunningPids = lists:map(fun ({Pid, _, _}) ->
|
|
|
|
Pid
|
|
|
|
end,
|
|
|
|
Running),
|
|
|
|
handle_error(BadPid, Reason, RunningPids)
|
|
|
|
end;
|
|
|
|
% We have data, but no nodes either available or occupied
|
|
|
|
cluster_runmany(_, _, [_Non|_Empty], []=_Nodes, []=_Running, _) ->
|
|
|
|
exit(allnodescrashed).
|
|
|
|
|
|
|
|
delete_running(Pid, [{Pid, Node, List}|Running], Acc) ->
|
|
|
|
{Running ++ Acc, Node, List};
|
|
|
|
delete_running(Pid, [R|Running], Acc) ->
|
|
|
|
delete_running(Pid, Running, [R|Acc]).
|
|
|
|
|
|
|
|
handle_error(BadPid, Reason, Pids) ->
|
|
|
|
lists:foreach(fun (Pid) ->
|
|
|
|
exit(Pid, siblingdied)
|
|
|
|
end, Pids),
|
|
|
|
lists:foreach(fun (Pid) ->
|
|
|
|
error_cleanup(Pid, BadPid)
|
|
|
|
end, Pids),
|
|
|
|
exit(Reason).
|
|
|
|
|
|
|
|
error_cleanup(BadPid, BadPid) ->
|
|
|
|
ok;
|
|
|
|
error_cleanup(Pid, BadPid) ->
|
|
|
|
receive
|
|
|
|
{Pid, _} ->
|
|
|
|
error_cleanup(Pid, BadPid);
|
|
|
|
{Pid, _, _} ->
|
|
|
|
error_cleanup(Pid, BadPid);
|
|
|
|
{'DOWN', _, _, Pid, _Reason} ->
|
|
|
|
ok
|
|
|
|
end.
|
|
|
|
|
|
|
|
normal_cleanup(Pid) ->
|
|
|
|
receive
|
|
|
|
{'DOWN', _, _, Pid, _Reason} ->
|
|
|
|
ok
|
|
|
|
end.
|
|
|
|
|
|
|
|
% edge case
|
|
|
|
fuse(_, []) ->
|
|
|
|
[];
|
|
|
|
fuse({reverse, _}=Fuse, Results) ->
|
|
|
|
[RL|ResultsR] = lists:reverse(Results),
|
|
|
|
fuse(Fuse, ResultsR, RL);
|
|
|
|
fuse(Fuse, [R1|Results]) ->
|
|
|
|
fuse(Fuse, Results, R1).
|
|
|
|
|
|
|
|
fuse({reverse, FuseFunc}=Fuse, [R2|Results], R1) ->
|
|
|
|
fuse(Fuse, Results, FuseFunc(R2, R1));
|
|
|
|
fuse(Fuse, [R2|Results], R1) ->
|
|
|
|
fuse(Fuse, Results, Fuse(R1, R2));
|
|
|
|
fuse(_, [], R) ->
|
|
|
|
R.
|
|
|
|
|
|
|
|
% Splits a list into a list of sublists, each of size Size,
|
|
|
|
% except for the last element which is less if the original list
|
|
|
|
% could not be evenly divided into Size-sized lists.
|
|
|
|
splitmany(List, Size) ->
|
|
|
|
splitmany(List, [], Size).
|
|
|
|
|
|
|
|
splitmany([], Acc, _) ->
|
|
|
|
lists:reverse(Acc);
|
|
|
|
splitmany(List, Acc, Size) ->
|
|
|
|
{Top, NList} = split(Size, List),
|
|
|
|
splitmany(NList, [Top|Acc], Size).
|
|
|
|
|
|
|
|
% Like lists:split, except it splits a list smaller than its first
|
|
|
|
% parameter
|
|
|
|
split(Size, List) ->
|
|
|
|
split(Size, List, []).
|
2011-04-03 21:23:12 -05:00
|
|
|
|
2012-10-25 11:52:47 -05:00
|
|
|
split(0, List, Acc) ->
|
|
|
|
{lists:reverse(Acc), List};
|
|
|
|
split(Size, [H|List], Acc) ->
|
|
|
|
split(Size - 1, List, [H|Acc]);
|
|
|
|
split(_, [], Acc) ->
|
|
|
|
{lists:reverse(Acc), []}.
|