仅使用一个进程的Python多处理

2024-04-23 22:09:47 发布

您现在位置:Python中文网/ 问答频道 /正文

编辑:

谢谢你的回答。我理解这个问题。 我相信通过改变imap_unordered行已经解决了这个问题,但是现在我遇到了一个不同的问题。你知道吗

with multiprocessing.Pool(processes=proc) as p:
    for i in p.imap_unordered(run_task, iterable, chunksize=chunksize):
        print(i)

我故意不包括所有其他的指纹来更好地理解为什么这不起作用。 它输出:

File ".\test.py", line 23, in test_run
    for i in p.imap(run_task, iterable, chunksize=chunksize):
TypeError: 'int' object is not iterable

我用不同的函数尝试了同样的方法,而不是run_task

def f(x):
    return x*x

with multiprocessing.Pool(processes=proc) as p:
        for i in p.imap_unordered(f, iterable, chunksize=chunksize):
            print(i)

输出正确!所以有人会认为问题出在run\u任务函数中。所以我改变了run\u任务函数:

def run_task(iterable):
    return type(iterable)

正如预期的那样,结果是:

<class 'int'>
<class 'int'>
<class 'int'>
...

问题是,imap_unordered不是应该传递iterable的块吗?你知道吗

为什么我在函数中得到单个的int,而不是它可以处理的块的子列表?你知道吗


原件

我有一个非常简单的多处理函数,可以运行虚拟工作负载。 我用imap_unordered的不同块大小和multiprocessing.Pool的不同进程数运行了多次。你知道吗

问题是,输出告诉我,无论我传递了多少processeschunksizesimap_unordered只是将整个列表传递给1进程。你知道吗

预期的结果是,它将把列表分成块,将每个块分配给每个进程,我将能够看到进程正在接收不同大小的列表。你知道吗

我是不是漏了什么?你知道吗

我有以下代码和输出:

import multiprocessing


def run_task(iterable):
    # Task to be executed, simulating dummy work
    work_simul = 0
    for number in iterable:
        work_simul += number * number
    return (len(iterable))


def test_run(proc, chunksize):
    # runs the function "run_task" with multiprocessing

    # defining our dummy iterable of length 10000
    iterable = [i**i for i in range(5000)]
    original_size = len(iterable)  # Size of the iterable for comparison
    results = {}
    with multiprocessing.Pool(processes=proc) as p:
        for process, r_value in \
                enumerate(p.imap_unordered(run_task, (iterable,),
                                           chunksize=chunksize)):
            # Add our process number and its return value into results so that we can compare performance here.
            results[process + 1] = r_value

    print(
        f"""Original size: {original_size}
Total process # {proc}\nChunksize # {chunksize}""")

    for key in results.keys():
        print(f"Process # {key}: has list length {results[key]}\n\n")


if __name__ == "__main__":
    test_run(1, 10)
    test_run(5, 10)
    test_run(10, 10)

    test_run(1, 100)
    test_run(5, 100)
    test_run(10, 100)

输出:

Original size: 5000
Total process # 1
Chunksize # 10
Process # 1: has list length 5000


Original size: 5000
Total process # 5
Chunksize # 10
Process # 1: has list length 5000


Original size: 5000
Total process # 10
Chunksize # 10
Process # 1: has list length 5000


Original size: 5000
Total process # 1
Chunksize # 100
Process # 1: has list length 5000


Original size: 5000
Total process # 5
Chunksize # 100
Process # 1: has list length 5000


Original size: 5000
Total process # 10
Chunksize # 100
Process # 1: has list length 5000

Tags: runintestfortasksizeiterableprocess
1条回答
网友
1楼 · 发布于 2024-04-23 22:09:47

指的是multiprocessing documentation。你知道吗

imap_unordered与来自itertoolsimap等价,imapmap相似。你知道吗

简单地说,它们都对iterable的每个元素应用一个函数。你知道吗

呼叫

for process, r_value in \
        enumerate(p.imap_unordered(run_task, (iterable,),
                                   chunksize=chunksize)):
    # Add our process number and its return value into results so that we can compare performance here.
    results[process + 1] = r_value

run_task应用于单个元素的列表。所以它只能做一个过程。你知道吗

枚举它—得到元组,其中第一个元素不是进程号,而是原始列表中的元素数。你知道吗

相关问题 更多 >