我有一个Python脚本,它生成[str, float]
元组,然后使用一个最终调用helper.streaming_bulk()的自定义函数将这些元组索引到ElasticSearch中
以下是生成器的实现方式:
doc_ids: List[str] = [...]
docs = ((doc_id, get_value(doc_id) for doc_id in doc_ids)
get_value()
调用一个远程服务,该服务计算每个文档id的浮点值。
接下来,这些元组被传递到update_page_quality_bulk()
:
for success, item in update_page_quality_bulk(
islice(doc_qualities, size)
):
total_success += success
if not success:
logging.error(item)
在内部,update_page_quality_bulk()创建ElasticSearch请求
这里使用生成器的优点之一是,第一个size
元素可以通过islice()馈入update_page_quality_bulk()
为了使整个过程更快,我想并行化get_value()
调用。如前所述,这些都是远程调用,因此本地计算成本可以忽略不计,但持续时间很长
元组的顺序无关紧要,传递到update_page_quality_bulk()
的元素也无关紧要。在较高的层次上,我想对任何n
元组进行get_value()
调用(最多并行x
),并传递最先完成的元组
我天真的尝试是将get_value()
定义为异步:
async def get_value():
...
并在发电机中等待呼叫:
docs = ((doc_id, await get_value(doc_id) for doc_id in doc_ids)
但是,这会在随后的islice()
调用中引发错误:
TypeError: 'async_generator' object is not iterable
删除islice
调用并将未修改的docs
生成器传递给update_page_quality_bulk()
会导致在元组上循环以将元组转换为ElasticSearch请求时引发相同的错误
我知道ElasticSearch客户端提供了asynchronous helpers,但它们在这里似乎不适用,因为我需要首先生成操作
根据this answer,似乎我必须将实现更改为使用队列
This answer意味着由于Python-GIL的原因,如果不使用multiprocessing就无法完成此操作,但是这个答案没有标记为正确,而且也非常陈旧
一般来说,我正在寻找一种在并行化get_value()
调用时尽可能少地更改当前逻辑的方法
因此,您希望将一个“同步外观”生成器传递给一个调用,该调用需要一个普通的惰性生成器(如islice),并继续并行地获取结果
这听起来像是
asyncio.as_completed
的工作:您使用普通生成器来创建任务-这些任务由asyncio机器并行运行,并且在任务完成时可以获得结果(哦!)但是,由于
update_page_quality_bulk
不支持异步,它永远不会将控制权交给异步IO循环,因此它可以完成得到结果的任务。这很可能会造成阻塞在另一个线程中调用
update_page_quality_bulk
可能也不起作用。我没有在这里尝试过,但是我想说,你不能只是在不同的线程中迭代doc,而不是在它(及其任务)创建的线程中迭代因此,首先要做的事情是,“生成器表达式”语法在您希望异步计算生成器的某些术语时不起作用,正如您所发现的那样-我们对其进行重构,以便在一个协程函数中创建元组-并且我们包装任务中对这些术语的所有调用(某些asyncio函数会自动在任务中进行包装)
然后我们可以使用asyncio机制来安排所有调用,并在这些结果到达时调用
update_page_quality_bulk
,如上所述,无法直接传递给非异步函数:asyncio循环将永远无法获得控制权。相反,我们在主线程中不断拾取任务的结果,并在另一个线程中调用sync函数-使用队列传递获取的结果。最后,使结果可以在内部可用时使用update_page_quality_bulk
,我们为threading.Queue创建了一个小的包装类,这样就可以像在迭代器中一样使用它——这对于使用迭代器的代码是透明的相关问题 更多 >
编程相关推荐