pydispatcher是否在后台线程中运行处理程序函数?

2024-05-16 06:44:35 发布

您现在位置:Python中文网/ 问答频道 /正文

在查找事件处理程序模块时,我遇到了pydispatcher,它似乎对初学者很友好。我对这个库的用例是,如果我的队列大小超过一个阈值,我想发送一个信号。然后,handler函数就可以开始处理并从队列中删除项(然后向数据库中进行大容量插入)。你知道吗

我希望处理函数在后台运行。我知道我可以简单地覆盖队列.append()方法检查队列大小并异步调用处理程序函数,但我想实现listener-dispatcher模型以保持逻辑的干净和分离。你知道吗

派发员是开箱即用吗?如果没有,是否有其他模块可以帮助我做到这一点?我是否需要管理对队列的访问,因为可能有多个线程同时处理和附加到队列?你知道吗

注意,在我的用例中只有一个调度器和事件处理程序。你知道吗


Tags: 函数数据库处理程序信号队列阈值用例后台
1条回答
网友
1楼 · 发布于 2024-05-16 06:44:35

我最近发布了Akuanduba模块,它可能会帮助您完成这项任务。存储库中只有一个示例可以帮助您了解它的工作原理,而且它似乎与您想要的类似。你知道吗

不管怎样,我将在这里解释一种用Akuanduba实现代码的方法:

  • 首先,您可以创建一个数据帧,以容纳您的队列:
# Mandatory imports
from Akuanduba.core.messenger.macros import *
from Akuanduba.core.constants import *
from Akuanduba.core import NotSet, AkuandubaDataframe
# Your imports go here:
from queue import Queue

class MyQueue (AkuandubaDataframe):

  def __init__(self, name):

    # Mandatory stuff
    AkuandubaDataframe.__init__(self, name)

    self.__queue = Queue ()

  def getQueue (self):
    return self.__queue

  def putQueue (self, val):
    self.__queue.put(val)

  def getQueueSize (self):
    return self.__queue.qsize()

  #
  # "toRawObj" method is a mandatory method that delivers a dict with the desired data
  # for file saving
  #
  def toRawObj(self):
    d = {
          "Queue" : self.getQueue(),
          }
    return d
  • 然后您可以创建一个触发器条件来检查队列大小:
from Akuanduba.core import StatusCode, NotSet, StatusTrigger
from Akuanduba.core.messenger.macros import *
from Akuanduba.core import TriggerCondition
import time

class CheckQueueSize (TriggerCondition):

  def __init__(self, name, maxSize):

    TriggerCondition.__init__(self, name)
    self._name = name
    self._maxSize = maxSize

  def initialize(self):

    return StatusCode.SUCCESS

  def execute (self):

    size = self.getContext().getHandler("MyQueue").getQueueSize()
    if (size > SIZE_THRESHOLD):
      return StatusTrigger.TRIGGERED
    else:
      return StatusTrigger.NOT_TRIGGERED

  def finalize(self):

    return StatusCode.SUCCESS
  • 制作一个工具作为处理函数:
# Mandatory imports
from Akuanduba.core import AkuandubaTool, StatusCode, NotSet, retrieve_kw
# Your imports go here:

class SampleTool(AkuandubaTool):

  def __init__(self, name, **kw):

    # Mandatory stuff
    AkuandubaTool.__init__(self, name)


  def initialize(self):

    # Lock the initialization. After that, this tool can not be initialized once again
    self.init_lock()
    return StatusCode.SUCCESS


  def execute(self,context):

    #
    # DO SOMETHING HERE
    #

    # Always return SUCCESS
    return StatusCode.SUCCESS

  def finalize(self):
    self.fina_lock()
    return StatusCode.SUCCESS
  • 最后,制作一个主脚本,使其能够协同工作:
# Akuanduba imports
from Akuanduba.core import Akuanduba, LoggingLevel, AkuandubaTrigger
from Akuanduba import ServiceManager, ToolManager, DataframeManager

# This sample's imports
import MyQueue, CheckQueueSize, SampleTool

# Creating your handler
your_handler = SampleTool ("Your Handler's name")

# Creating dataframes
queue = MyQueue ("MyQueue")

# Creating trigger
trigger  = AkuandubaTrigger("Sample Trigger Name", triggerType = 'or')

# Append conditions and tools to trigger just adding them
# Tools appended to the trigger will only run when trigger is StatusTrigger.TRIGGERED,
# and will run in the order they've been appended
trigger += CheckQueueSize( "CheckQueueSize condition", MAX_QUEUE_SIZE )
trigger += your_handler

# Creating Akuanduba
manager = Akuanduba("Akuanduba", level=LoggingLevel.INFO)

# Appending tools
#
# ToolManager += TOOL_1
# ToolManager += TOOL_2
#
ToolManager += trigger

# Apprending dataframes
DataframeManager += sampleDataframe

# Initializing 
manager.initialize()
manager.execute()
manager.finalize()

这样,你就有了干净和独立的代码。你知道吗

相关问题 更多 >