如何并行运行函数?
我想在Python中同时运行多个函数。
我现在的代码大概是这样的:
files.py
import common #common is a util class that handles all the IO stuff
dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)
def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)
我想同时调用func1和func2,让它们一起运行。这两个函数之间没有任何互动,也不操作同一个对象。目前,我必须等func1完成后才能开始func2。我该怎么做才能像下面这样:
process.py
from files import func1, func2
runBothFunc(func1(), func2())
我希望能在差不多同一时间创建两个目录,因为每分钟我都在统计有多少文件被创建。如果目录不存在,就会影响我的计时。
9 个回答
这可以通过Ray来优雅地实现,Ray是一个可以让你轻松地将Python代码进行并行处理和分布式运行的系统。
要让你的例子实现并行处理,你需要用@ray.remote
这个装饰器来定义你的函数,然后用.remote
来调用它们。
import ray
ray.init()
dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
# Define the functions.
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
# func1() code here...
@ray.remote
def func2(filename, addFiles, dir):
# func2() code here...
# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)])
如果你把同样的参数传给两个函数,而且这个参数很大,使用ray.put()
会更有效率。这样可以避免这个大参数被序列化两次,也不会创建两个内存副本:
largeData_id = ray.put(largeData)
ray.get([func1(largeData_id), func2(largeData_id)])
重要 - 如果func1()
和func2()
有返回结果,你需要按照以下方式重写代码:
ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])
使用Ray相比于multiprocessing模块有很多好处。特别是,同一段代码可以在单台机器上运行,也可以在多台机器的集群上运行。想了解更多Ray的优点,可以查看这篇相关帖子。
如果你的函数主要是在做输入输出工作(也就是处理数据的读写),而不是太多的计算工作,并且你使用的是Python 3.2或更高版本,你可以使用ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
run_io_tasks_in_parallel([
lambda: print('IO task 1 running!'),
lambda: print('IO task 2 running!'),
])
如果你的函数主要是在做计算工作(也就是需要大量处理数据的工作),而不是太多的输入输出工作,并且你使用的是Python 3.2或更高版本,你可以使用ProcessPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
def run_cpu_tasks_in_parallel(tasks):
with ProcessPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
def task_1():
print('CPU task 1 running!')
def task_2():
print('CPU task 2 running!')
if __name__ == '__main__':
run_cpu_tasks_in_parallel([
task_1,
task_2,
])
另外,如果你只使用Python 2.6或更高版本,你可以直接使用multiprocessing模块:
from multiprocessing import Process
def run_cpu_tasks_in_parallel(tasks):
running_tasks = [Process(target=task) for task in tasks]
for running_task in running_tasks:
running_task.start()
for running_task in running_tasks:
running_task.join()
def task_1():
print('CPU task 1 running!')
def task_2():
print('CPU task 2 running!')
if __name__ == '__main__':
run_cpu_tasks_in_parallel([
task_1,
task_2,
])
你可以使用 threading
或者 multiprocessing
这两个模块。
由于 CPython的一些特殊情况,使用 threading
可能无法实现真正的并行处理。因此,通常来说,使用 multiprocessing
更加靠谱。
下面是一个完整的例子:
from multiprocessing import Process
def func1():
print("func1: starting")
for i in range(10000000):
pass
print("func1: finishing")
def func2():
print("func2: starting")
for i in range(10000000):
pass
print("func2: finishing")
if __name__ == "__main__":
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
启动和加入子进程的过程可以很容易地封装成一个函数,就像你的 runBothFunc
一样:
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
runInParallel(func1, func2)