Python中的multiprocessing.Array的元组
我在使用Python的多进程功能时遇到了一些困难。
我想把一个包含元组的列表放进multiprocessing.Array里,但我找不到元组的类型代码。
这是我的代码,我想知道在主函数中如何为arr写“type_of_tuple”。
from multiprocessing import Pool, Array
def thread_func(time, array):
time.sleep(time)
if len(array) > 0:
print(array.pop(0))
def main(cpu_number):
list = [("a","b"), ("c","d"), ("e","f")]
arr = Array( type_of_tuple """ how to write this?""", list)
for i in range(cpu_number):
r = pool.apply_async(thread_func, args=(1000, arr))
thread_list.append(r)
for thread in thread_list:
thread.wait()
if __name__ == "__main__":
main(3)
2 个回答
为了回答你的问题,如果不考虑逻辑的话,你应该使用ctypes结构体来代替:
from ctypes import Structure, POINTER, byref, c_ubyte, cast, create_string_buffer
from multiprocessing import Pool, Array
class SomeTuple(Structure):
_fields_ = [
('a', POINTER(c_ubyte)),
('b', POINTER(c_ubyte))
]
def thread_func(time, array_):
time.sleep(time)
if len(array_) > 0:
print(array_.pop(0))
def main(cpu_number):
thread_list = []
value_list = [
SomeTuple(cast(create_string_buffer(b"a"), POINTER(c_ubyte)), cast(create_string_buffer(b"b"), POINTER(c_ubyte))),
SomeTuple(cast(create_string_buffer(b"c"), POINTER(c_ubyte)), cast(create_string_buffer(b"d"), POINTER(c_ubyte))),
SomeTuple(cast(create_string_buffer(b"e"), POINTER(c_ubyte)), cast(create_string_buffer(b"f"), POINTER(c_ubyte)))
]
arr = Array(SomeTuple, value_list)
with Pool(processes=cpu_number) as pool:
for _ in range(cpu_number):
r = pool.apply_async(thread_func, args=(1000, arr))
thread_list.append(r)
for thread in thread_list:
thread.wait()
if __name__ == "__main__":
main(3)
你找不到的原因是因为它根本不存在。Array
的主要功能是处理简单的、相同类型的数据,这些数据可以以“未包装”的二进制形式存储。
而元组(tuple)是一种复合类型,可以存放任意数量和类型的值。所以你不能把它放进Array
里。
实际上,字符串也不能放进数组,因为字符串的字符数量是可变的;每个字符的大小都不一样。(如果你用的是Python 3,那就更复杂了,因为字符的大小可以是1、2或4个字节……)
此外,数组的长度是固定的;你也不能从中pop
(弹出)值。
所以,你需要找到其他方法来共享这些数据。
如果你对C语言足够了解,可以使用shared_ctypes
,将你的字符串元组映射到一个struct
的char*
。
或者,你可以写一个函数,把元组编码成固定大小的值(然后再把这些值切片成字符数组),在一边处理,另一边解码。
不过,我觉得如果你按照文档的建议,找到一种通过消息传递而不是共享内存的方式来编写代码,生活会简单很多。
因为你需要的唯一共享变更是让每个任务从队列的末尾pop
一个值,这样其他任务就不会看到相同的值,显而易见的解决方案就是使用Queue
,因为它正好能做到这一点。
或者,更简单一点,直接使用像map
这样的高级方法,而不是apply
,来管理队列,确保每个任务得到一个值,这样你就不用操心这些了。例如:
def thread_func(time, value):
time.sleep(time)
print(value)
def main(cpu_number):
values = [("a","b"), ("c","d"), ("e","f")]
results = pool.imap_unordered(partial(thread_func, 1000), values[:cpu_number])
for result in results:
pass
if __name__ == "__main__":
main(3)
顺便说一下,我不太明白你为什么要把任务数量限制在CPU数量上。通常,你会创建一个Pool(cpu_number)
,然后把所有任务排队。如果你只想运行3个任务,其实根本不需要池,只需在Process
上运行每个任务就可以了。