我有一个python多线程应用程序。我想在一个线程中运行一个异步循环,并从另一个线程向它发送回调和协程。应该很容易,但是我不能把我的头放在那些东西上。
我提出了以下解决方案,其中一半是我想要的,可以随意评论任何事情:
import asyncio
from threading import Thread
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) #why do I need that??
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
f = functools.partial(self.loop.create_task, coro)
return self.loop.call_soon_threadsafe(f)
def cancel_task(self, xx):
#no idea
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?
b.stop()
所以开始和停止循环都很好。我考虑过使用create_task创建任务,但该方法不是线程安全的,所以我将其包装在call_soon_threadsafe中。但我希望能够获取任务对象以便能够取消任务。我可以用未来和条件做一件复杂的事情,但必须有一个更简单的方法,不是吗?
只是作为参考,我最终实现的代码基于我在这个网站上得到的帮助,它更简单,因为我不需要所有的功能。再次感谢!
你什么都做对了。 用于任务停止生成方法
顺便说一下,可以通过
因为
asyncio
只为主线程创建隐式事件循环。我认为您可能需要让您的
add_task
方法知道它是否从事件循环以外的线程调用。这样,如果它是从同一线程调用的,您可以直接调用asyncio.async
,否则,它可以做一些额外的工作将任务从循环线程传递到调用线程。下面是一个例子:首先,我们将事件循环的线程id保存在
run
方法中,这样我们就可以确定稍后对add_task
的调用是否来自其他线程。如果add_task
是从非事件循环线程调用的,我们使用call_soon_threadsafe
调用一个同时调度协程的函数,然后使用concurrent.futures.Future
将任务传递回调用线程,该线程等待Future
的结果。关于取消任务的说明:当您在
Task
上调用cancel
时,下次运行事件循环时,将在协程中引发CancelledError
。这意味着,任务正在包装的协程将在下次到达屈服点时由于异常而中止,除非协程捕获CancelledError
,并阻止其自身中止。还要注意,这只在被包装的函数实际上是一个可中断的协程的情况下才有效;例如,由BaseEventLoop.run_in_executor
返回的asyncio.Future
实际上不能被取消,因为它实际上被包装在concurrent.futures.Future
上,并且一旦它们的底层函数真正开始执行,这些函数就不能被取消。在这些情况下,asyncio.Future
会说它被取消了,但是实际在执行器中运行的函数将继续运行。编辑:根据Andrew Svetlov的建议,更新了第一个示例以使用
concurrent.futures.Future
,而不是queue.Queue
。注意:由于版本3.4.4使用^{} ,因此不推荐使用^{} 。
相关问题 更多 >
编程相关推荐