将Python生成器重写为异步

2024-05-23 16:48:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个Python脚本,它生成[str, float]元组,然后使用一个最终调用helper.streaming_bulk()的自定义函数将这些元组索引到ElasticSearch中

以下是生成器的实现方式:

doc_ids: List[str] = [...]
docs = ((doc_id, get_value(doc_id) for doc_id in doc_ids)

get_value()调用一个远程服务,该服务计算每个文档id的浮点值。 接下来,这些元组被传递到update_page_quality_bulk()

for success, item in update_page_quality_bulk(
        islice(doc_qualities, size)
    ):
        total_success += success
        if not success:
            logging.error(item)

在内部,update_page_quality_bulk()创建ElasticSearch请求

这里使用生成器的优点之一是,第一个size元素可以通过islice()馈入update_page_quality_bulk()

为了使整个过程更快,我想并行化get_value()调用。如前所述,这些都是远程调用,因此本地计算成本可以忽略不计,但持续时间很长

元组的顺序无关紧要,传递到update_page_quality_bulk()的元素也无关紧要。在较高的层次上,我想对任何n元组进行get_value()调用(最多并行x),并传递最先完成的元组

我天真的尝试是将get_value()定义为异步:

async def get_value():
  ...

并在发电机中等待呼叫:

docs = ((doc_id, await get_value(doc_id) for doc_id in doc_ids)

但是,这会在随后的islice()调用中引发错误:

TypeError: 'async_generator' object is not iterable

删除islice调用并将未修改的docs生成器传递给update_page_quality_bulk()会导致在元组上循环以将元组转换为ElasticSearch请求时引发相同的错误

我知道ElasticSearch客户端提供了asynchronous helpers,但它们在这里似乎不适用,因为我需要首先生成操作

根据this answer,似乎我必须将实现更改为使用队列

This answer意味着由于Python-GIL的原因,如果不使用multiprocessing就无法完成此操作,但是这个答案没有标记为正确,而且也非常陈旧

一般来说,我正在寻找一种在并行化get_value()调用时尽可能少地更改当前逻辑的方法


Tags: ididsdocsforgetdocvaluepage
1条回答
网友
1楼 · 发布于 2024-05-23 16:48:39

因此,您希望将一个“同步外观”生成器传递给一个调用,该调用需要一个普通的惰性生成器(如islice),并继续并行地获取结果

这听起来像是asyncio.as_completed的工作:您使用普通生成器来创建任务-这些任务由asyncio机器并行运行,并且在任务完成时可以获得结果(哦!)

但是,由于update_page_quality_bulk不支持异步,它永远不会将控制权交给异步IO循环,因此它可以完成得到结果的任务。这很可能会造成阻塞

在另一个线程中调用update_page_quality_bulk可能也不起作用。我没有在这里尝试过,但是我想说,你不能只是在不同的线程中迭代doc,而不是在它(及其任务)创建的线程中迭代

因此,首先要做的事情是,“生成器表达式”语法在您希望异步计算生成器的某些术语时不起作用,正如您所发现的那样-我们对其进行重构,以便在一个协程函数中创建元组-并且我们包装任务中对这些术语的所有调用(某些asyncio函数会自动在任务中进行包装)

然后我们可以使用asyncio机制来安排所有调用,并在这些结果到达时调用update_page_quality_bulk,如上所述,无法直接传递给非异步函数:asyncio循环将永远无法获得控制权。相反,我们在主线程中不断拾取任务的结果,并在另一个线程中调用sync函数-使用队列传递获取的结果。最后,使结果可以在内部可用时使用update_page_quality_bulk,我们为threading.Queue创建了一个小的包装类,这样就可以像在迭代器中一样使用它——这对于使用迭代器的代码是透明的


# example code: untested

async def get_doc_values(doc_id):
    loop = asyncio.get_running_loop()
    # Run_in_executor runs the synchronous function in parallel in a thread-pool
    # check the docs - you might want to pass a custom executor with more than
    # the default number of workers, instead of None:
    return doc_id, await asyncio.run_in_executor(None, get_value, doc_id)


def update_es(iterator):
    # this function runs in a separate thread - 
    for success, item in update_page_quality_bulk(iterator):
            total_success += success
            if not success:
                logging.error(item)
                
sentinel = Ellipsis  # ... : python ellipsis - a nice sentinel that also worker for multiprocessing

class Iterator:
    """This allows the queue, fed in the main thread by the tasks as they are as they are completed
    to behave like an ordinary iterator, which can be consumed by "update_page_quality_bulk" in another thread
    """
    def __init__(self, source_queue):
        self.source = source_queue
        
        
    def __next__(self):
        value= self.source.get()
        if value is sentinel:
            raise StopIteration()
        return value


queue = threading.Queue()
iterator = Iterator(queue)
es_worker = threading.Thread(target=update_es, args=(iterator,))
es_worker.start()
for doc_value_task in asyncio.as_completed(get_doc_values(doc_id) for doc_id in doc_ids):
    doc_value = await doc_value_task
    queue.put(doc_value)
    
es_worker.join()

相关问题 更多 >