Python中的multiprocessing.Array的元组

1 投票
2 回答
3729 浏览
提问于 2025-04-18 16:44

我在使用Python的多进程功能时遇到了一些困难。
我想把一个包含元组的列表放进multiprocessing.Array里,但我找不到元组的类型代码。

这是我的代码,我想知道在主函数中如何为arr写“type_of_tuple”。

from multiprocessing import Pool, Array

def thread_func(time, array):
    time.sleep(time)
    if len(array) > 0:
        print(array.pop(0))

def main(cpu_number):
    list = [("a","b"), ("c","d"), ("e","f")]
    arr = Array( type_of_tuple """ how to write this?""", list)

    for i in range(cpu_number):
        r = pool.apply_async(thread_func, args=(1000, arr))
        thread_list.append(r)

    for thread in thread_list:
        thread.wait()

if __name__ == "__main__":
    main(3)

2 个回答

0

为了回答你的问题,如果不考虑逻辑的话,你应该使用ctypes结构体来代替:

from ctypes import Structure, POINTER, byref, c_ubyte, cast, create_string_buffer
from multiprocessing import Pool, Array


class SomeTuple(Structure):
    _fields_ = [
        ('a', POINTER(c_ubyte)),
        ('b', POINTER(c_ubyte))
    ]


def thread_func(time, array_):
    time.sleep(time)
    if len(array_) > 0:
        print(array_.pop(0))

def main(cpu_number):
    thread_list = []
    value_list = [
        SomeTuple(cast(create_string_buffer(b"a"), POINTER(c_ubyte)), cast(create_string_buffer(b"b"), POINTER(c_ubyte))),
        SomeTuple(cast(create_string_buffer(b"c"), POINTER(c_ubyte)), cast(create_string_buffer(b"d"), POINTER(c_ubyte))),
        SomeTuple(cast(create_string_buffer(b"e"), POINTER(c_ubyte)), cast(create_string_buffer(b"f"), POINTER(c_ubyte)))
    ]
    arr = Array(SomeTuple, value_list)

    with Pool(processes=cpu_number) as pool:
        for _ in range(cpu_number):
            r = pool.apply_async(thread_func, args=(1000, arr))
            thread_list.append(r)

    for thread in thread_list:
        thread.wait()

if __name__ == "__main__":
    main(3)
2

你找不到的原因是因为它根本不存在。Array的主要功能是处理简单的、相同类型的数据,这些数据可以以“未包装”的二进制形式存储。

而元组(tuple)是一种复合类型,可以存放任意数量和类型的值。所以你不能把它放进Array里。

实际上,字符串也不能放进数组,因为字符串的字符数量是可变的;每个字符的大小都不一样。(如果你用的是Python 3,那就更复杂了,因为字符的大小可以是1、2或4个字节……)

此外,数组的长度是固定的;你也不能从中pop(弹出)值。

所以,你需要找到其他方法来共享这些数据。

如果你对C语言足够了解,可以使用shared_ctypes,将你的字符串元组映射到一个structchar*

或者,你可以写一个函数,把元组编码成固定大小的值(然后再把这些值切片成字符数组),在一边处理,另一边解码。

不过,我觉得如果你按照文档的建议,找到一种通过消息传递而不是共享内存的方式来编写代码,生活会简单很多。

因为你需要的唯一共享变更是让每个任务从队列的末尾pop一个值,这样其他任务就不会看到相同的值,显而易见的解决方案就是使用Queue,因为它正好能做到这一点。

或者,更简单一点,直接使用像map这样的高级方法,而不是apply,来管理队列,确保每个任务得到一个值,这样你就不用操心这些了。例如:

def thread_func(time, value):
    time.sleep(time)
    print(value)

def main(cpu_number):
    values = [("a","b"), ("c","d"), ("e","f")]
    results = pool.imap_unordered(partial(thread_func, 1000), values[:cpu_number])
    for result in results:
        pass

if __name__ == "__main__":
    main(3)

顺便说一下,我不太明白你为什么要把任务数量限制在CPU数量上。通常,你会创建一个Pool(cpu_number),然后把所有任务排队。如果你只想运行3个任务,其实根本不需要池,只需在Process上运行每个任务就可以了。

撰写回答