Two Functions In Parallel With Multiple Arguments And Return Values
I've got two separate functions. Each of them takes quite a long time to execute. def function1(arg): do_some_stuff_here return result1 def function2(arg1, arg2, arg3):
Solution 1:
First of all, Process, Pool and Queue all have different use case.
Process is used to spawn a process by creating the Process object.
from multiprocessing import Process
def method1():
print "in method1"
print "in method1"
def method2():
print "in method2"
print "in method2"
p1 = Process(target=method1) # create a process object p1
p1.start() # starts the process p1
p2 = Process(target=method2)
p2.start()
Pool is used to parallelize execution of function across multiple input values.
from multiprocessing import Pool
def method1(x):
print x
print x**2
return x**2
p = Pool(3)
result = p.map(method1, [1,4,9])
print result # prints [1, 16, 81]
Queue is used to communicate between processes.
from multiprocessing import Process, Queue
def method1(x, l1):
print "in method1"
print "in method1"
l1.put(x**2)
return x
def method2(x, l2):
print "in method2"
print "in method2"
l2.put(x**3)
return x
l1 = Queue()
p1 = Process(target=method1, args=(4, l1, ))
l2 = Queue()
p2 = Process(target=method2, args=(2, l2, ))
p1.start()
p2.start()
print l1.get() # prints 16
print l2.get() # prints 8
Now, for your case you can use Process & Queue(3rd method) or you can manipulate the pool method to work (below)
import itertools
from multiprocessing import Pool
import sys
def method1(x):
print x
print x**2
return x**2
def method2(x):
print x
print x**3
return x**3
def unzip_func(a, b):
return a, b
def distributor(option_args):
option, args = unzip_func(*option_args) # unzip option and args
attr_name = "method" + str(option)
# creating attr_name depending on option argument
value = getattr(sys.modules[__name__], attr_name)(args)
# call the function with name 'attr_name' with argument args
return value
option_list = [1,2] # for selecting the method number
args_list = [4,2]
# list of arg for the corresponding method, (argument 4 is for method1)
p = Pool(3) # creating pool of 3 processes
result = p.map(distributor, itertools.izip(option_list, args_list))
# calling the distributor function with args zipped as (option1, arg1), (option2, arg2) by itertools package
print result # prints [16,8]
Hope this helps.
Solution 2:
Here is an example of:
- Run a single function with multiple inputs in parallel using a Pool (square function) Interesting Side Note the mangled op on lines for "5 981 25"
- Run multiple functions with different inputs (Both args and kwargs) and collect their results using a Pool (pf1, pf2, pf3 functions)
import datetime
import multiprocessing
import time
import random
from multiprocessing import Pool
def square(x):
# calculate the square of the value of x
print(x, x*x)
return x*x
def pf1(*args, **kwargs):
sleep_time = random.randint(3, 6)
print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf1", args, sleep_time, datetime.datetime.now()))
print("Keyword Args from pf1: %s" % kwargs)
time.sleep(sleep_time)
print(multiprocessing.current_process().name, "\tpf1 done at %s\n" % datetime.datetime.now())
return (sum(*args), kwargs)
def pf2(*args):
sleep_time = random.randint(7, 10)
print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf2", args, sleep_time, datetime.datetime.now()))
time.sleep(sleep_time)
print(multiprocessing.current_process().name, "\tpf2 done at %s\n" % datetime.datetime.now())
return sum(*args)
def pf3(*args):
sleep_time = random.randint(0, 3)
print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf3", args, sleep_time, datetime.datetime.now()))
time.sleep(sleep_time)
print(multiprocessing.current_process().name, "\tpf3 done at %s\n" % datetime.datetime.now())
return sum(*args)
def smap(f, *arg):
if len(arg) == 2:
args, kwargs = arg
return f(list(args), **kwargs)
elif len(arg) == 1:
args = arg
return f(*args)
if __name__ == '__main__':
# Define the dataset
dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
# Output the dataset
print ('Dataset: ' + str(dataset))
# Run this with a pool of 5 agents having a chunksize of 3 until finished
agents = 5
chunksize = 3
with Pool(processes=agents) as pool:
result = pool.map(square, dataset)
print("Result of Squares : %s\n\n" % result)
with Pool(processes=3) as pool:
result = pool.starmap(smap, [(pf1, [1,2,3], {'a':123, 'b':456}), (pf2, [11,22,33]), (pf3, [111,222,333])])
# Output the result
print ('Result: %s ' % result)
Output:
*******
Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
1 1
2 4
3 9
4 16
6 36
7 49
8 64
59 81
25
10 100
11 121
12 144
13 169
14 196
Result of Squares : [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]
Process : ForkPoolWorker-6 Function : pf1 Args: ([1, 2, 3],) sleeping for 3 Time : 2020-07-20 00:51:56.477299
Keyword Args from pf1: {'a': 123, 'b': 456}
Process : ForkPoolWorker-7 Function : pf2 Args: ([11, 22, 33],) sleeping for 8 Time : 2020-07-20 00:51:56.477371
Process : ForkPoolWorker-8 Function : pf3 Args: ([111, 222, 333],) sleeping for 1 Time : 2020-07-20 00:51:56.477918
ForkPoolWorker-8 pf3 done at 2020-07-20 00:51:57.478808
ForkPoolWorker-6 pf1 done at 2020-07-20 00:51:59.478877
ForkPoolWorker-7 pf2 done at 2020-07-20 00:52:04.478016
Result: [(6, {'a': 123, 'b': 456}), 66, 666]
Process finished with exit code 0
Solution 3:
This is another example I just found, hope it helps, nice and easy ;)
from multiprocessing import Pool
def square(x):
return x * x
def cube(y):
return y * y * y
pool = Pool(processes=20)
result_squares = pool.map_async(square, range(10))
result_cubes = pool.map_async(cube, range(10))
print result_squares.get(timeout=3)
print result_cubes.get(timeout=3)
Post a Comment for "Two Functions In Parallel With Multiple Arguments And Return Values"