python 3:使用asyncio任务时如何在异步函数中等待回调

2024-05-23 13:48:44 发布

您现在位置:Python中文网/ 问答频道 /正文

AWS IoT python Sdk有一个函数publishAsync
函数的签名是publishAsync(主题、负载、QoS、ackCallback=None)

我想将其扭曲为一个协同程序并使用asyncio

我想做的是:

  1. 连接到AWS
  2. 异步发布10条消息
  3. 断开

我不知道该如何将该函数包装为异步函数

async def asyncPublish(self,
                     msg:str,
                     topic:str,
                     QoS=1):

        # the publishAckFn
        def internalPubAckHandler(mid):
            print(json.dumps({
                'messageID':mid,
                'acknowledged':True
            }))
            return True
        pass

        # publish to the topic asynchronously
        messageID = self.awsIoTClient.publishAsync(topic,msg,QoS,ackCallback=internalPubAckHandler)
        print(json.dumps({
           'messageID':messageID,
           'topic':topic,
           'payload':msg,
           'QoS':QoS,
           'async':True
        }))

    pass


---- in my main file
tasks = []
for i in range(10):
        tasks += asyncPublish('test','test')
pass

loop = asyncio.get_event_loop()
loop.set_debug(True)  # debug mode
gw1.connect()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
gw1.disconnect()  

但当我这样做时,断开连接将比internalPubAckHandler更快地被调用 我不会得到任何承认


Tags: 函数awsloopasynciotrueasynctopicmsg
1条回答
网友
1楼 · 发布于 2024-05-23 13:48:44

最后,我找到了解决办法。我们可以使用期货

Future objects are used to bridge low-level callback-based code with high-level async/await code. Ref

如果函数的签名如下所示:

def functionWithCallback(args,callback)

我们需要将其包装到一个等待回调的异步函数中,我们可以执行如下操作:

async def asyncVersion(args):

    # one way of creating a future
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    def futureCallback():
        # do something here and then signal the future
        ...
        loop.call_soon_threadsafe(future .set_result,time.time())
    pass

    # call your function
    functionWithCallback(args,futureCallback)

    # wait for the callback
    await future


    # do other stuff after the callback is done
    .....

pass

相关问题 更多 >