使用pydantic.BaseModel对象的Python多进程处理
我正在尝试在Python中使用多进程工具箱来处理一个pydantic对象。
我的任务是:我需要下载很多文件。这些文件的链接和一些附加信息,比如一个布尔值“file_downloaded”,都存储在一个数据对象中。我是用pydantic创建这个数据对象的。现在我想一次下载多个文件,所以我想创建一个包含多个数据对象的列表,并在一个有5个进程的池中处理它们,我会用map函数来实现。
这里有一个简单的例子(有错误):
import pydantic
from typing import Optional
import multiprocessing
from multiprocessing.managers import BaseManager
class data_object(pydantic.BaseModel):
url: str
downloaded: Optional[bool] = False
class CustomManager(BaseManager):
pass
def downloader(single_data: data_object):
single_data.downloaded = True
if __name__ == '__main__':
# Simple single process test for data_object and worker (no errors)
just_one_object = data_object(url='url1')
print(just_one_object.downloaded)
downloader(just_one_object)
print(just_one_object.downloaded)
# Multiprocesses with shared data_object
CustomManager.register('data_object', data_object)
CustomManager.register('list', list)
with CustomManager() as manager:
shared_single_object = manager.data_object(url='url2') # Error occurs
print(shared_single_object.downloaded)
downloader(shared_single_object)
print(shared_single_object.downloaded)
managed_list = manager.list([manager.data_object(url='url'+str(v)) for v in range(5)])
pool = multiprocessing.Pool(processes=5)
pool.map(downloader, managed_list)
pool.close()
pool.join()
print(managed_list)
当我运行这个例子时,在定义shared_single_object的那一行,我遇到了以下错误:
AttributeError: '__signature__' attribute of 'data_object' is class-only
不幸的是,我不知道从哪里开始解决这个错误。接下来,我创建了多个不同链接的数据对象实例,并将它们列在一个管理列表中。然后它们应该被下载。也许还有其他问题,我无法运行这部分。
我在网上搜索了如何使用multiprocessing.manager来处理pydantic对象,但没有找到任何信息。我尝试使用一个共享复杂类的例子来实现上面的代码。
仅用字典我能够一次下载多个文件,但我想使用pydantic。
提前谢谢你。
1 个回答
暂无回答