python多处理池

2024-04-25 14:11:34 发布

您现在位置:Python中文网/ 问答频道 /正文

在使用python多处理池时,提交了多少个作业

如何决定? 我们能控制它吗? 比如队列中最多10个作业,以减少内存使用

假设我的主干代码写在下面: 对于每个色度和模拟,我读取数据作为数据帧

(我认为在提交作业之前读取数据会更好,以减少工作进程中的I/O限制)

然后我将熊猫数据帧发送给每个工人来处理它

但似乎提交的作业比最终确定的作业数量多,这导致了内存错误

numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=numofProcesses)
jobs=[]


all_result1={}
all_result2={}

def accumulate(result):
 result1=result[0]
 result2=result[1]
 accumulate(resulst1,all_result1)
 accumulate(resulst2,all_result2)
 print('ACCUMULATE')

for each chr:
 for each sim:
     chrBased_simBased_df= readData(chr,sim)
     jobs.append(pool.apply_async(func, args=(chrBased_simBased_df,too,many,),callback=accumulate))
     print('Submitted job:%d' %(len(jobs)))

pool.close()
pool.join()

有办法摆脱它吗


Tags: 数据内存for作业jobsresult读取数据all
1条回答
网友
1楼 · 发布于 2024-04-25 14:11:34

multiprocessing.Poolconcurrent.futures.ProcessPoolExecutor都不允许限制您提交给工作人员的任务量

然而,这是一个非常简单的扩展,您可以通过使用信号量自行构建

您可以在此gist中检查一个示例。它使用concurrent.futures模块,但将其移植到multiprocessing.Pool也应该很简单

from threading import BoundedSemaphore
from concurrent.futures import ProcessPoolExecutor


class MaxQueuePool:
    """This Class wraps a concurrent.futures.Executor
    limiting the size of its task queue.
    If `max_queue_size` tasks are submitted, the next call to submit will block
    until a previously submitted one is completed.
    """
    def __init__(self, executor, max_queue_size, max_workers=None):
        self.pool = executor(max_workers=max_workers)
        self.pool_queue = BoundedSemaphore(max_queue_size)

    def submit(self, function, *args, **kwargs):
        """Submits a new task to the pool, blocks if Pool queue is full."""
        self.pool_queue.acquire()

        future = self.pool.submit(function, *args, **kwargs)
        future.add_done_callback(self.pool_queue_callback)

        return future

    def pool_queue_callback(self, _):
        """Called once task is done, releases one queue slot."""
        self.pool_queue.release()


if __name__ == '__main__':
    pool = MaxQueuePool(ProcessPoolExecutor, 8)
    f = pool.submit(print, "Hello World!")
    f.result()

相关问题 更多 >

    热门问题