如何并行运行函数?

192 投票
9 回答
397390 浏览
提问于 2025-04-17 00:24

我想在Python中同时运行多个函数。

我现在的代码大概是这样的:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

我想同时调用func1和func2,让它们一起运行。这两个函数之间没有任何互动,也不操作同一个对象。目前,我必须等func1完成后才能开始func2。我该怎么做才能像下面这样:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

我希望能在差不多同一时间创建两个目录,因为每分钟我都在统计有多少文件被创建。如果目录不存在,就会影响我的计时。

9 个回答

32

这可以通过Ray来优雅地实现,Ray是一个可以让你轻松地将Python代码进行并行处理和分布式运行的系统。

要让你的例子实现并行处理,你需要用@ray.remote这个装饰器来定义你的函数,然后用.remote来调用它们。

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

如果你把同样的参数传给两个函数,而且这个参数很大,使用ray.put()会更有效率。这样可以避免这个大参数被序列化两次,也不会创建两个内存副本:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

重要 - 如果func1()func2()有返回结果,你需要按照以下方式重写代码:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

使用Ray相比于multiprocessing模块有很多好处。特别是,同一段代码可以在单台机器上运行,也可以在多台机器的集群上运行。想了解更多Ray的优点,可以查看这篇相关帖子

55

如果你的函数主要是在做输入输出工作(也就是处理数据的读写),而不是太多的计算工作,并且你使用的是Python 3.2或更高版本,你可以使用ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

如果你的函数主要是在做计算工作(也就是需要大量处理数据的工作),而不是太多的输入输出工作,并且你使用的是Python 3.2或更高版本,你可以使用ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor

def run_cpu_tasks_in_parallel(tasks):
    with ProcessPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

另外,如果你只使用Python 2.6或更高版本,你可以直接使用multiprocessing模块:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])
270

你可以使用 threading 或者 multiprocessing 这两个模块。

由于 CPython的一些特殊情况,使用 threading 可能无法实现真正的并行处理。因此,通常来说,使用 multiprocessing 更加靠谱。

下面是一个完整的例子:

from multiprocessing import Process


def func1():
    print("func1: starting")
    for i in range(10000000):
        pass

    print("func1: finishing")


def func2():
    print("func2: starting")
    for i in range(10000000):
        pass

    print("func2: finishing")


if __name__ == "__main__":
    p1 = Process(target=func1)
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p1.join()
    p2.join()

启动和加入子进程的过程可以很容易地封装成一个函数,就像你的 runBothFunc 一样:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

撰写回答