Celery在task_success处理程序中关闭工作进程无效

2 投票
1 回答
1191 浏览
提问于 2025-04-17 15:47

我正在尝试让一个工作程序一次只执行一个任务,然后再关闭它。我已经把关闭的部分做得很好了(这里有一些背景信息:celery尝试通过在任务后运行信号中引发SystemExit来关闭工作程序,但总是挂起,主进程从不退出),但是在关闭时,我遇到了一个错误:

[2013-02-13 12:19:05,689: CRITICAL/MainProcess] Couldn't ack 1, reason:AttributeError("'NoneType' object has no attribute 'method_writer'",)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/base.py", line 104, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/base.py", line 99, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/amqplib/client_0_8/channel.py", line 1742, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/site-packages/amqplib/client_0_8/abstract_channel.py", line 75, in _send_method
    self.connection.method_writer.write_method(self.channel_id,
AttributeError: 'NoneType' object has no attribute 'method_writer'

为什么会这样?不仅没有确认任务完成,而且还清空了队列中剩下的其他任务(这可是个大问题)。

我该如何解决这个问题?





更新

下面是更新后的堆栈跟踪信息(pip install -U kombu amqp amqplib celery):

[2013-02-13 11:58:05,357: CRITICAL/MainProcess] Internal error: AttributeError("'NoneType' object has no attribute 'method_writer'",)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 372, in process_task
    req.execute_using_pool(self.pool)
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 219, in execute_using_pool
    timeout=task.time_limit)
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/base.py", line 137, in apply_async
    **options)
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/base.py", line 27, in apply_target
    callback(target(*args, **kwargs))
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 333, in on_success
    self.acknowledge()
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 439, in acknowledge
    self.on_ack(logger, self.connection_errors)
  File "/usr/local/lib/python2.7/dist-packages/kombu/transport/base.py", line 98, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/dist-packages/kombu/transport/base.py", line 93, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1562, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 57, in _send_method
    self.connection.method_writer.write_method(
AttributeError: 'NoneType' object has no attribute 'method_writer'

1 个回答

0

在任务后处理时退出是不推荐的,因为任务后处理是在“任务主体”错误处理之外执行的。

当一个任务调用 sys.exit 时,具体会发生什么并没有明确的定义,这实际上取决于使用的池类型。

在多进程的情况下,子进程会被一个新的进程替代。而在其他类型的池中,工作进程会关闭,但这可能会改变,以便与多进程的行为保持一致。

在任务主体之外调用退出被视为内部错误(崩溃)。

“任务主体”是指在 task.__call__() 执行时的内容。

我觉得一个更好的解决方案可能是使用自定义的执行策略:

from celery.worker import strategy
from functools import wraps

@staticmethod
def shutdown_after_strategy(task, app, consumer):

    default_handler = strategy.default(task, app, consumer)

    def _shutdown_to_exit_after(fun):
        @wraps(fun)
        def _inner(*args, **kwargs):
            try:
                return fun(*args, **kwargs)
            finally:
                raise SystemExit()
       return _inner
    return _decorate_to_exit_after(default_handler)

@celery.task(Strategy=shutdown_after_strategy)
def shutdown_after():
    print('will shutdown after this')

这并不是特别优雅,但执行策略的目的是为了优化任务执行,而不是为了方便扩展(工作进程通过缓存 Task.Strategy 来“预编译”每种任务类型的执行路径)。

在 Celery 3.1 中,你可以通过“启动步骤”扩展工作进程和消费者,所以很可能会有一个更好的解决方案。

撰写回答