From 31ebca114ade860f2bf665ae6265f3de4b3f2cee Mon Sep 17 00:00:00 2001 From: Eric Merritt Date: Thu, 25 Oct 2012 11:52:47 -0500 Subject: [PATCH] replace ec_plists with Stephan's plists Signed-off-by: Jordan Wilberding --- src/ec_plists.erl | 1060 +++++++++++++++++++++++++++++++++++---------- src/plists.erl | 858 ------------------------------------ 2 files changed, 827 insertions(+), 1091 deletions(-) delete mode 100644 src/plists.erl diff --git a/src/ec_plists.erl b/src/ec_plists.erl index cd14697..688d5a5 100644 --- a/src/ec_plists.erl +++ b/src/ec_plists.erl @@ -1,264 +1,858 @@ -%%%------------------------------------------------------------------- -%%% @doc -%%% simple parrallel map. Originally provided by Joe Armstrong -%%% on the erlang questions mailing list. -%%% @end -%%%------------------------------------------------------------------- --module(ec_plists). +% @author Stephen Marsh +% @copyright 2007 Stephen Marsh freeyourmind ++ [$@|gmail.com] +% @doc plists is a drop-in replacement for module +% lists, +% making most list operations parallel. It can operate on each element in +% parallel, for IO-bound operations, on sublists in parallel, for +% taking advantage of multi-core machines with CPU-bound operations, and +% across erlang nodes, for parallizing inside a cluster. It handles +% errors and node failures. It can be configured, tuned, and tweaked to +% get optimal performance while minimizing overhead. +% +% Almost all the functions are +% identical to equivalent functions in lists, returning exactly the same +% result, and having both a form with an identical syntax that operates on +% each element in parallel and a form which takes an optional "malt", +% a specification for how to parallize the operation. +% +% fold is the one exception, parallel fold is different from linear fold. +% This module also include a simple mapreduce implementation, and the +% function runmany. All the other functions are implemented with runmany, +% which is as a generalization of parallel list operations. +% +% == Malts == +% A malt specifies how to break a list into sublists, and can optionally +% specify a timeout, which nodes to run on, and how many processes to start +% per node. +% +% Malt = MaltComponent | [MaltComponent]
+% MaltComponent = SubListSize::integer() | {processes, integer()} | +% {processes, schedulers} | +% {timeout, Milliseconds::integer()} | {nodes, [NodeSpec]}
+% NodeSpec = Node::atom() | {Node::atom(), NumProcesses::integer()} | +% {Node::atom(), schedulers} +% +% An integer can be given to specify the exact size for +% sublists. 1 is a good choice for IO-bound operations and when +% the operation on each list element is expensive. Larger numbers +% minimize overhead and are faster for cheap operations. +% +% If the integer is omitted, and +% you have specified a {processes, X}, the list is +% split into X sublists. This is only +% useful when the time to process each element is close to identical and you +% know exactly how many lines of execution are available to you. +% +% If neither of the above applies, the sublist size defaults to 1. +% +% You can use {processes, X} to have the list processed +% by X processes on the local machine. A good choice for X is the number of +% lines of execution (cores) the machine provides. This can be done +% automatically with {processes, schedulers}, which sets +% the number of processes to the number of schedulers in the erlang virtual +% machine (probably equal to the number of cores). +% +% {timeout, Milliseconds} specifies a timeout. This is a timeout for the entire +% operation, both operating on the sublists and combining the results. +% exit(timeout) is evaluated if the timeout is exceeded. +% +% {nodes, NodeList} specifies that the operation should be done across nodes. +% Every element of NodeList is of the form {NodeName, NumProcesses} or +% NodeName, which means the same as {NodeName, 1}. plists runs +% NumProcesses processes on NodeName concurrently. A good choice for +% NumProcesses is the number of lines of execution (cores) a node provides +% plus one. This ensures the node is completely busy even when +% fetching a new sublist. This can be done automatically with +% {NodeName, schedulers}, in which case +% plists uses a cached value if it has one, and otherwise finds the number of +% schedulers in the remote node and adds one. This will ensure at least one +% busy process per core (assuming the node has a scheduler for each core). +% +% plists is able to recover if a node goes down. +% If all nodes go down, exit(allnodescrashed) is evaluated. +% +% Any of the above may be used as a malt, or may be combined into a list. +% {nodes, NodeList} and {processes, X} may not be combined. +% +% === Examples === +% % start a process for each element (1-element sublists)
+% 1 +% +% % start a process for each ten elements (10-element sublists)
+% 10 +% +% % split the list into two sublists and process in two processes
+% {processes, 2} +% +% % split the list into X sublists and process in X processes,
+% % where X is the number of cores in the machine
+% {processes, schedulers} +% +% % split the list into 10-element sublists and process in two processes
+% [10, {processes, 2}] +% +% % timeout after one second. Assumes that a process should be started
+% % for each element.
+% {timeout, 1000} +% +% % Runs 3 processes at a time on apple@desktop, +% and 2 on orange@laptop
+% % This is the best way to utilize all the CPU-power of a dual-core
+% % desktop and a single-core laptop. Assumes that the list should be
+% % split into 1-element sublists.
+% {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]} +% +% Like above, but makes plists figure out how many processes to use. +% {nodes, [{apple@desktop, schedulers}, {orange@laptop, schedulers}]} +% +% % Gives apple and orange three seconds to process the list as
+% % 100-element sublists.
+% [100, {timeout, 3000}, {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}] +% +% === Aside: Why Malt? === +% I needed a word for this concept, so maybe my subconsciousness gave me one by +% making me misspell multiply. Maybe it is an acronym for Malt is A List +% Tearing Specification. Maybe it is a beer metaphor, suggesting that code +% only runs in parallel if bribed with spirits. It's jargon, learn it +% or you can't be part of the in-group. +% +% == Messages and Errors == +% plists assures that no extraneous messages are left in or will later +% enter the message queue. This is guaranteed even in the event of an error. +% +% Errors in spawned processes are caught and propagated to the calling +% process. If you invoke +% +% plists:map(fun (X) -> 1/X end, [1, 2, 3, 0]). +% +% you get a badarith error, exactly like when you use lists:map. +% +% plists uses monitors to watch the processes it spawns. It is not a good idea +% to invoke plists when you are already monitoring processes. If one of them +% does a non-normal exit, plists receives the 'DOWN' message believing it to be +% from one of its own processes. The error propagation system goes into +% effect, which results in the error occuring in the calling process. +% +% == License == +% The MIT License +% +% Copyright (c) 2007 Stephen Marsh +% +% Permission is hereby granted, free of charge, to any person obtaining a copy +% of this software and associated documentation files (the "Software"), to deal +% in the Software without restriction, including without limitation the rights +% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +% copies of the Software, and to permit persons to whom the Software is +% furnished to do so, subject to the following conditions: +% +% The above copyright notice and this permission notice shall be included in +% all copies or substantial portions of the Software. +% +% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +% THE SOFTWARE. --export([map/2, - map/3, - ftmap/2, - ftmap/3, - filter/2, - filter/3]). --export_type([thunk/0]). +-module(plists). +-export([all/2, all/3, any/2, any/3, filter/2, filter/3, +fold/3, fold/4, fold/5, foreach/2, foreach/3, map/2, map/3, +partition/2, partition/3, sort/1, sort/2, sort/3, +usort/1, usort/2, usort/3, mapreduce/2, mapreduce/3, mapreduce/5, +runmany/3, runmany/4]). -%%============================================================================= -%% Types -%%============================================================================= --type thunk() :: fun((any()) -> any()). +% Everything here is defined in terms of runmany. +% The following methods are convient interfaces to runmany. -%%============================================================================= -%% Public API -%%============================================================================= +% @doc Same semantics as in module +% lists. +% @spec (Fun, List) -> bool() +all(Fun, List) -> + all(Fun, List, 1). -%% @doc Takes a function and produces a list of the result of the function -%% applied to each element of the argument list. A timeout is optional. -%% In the event of a timeout or an exception the entire map will fail -%% with an excption with class throw. --spec map(fun(), [any()]) -> [any()]. -map(Fun, List) -> - map(Fun, List, infinity). +% @doc Same semantics as in module +% lists. +% @spec (Fun, List, Malt) -> bool() +all(Fun, List, Malt) -> + try runmany(fun (L) -> + B = lists:all(Fun, L), + if B -> + nil; + true -> + exit(notall) + end + end, + fun (_A1, _A2) -> + nil + end, + List, Malt) of + _ -> + true + catch exit:notall -> + false + end. --spec map(thunk(), [any()], timeout() | infinity) -> [any()]. -map(Fun, List, Timeout) -> - run_list_fun_in_parallel(map, Fun, List, Timeout). +% @doc Same semantics as in module +% lists. +% @spec (Fun, List) -> bool() +any(Fun, List) -> + any(Fun, List, 1). -%% @doc Takes a function and produces a list of the result of the function -%% applied to each element of the argument list. A timeout is optional. -%% This function differes from regular map in that it is fault tolerant. -%% If a timeout or an exception occurs while processing an element in -%% the input list the ftmap operation will continue to function. Timeouts -%% and exceptions will be reflected in the output of this function. -%% All application level results are wrapped in a tuple with the tag -%% 'value'. Exceptions will come through as they are and timeouts will -%% return as the atom timeout. -%% This is useful when the ftmap is being used for side effects. -%%
-%% 2> ftmap(fun(N) -> factorial(N) end, [1, 2, 1000000, "not num"], 100)
-%% [{value, 1}, {value, 2}, timeout, {badmatch, ...}]
-%% 
--spec ftmap(thunk(), [any()]) -> [{value, any()} | any()]. -ftmap(Fun, List) -> - ftmap(Fun, List, infinity). +% @doc Same semantics as in module +% lists. +% @spec (Fun, List, Malt) -> bool() +any(Fun, List, Malt) -> + try runmany(fun (L) -> + B = lists:any(Fun, L), + if B -> + exit(any); + true -> + nil + end + end, + fun (_A1, _A2) -> + nil + end, + List, Malt) of + _ -> + false + catch exit:any -> + true + end. --spec ftmap(thunk(), [any()], timeout() | infinity) -> [{value, any()} | any()]. -ftmap(Fun, List, Timeout) -> - run_list_fun_in_parallel(ftmap, Fun, List, Timeout). - -%% @doc Returns a list of the elements in the supplied list which -%% the function Fun returns true. A timeout is optional. In the -%% event of a timeout the filter operation fails. --spec filter(thunk(), [any()]) -> [any()]. +% @doc Same semantics as in module +% lists. +% @spec (Fun, List) -> list() filter(Fun, List) -> - filter(Fun, List, infinity). + filter(Fun, List, 1). --spec filter(thunk(), [any()], timeout() | infinity) -> [any()]. -filter(Fun, List, Timeout) -> - run_list_fun_in_parallel(filter, Fun, List, Timeout). +% @doc Same semantics as in module +% lists. +% @spec (Fun, List, Malt) -> list() +filter(Fun, List, Malt) -> + runmany(fun (L) -> + lists:filter(Fun, L) + end, + {reverse, fun (A1, A2) -> + A1 ++ A2 + end}, + List, Malt). -%%============================================================================= -%% Internal API -%%============================================================================= --spec run_list_fun_in_parallel(atom(), thunk(), [any()], timeout() | infinity) -> [any()]. -run_list_fun_in_parallel(ListFun, Fun, List, Timeout) -> - LocalPid = self(), - Pids = - lists:map(fun(E) -> - Pid = - proc_lib:spawn(fun() -> - wait(LocalPid, Fun, - E, Timeout) - end), - {Pid, E} - end, List), - gather(ListFun, Pids). +% Note that with parallel fold there is not foldl and foldr, +% instead just one fold that can fuse Accumlators. --spec wait(pid(), thunk(), any(), timeout() | infinity) -> any(). -wait(Parent, Fun, E, Timeout) -> - WaitPid = self(), - Child = spawn(fun() -> - do_f(WaitPid, Fun, E) - end), +% @doc Like below, but assumes 1 as the Malt. This function is almost useless, +% and is intended only to aid converting code from using lists to plists. +% @spec (Fun, InitAcc, List) -> term() +fold(Fun, InitAcc, List) -> + fold(Fun, Fun, InitAcc, List, 1). - wait(Parent, Child, Timeout). +% @doc Like below, but uses the Fun as the Fuse by default. +% @spec (Fun, InitAcc, List, Malt) -> term() +fold(Fun, InitAcc, List, Malt) -> + fold(Fun, Fun, InitAcc, List, Malt). --spec wait(pid(), pid(), timeout() | infinity) -> any(). -wait(Parent, Child, Timeout) -> - receive - {Child, Ret} -> - Parent ! {self(), Ret} - after Timeout -> - exit(Child, timeout), - Parent ! {self(), timeout} - end. +% @doc fold is more complex when made parallel. There is no foldl and foldr, +% accumulators aren't passed in any defined order. +% The list is split into sublists which are folded together. Fun is +% identical to the function passed to lists:fold[lr], it takes +% (an element, and the accumulator) and returns -> a new accumulator. +% It is used for the initial stage of folding sublists. Fuse fuses together +% the results, it takes (Results1, Result2) and returns -> a new result. +% By default sublists are fused left to right, each result of a fuse being +% fed into the first element of the next fuse. The result of the last fuse +% is the result. +% +% Fusing may also run in parallel using a recursive algorithm, +% by specifying the fuse as {recursive, Fuse}. See +% the discussion in {@link runmany/4}. +% +% Malt is the malt for the initial folding of sublists, and for the +% possible recursive fuse. +% @spec (Fun, Fuse, InitAcc, List, Malt) -> term() +fold(Fun, Fuse, InitAcc, List, Malt) -> + Fun2 = fun (L) -> lists:foldl(Fun, InitAcc, L) end, + runmany(Fun2, Fuse, List, Malt). --spec gather(atom(), [any()]) -> [any()]. -gather(map, PidElementList) -> - map_gather(PidElementList); -gather(ftmap, PidElementList) -> - ftmap_gather(PidElementList); -gather(filter, PidElementList) -> - filter_gather(PidElementList). +% @doc Similiar to foreach in module +% lists +% except it makes no guarantee about the order it processes list elements. +% @spec (Fun, List) -> void() +foreach(Fun, List) -> + foreach(Fun, List, 1). --spec map_gather([pid()]) -> [any()]. -map_gather([{Pid, _E} | Rest]) -> - receive - {Pid, {value, Ret}} -> - [Ret|map_gather(Rest)]; - %% timeouts fall here too. Should timeouts be a return value - %% or an exception? I lean toward return value, but the code - %% is easier with the exception. Thoughts? - {Pid, Exception} -> - killall(Rest), - throw(Exception) - end; -map_gather([]) -> - []. +% @doc Similiar to foreach in module +% lists +% except it makes no guarantee about the order it processes list elements. +% @spec (Fun, List, Malt) -> void() +foreach(Fun, List, Malt) -> + runmany(fun (L) -> + lists:foreach(Fun, L) + end, + fun (_A1, _A2) -> + ok + end, + List, Malt). --spec ftmap_gather([pid()]) -> [any()]. -ftmap_gather([{Pid, _E} | Rest]) -> - receive - {Pid, Value} -> [Value|ftmap_gather(Rest)] - end; -ftmap_gather([]) -> - []. +% @doc Same semantics as in module +% lists. +% @spec (Fun, List) -> list() +map(Fun, List) -> + map(Fun, List, 1). --spec filter_gather([pid()]) -> [any()]. -filter_gather([{Pid, E} | Rest]) -> - receive - {Pid, {value, false}} -> - filter_gather(Rest); - {Pid, {value, true}} -> - [E|filter_gather(Rest)]; - {Pid, {value, NotBool}} -> - killall(Rest), - throw({bad_return_value, NotBool}); - {Pid, Exception} -> - killall(Rest), - throw(Exception) - end; -filter_gather([]) -> - []. +% @doc Same semantics as in module +% lists. +% @spec (Fun, List, Malt) -> list() +map(Fun, List, Malt) -> + runmany(fun (L) -> + lists:map(Fun, L) + end, + {reverse, fun (A1, A2) -> + A1 ++ A2 + end}, + List, Malt). --spec do_f(pid(), thunk(), any()) -> no_return(). -do_f(Parent, F, E) -> - try - Result = F(E), - Parent ! {self(), {value, Result}} - catch - _Class:Exception -> - %% Losing class info here, but since throw does not accept - %% that arg anyhow and forces a class of throw it does not - %% matter. - Parent ! {self(), Exception} - end. +% @doc Same semantics as in module +% lists. +% @spec (Fun, List) -> {list(), list()} +partition(Fun, List) -> + partition(Fun, List, 1). --spec killall([pid()]) -> ok. -killall([{Pid, _E}|T]) -> - exit(Pid, kill), - killall(T); -killall([]) -> - ok. +% @doc Same semantics as in module +% lists. +% @spec (Fun, List, Malt) -> {list(), list()} +partition(Fun, List, Malt) -> + runmany(fun (L) -> + lists:partition(Fun, L) + end, + {reverse, fun ({True1, False1}, {True2, False2}) -> + {True1 ++ True2, False1 ++ False2} + end}, + List, Malt). -%%============================================================================= -%% Tests -%%============================================================================= +% SORTMALT needs to be tuned +-define(SORTMALT, 100). --ifndef(NOTEST). --include_lib("eunit/include/eunit.hrl"). +% @doc Same semantics as in module +% lists. +% @spec (List) -> list() +sort(List) -> + sort(fun (A, B) -> + A =< B + end, + List). -map_good_test() -> - Results = map(fun(_) -> - ok - end, - lists:seq(1, 5), infinity), - ?assertMatch([ok, ok, ok, ok, ok], - Results). +% @doc Same semantics as in module +% lists. +% @spec (Fun, List) -> list() +sort(Fun, List) -> + sort(Fun, List, ?SORTMALT). -ftmap_good_test() -> - Results = ftmap(fun(_) -> - ok - end, - lists:seq(1, 3), infinity), - ?assertMatch([{value, ok}, {value, ok}, {value, ok}], - Results). +% @doc This version lets you specify your own malt for sort. +% +% sort splits the list into sublists and sorts them, and it merges the +% sorted lists together. These are done in parallel. Each sublist is +% sorted in a seperate process, and each merging of results is done in a +% seperate process. Malt defaults to 100, causing the list to be split into +% 100-element sublists. +% @spec (Fun, List, Malt) -> list() +sort(Fun, List, Malt) -> + Fun2 = fun (L) -> + lists:sort(Fun, L) + end, + Fuse = fun (A1, A2) -> + lists:merge(Fun, A1, A2) + end, + runmany(Fun2, {recursive, Fuse}, List, Malt). -filter_good_test() -> - Results = filter(fun(X) -> - X == show - end, - [show, show, remove], infinity), - ?assertMatch([show, show], - Results). +% @doc Same semantics as in module +% lists. +% @spec (List) -> list() +usort(List) -> + usort(fun (A, B) -> + A =< B + end, + List). -map_timeout_test() -> - Results = - try - map(fun(T) -> - timer:sleep(T), - T - end, - [1, 100], 10) - catch - C:E -> {C, E} - end, - ?assertMatch({throw, timeout}, Results). +% @doc Same semantics as in module +% lists. +% @spec (Fun, List) -> list() +usort(Fun, List) -> + usort(Fun, List, ?SORTMALT). -ftmap_timeout_test() -> - Results = ftmap(fun(X) -> - timer:sleep(X), - true - end, - [100, 1], 10), - ?assertMatch([timeout, {value, true}], Results). +% @doc This version lets you specify your own malt for usort. +% +% usort splits the list into sublists and sorts them, and it merges the +% sorted lists together. These are done in parallel. Each sublist is +% sorted in a seperate process, and each merging of results is done in a +% seperate process. Malt defaults to 100, causing the list to be split into +% 100-element sublists. +% +% usort removes duplicate elments while it sorts. +% @spec (Fun, List, Malt) -> list() +usort(Fun, List, Malt) -> + Fun2 = fun (L) -> + lists:usort(Fun, L) + end, + Fuse = fun (A1, A2) -> + lists:umerge(Fun, A1, A2) + end, + runmany(Fun2, {recursive, Fuse}, List, Malt). -filter_timeout_test() -> - Results = - try - filter(fun(T) -> - timer:sleep(T), - T == 1 +% @doc Like below, assumes default MapMalt of 1. +% @spec (MapFunc, List) -> Dict +% MapFunc = (term()) -> DeepListOfKeyValuePairs +% DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value} +mapreduce(MapFunc, List) -> + mapreduce(MapFunc, List, 1). + +% Like below, but uses a default reducer that collects all +% {Key, Value} pairs into a +% dict, +% with values {Key, [Value1, Value2...]}. +% This dict is returned as the result. +mapreduce(MapFunc, List, MapMalt) -> + mapreduce(MapFunc, List, dict:new(), fun add_key/3, MapMalt). + +% @doc This is a very basic mapreduce. You won't write a Google-rivaling +% search engine with it. It has no equivalent in lists. Each +% element in the list is run through the MapFunc, which produces either +% a {Key, Value} pair, or a lists of key value pairs, or a list of lists of +% key value pairs...etc. A reducer process runs in parallel with the mapping +% processes, collecting the key value pairs. It starts with a state given by +% InitState, and for each {Key, Value} pair that it receives it invokes +% ReduceFunc(OldState, Key, Value) to compute its new state. mapreduce returns +% the reducer's final state. +% +% MapMalt is the malt for the mapping operation, with a default value of 1, +% meaning each element of the list is mapped by a seperate process. +% +% mapreduce requires OTP R11B, or it may leave monitoring messages in the +% message queue. +% @spec (MapFunc, List, InitState, ReduceFunc, MapMalt) -> Dict +% MapFunc = (term()) -> DeepListOfKeyValuePairs +% DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value} +% ReduceFunc = (OldState::term(), Key::term(), Value::term() -> NewState::term() +mapreduce(MapFunc, List, InitState, ReduceFunc, MapMalt) -> + Parent = self(), + {Reducer, ReducerRef} = + erlang:spawn_monitor(fun () -> + reducer(Parent, 0, InitState, ReduceFunc) + end), + MapFunc2 = fun (L) -> + Reducer ! lists:map(MapFunc, L), + 1 + end, + SentMessages = try runmany(MapFunc2, fun (A, B) -> A+B end, List, MapMalt) + catch + exit:Reason -> + erlang:demonitor(ReducerRef, [flush]), + Reducer ! die, + exit(Reason) end, - [1, 100], 10) - catch - C:E -> {C, E} - end, - ?assertMatch({throw, timeout}, Results). - -map_bad_test() -> - Results = - try - map(fun(_) -> - throw(test_exception) - end, - lists:seq(1, 5), infinity) - catch - C:E -> {C, E} - end, - ?assertMatch({throw, test_exception}, Results). - -ftmap_bad_test() -> - Results = - ftmap(fun(2) -> - throw(test_exception); - (N) -> - N + Reducer ! {mappers, done, SentMessages}, + Results = receive + {Reducer, Results2} -> + Results2; + {'DOWN', _, _, Reducer, Reason2} -> + exit(Reason2) end, - lists:seq(1, 5), infinity), - ?assertMatch([{value, 1}, test_exception, {value, 3}, - {value, 4}, {value, 5}] , Results). + receive + {'DOWN', _, _, Reducer, normal} -> + nil + end, + Results. --endif. +reducer(Parent, NumReceived, State, Func) -> + receive + die -> + nil; + {mappers, done, NumReceived} -> + Parent ! {self (), State}; + Keys -> + reducer(Parent, NumReceived + 1, each_key(State, Func, Keys), Func) + end. + +each_key(State, Func, {Key, Value}) -> + Func(State, Key, Value); +each_key(State, Func, [List|Keys]) -> + each_key(each_key(State, Func, List), Func, Keys); +each_key(State, _, []) -> + State. + +add_key(Dict, Key, Value) -> + case dict:is_key(Key, Dict) of + true -> + dict:append(Key, Value, Dict); + false -> + dict:store(Key, [Value], Dict) + end. + +% @doc Like below, but assumes a Malt of 1, +% meaning each element of the list is processed by a seperate process. +% @spec (Fun, Fuse, List) -> term() +runmany(Fun, Fuse, List) -> + runmany(Fun, Fuse, List, 1). + +% Begin internal stuff (though runmany/4 is exported). + +% @doc All of the other functions are implemented with runmany. runmany +% takes a List, splits it into sublists, and starts processes to operate on +% each sublist, all done according to Malt. Each process passes its sublist +% into Fun and sends the result back. +% +% The results are then fused together to get the final result. There are two +% ways this can operate, lineraly and recursively. If Fuse is a function, +% a fuse is done linearly left-to-right on the sublists, the results +% of processing the first and second sublists being passed to Fuse, then +% the result of the first fuse and processing the third sublits, and so on. If +% Fuse is {reverse, FuseFunc}, then a fuse is done right-to-left, the results +% of processing the second-to-last and last sublists being passed to FuseFunc, +% then the results of processing the third-to-last sublist and +% the results of the first fuse, and and so forth. +% Both methods preserve the original order of the lists elements. +% +% To do a recursive fuse, pass Fuse as {recursive, FuseFunc}. +% The recursive fuse makes no guarantee about the order the results of +% sublists, or the results of fuses are passed to FuseFunc. It +% continues fusing pairs of results until it is down to one. +% +% Recursive fuse is down in parallel with processing the sublists, and a +% process is spawned to fuse each pair of results. It is a parallized +% algorithm. Linear fuse is done after all results of processing sublists +% have been collected, and can only run in a single process. +% +% Even if you pass {recursive, FuseFunc}, a recursive fuse is only done if +% the malt contains {nodes, NodeList} or {processes, X}. If this is not the +% case, a linear fuse is done. +% @spec (Fun, Fuse, List, Malt) -> term() +% Fun = (list()) -> term() +% Fuse = FuseFunc | {recursive, FuseFunc} +% FuseFunc = (term(), term()) -> term() +runmany(Fun, Fuse, List, Malt) when is_list(Malt) -> + runmany(Fun, Fuse, List, local, no_split, Malt); +runmany(Fun, Fuse, List, Malt) -> + runmany(Fun, Fuse, List, [Malt]). + +runmany(Fun, Fuse, List, Nodes, no_split, [MaltTerm|Malt]) when is_integer(MaltTerm) -> + runmany(Fun, Fuse, List, Nodes, MaltTerm, Malt); +% run a process for each scheduler +runmany(Fun, Fuse, List, local, Split, [{processes, schedulers}|Malt]) -> + S = erlang:system_info(schedulers), + runmany(Fun, Fuse, List, local, Split, [{processes, S}|Malt]); +% Split the list into X sublists, where X is the number of processes +runmany(Fun, Fuse, List, local, no_split, [{processes, X}|_]=Malt) -> + L = length(List), + case L rem X of + 0 -> + runmany(Fun, Fuse, List, local, L div X, Malt); + _ -> + runmany(Fun, Fuse, List, local, L div X + 1, Malt) + end; +% run X process on local machine +runmany(Fun, Fuse, List, local, Split, [{processes, X}|Malt]) -> + Nodes = lists:duplicate(X, node()), + runmany(Fun, Fuse, List, Nodes, Split, Malt); +runmany(Fun, Fuse, List, Nodes, Split, [{timeout, X}|Malt]) -> + Parent = self(), + Timer = spawn(fun () -> + receive + stoptimer -> + Parent ! {timerstopped, self()} + after X -> + Parent ! {timerrang, self()}, + receive + stoptimer -> + Parent ! {timerstopped, self()} + end + end + end), + Ans = try runmany(Fun, Fuse, List, Nodes, Split, Malt) + catch + % we really just want the after block, the syntax + % makes this catch necessary. + willneverhappen -> + nil + after + Timer ! stoptimer, + cleanup_timer(Timer) + end, + Ans; +runmany(Fun, Fuse, List, local, Split, [{nodes, NodeList}|Malt]) -> + Nodes = lists:foldl(fun ({Node, schedulers}, A) -> + X = schedulers_on_node(Node) + 1, + lists:reverse(lists:duplicate(X, Node), A); + ({Node, X}, A) -> + lists:reverse(lists:duplicate(X, Node), A); + (Node, A) -> + [Node|A] + end, + [], NodeList), + runmany(Fun, Fuse, List, Nodes, Split, Malt); +% local recursive fuse, for when we weren't invoked with {processes, X} +% or {nodes, NodeList}. Degenerates recursive fuse into linear fuse. +runmany(Fun, {recursive, Fuse}, List, local, Split, []) -> + runmany(Fun, Fuse, List, local, Split, []); +% by default, operate on each element seperately +runmany(Fun, Fuse, List, Nodes, no_split, []) -> + runmany(Fun, Fuse, List, Nodes, 1, []); +runmany(Fun, Fuse, List, local, Split, []) -> + List2 = splitmany(List, Split), + local_runmany(Fun, Fuse, List2); +runmany(Fun, Fuse, List, Nodes, Split, []) -> + List2 = splitmany(List, Split), + cluster_runmany(Fun, Fuse, List2, Nodes). + +cleanup_timer(Timer) -> + receive + {timerrang, Timer} -> + cleanup_timer(Timer); + {timerstopped, Timer} -> + nil + end. + +schedulers_on_node(Node) -> + case get(plists_schedulers_on_nodes) of + undefined -> + X = determine_schedulers(Node), + put(plists_schedulers_on_nodes, + dict:store(Node, X, dict:new())), + X; + Dict -> + case dict:is_key(Node, Dict) of + true -> + dict:fetch(Node, Dict); + false -> + X = determine_schedulers(Node), + put(plists_schedulers_on_nodes, + dict:store(Node, X, Dict)), + X + end + end. + +determine_schedulers(Node) -> + Parent = self(), + Child = spawn(Node, fun () -> + Parent ! {self(), erlang:system_info(schedulers)} + end), + erlang:monitor(process, Child), + receive + {Child, X} -> + receive + {'DOWN', _, _, Child, _Reason} -> + nil + end, + X; + {'DOWN', _, _, Child, Reason} when Reason =/= normal -> + 0 + end. + +% local runmany, for when we weren't invoked with {processes, X} +% or {nodes, NodeList}. Every sublist is processed in parallel. +local_runmany(Fun, Fuse, List) -> + Parent = self (), + Pids = lists:map(fun (L) -> + F = fun () -> + Parent ! + {self (), Fun(L)} + end, + {Pid, _} = erlang:spawn_monitor(F), + Pid + end, + List), + Answers = try lists:map(fun receivefrom/1, Pids) + catch throw:Message -> + {BadPid, Reason} = Message, + handle_error(BadPid, Reason, Pids) + end, + lists:foreach(fun (Pid) -> + normal_cleanup(Pid) + end, Pids), + fuse(Fuse, Answers). + +receivefrom(Pid) -> + receive + {Pid, R} -> + R; + {'DOWN', _, _, BadPid, Reason} when Reason =/= normal -> + throw({BadPid, Reason}); + {timerrang, _} -> + throw({nil, timeout}) + end. + +% Convert List into [{Number, Sublist}] +cluster_runmany(Fun, Fuse, List, Nodes) -> + {List2, _} = lists:foldl(fun (X, {L, Count}) -> + {[{Count, X}|L], Count+1} + end, + {[], 0}, List), + cluster_runmany(Fun, Fuse, List2, Nodes, [], []). + +% Add a pair of results into the TaskList as a fusing task +cluster_runmany(Fun, {recursive, Fuse}, [], Nodes, Running, + [{_, R1}, {_, R2}|Results]) -> + cluster_runmany(Fun, {recursive, Fuse}, [{fuse, R1, R2}], Nodes, + Running, Results); +% recursive fuse done, return result +cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], [{_, Result}]) -> + Result; +% edge case where we are asked to do nothing +cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], []) -> + []; +% We're done, now we just have to [linear] fuse the results +cluster_runmany(_, Fuse, [], _Nodes, [], Results) -> + fuse(Fuse, lists:map(fun ({_, R}) -> R end, + lists:sort(fun ({A, _}, {B, _}) -> + A =< B + end, + lists:reverse(Results)))); +% We have a ready node and a sublist or fuse to be processed, so we start +% a new process +cluster_runmany(Fun, Fuse, [Task|TaskList], [N|Nodes], Running, Results) -> + Parent = self(), + case Task of + {Num, L2} -> + Fun2 = fun () -> + Parent ! {self(), Num, Fun(L2)} + end; + {fuse, R1, R2} -> + {recursive, FuseFunc} = Fuse, + Fun2 = fun () -> + Parent ! {self(), fuse, FuseFunc(R1, R2)} + end + end, + Fun3 = fun () -> + try Fun2() + catch + exit:siblingdied -> + ok; + exit:Reason -> + Parent ! {self(), error, Reason}; + error:R -> + Parent ! {self(), error, {R, erlang:get_stacktrace()}}; + throw:R -> + Parent ! {self(), error, {{nocatch, R}, erlang:get_stacktrace()}} + end + end, + Pid = spawn(N, Fun3), + erlang:monitor(process, Pid), + cluster_runmany(Fun, Fuse, TaskList, Nodes, [{Pid, N, Task}|Running], Results); +% We can't start a new process, but can watch over already running ones +cluster_runmany(Fun, Fuse, TaskList, Nodes, Running, Results) when length(Running) > 0 -> + receive + {_Pid, error, Reason} -> + RunningPids = lists:map(fun ({Pid, _, _}) -> + Pid + end, + Running), + handle_error(junkvalue, Reason, RunningPids); + {Pid, Num, Result} -> + % throw out the exit message, Reason should be + % normal, noproc, or noconnection + receive {'DOWN', _, _, Pid, _Reason} -> + nil + end, + {Running2, FinishedNode, _} = delete_running(Pid, Running, []), + cluster_runmany(Fun, Fuse, TaskList, + [FinishedNode|Nodes], Running2, [{Num, Result}|Results]); + {timerrang, _} -> + RunningPids = lists:map(fun ({Pid, _, _}) -> + Pid + end, + Running), + handle_error(nil, timeout, RunningPids); + % node failure + {'DOWN', _, _, Pid, noconnection} -> + {Running2, _DeadNode, Task} = delete_running(Pid, Running, []), + cluster_runmany(Fun, Fuse, [Task|TaskList], Nodes, + Running2, Results); + % could a noproc exit message come before the message from + % the process? we are assuming it can't. + % this clause is unlikely to get invoked due to cluster_runmany's + % spawned processes. It will still catch errors in mapreduce's + % reduce process, however. + {'DOWN', _, _, BadPid, Reason} when Reason =/= normal -> + RunningPids = lists:map(fun ({Pid, _, _}) -> + Pid + end, + Running), + handle_error(BadPid, Reason, RunningPids) + end; +% We have data, but no nodes either available or occupied +cluster_runmany(_, _, [_Non|_Empty], []=_Nodes, []=_Running, _) -> + exit(allnodescrashed). + +delete_running(Pid, [{Pid, Node, List}|Running], Acc) -> + {Running ++ Acc, Node, List}; +delete_running(Pid, [R|Running], Acc) -> + delete_running(Pid, Running, [R|Acc]). + +handle_error(BadPid, Reason, Pids) -> + lists:foreach(fun (Pid) -> + exit(Pid, siblingdied) + end, Pids), + lists:foreach(fun (Pid) -> + error_cleanup(Pid, BadPid) + end, Pids), + exit(Reason). + +error_cleanup(BadPid, BadPid) -> + ok; +error_cleanup(Pid, BadPid) -> + receive + {Pid, _} -> + error_cleanup(Pid, BadPid); + {Pid, _, _} -> + error_cleanup(Pid, BadPid); + {'DOWN', _, _, Pid, _Reason} -> + ok + end. + +normal_cleanup(Pid) -> + receive + {'DOWN', _, _, Pid, _Reason} -> + ok + end. + +% edge case +fuse(_, []) -> + []; +fuse({reverse, _}=Fuse, Results) -> + [RL|ResultsR] = lists:reverse(Results), + fuse(Fuse, ResultsR, RL); +fuse(Fuse, [R1|Results]) -> + fuse(Fuse, Results, R1). + +fuse({reverse, FuseFunc}=Fuse, [R2|Results], R1) -> + fuse(Fuse, Results, FuseFunc(R2, R1)); +fuse(Fuse, [R2|Results], R1) -> + fuse(Fuse, Results, Fuse(R1, R2)); +fuse(_, [], R) -> + R. + +% Splits a list into a list of sublists, each of size Size, +% except for the last element which is less if the original list +% could not be evenly divided into Size-sized lists. +splitmany(List, Size) -> + splitmany(List, [], Size). + +splitmany([], Acc, _) -> + lists:reverse(Acc); +splitmany(List, Acc, Size) -> + {Top, NList} = split(Size, List), + splitmany(NList, [Top|Acc], Size). + +% Like lists:split, except it splits a list smaller than its first +% parameter +split(Size, List) -> + split(Size, List, []). + +split(0, List, Acc) -> + {lists:reverse(Acc), List}; +split(Size, [H|List], Acc) -> + split(Size - 1, List, [H|Acc]); +split(_, [], Acc) -> + {lists:reverse(Acc), []}. diff --git a/src/plists.erl b/src/plists.erl deleted file mode 100644 index 688d5a5..0000000 --- a/src/plists.erl +++ /dev/null @@ -1,858 +0,0 @@ -% @author Stephen Marsh -% @copyright 2007 Stephen Marsh freeyourmind ++ [$@|gmail.com] -% @doc plists is a drop-in replacement for module -% lists, -% making most list operations parallel. It can operate on each element in -% parallel, for IO-bound operations, on sublists in parallel, for -% taking advantage of multi-core machines with CPU-bound operations, and -% across erlang nodes, for parallizing inside a cluster. It handles -% errors and node failures. It can be configured, tuned, and tweaked to -% get optimal performance while minimizing overhead. -% -% Almost all the functions are -% identical to equivalent functions in lists, returning exactly the same -% result, and having both a form with an identical syntax that operates on -% each element in parallel and a form which takes an optional "malt", -% a specification for how to parallize the operation. -% -% fold is the one exception, parallel fold is different from linear fold. -% This module also include a simple mapreduce implementation, and the -% function runmany. All the other functions are implemented with runmany, -% which is as a generalization of parallel list operations. -% -% == Malts == -% A malt specifies how to break a list into sublists, and can optionally -% specify a timeout, which nodes to run on, and how many processes to start -% per node. -% -% Malt = MaltComponent | [MaltComponent]
-% MaltComponent = SubListSize::integer() | {processes, integer()} | -% {processes, schedulers} | -% {timeout, Milliseconds::integer()} | {nodes, [NodeSpec]}
-% NodeSpec = Node::atom() | {Node::atom(), NumProcesses::integer()} | -% {Node::atom(), schedulers} -% -% An integer can be given to specify the exact size for -% sublists. 1 is a good choice for IO-bound operations and when -% the operation on each list element is expensive. Larger numbers -% minimize overhead and are faster for cheap operations. -% -% If the integer is omitted, and -% you have specified a {processes, X}, the list is -% split into X sublists. This is only -% useful when the time to process each element is close to identical and you -% know exactly how many lines of execution are available to you. -% -% If neither of the above applies, the sublist size defaults to 1. -% -% You can use {processes, X} to have the list processed -% by X processes on the local machine. A good choice for X is the number of -% lines of execution (cores) the machine provides. This can be done -% automatically with {processes, schedulers}, which sets -% the number of processes to the number of schedulers in the erlang virtual -% machine (probably equal to the number of cores). -% -% {timeout, Milliseconds} specifies a timeout. This is a timeout for the entire -% operation, both operating on the sublists and combining the results. -% exit(timeout) is evaluated if the timeout is exceeded. -% -% {nodes, NodeList} specifies that the operation should be done across nodes. -% Every element of NodeList is of the form {NodeName, NumProcesses} or -% NodeName, which means the same as {NodeName, 1}. plists runs -% NumProcesses processes on NodeName concurrently. A good choice for -% NumProcesses is the number of lines of execution (cores) a node provides -% plus one. This ensures the node is completely busy even when -% fetching a new sublist. This can be done automatically with -% {NodeName, schedulers}, in which case -% plists uses a cached value if it has one, and otherwise finds the number of -% schedulers in the remote node and adds one. This will ensure at least one -% busy process per core (assuming the node has a scheduler for each core). -% -% plists is able to recover if a node goes down. -% If all nodes go down, exit(allnodescrashed) is evaluated. -% -% Any of the above may be used as a malt, or may be combined into a list. -% {nodes, NodeList} and {processes, X} may not be combined. -% -% === Examples === -% % start a process for each element (1-element sublists)
-% 1 -% -% % start a process for each ten elements (10-element sublists)
-% 10 -% -% % split the list into two sublists and process in two processes
-% {processes, 2} -% -% % split the list into X sublists and process in X processes,
-% % where X is the number of cores in the machine
-% {processes, schedulers} -% -% % split the list into 10-element sublists and process in two processes
-% [10, {processes, 2}] -% -% % timeout after one second. Assumes that a process should be started
-% % for each element.
-% {timeout, 1000} -% -% % Runs 3 processes at a time on apple@desktop, -% and 2 on orange@laptop
-% % This is the best way to utilize all the CPU-power of a dual-core
-% % desktop and a single-core laptop. Assumes that the list should be
-% % split into 1-element sublists.
-% {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]} -% -% Like above, but makes plists figure out how many processes to use. -% {nodes, [{apple@desktop, schedulers}, {orange@laptop, schedulers}]} -% -% % Gives apple and orange three seconds to process the list as
-% % 100-element sublists.
-% [100, {timeout, 3000}, {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}] -% -% === Aside: Why Malt? === -% I needed a word for this concept, so maybe my subconsciousness gave me one by -% making me misspell multiply. Maybe it is an acronym for Malt is A List -% Tearing Specification. Maybe it is a beer metaphor, suggesting that code -% only runs in parallel if bribed with spirits. It's jargon, learn it -% or you can't be part of the in-group. -% -% == Messages and Errors == -% plists assures that no extraneous messages are left in or will later -% enter the message queue. This is guaranteed even in the event of an error. -% -% Errors in spawned processes are caught and propagated to the calling -% process. If you invoke -% -% plists:map(fun (X) -> 1/X end, [1, 2, 3, 0]). -% -% you get a badarith error, exactly like when you use lists:map. -% -% plists uses monitors to watch the processes it spawns. It is not a good idea -% to invoke plists when you are already monitoring processes. If one of them -% does a non-normal exit, plists receives the 'DOWN' message believing it to be -% from one of its own processes. The error propagation system goes into -% effect, which results in the error occuring in the calling process. -% -% == License == -% The MIT License -% -% Copyright (c) 2007 Stephen Marsh -% -% Permission is hereby granted, free of charge, to any person obtaining a copy -% of this software and associated documentation files (the "Software"), to deal -% in the Software without restriction, including without limitation the rights -% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -% copies of the Software, and to permit persons to whom the Software is -% furnished to do so, subject to the following conditions: -% -% The above copyright notice and this permission notice shall be included in -% all copies or substantial portions of the Software. -% -% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -% THE SOFTWARE. - - --module(plists). --export([all/2, all/3, any/2, any/3, filter/2, filter/3, -fold/3, fold/4, fold/5, foreach/2, foreach/3, map/2, map/3, -partition/2, partition/3, sort/1, sort/2, sort/3, -usort/1, usort/2, usort/3, mapreduce/2, mapreduce/3, mapreduce/5, -runmany/3, runmany/4]). - -% Everything here is defined in terms of runmany. -% The following methods are convient interfaces to runmany. - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List) -> bool() -all(Fun, List) -> - all(Fun, List, 1). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List, Malt) -> bool() -all(Fun, List, Malt) -> - try runmany(fun (L) -> - B = lists:all(Fun, L), - if B -> - nil; - true -> - exit(notall) - end - end, - fun (_A1, _A2) -> - nil - end, - List, Malt) of - _ -> - true - catch exit:notall -> - false - end. - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List) -> bool() -any(Fun, List) -> - any(Fun, List, 1). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List, Malt) -> bool() -any(Fun, List, Malt) -> - try runmany(fun (L) -> - B = lists:any(Fun, L), - if B -> - exit(any); - true -> - nil - end - end, - fun (_A1, _A2) -> - nil - end, - List, Malt) of - _ -> - false - catch exit:any -> - true - end. - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List) -> list() -filter(Fun, List) -> - filter(Fun, List, 1). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List, Malt) -> list() -filter(Fun, List, Malt) -> - runmany(fun (L) -> - lists:filter(Fun, L) - end, - {reverse, fun (A1, A2) -> - A1 ++ A2 - end}, - List, Malt). - -% Note that with parallel fold there is not foldl and foldr, -% instead just one fold that can fuse Accumlators. - -% @doc Like below, but assumes 1 as the Malt. This function is almost useless, -% and is intended only to aid converting code from using lists to plists. -% @spec (Fun, InitAcc, List) -> term() -fold(Fun, InitAcc, List) -> - fold(Fun, Fun, InitAcc, List, 1). - -% @doc Like below, but uses the Fun as the Fuse by default. -% @spec (Fun, InitAcc, List, Malt) -> term() -fold(Fun, InitAcc, List, Malt) -> - fold(Fun, Fun, InitAcc, List, Malt). - -% @doc fold is more complex when made parallel. There is no foldl and foldr, -% accumulators aren't passed in any defined order. -% The list is split into sublists which are folded together. Fun is -% identical to the function passed to lists:fold[lr], it takes -% (an element, and the accumulator) and returns -> a new accumulator. -% It is used for the initial stage of folding sublists. Fuse fuses together -% the results, it takes (Results1, Result2) and returns -> a new result. -% By default sublists are fused left to right, each result of a fuse being -% fed into the first element of the next fuse. The result of the last fuse -% is the result. -% -% Fusing may also run in parallel using a recursive algorithm, -% by specifying the fuse as {recursive, Fuse}. See -% the discussion in {@link runmany/4}. -% -% Malt is the malt for the initial folding of sublists, and for the -% possible recursive fuse. -% @spec (Fun, Fuse, InitAcc, List, Malt) -> term() -fold(Fun, Fuse, InitAcc, List, Malt) -> - Fun2 = fun (L) -> lists:foldl(Fun, InitAcc, L) end, - runmany(Fun2, Fuse, List, Malt). - -% @doc Similiar to foreach in module -% lists -% except it makes no guarantee about the order it processes list elements. -% @spec (Fun, List) -> void() -foreach(Fun, List) -> - foreach(Fun, List, 1). - -% @doc Similiar to foreach in module -% lists -% except it makes no guarantee about the order it processes list elements. -% @spec (Fun, List, Malt) -> void() -foreach(Fun, List, Malt) -> - runmany(fun (L) -> - lists:foreach(Fun, L) - end, - fun (_A1, _A2) -> - ok - end, - List, Malt). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List) -> list() -map(Fun, List) -> - map(Fun, List, 1). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List, Malt) -> list() -map(Fun, List, Malt) -> - runmany(fun (L) -> - lists:map(Fun, L) - end, - {reverse, fun (A1, A2) -> - A1 ++ A2 - end}, - List, Malt). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List) -> {list(), list()} -partition(Fun, List) -> - partition(Fun, List, 1). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List, Malt) -> {list(), list()} -partition(Fun, List, Malt) -> - runmany(fun (L) -> - lists:partition(Fun, L) - end, - {reverse, fun ({True1, False1}, {True2, False2}) -> - {True1 ++ True2, False1 ++ False2} - end}, - List, Malt). - -% SORTMALT needs to be tuned --define(SORTMALT, 100). - -% @doc Same semantics as in module -% lists. -% @spec (List) -> list() -sort(List) -> - sort(fun (A, B) -> - A =< B - end, - List). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List) -> list() -sort(Fun, List) -> - sort(Fun, List, ?SORTMALT). - -% @doc This version lets you specify your own malt for sort. -% -% sort splits the list into sublists and sorts them, and it merges the -% sorted lists together. These are done in parallel. Each sublist is -% sorted in a seperate process, and each merging of results is done in a -% seperate process. Malt defaults to 100, causing the list to be split into -% 100-element sublists. -% @spec (Fun, List, Malt) -> list() -sort(Fun, List, Malt) -> - Fun2 = fun (L) -> - lists:sort(Fun, L) - end, - Fuse = fun (A1, A2) -> - lists:merge(Fun, A1, A2) - end, - runmany(Fun2, {recursive, Fuse}, List, Malt). - -% @doc Same semantics as in module -% lists. -% @spec (List) -> list() -usort(List) -> - usort(fun (A, B) -> - A =< B - end, - List). - -% @doc Same semantics as in module -% lists. -% @spec (Fun, List) -> list() -usort(Fun, List) -> - usort(Fun, List, ?SORTMALT). - -% @doc This version lets you specify your own malt for usort. -% -% usort splits the list into sublists and sorts them, and it merges the -% sorted lists together. These are done in parallel. Each sublist is -% sorted in a seperate process, and each merging of results is done in a -% seperate process. Malt defaults to 100, causing the list to be split into -% 100-element sublists. -% -% usort removes duplicate elments while it sorts. -% @spec (Fun, List, Malt) -> list() -usort(Fun, List, Malt) -> - Fun2 = fun (L) -> - lists:usort(Fun, L) - end, - Fuse = fun (A1, A2) -> - lists:umerge(Fun, A1, A2) - end, - runmany(Fun2, {recursive, Fuse}, List, Malt). - -% @doc Like below, assumes default MapMalt of 1. -% @spec (MapFunc, List) -> Dict -% MapFunc = (term()) -> DeepListOfKeyValuePairs -% DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value} -mapreduce(MapFunc, List) -> - mapreduce(MapFunc, List, 1). - -% Like below, but uses a default reducer that collects all -% {Key, Value} pairs into a -% dict, -% with values {Key, [Value1, Value2...]}. -% This dict is returned as the result. -mapreduce(MapFunc, List, MapMalt) -> - mapreduce(MapFunc, List, dict:new(), fun add_key/3, MapMalt). - -% @doc This is a very basic mapreduce. You won't write a Google-rivaling -% search engine with it. It has no equivalent in lists. Each -% element in the list is run through the MapFunc, which produces either -% a {Key, Value} pair, or a lists of key value pairs, or a list of lists of -% key value pairs...etc. A reducer process runs in parallel with the mapping -% processes, collecting the key value pairs. It starts with a state given by -% InitState, and for each {Key, Value} pair that it receives it invokes -% ReduceFunc(OldState, Key, Value) to compute its new state. mapreduce returns -% the reducer's final state. -% -% MapMalt is the malt for the mapping operation, with a default value of 1, -% meaning each element of the list is mapped by a seperate process. -% -% mapreduce requires OTP R11B, or it may leave monitoring messages in the -% message queue. -% @spec (MapFunc, List, InitState, ReduceFunc, MapMalt) -> Dict -% MapFunc = (term()) -> DeepListOfKeyValuePairs -% DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key, Value} -% ReduceFunc = (OldState::term(), Key::term(), Value::term() -> NewState::term() -mapreduce(MapFunc, List, InitState, ReduceFunc, MapMalt) -> - Parent = self(), - {Reducer, ReducerRef} = - erlang:spawn_monitor(fun () -> - reducer(Parent, 0, InitState, ReduceFunc) - end), - MapFunc2 = fun (L) -> - Reducer ! lists:map(MapFunc, L), - 1 - end, - SentMessages = try runmany(MapFunc2, fun (A, B) -> A+B end, List, MapMalt) - catch - exit:Reason -> - erlang:demonitor(ReducerRef, [flush]), - Reducer ! die, - exit(Reason) - end, - Reducer ! {mappers, done, SentMessages}, - Results = receive - {Reducer, Results2} -> - Results2; - {'DOWN', _, _, Reducer, Reason2} -> - exit(Reason2) - end, - receive - {'DOWN', _, _, Reducer, normal} -> - nil - end, - Results. - -reducer(Parent, NumReceived, State, Func) -> - receive - die -> - nil; - {mappers, done, NumReceived} -> - Parent ! {self (), State}; - Keys -> - reducer(Parent, NumReceived + 1, each_key(State, Func, Keys), Func) - end. - -each_key(State, Func, {Key, Value}) -> - Func(State, Key, Value); -each_key(State, Func, [List|Keys]) -> - each_key(each_key(State, Func, List), Func, Keys); -each_key(State, _, []) -> - State. - -add_key(Dict, Key, Value) -> - case dict:is_key(Key, Dict) of - true -> - dict:append(Key, Value, Dict); - false -> - dict:store(Key, [Value], Dict) - end. - -% @doc Like below, but assumes a Malt of 1, -% meaning each element of the list is processed by a seperate process. -% @spec (Fun, Fuse, List) -> term() -runmany(Fun, Fuse, List) -> - runmany(Fun, Fuse, List, 1). - -% Begin internal stuff (though runmany/4 is exported). - -% @doc All of the other functions are implemented with runmany. runmany -% takes a List, splits it into sublists, and starts processes to operate on -% each sublist, all done according to Malt. Each process passes its sublist -% into Fun and sends the result back. -% -% The results are then fused together to get the final result. There are two -% ways this can operate, lineraly and recursively. If Fuse is a function, -% a fuse is done linearly left-to-right on the sublists, the results -% of processing the first and second sublists being passed to Fuse, then -% the result of the first fuse and processing the third sublits, and so on. If -% Fuse is {reverse, FuseFunc}, then a fuse is done right-to-left, the results -% of processing the second-to-last and last sublists being passed to FuseFunc, -% then the results of processing the third-to-last sublist and -% the results of the first fuse, and and so forth. -% Both methods preserve the original order of the lists elements. -% -% To do a recursive fuse, pass Fuse as {recursive, FuseFunc}. -% The recursive fuse makes no guarantee about the order the results of -% sublists, or the results of fuses are passed to FuseFunc. It -% continues fusing pairs of results until it is down to one. -% -% Recursive fuse is down in parallel with processing the sublists, and a -% process is spawned to fuse each pair of results. It is a parallized -% algorithm. Linear fuse is done after all results of processing sublists -% have been collected, and can only run in a single process. -% -% Even if you pass {recursive, FuseFunc}, a recursive fuse is only done if -% the malt contains {nodes, NodeList} or {processes, X}. If this is not the -% case, a linear fuse is done. -% @spec (Fun, Fuse, List, Malt) -> term() -% Fun = (list()) -> term() -% Fuse = FuseFunc | {recursive, FuseFunc} -% FuseFunc = (term(), term()) -> term() -runmany(Fun, Fuse, List, Malt) when is_list(Malt) -> - runmany(Fun, Fuse, List, local, no_split, Malt); -runmany(Fun, Fuse, List, Malt) -> - runmany(Fun, Fuse, List, [Malt]). - -runmany(Fun, Fuse, List, Nodes, no_split, [MaltTerm|Malt]) when is_integer(MaltTerm) -> - runmany(Fun, Fuse, List, Nodes, MaltTerm, Malt); -% run a process for each scheduler -runmany(Fun, Fuse, List, local, Split, [{processes, schedulers}|Malt]) -> - S = erlang:system_info(schedulers), - runmany(Fun, Fuse, List, local, Split, [{processes, S}|Malt]); -% Split the list into X sublists, where X is the number of processes -runmany(Fun, Fuse, List, local, no_split, [{processes, X}|_]=Malt) -> - L = length(List), - case L rem X of - 0 -> - runmany(Fun, Fuse, List, local, L div X, Malt); - _ -> - runmany(Fun, Fuse, List, local, L div X + 1, Malt) - end; -% run X process on local machine -runmany(Fun, Fuse, List, local, Split, [{processes, X}|Malt]) -> - Nodes = lists:duplicate(X, node()), - runmany(Fun, Fuse, List, Nodes, Split, Malt); -runmany(Fun, Fuse, List, Nodes, Split, [{timeout, X}|Malt]) -> - Parent = self(), - Timer = spawn(fun () -> - receive - stoptimer -> - Parent ! {timerstopped, self()} - after X -> - Parent ! {timerrang, self()}, - receive - stoptimer -> - Parent ! {timerstopped, self()} - end - end - end), - Ans = try runmany(Fun, Fuse, List, Nodes, Split, Malt) - catch - % we really just want the after block, the syntax - % makes this catch necessary. - willneverhappen -> - nil - after - Timer ! stoptimer, - cleanup_timer(Timer) - end, - Ans; -runmany(Fun, Fuse, List, local, Split, [{nodes, NodeList}|Malt]) -> - Nodes = lists:foldl(fun ({Node, schedulers}, A) -> - X = schedulers_on_node(Node) + 1, - lists:reverse(lists:duplicate(X, Node), A); - ({Node, X}, A) -> - lists:reverse(lists:duplicate(X, Node), A); - (Node, A) -> - [Node|A] - end, - [], NodeList), - runmany(Fun, Fuse, List, Nodes, Split, Malt); -% local recursive fuse, for when we weren't invoked with {processes, X} -% or {nodes, NodeList}. Degenerates recursive fuse into linear fuse. -runmany(Fun, {recursive, Fuse}, List, local, Split, []) -> - runmany(Fun, Fuse, List, local, Split, []); -% by default, operate on each element seperately -runmany(Fun, Fuse, List, Nodes, no_split, []) -> - runmany(Fun, Fuse, List, Nodes, 1, []); -runmany(Fun, Fuse, List, local, Split, []) -> - List2 = splitmany(List, Split), - local_runmany(Fun, Fuse, List2); -runmany(Fun, Fuse, List, Nodes, Split, []) -> - List2 = splitmany(List, Split), - cluster_runmany(Fun, Fuse, List2, Nodes). - -cleanup_timer(Timer) -> - receive - {timerrang, Timer} -> - cleanup_timer(Timer); - {timerstopped, Timer} -> - nil - end. - -schedulers_on_node(Node) -> - case get(plists_schedulers_on_nodes) of - undefined -> - X = determine_schedulers(Node), - put(plists_schedulers_on_nodes, - dict:store(Node, X, dict:new())), - X; - Dict -> - case dict:is_key(Node, Dict) of - true -> - dict:fetch(Node, Dict); - false -> - X = determine_schedulers(Node), - put(plists_schedulers_on_nodes, - dict:store(Node, X, Dict)), - X - end - end. - -determine_schedulers(Node) -> - Parent = self(), - Child = spawn(Node, fun () -> - Parent ! {self(), erlang:system_info(schedulers)} - end), - erlang:monitor(process, Child), - receive - {Child, X} -> - receive - {'DOWN', _, _, Child, _Reason} -> - nil - end, - X; - {'DOWN', _, _, Child, Reason} when Reason =/= normal -> - 0 - end. - -% local runmany, for when we weren't invoked with {processes, X} -% or {nodes, NodeList}. Every sublist is processed in parallel. -local_runmany(Fun, Fuse, List) -> - Parent = self (), - Pids = lists:map(fun (L) -> - F = fun () -> - Parent ! - {self (), Fun(L)} - end, - {Pid, _} = erlang:spawn_monitor(F), - Pid - end, - List), - Answers = try lists:map(fun receivefrom/1, Pids) - catch throw:Message -> - {BadPid, Reason} = Message, - handle_error(BadPid, Reason, Pids) - end, - lists:foreach(fun (Pid) -> - normal_cleanup(Pid) - end, Pids), - fuse(Fuse, Answers). - -receivefrom(Pid) -> - receive - {Pid, R} -> - R; - {'DOWN', _, _, BadPid, Reason} when Reason =/= normal -> - throw({BadPid, Reason}); - {timerrang, _} -> - throw({nil, timeout}) - end. - -% Convert List into [{Number, Sublist}] -cluster_runmany(Fun, Fuse, List, Nodes) -> - {List2, _} = lists:foldl(fun (X, {L, Count}) -> - {[{Count, X}|L], Count+1} - end, - {[], 0}, List), - cluster_runmany(Fun, Fuse, List2, Nodes, [], []). - -% Add a pair of results into the TaskList as a fusing task -cluster_runmany(Fun, {recursive, Fuse}, [], Nodes, Running, - [{_, R1}, {_, R2}|Results]) -> - cluster_runmany(Fun, {recursive, Fuse}, [{fuse, R1, R2}], Nodes, - Running, Results); -% recursive fuse done, return result -cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], [{_, Result}]) -> - Result; -% edge case where we are asked to do nothing -cluster_runmany(_, {recursive, _Fuse}, [], _Nodes, [], []) -> - []; -% We're done, now we just have to [linear] fuse the results -cluster_runmany(_, Fuse, [], _Nodes, [], Results) -> - fuse(Fuse, lists:map(fun ({_, R}) -> R end, - lists:sort(fun ({A, _}, {B, _}) -> - A =< B - end, - lists:reverse(Results)))); -% We have a ready node and a sublist or fuse to be processed, so we start -% a new process -cluster_runmany(Fun, Fuse, [Task|TaskList], [N|Nodes], Running, Results) -> - Parent = self(), - case Task of - {Num, L2} -> - Fun2 = fun () -> - Parent ! {self(), Num, Fun(L2)} - end; - {fuse, R1, R2} -> - {recursive, FuseFunc} = Fuse, - Fun2 = fun () -> - Parent ! {self(), fuse, FuseFunc(R1, R2)} - end - end, - Fun3 = fun () -> - try Fun2() - catch - exit:siblingdied -> - ok; - exit:Reason -> - Parent ! {self(), error, Reason}; - error:R -> - Parent ! {self(), error, {R, erlang:get_stacktrace()}}; - throw:R -> - Parent ! {self(), error, {{nocatch, R}, erlang:get_stacktrace()}} - end - end, - Pid = spawn(N, Fun3), - erlang:monitor(process, Pid), - cluster_runmany(Fun, Fuse, TaskList, Nodes, [{Pid, N, Task}|Running], Results); -% We can't start a new process, but can watch over already running ones -cluster_runmany(Fun, Fuse, TaskList, Nodes, Running, Results) when length(Running) > 0 -> - receive - {_Pid, error, Reason} -> - RunningPids = lists:map(fun ({Pid, _, _}) -> - Pid - end, - Running), - handle_error(junkvalue, Reason, RunningPids); - {Pid, Num, Result} -> - % throw out the exit message, Reason should be - % normal, noproc, or noconnection - receive {'DOWN', _, _, Pid, _Reason} -> - nil - end, - {Running2, FinishedNode, _} = delete_running(Pid, Running, []), - cluster_runmany(Fun, Fuse, TaskList, - [FinishedNode|Nodes], Running2, [{Num, Result}|Results]); - {timerrang, _} -> - RunningPids = lists:map(fun ({Pid, _, _}) -> - Pid - end, - Running), - handle_error(nil, timeout, RunningPids); - % node failure - {'DOWN', _, _, Pid, noconnection} -> - {Running2, _DeadNode, Task} = delete_running(Pid, Running, []), - cluster_runmany(Fun, Fuse, [Task|TaskList], Nodes, - Running2, Results); - % could a noproc exit message come before the message from - % the process? we are assuming it can't. - % this clause is unlikely to get invoked due to cluster_runmany's - % spawned processes. It will still catch errors in mapreduce's - % reduce process, however. - {'DOWN', _, _, BadPid, Reason} when Reason =/= normal -> - RunningPids = lists:map(fun ({Pid, _, _}) -> - Pid - end, - Running), - handle_error(BadPid, Reason, RunningPids) - end; -% We have data, but no nodes either available or occupied -cluster_runmany(_, _, [_Non|_Empty], []=_Nodes, []=_Running, _) -> - exit(allnodescrashed). - -delete_running(Pid, [{Pid, Node, List}|Running], Acc) -> - {Running ++ Acc, Node, List}; -delete_running(Pid, [R|Running], Acc) -> - delete_running(Pid, Running, [R|Acc]). - -handle_error(BadPid, Reason, Pids) -> - lists:foreach(fun (Pid) -> - exit(Pid, siblingdied) - end, Pids), - lists:foreach(fun (Pid) -> - error_cleanup(Pid, BadPid) - end, Pids), - exit(Reason). - -error_cleanup(BadPid, BadPid) -> - ok; -error_cleanup(Pid, BadPid) -> - receive - {Pid, _} -> - error_cleanup(Pid, BadPid); - {Pid, _, _} -> - error_cleanup(Pid, BadPid); - {'DOWN', _, _, Pid, _Reason} -> - ok - end. - -normal_cleanup(Pid) -> - receive - {'DOWN', _, _, Pid, _Reason} -> - ok - end. - -% edge case -fuse(_, []) -> - []; -fuse({reverse, _}=Fuse, Results) -> - [RL|ResultsR] = lists:reverse(Results), - fuse(Fuse, ResultsR, RL); -fuse(Fuse, [R1|Results]) -> - fuse(Fuse, Results, R1). - -fuse({reverse, FuseFunc}=Fuse, [R2|Results], R1) -> - fuse(Fuse, Results, FuseFunc(R2, R1)); -fuse(Fuse, [R2|Results], R1) -> - fuse(Fuse, Results, Fuse(R1, R2)); -fuse(_, [], R) -> - R. - -% Splits a list into a list of sublists, each of size Size, -% except for the last element which is less if the original list -% could not be evenly divided into Size-sized lists. -splitmany(List, Size) -> - splitmany(List, [], Size). - -splitmany([], Acc, _) -> - lists:reverse(Acc); -splitmany(List, Acc, Size) -> - {Top, NList} = split(Size, List), - splitmany(NList, [Top|Acc], Size). - -% Like lists:split, except it splits a list smaller than its first -% parameter -split(Size, List) -> - split(Size, List, []). - -split(0, List, Acc) -> - {lists:reverse(Acc), List}; -split(Size, [H|List], Acc) -> - split(Size - 1, List, [H|Acc]); -split(_, [], Acc) -> - {lists:reverse(Acc), []}.