Celery 入门 - result.get() 的超时错误
我正在按照这个教程学习Celery的基础知识:http://celery.readthedocs.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results
我在使用RabbitMQ,完全按照教程来操作。
当我执行result.get(timeout=1)时,出现了超时错误,尽管这只是一个简单的加法操作,而且我可以在另一个窗口看到工作进程正在运行并且正确地计算出结果(8)。
(venv) C:\Volt\celerytest>ipython
Python 2.7.6 (default, Nov 10 2013, 19:24:18) [MSC v.1500 32 bit (Intel)]
Type "copyright", "credits" or "license" for more information.
IPython 2.1.0 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
In [1]: from tasks import add
In [2]: a = add(1,3)
In [3]: a
Out[3]: 4
In [4]: a = add.delay(1,3)
In [5]: a.ready()
Out[5]: False
In [6]: a = add.delay(4,4)
In [7]: a.get(timeout=0.5)
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
<ipython-input-7-2c407a92720e> in <module>()
----> 1 a.get(timeout=0.5)
C:\Users\Som\Envs\venv\lib\site-packages\celery\result.pyc in get(self, timeout,
propagate, interval, no_ack, follow_parents)
167 interval=interval,
168 on_interval=on_interval,
--> 169 no_ack=no_ack,
170 )
171 finally:
C:\Users\Som\Envs\venv\lib\site-packages\celery\backends\amqp.pyc in wait_for(se
lf, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPA
GATE_STATES, **kwargs)
155 on_interval=on_interval)
156 except socket.timeout:
--> 157 raise TimeoutError('The operation timed out.')
158
159 if meta['status'] in PROPAGATE_STATES and propagate:
TimeoutError: The operation timed out.
In [8]:
tasks.py 文件
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task
def add(x, y):
return x + y
工作进程日志
[tasks]
. tasks.add
[2014-07-17 13:00:33,196: INFO/MainProcess] Connected to amqp://guest:**@127.0.0
.1:5672//
[2014-07-17 13:00:33,211: INFO/MainProcess] mingle: searching for neighbors
[2014-07-17 13:00:34,220: INFO/MainProcess] mingle: all alone
[2014-07-17 13:00:34,240: WARNING/MainProcess] celery@SomsPC ready.
[2014-07-17 13:00:34,242: INFO/MainProcess] Received task: tasks.add[85ff75d8-38
b5-442a-a574-c8b976a33739]
[2014-07-17 13:00:34,243: INFO/MainProcess] Task tasks.add[85ff75d8-38b5-442a-a5
74-c8b976a33739] succeeded in 0.000999927520752s: 4
[2014-07-17 13:00:46,582: INFO/MainProcess] Received task: tasks.add[49de7c6b-96
72-485d-926e-a4e564ccc89a]
[2014-07-17 13:00:46,588: INFO/MainProcess] Task tasks.add[49de7c6b-9672-485d-92
6e-a4e564ccc89a] succeeded in 0.00600004196167s: 8
4 个回答
我知道我回答得有点晚,但希望能帮到某些人。
你只需要在配置好后端之后,重启已经在运行的工作进程。关于这点,你可以在“第一步”页面找到相关信息,不过这部分内容在文章的最后。
确保没有旧的工作进程还在运行。
很容易不小心启动多个工作进程,所以在启动新的之前,确保之前的工作进程已经正确关闭。
如果有一个旧的工作进程没有配置好预期的结果后端,它可能会在运行并干扰任务。
有时候我在使用redis的时候也会遇到TimeoutError
这个错误,所以我写了一个辅助函数来处理这个问题:
celery_app.update(
redis_socket_timeout=5,
redis_socket_connect_timeout=5,
)
def run_task(task, *args, **kwargs):
timeout = 2 * 60
future = task.apply_async(args, kwargs)
time_end = time.time() + timeout
while True:
try:
return future.get(timeout=timeout)
except redis.TimeoutError:
if time.time() < time_end:
continue
raise
如果你查看这个讨论帖,你会发现设置 --pool=solo
也能解决这个问题。这对我来说是有效的。
我在学习“Celery入门”时遇到了完全相同的问题。
我觉得问题的原因可能是因为使用了 backend='amqp'
。
对我有效的设置如下:
app = Celery('tasks', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
根据文档,当使用AMQP作为结果后端时,每个结果只能被获取一次(实际上它在查询中就是一条消息)。
我想,你的工作进程是为了把结果打印到控制台而去获取这个结果的:
Task tasks.add[49de7c6b-9672-485d-926e-a4e564ccc89a] succeeded in 0.00600004196167s: 8
所以你就无法再次获取同样的结果了。