我正在尝试创建一个基于Python的CLI,它通过websockets与web服务通信。我遇到的一个问题是,CLI向web服务发出的请求间歇性地无法得到处理。查看web服务的日志,我可以看到问题是由以下事实引起的:这些请求通常是在套接字关闭的同时(甚至是之后)发出的:
2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed
在我的CLI中,我定义了一个类CommunicationService
,该类负责处理与web服务的所有直接通信。在内部,它使用^{asyncio
之上。
CommunicationService
包含以下发送请求的方法:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
asyncio.ensure_future(self._ws.send(request))
…其中ws
是先前在另一个方法中打开的websocket:
self._ws = await websockets.connect(websocket_address)
我想要的是能够等待asyncio.ensure_future
返回的未来,如果有必要,在之后的一段时间内睡眠,以便在关闭websocket之前给web服务时间来处理请求。
然而,由于send_request
是同步方法,它不能简单地await
这些未来。使它异步将是毫无意义的,因为没有什么可以等待它返回的协程对象。我也不能使用loop.run_until_complete
,因为在调用循环时循环已经在运行。
我发现有人描述的问题与我在mail.python.org遇到的问题非常相似。在该线程中发布的解决方案是,在循环已经运行的情况下,使函数返回coroutine对象:
def aio_map(coro, iterable, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
coroutines = map(coro, iterable)
coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)
if loop.is_running():
return coros
else:
return loop.run_until_complete(coros)
这对我来说是不可能的,因为我正在使用PyRx(反应性框架的Python实现)并且send_request
只作为Rx observate的订户调用,这意味着返回值被丢弃,并且对我的代码不可用:
class AnonymousObserver(ObserverBase):
...
def _on_next_core(self, value):
self._next(value)
另一方面,我不确定这是否是通常遇到的asyncio
问题,或者我只是没有得到它,但我发现使用它相当令人沮丧。例如,在C#(C#)中,我需要做的可能是如下操作:
void SendRequest(string request)
{
this.ws.Send(request).Wait();
// Task.Delay(500).Wait(); // Uncomment If necessary
}
与此同时,asyncio
版本的“wait”无用地返回了另一个我被迫放弃的协同程序。
更新
我找到了一个解决这个问题的方法,似乎是可行的。我有一个异步回调,它在命令执行之后和CLI终止之前执行,所以我只是把它改成。。。
async def after_command():
await comms.stop()
…对此:
async def after_command():
await asyncio.sleep(0.25) # Allow time for communication
await comms.stop()
不过,我还是很乐意得到这个问题的任何答案,以供将来参考。在其他情况下,我可能无法依赖这样的解决方案,我仍然认为最好在send_request
内部执行延迟,这样CommunicationService
的客户就不必担心时间问题。
关于文森特的问题:
Does your loop run in a different thread, or is send_request called by some callback?
所有东西都在同一个线程中运行-它由回调调用。发生的情况是,我定义了所有使用异步回调的命令,当执行时,其中一些命令将尝试向web服务发送请求。因为它们是异步的,所以只有通过在CLI的顶层调用loop.run_until_complete
来执行它们,这意味着循环在它们执行和发出此请求的中途运行(通过对send_request
的间接调用)。
更新2
这是一个基于Vincent建议添加“完成”回调的解决方案。
将一个新的布尔字段_busy
添加到CommunicationService
中,以表示是否发生了comms活动。
在发送请求之前,将CommunicationService.send_request
修改为将_busy
设置为true,然后在完成后向_ws.send
提供回调以重置_busy
:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
def callback(_):
self._busy = False
self._busy = True
asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)
CommunicationService.stop
现在实现为等待此标志设置为false,然后再继续:
async def stop(self) -> None:
"""
Terminate communications with TestCube Web Service.
"""
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
while self._busy:
await asyncio.sleep(0.1)
# Allow short delay after final request is processed.
await asyncio.sleep(0.1)
self._listen_task.cancel()
await asyncio.wait([self._listen_task, self._ws.close()])
self._listen_task = None
self._ws = None
logger.info('Terminated connection to TestCube Web Service')
这似乎也行得通,而且至少是这样nication计时逻辑应该被封装在CommunicationService
类中。
更新3
更好的解决方案基于文森特的建议。
我们有self._busy
而不是self._send_request_tasks = []
。
新的send_request
实现:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.append(task)
新的stop
实现:
async def stop(self) -> None:
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
...
您可以使用任务的
set
:使用
ensure_future
安排任务,并使用add_done_callback
清理:并等待任务的
set
完成:如果您不在异步函数中,那么可以使用
yield from
关键字自己有效地实现await
。以下代码将被阻止,直到将来返回:相关问题 更多 >
编程相关推荐