如何让戏剧工作者在剧本中工作?

2024-05-13 05:07:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我刚开始做几天的戏剧实验,我不会让一个工人在一个(简单的)剧本中工作

test.py


import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results
from dramatiq.worker import Worker

redis_broker = RedisBroker(host="127.0.0.1", port=6379)

results_backend = RedisBackend(url="redis://127.0.0.1:6379")
redis_broker.add_middleware(Results(backend=results_backend))

dramatiq.set_broker(redis_broker)

worker = Worker(
    broker=redis_broker
)
worker.start()

@dramatiq.actor(queue_name="default", max_retries=1, store_results=True)
def print_hello_world():
    print("Hello World!")

print_hello_world.send()

结果(Redis):

127.0.0.1:6379> keys *
1) "dramatiq:default.msgs"
2) "dramatiq:default"
3) "dramatiq:__heartbeats__"

但从test.py文件夹中启动戏剧性的deamon时:

$ dramatiq test

结果正是我所期望的

结果(Redis)

127.0.0.1:6379> keys *
1) "3f11649148820d957b2945da46b3c2b7"
2) "dramatiq:__heartbeats__"

工人似乎没有以这种方式接收信息。在互联网上很难找到在脚本中设置工作者的例子

有人能帮我完成这项工作吗


Tags: frompytestimportredisbackenddefaultbroker
1条回答
网友
1楼 · 发布于 2024-05-13 05:07:28

您可以从这个链接https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311引用

"""
Refer from https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311
"""
import os
import sys

from dramatiq.cli import (
    CPUS,
    HAS_WATCHDOG,
    main as dramatiq_worker,
    make_argument_parser as dramatiq_argument_parser,
)

from app.core.config import get_settings
from app.tasks import rabbitmq_broker
from app.tasks.manual_selenium import demo
from app.utils.logger import logger


def list_managed_actors(broker, queues):
    queues = set(queues)
    all_actors = broker.actors.values()
    if not queues:
        return all_actors
    else:
        return [a for a in all_actors if a.queue_name in queues]


def format_actor(actor):
    return "%s@%s" % (actor.actor_name, actor.queue_name)


def guess_code_directory(broker):
    actor = next(iter(broker.actors.values()))
    modname, *_ = actor.fn.__module__.partition('.')
    mod = sys.modules[modname]
    return os.path.dirname(mod.__file__)


def worker(verbose=0, processes=CPUS, threads=8, queues=None, broker=rabbitmq_broker):
    """Run dramatiq workers.

    Setup Dramatiq with broker and task modules from Flask app.

    \b
    examples:
      # Run dramatiq with 1 thread per process.
      $ flask worker  threads 1

    \b
      # Listen only to the "foo" and "bar" queues.
      $ flask worker -Q foo,bar

    \b
      # Consuming from a specific broker
      $ flask worker mybroker
    """
    # Plugin for flask.commands entrypoint.
    #
    # Wraps dramatiq worker CLI in a Flask command. This is private API of
    # dramatiq.
    # TODO Plugin for fastapi

    parser = dramatiq_argument_parser()

    command = [
        " processes", str(processes),
        " threads", str(threads),
        # This module does not have broker local. Thus dramatiq fallbacks to
        # global broker.
        __name__,
    ]

    if get_settings().DEBUG:
        verbose = max(1, verbose)
        if HAS_WATCHDOG:
            command += [" watch", guess_code_directory(broker)]

    queues = queues.split(",") if queues else []
    if queues:
        command += [" queues"] + queues
    command += verbose * ['-v']
    args = parser.parse_args(command)

    logger.info("Able to execute the following actors:")
    for actor in list_managed_actors(broker, queues):
        logger.info("\t%s." % format_actor(actor))

    dramatiq_worker(args)


if '__main__' == __name__:
    worker()


相关问题 更多 >