在协作多任务上下文中帮助编写同步代码流的一组小实用程序。
gevent_async的Python项目详细描述
在协作多任务上下文中帮助编写同步代码流的一组小实用程序。 它是围绕gevent(http://www.gevent.org)的特性集设计的
延迟呼叫
async.DeferredCallHandler是异步处理的函数调用的包装器。 这允许控制在哪个上下文中执行这些函数,这是必需的 在多任务协作中。
- 有两种可用的呼叫类型:
- sync(同步):此类调用等待延迟的调用句柄处理该调用 回来。从用户的角度来看,它的行为类似于常规函数调用。
- oneway(单向):这种类型的调用立即返回。由于它的性质,没有办法知道 一旦处理完毕,它是成功还是失败。
示例
例如,假设我们有一个manager实体,它必须以原子方式处理一些资源:
fromasyncimportDeferredCallHandlerclassManager(DeferredCallHandler):defmanage(self):# do things with resources# with the assurance that the resources won't# be modified during processself.process()# process pending calls# do more thingsdefaccess_resources(self):#returns the resources the manager has properly managed.defupdate_resource(self,data):#updates a resource infodefrun(self):whileTrue:self.manage()
我们可以启动Manager并从多个greenlet调用其函数:
manager=Manager()gevent.spawn(manager.run)# At that point, the manager entity is will be doing resource managementresources=...# we have an array of resourcesdefmonitor(target):foreventintarget.events():# we could apply some transformation to the event, and then# forward it to the manager.manager.oneway.update_resource(event)forresourceinresources:gevent.spawn(monitor,resource)defconsumer():whileTrue:resources=manager.access_resources()# at that point, we have the guarantee that the resources# are properly managed and will not become stale or corrupted during process.consumer()
deferredcallhandler api文档
def process(forever=False, whitelist=None):
处理所有挂起的延迟调用。
如果forever设置为True,进程将一直等待新调用,直到 调用stop_processing()。
如果whitelist设置为字符串列表,则只有名称与元素匹配的函数 将执行白名单。
def stop_processing():
通过deferredcallhandler调用的传入调用中断迭代 process(forever=True)。
异常
同步调用将像常规函数一样转发异常:
fromasyncimportDeferredCallHandlerclassLemming(DeferredCallHandler):defkaboom(self):raiseException("#high pitched# oh no!")lemming=Lemming()spawn(lemming.process,forever=True)try:lemming.sync.kaboom()exceptException:pass# We should hit that# This should trigger the exception but produce an exception log entry.lemming.oneway.kaboom()
常规函数调用
DeferredCallHandler对象不阻止直接函数调用。风险自负:
fromasyncimportDeferredCallHandlerclassManager(DeferredCallHandler):defmanage(self):# do things with resources# with the assurance that the resources won't# be modified during processself.process()# process pending calls# do more thingsdefaccess_resources(self):#returns the resources the manager has properly managed.defupdate_resource(self,data):#updates a resource infodefrun(self):whileTrue:self.manage()manager=Manager()gevent.spawn(manager.run)resources=manager.access_resources()# !!! The resources may be in the middle of a management process and their state# may be incoherentresources=manager.sync.access_resources()# In that case, we're guaranteed the management process is not running.
超时
sync调用可以指定可选超时,以确保执行操作 在给定的时间范围内:
fromasyncimportDeferredCallHandlerclassABitSlow(DeferredCallHandler):deftaking_my_time(self):gevent.sleep(10)slow=ABitSlow()spawn(slow.process,forever=True)try:slow.sync(timeout=1).taking_my_time()exceptgevent.Timeout:pass# We should hit that
多任务状态处理
部分受尾部递归机制的启发,我们提供了一种包含和处理代码的方法 管理国家机器的行为。
@statedecorator将函数方法转换为状态greenlet。当另一个状态函数 调用时,它将创建一个新的state greenlet,替换当前state greenlet,有效地复制 尾部递归的行为。
例如:
@state(transitions_to="growing")defsprouting()# germination process heregrowing()# the sprouting greenlet terminates and leaves way to the growing one@state(transitions_to="flowering")defgrowing()# transform CO2 and sunlight to biomassflowering()# the growing greenlet terminates and leaves way to the flowering one@state(transitions_to=["dead","withering"])defflowering()# Grow flowersifis_eaten:# parameters can be given to state changes.dead(is_eaten=True)# the flowering greenlet terminates and leaves way to the dead oneelse:withering()# the flowering greenlet terminates and leaves way to the withering one@state(transitions_to="dead")defwithering()# Dry updead()# the withering greenlet terminates and leaves way to the dead one@state# terminal state, no transitionsdefdead(is_eaten=False)ifnotis_eaten:# clean up phasesprouting()# spawns the initial state
@statedecorator也可以用于方法:
classFlower(object):@state(transitions_to="growing")defsprouting(self)# germination process heregrowing()# the sprouting greenlet terminates and leaves way to the growing one# ...
必须由transitions_to参数或任何不正确的转换指定正确的转换 将引发ValidationError异常。
回拨
可以在转换时定义回调。通过将on_start参数设置为状态,给定的回调将 当状态启动时激活。
预期的回调签名是def on_start(state, *args, **kwargs),其中state是 (此时,仍然没有启动)async.state.Statestate greenlet,它将处理状态和 *args和**kwargs是给状态调用的参数。
例如:
defon_transition(new_state,target,*args,**kwargs):if"store"inkwargsandkwargs["store"]:target.state=new_stateclassObject(object):def__init__(self):self.state=None@state(on_start=on_transition)defa_state(self,store=False):passobj=Object()obj.a_state(store=True)sleep()obj.state# => is now storing the current state object.