Python:如何并行运行Python函数?

2024-04-24 10:27:23 发布

您现在位置:Python中文网/ 问答频道 /正文

我先研究了一下,找不到问题的答案。我试图在Python中并行运行多个函数。

我有这样的东西:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

我想调用func1和func2并让它们同时运行。函数之间或在同一对象上不交互。现在我必须等待func1完成,然后才开始func2。我该如何做以下事情:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

我希望能够同时创建两个目录,因为每分钟我都要计算创建了多少个文件。如果目录不在那里,我就没有时间了。


Tags: 函数pyimportfordeffilescommonfilename
3条回答

这可以通过Ray来实现,该系统允许您轻松地并行化和分发Python代码。

要并行化示例,需要使用@ray.remote装饰器定义函数,然后使用.remote调用它们。

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

如果将同一个参数传递给两个函数,并且参数很大,则使用ray.put()是一种更有效的方法。这样可以避免将大参数序列化两次并创建其两个内存副本:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

如果func1()func2()返回结果,则需要按如下方式重写代码:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

multiprocessing模块相比,使用Ray有许多优点。尤其是,同一代码将在一台机器上以及在一组机器上运行。有关Ray的更多优点,请参见this related post

您可以使用^{}^{}

由于peculiarities of CPythonthreading不太可能实现真正的并行。因此,multiprocessing通常是更好的选择。

下面是一个完整的示例:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

启动/连接子进程的机制可以很容易地按照runBothFunc的思路封装到函数中:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

如果您的函数主要执行I/O工作(以及更少的CPU工作),并且您有Python3.2+,则可以使用ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

如果您的函数主要执行CPU工作(而I/O工作较少),并且您有Python2.6+,则可以使用multiprocessing模块:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])

相关问题 更多 >