Python中使用'multiprocessing'模块的最佳实践
我正在尝试使用 multiprocessing
模块来进行 python
编程。下面这段示例代码在 ipython notebook 中运行时没有任何错误。但是我发现每次执行代码块时,后台会产生额外的 python 进程。
import multiprocessing as mp
def f(x):
print "Hello World ", mp.current_process()
return 1
pool = mp.Pool(3)
data = range(0,10)
pool.map(f, data)
而当我把相同的代码保存为普通的 .py 文件并执行时,就会遇到错误,甚至需要强制关闭终端才能停止程序的运行。
我通过在代码中添加 if __name__ == '__main__':
来解决这个问题,并在这个条件下创建进程池,同时使用 pool.close()
来关闭进程池。
我很好奇,在使用 multiprocessing
以及相关的函数,比如 map
、apply
、apply_async
等时,应该遵循哪些最佳实践?我打算用这个模块来并行读取文件,并希望能应用于一些机器学习算法,以加快处理速度。
4 个回答
官方的Python文档里有很多使用示例。这可能是学习最佳实践的最好方法:http://docs.python.org/2/library/multiprocessing.html
提到的Python文档很好,可以看看 使用Python的multiprocessing.Process类。那个问题里有一些类似的想法。我还推荐你去看看 https://www.ibm.com/developerworks/aix/library/au-multiprocessing/。这个也是关于Python的,介绍了一些很不错的Python多进程处理的方法。
你需要把代码放在 if __name__
里面的原因是,当 Python 创建一个新进程时,它实际上会导入这个模块,这样就会不断尝试运行那些不在 if __name__
块里的代码。
最佳做法是把代码放在一些命名合理、小而易测的函数里。可以有一个 'main()' 函数,然后在 if __name__
块里调用它。
尽量避免使用全局状态(和模块级变量),这样会让事情变得复杂。相反,考虑把数据在进程之间传递。虽然这样可能会比较慢,但先想好怎么尽量少传数据是很有帮助的。比如,如果你有一个很大的配置对象,不要把整个对象都传给每个进程,而是把你的函数拆分成只需要用到一两个属性的形式,只传那些必要的部分。
当事情是顺序发生时,测试起来会容易很多,所以把代码写成容易顺序执行的样子,而不是使用 map
或其他方法,这样会更简单。
对代码进行性能分析是个好主意,因为有时候创建新进程的速度可能比在一个线程里完成所有事情要慢。还有,gevent 模块也很不错——如果你的程序是网络密集型的,使用 gevent 有时能比多进程处理更快。
概述、架构及一些实用建议
根据我自己(也有限的)经验,我可以分享一些关于多进程工作原理和使用方法的见解。我觉得python.org的手册描述得不够清楚,所以我直接看了代码。对于有同样感觉的朋友们……这是我目前总结的内容:
一般的好习惯/最佳实践建议
- 一般实现方法:
- 测试驱动,数据量要小:你不想花几分钟去猜测程序是崩溃了还是在计算中
- 逐步进行并进行时间分析:
- 首先,先实现并调试不使用多进程的代码
- 接下来,单进程实现并调试,分析时间并比较没有多进程时的开销
- 然后,增加进程数量并分析时间,以识别任何GIL(全局解释器锁)问题和等待时间。
- 简单的
Process
或它们的列表可以用来逐个运行少量函数。 Pool
用于处理可批量处理的工作负载(高层次的任务/命令),在一定数量的Process
之间进行分配。- 对于处理器密集型任务(高处理器负载且输入/输出可批量处理),使用
Pool
;对于IO密集型任务(低处理器负载且输入/输出分开),使用pool.ThreadPool
。 - 在
Process
、Pool
、Thread
和ThreadPool
之间传输数据时,使用queues.Queue
及其子类(如果结果顺序很重要)或Pipe
,并确保PipeConnection
与进程或线程一一对应。 - 共享不同类型的变量(如
BaseProxy
、Namespace
、Queue
、Pool
,或者用于设置不同进程之间的同步对象如Barrier
/Lock
/RLock
/Sempaphore
/Condition
)时,使用Manager
类。 - 如果无法避免
GIL
,使用Manager
来处理它们,并尝试将密集计算的进程与GIL
相关的计算分开(例如,解析复杂数据结构等),然后通过Pipe
或共享Queue
连接它们。 - 使用多个
Pool
可以将不同数量的进程分配给不同的任务。否则,只需实现一个Pool
并进行多重映射或方法调用。 - 依赖于彼此中间结果的顺序并行计算任务,可以使用一个
Pool()
,通过多次调用Pool.(star)map_async()
或Pool.(star)map()
来实现。为了同步任务,可以使用映射函数返回的ApplyResult()
实例及其方法ApplyResult().ready()/.wait()/.get()/.successful()
。
架构和流程
- 当运行
import multiprocessing
时,_current_process = MainProcess()
会被初始化,它是BaseProcess
的一个子类,但没有target
、args
、kwargs
和_parent_pid
,基本上是一个处理对象,用于管理已经在运行的python内核中导入multiprocessing
的所有其他Process
。 pool.ThreadPool
是与Pool
类似的API,可能也有类似的架构。Pool
基于三个守护线程Pool._task_handler
、Pool._worker_handler
和Pool._result_handler
,它们与一个内部的queue.Queue()
Pool._taskqueue
和两个内部的SimpleQueue
Pool._inqueue
和Pool._outqueue
连接。Pool._cache
是一个字典,保存所有Pool.apply_async()/_map_async()
及其子方法调用的ApplyResults
及其子类实例,使用全局ApplyResults._job
作为key
。- 一个
Pool
的ApplyResult
及其子类可以在Pool._cache
中找到,或者作为Pool.apply_async()/._map_async()
及其子方法的返回值。 Pool.map()
和Pool.map_async()
的区别在于,Pool.map() == Pool.map_async().get()
,这会强制/锁定主进程等待所有结果计算完成并存储在返回对象ApplyResult()
中。Queue
/SimpleQueues
在Pool
中的作用:Pool.taskqueue
:将Pool.apply_async()/.map_async()
等的高层次任务分割成任务批次,传递给Pool._task_handler
。Pool._inqueue
:将任务作为批量迭代器从Pool._task_handler
传递给Pool._pool.Process(target=worker, ...)
Pool._outqueue
:将结果从Pool._pool.Process(target=worker, ...)
(由Pool._worker_handler
初始化)传递给Pool._result_handler
,后者再将结果存储在ApplyResult
中,缓存于Pool._cache[self._job]
。
ApplyResult
将作为列表保存结果,如果目标func
有返回对象。否则,ApplyResult()
只是用于同步方法的句柄,即结果状态调用方法。- 为了连接进程和线程,提供了四个类,从高到低功能依次为:
queues.JoinableQueue
、queues.Queue
、SimpleQueue
、Pipe
/PipeConnection
。Pipe
只是一个返回两个实际PipeConnection
类实例的方法。
一些代码示例
import logging
import multiprocessing as mp
import random
import time
import numpy as np
from copy import deepcopy
MODEL_INPUTS = ["input_ids", "mc_token_ids", "lm_labels", "mc_labels", "token_type_ids"]
mp.log_to_stderr(level=logging.INFO) # mp.log_to_strerr(level=logging.DEBUG)
logger = mp.get_logger()
logger.setLevel(level=logging.INFO) # mp.setLevel(level=logging.DEBUG)
def secs2hms(seconds, num_decimals=4):
hms_time = [*(*divmod(divmod(int(seconds), 60)[0], 60), divmod(int(seconds), 60)[1])]
if hasattr(seconds, '__round__'):
hms_time[-1] += seconds.__round__(num_decimals) - int(seconds)
return hms_time
class Timer():
def __init__(self, time_name, log_method=print, time_format='hms', hms_decimals=4):
self.time_name = time_name
self.output_method = get_log_method(method_name=log_method_name)
self.time_format = time_format
self.hms_decimals = hms_decimals
self.start_time = time.time()
def start(self):
raise RuntimeError('Timer was already started at initialization.')
def stop(self, *args):
seconds_time = time.time() - self.start_time
time_name = self.time_name.format(*args)
if self.time_format == 'hms':
hms_time = secs2hms(seconds=seconds_time, num_decimals=self.hms_decimals)
hms_time = ' '.join([text.format(dt) for dt, text in zip(hms_time, ['{}h', '{}min', '{}sec']) if dt > 0])
self.output_method('{} = {}'.format(time_name, hms_time))
else:
self.output_method('{} = {}sec'.format(time_name, seconds_time))
self._delete_timer()
def _delete_timer(self):
del self
def get_log_method(method_name):
if method_name == 'debug':
log_method = logger.debug
elif method_name == 'info':
log_method = logger.info
else:
log_method = print
return log_method
def _generate_random_array(shape):
return np.array([[[random.randint(0, 1000)
for _ in range(shape[2])]
for _ in range(shape[1])]
for _ in range(shape[0])])
def random_piped_array(shape, pipe_in, log_method_name='print', log_name='RANDOM'):
log_method = get_log_method(method_name=log_method_name)
array = _generate_random_array(shape=shape)
log_method('{}: sending `array through `pipe_in`'.format(log_name))
pipe_in.send(array)
def random_array(shape, log_method_name='print', log_name='RANDOM'):
log_method = get_log_method(method_name=log_method_name)
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: append `array` to `shared_array`'.format(log_name))
# for dataset_name in ['train', 'valid']:
# shared_arrays[dataset_name].append(array)
return array
def random_shared_array(shape, shared_arrays, log_method_name='print', log_name='SHARED_RANDOM'):
log_method = get_log_method(method_name=log_method_name)
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: append `array` to `shared_array`'.format(log_name))
shared_arrays.append(array)
def random_nested_array(shape, nested_shared_arrays, dataset_name, log_method_name='print', log_name='NESTED_RANDOM'):
log_method = get_log_method(method_name=log_method_name)
log_method('{}: appending array to shared_arrays[\'{}\']'.format(log_name, dataset_name))
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: appendind `array` to `shared_array` with currently len(nested_shared_array[\'{}\']) = {}'.format(
log_name, dataset_name, len(nested_shared_arrays[dataset_name])))
nested_shared_arrays[dataset_name].append(array)
def nested_dict_list_deepcopy(nested_shared_arrays):
"""No hierachical switching between mp.manager.BaseProxy and unshared elements"""
nested_unshared_arrays = dict()
for key, shared_list in nested_shared_arrays.items():
nested_unshared_arrays[key] = deepcopy(shared_list)
return nested_unshared_arrays
def log_arrays_state(arrays, log_method_name='print', log_name='ARRAY_STATE'):
log_method = get_log_method(method_name=log_method_name)
log_method('ARRAY_STATE: type(arrays) = {}'.format(type(arrays)))
try:
if hasattr(arrays, '__len__'):
log_method('{}: len(arrays) = {}'.format(log_name, len(arrays)))
if len(arrays) < 20:
for idx, array in enumerate(arrays):
log_method('{}: type(arrays[{}]) = {}'.format(log_name, idx, type(array)))
if hasattr(array, 'shape'):
log_method('{}: arrays[{}].shape = {}'.format(log_name, idx, array.shape))
else:
log_method('{}: arrays[{}] has not `shape` attribute'.format(log_name, idx))
else:
log_method('{}: array has no `__len__` method'.format(log_name))
except BrokenPipeError as error_msg:
log_method('{}: BrokenPipeError: {}'.format(log_name, error_msg))
def log_nested_arrays_state(nested_arrays, log_method_name='print', log_name='NESTED_ARRAY_STATE'):
log_method = get_log_method(method_name=log_method_name)
log_method('{}: type(arrays) = {}'.format(log_name, type(nested_arrays)))
for key, arrays in nested_arrays.items():
log_arrays_state(arrays=arrays, log_name=log_name + '_' + key.upper(), log_method_name=log_method_name)
if __name__ == '__main__':
log_method = logger.info
# log_method cannot be pickled in map_async, therefore an extra log_method_name string is implemented to hand
# through
log_method_name = 'info'
num_samples = 100
num_processes = 1 # len(MODEL_INPUTS) #
array_shapes = [(num_samples, random.randint(2, 5), random.randint(100, 300)) for _ in range(len(MODEL_INPUTS))]
def stdout_some_newlines(num_lines=2, sleep_time=1):
print(''.join(num_lines * ['\n']))
time.sleep(sleep_time)
# Pool with results from `func` with `return` received from `AsyncResult`(=`ApplyResult`)
# `AsyncResult` also used for process synchronization, e.g. waiting for processes to finish
log_method('MAIN: setting up `Pool.map_async` with `return`ing `func`')
async_return_timer = Timer(time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
log_method=log_method)
# Pool with variable return
setup_pool_timer = Timer(time_name='TIMER_SETUP: time to set up pool with {} processes'.format(num_processes),
log_method=log_method)
with mp.Pool(processes=num_processes) as pool:
setup_pool_timer.stop()
arrays = pool.starmap_async(func=random_array, iterable=[(shape, log_method_name) for shape in array_shapes])
getted_arrays = arrays.get()
async_return_timer.stop()
# Logging array state inside the `pool` context manager
log_method('MAIN: arrays from `pool.map_async() return` with in the `pool`\'s context manager:')
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
log_method('MAIN: arrays.get() from `pool.map_async() return` with in the `pool`\'s context manager:')
log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
# Logging array state outside the `pool` context manager
log_method('MAIN: arrays from `pool.map_async() return` outside the `pool`\'s context manager:')
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
log_method('MAIN: arrays.get() from `pool.map_async() return` outside the `pool`\'s context manager:')
log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
del pool, arrays, getted_arrays
stdout_some_newlines()
# Functionality of `np.Process().is_alive()
log_method('IS_ALIVE: testing funcktionality of flag `mp.Process().is_alive()` w.r.t. process status')
p = mp.Process(target=lambda x: x ** 2, args=(10,))
log_method('IS_ALIVE: after intializing, before starting: {}'.format(p.is_alive()))
p.start()
log_method('IS_ALIVE: after starting, before joining: p.is_alive() = {}'.format(p.is_alive()))
time.sleep(5)
log_method('IS_ALIVE: after sleeping 5sec, before joining: p.is_alive() = {}'.format(p.is_alive()))
p.join()
log_method('IS_ALIVE: after joining: p.is_alive() = {}'.format(p.is_alive()))
p.terminate()
del p
stdout_some_newlines()
# Pool with `func` `return`ing results directly to the reuslt handler from `mp.Pool().starmap_async()` of type
# `AsyncResults()`
log_method(
'MAIN: Pool.map() is not tested explicitly because is equivalent to `Pool.map() == Pool.map_async().get()')
stdout_some_newlines()
# Pool with results assigned to shared variable & `AsyncResult` only used for process synchronization but
# not for result receiving
log_method(
'MAIN: setting up Manager(), Manager.list() as shared variable and Pool.starmap_async with results from shared '
'variable')
async_shared_timer = Timer(
time_name='TIMER_POOL_SHARED: time for random array with {} processes'.format(num_processes),
log_method=log_method)
setup_shared_variable_timer = Timer(time_name='TIMEE_INIT: time to set up shared variable', log_method=log_method)
with mp.Manager() as sync_manager:
shared_arrays = sync_manager.list()
setup_shared_variable_timer.stop()
async_return_timer = Timer(
time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
log_method=log_method)
setup_pool_timer = Timer(
time_name='TIMER_POOL_INIT: time to set up pool with {} processes'.format(num_processes),
log_method=log_method)
with mp.Pool(processes=num_processes) as pool:
setup_pool_timer.stop()
async_result = pool.starmap_async(
func=random_shared_array,
iterable=[(shape, shared_arrays, log_method_name) for shape in array_shapes])
log_method('MAIN: async_result.ready() befor async.wait() = {}'.format(async_result.ready()))
async_result.wait()
log_method('MAIN: async_result.ready() after async.wait() = {}'.format(async_result.ready()))
log_method('MAIN: asyn_result.sucessful() after async.wait() = {}'.format(async_result.successful()))
async_return_timer.stop()
copy_timer = Timer('TIMER_COPY: time to copy shared_arrays to standard arrays', log_method=log_method)
unshared_arrays = deepcopy(shared_arrays)
copy_timer.stop()
async_shared_timer.stop()
log_method('MAIN: shared_arrays from `pool.map_async()` within `sync_manager` context manager:')
log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
log_method(
'MAIN: unshared_arrays = deepcopy(shared_arrays) from `pool.map_async()` within `sync_manager`\'s '
'context manager:')
log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
log_method('MAIN: shared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
log_method('MAIN: unshared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
del sync_manager, shared_arrays, async_result, pool, unshared_arrays
stdout_some_newlines()
# Same as above just with pipe instead of `shared_arrays`
log_method('MAIN: separate process outputting to `mp.Pipe()`')
process_pipe_timer = Timer(time_name='TIMER_PIPE: time for `random_pipe_array` outputting through a `mp.Pipe()')
arrays = list()
pipe_in, pipe_out = mp.Pipe()
# initialize processes
processes = [mp.Process(target=random_piped_array, args=(shape, pipe_in, log_method_name)) for shape in
array_shapes]
# Start processes
for process in processes:
process.start()
# Collect piped arrays form pipe and append them to `arrays`
while (any([process.is_alive() for process in processes]) or pipe_out.poll()) and len(arrays) < len(MODEL_INPUTS):
log_method(
'RANDOM: receiving arrays through pipe and appending to arrays with currently len(arrays) = {}'.format(
len(arrays)))
arrays.append(pipe_out.recv())
# join processes
for process in processes:
process.join()
process_pipe_timer.stop()
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
pipe_in.close()
pipe_out.close()
del arrays, pipe_in, pipe_out, processes, process
stdout_some_newlines()
# Nested shared dict/list/arrays
log_method('MAIN: `random_nested_arrays` with nested shared `mp.Manager().dict()` and `mp.Manager().list()`s')
nested_timer = Timer(time_name='TIMER_NESTED: time for `random_nested_arrays()`')
with mp.Manager() as sync_manager:
nested_shared_arrays = sync_manager.dict()
nested_shared_arrays['train'] = sync_manager.list()
nested_shared_arrays['valid'] = sync_manager.list()
with mp.Pool(processes=num_processes) as pool:
nested_results = pool.starmap_async(func=random_nested_array,
iterable=[(shape, nested_shared_arrays, dataset_name, log_method_name)
for dataset_name in nested_shared_arrays.keys()
for shape in array_shapes])
nested_results.wait()
unshared_nested_arrays = nested_dict_list_deepcopy(nested_shared_arrays)
nested_timer.stop()
log_nested_arrays_state(nested_arrays=unshared_nested_arrays, log_method_name=log_method_name)
del sync_manager, nested_shared_arrays, pool, nested_results, unshared_nested_arrays
stdout_some_newlines()
# List of processes targeted directly to their functions one by one
log_method(
'MAIN: separate process outputting to shared `mp.Manager.list()` with process handles maintained in list()')
log_method('MAIN: separate process implementations are only preferred over pools for 1-to-1=processes-to-tasks'
' relations or asynchronous single tasks calculations.')
processes_timer = Timer(
time_name='TIMER_PROCESS: time for `random_shared_arrays` with separate {} processes'.format(num_processes),
log_method=log_method)
with mp.Manager() as sync_manager:
shared_arrays = sync_manager.list()
# Initialize processes
processes = [mp.Process(target=random_shared_array, args=(shape, shared_arrays, log_method_name))
for shape in array_shapes]
# Start processes
for process in processes:
process.start()
processes_timer.stop()
# Join processes = wait for processes to finish
for process in processes:
process.join()
unshared_process_arrays = deepcopy(shared_arrays)
processes_timer.stop()
log_arrays_state(arrays=unshared_process_arrays, log_method_name=log_method_name)
del sync_manager, shared_arrays, unshared_process_arrays, processes, process
stdout_some_newlines()