UNEXPECTED_FRAME - 预期60类内容头,实际得到非内容头帧
我正在做的事情是,想象一下你有几个工作流程需要执行。这些工作流程里有一些任务,而这些任务的目标是不同的主机。最快的方式就是把每个工作流程放在一个进程里,并且让它们同时运行。
我试着用Python的多进程功能来执行一个远程函数,这个函数是通过Celery来调用的。我的程序在只运行一个进程的时候没问题,但当我运行多个进程时,就出现了下面的错误。根据我了解到的,问题出在同时在同一个频道上发布消息。频道不应该在不同的线程之间共享。
我该如何让Celery解决这个问题呢?是需要在'celeryd'命令中加一个参数,还是我需要在我的Python程序里处理这个问题?
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
self.queue_declare(nowait, passive=False)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
nowait=nowait)
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1254, in queue_declare
self._send_method((50, 10), args)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
write_frame(1, channel, payload)
File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
frame_type, channel, size, payload, 0xce,
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
Process Process-3:
self.revive(self.channel)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
self.run()
self.exchange.declare(nowait)
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
self._target(*self._args, **self._kwargs)
nowait=nowait, passive=passive,
File "testHello.py", line 16, in test_hello_aux
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 613, in exchange_declare
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self._send_method((40, 10), args)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
write_frame(1, channel, payload)
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
frame_type, channel, size, payload, 0xce,
File "/usr/lib/python2.7/socket.py", line 224, in meth
self.exchange.declare(nowait)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
nowait=nowait, passive=passive,
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 620, in exchange_declare
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
(40, 11), # Channel.exchange_declare_ok
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 237, in _wait_method
self.method_reader.read_method()
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 189, in read_method
raise m
error: [Errno 104] Connection reset by peer
Process Process-4:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
self.queue_declare(nowait, passive=False)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
nowait=nowait)
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1258, in queue_declare
(50, 11), # Channel.queue_declare_ok
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 270, in _wait_method
self.wait()
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 69, in wait
return self.dispatch_method(method_sig, args, content)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 87, in dispatch_method
return amqp_method(self, args)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 526, in _close
(class_id, method_id), ConnectionError)
UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
celery --version 3.1.11 (Cipater) amq --version 0.9.1
1 个回答
1
使用Celery的时候,你不需要自己去用Python的多进程模块。Celery会为你处理好所有的事情。
你需要在一个叫做tasks.py的文件里定义你的任务。
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
假设add
这个函数其实是你想要并行运行的任何操作。这里我们需要理解两个概念。并行(parallel)是指同时进行,而异步(async)是指不需要同步进行。我不能保证你的任务会同时运行,但我可以保证它们不会同步运行。因此,我们就用“异步”这个词吧。
Celery有一个叫做Canvas的功能,它提供了一些用于异步流程控制的基本工具。你可能会对group
和chord
这两个工具感兴趣。group
可以让你同时运行一组异步任务,并在所有任务完成后获取结果(这就实现了你想要的效果)。chord
的功能和group
类似,不过它会在所有任务完成后触发一个回调。
下面是调用代码的一个例子:
WAIT_TIME = 10 # how ever long you are willing to wait for your tasks
from tasks import add
from celery import group
future = group(add.s(i**i, i**i) for i in xrange(10))()
results = future.get(timeout=WAIT_TIME)
Celery的任务会自动在它们自己的进程中运行(也就是你启动的工作进程),你不需要自己再创建其他进程。