UNEXPECTED_FRAME - 预期60类内容头,实际得到非内容头帧

1 投票
1 回答
3413 浏览
提问于 2025-04-18 04:06

我正在做的事情是,想象一下你有几个工作流程需要执行。这些工作流程里有一些任务,而这些任务的目标是不同的主机。最快的方式就是把每个工作流程放在一个进程里,并且让它们同时运行。

我试着用Python的多进程功能来执行一个远程函数,这个函数是通过Celery来调用的。我的程序在只运行一个进程的时候没问题,但当我运行多个进程时,就出现了下面的错误。根据我了解到的,问题出在同时在同一个频道上发布消息。频道不应该在不同的线程之间共享。

我该如何让Celery解决这个问题呢?是需要在'celeryd'命令中加一个参数,还是我需要在我的Python程序里处理这个问题?

    Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "testHello.py", line 16, in test_hello_aux
    print output.get()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
    on_interval=on_interval)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
    no_ack=no_ack, accept=self.accept) as consumer:
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
    self.revive(self.channel)
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
    self.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
    queue.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
    self.queue_declare(nowait, passive=False)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
    nowait=nowait)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1254, in queue_declare
    self._send_method((50, 10), args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "testHello.py", line 16, in test_hello_aux
    print output.get()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
    on_interval=on_interval)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
    no_ack=no_ack, accept=self.accept) as consumer:
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
Process Process-3:
    self.revive(self.channel)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
    self.declare()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
    queue.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
    self.run()
    self.exchange.declare(nowait)
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
    self._target(*self._args, **self._kwargs)
    nowait=nowait, passive=passive,
  File "testHello.py", line 16, in test_hello_aux
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 613, in exchange_declare
    print output.get()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
    on_interval=on_interval)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
    no_ack=no_ack, accept=self.accept) as consumer:
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
    self._send_method((40, 10), args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
    self.revive(self.channel)
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
    self.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
    write_frame(1, channel, payload)
    queue.declare()
  File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
    frame_type, channel, size, payload, 0xce,
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    self.exchange.declare(nowait)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
    nowait=nowait, passive=passive,
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 620, in exchange_declare
    return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
    (40, 11),  # Channel.exchange_declare_ok
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods)
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 237, in _wait_method
    self.method_reader.read_method()
  File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 189, in read_method
    raise m
error: [Errno 104] Connection reset by peer
Process Process-4:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "testHello.py", line 16, in test_hello_aux
    print output.get()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
    on_interval=on_interval)
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
    no_ack=no_ack, accept=self.accept) as consumer:
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
    self.revive(self.channel)
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
    self.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
    queue.declare()
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
    self.queue_declare(nowait, passive=False)
  File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
    nowait=nowait)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1258, in queue_declare
    (50, 11),  # Channel.queue_declare_ok
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods)
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 270, in _wait_method
    self.wait()
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 69, in wait
    return self.dispatch_method(method_sig, args, content)
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 87, in dispatch_method
    return amqp_method(self, args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 526, in _close
    (class_id, method_id), ConnectionError)
UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

celery --version 3.1.11 (Cipater) amq --version 0.9.1

1 个回答

1

使用Celery的时候,你不需要自己去用Python的多进程模块。Celery会为你处理好所有的事情。

你需要在一个叫做tasks.py的文件里定义你的任务。


from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

假设add这个函数其实是你想要并行运行的任何操作。这里我们需要理解两个概念。并行(parallel)是指同时进行,而异步(async)是指不需要同步进行。我不能保证你的任务会同时运行,但我可以保证它们不会同步运行。因此,我们就用“异步”这个词吧。

Celery有一个叫做Canvas的功能,它提供了一些用于异步流程控制的基本工具。你可能会对groupchord这两个工具感兴趣。group可以让你同时运行一组异步任务,并在所有任务完成后获取结果(这就实现了你想要的效果)。chord的功能和group类似,不过它会在所有任务完成后触发一个回调。

下面是调用代码的一个例子:



WAIT_TIME = 10 # how ever long you are willing to wait for your tasks

from tasks import add
from celery import group

future = group(add.s(i**i, i**i) for i in xrange(10))()
results = future.get(timeout=WAIT_TIME)

Celery的任务会自动在它们自己的进程中运行(也就是你启动的工作进程),你不需要自己再创建其他进程。

撰写回答