使用pydantic.BaseModel对象的Python多进程处理

0 投票
1 回答
65 浏览
提问于 2025-04-14 15:49

我正在尝试在Python中使用多进程工具箱来处理一个pydantic对象。

我的任务是:我需要下载很多文件。这些文件的链接和一些附加信息,比如一个布尔值“file_downloaded”,都存储在一个数据对象中。我是用pydantic创建这个数据对象的。现在我想一次下载多个文件,所以我想创建一个包含多个数据对象的列表,并在一个有5个进程的池中处理它们,我会用map函数来实现。

这里有一个简单的例子(有错误):

import pydantic
from typing import Optional
import multiprocessing
from multiprocessing.managers import BaseManager


class data_object(pydantic.BaseModel):
    url: str
    downloaded: Optional[bool] = False


class CustomManager(BaseManager):
    pass


def downloader(single_data: data_object):
    single_data.downloaded = True


if __name__ == '__main__':
    # Simple single process test for data_object and worker (no errors)
    just_one_object = data_object(url='url1')
    print(just_one_object.downloaded)
    downloader(just_one_object)
    print(just_one_object.downloaded)

    # Multiprocesses with shared data_object
    CustomManager.register('data_object', data_object)
    CustomManager.register('list', list)
    with CustomManager() as manager:
        shared_single_object = manager.data_object(url='url2')  # Error occurs
        print(shared_single_object.downloaded)
        downloader(shared_single_object)
        print(shared_single_object.downloaded)

        managed_list = manager.list([manager.data_object(url='url'+str(v)) for v in range(5)])

        pool = multiprocessing.Pool(processes=5)
        pool.map(downloader, managed_list)
        pool.close()
        pool.join()
        print(managed_list)

当我运行这个例子时,在定义shared_single_object的那一行,我遇到了以下错误:

AttributeError: '__signature__' attribute of 'data_object' is class-only

不幸的是,我不知道从哪里开始解决这个错误。接下来,我创建了多个不同链接的数据对象实例,并将它们列在一个管理列表中。然后它们应该被下载。也许还有其他问题,我无法运行这部分。

我在网上搜索了如何使用multiprocessing.manager来处理pydantic对象,但没有找到任何信息。我尝试使用一个共享复杂类的例子来实现上面的代码。

仅用字典我能够一次下载多个文件,但我想使用pydantic。

提前谢谢你。

1 个回答

暂无回答

撰写回答