Celery工作进程变量共享问题
我在一个项目中使用Python和celery。在这个项目里,我有两个文件:
celeryconfig.py
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("example",)
CELERYD_CONCURRENCY = 2
和example.py
from celery.task import task
import hashlib
md5 = hashlib.md5()
@task
def getDigest(text):
print 'Using md5 - ',md5
md5.update(text)
return md5.digest()
在celeryconfig.py中,我把CELERYD_CONCURRENCY设置为2,这意味着它会把任务队列中的任务分配给2个不同的进程来处理。
然后我在Python控制台中运行:
from example import getDigest
getDigest.delay('foo');getDigest.delay('bar')
这会创建两个任务,让这两个工作进程同时执行。问题是,当这两个工作进程运行它们的任务函数getDigest()时,它们似乎在使用同一个哈希对象(md5)。从celeryd的输出可以确认这一点,具体如下。
[PoolWorker-2] Using md5 -
[PoolWorker-2] <md5 HASH object @ 0x23e6870>
[PoolWorker-1] Using md5 -
[PoolWorker-1] <md5 HASH object @ 0x23e6870>
为了简单起见,我使用的是hashlib的md5对象,但在我的实际项目中,我使用的是一个不能被多个进程访问和修改的对象。这就导致工作进程崩溃了。
这就引出了一个问题:我该如何修改我的代码,让工作进程各自初始化并使用自己的(md5)对象?现在它们共享同一个对象,这导致我的应用崩溃。这可能吗?
1 个回答
7
他们使用的是同一个对象,因为你在代码里明确告诉他们这样做。你在任务的外面创建了这个对象,然后在任务里使用它,这样就让所有的工作者都能访问这个共享的对象。这是一个并发问题,不一定是Celery的问题。如果这个对象比较小,你可以使用它的一个副本,或者使用你自己的锁定策略。不过一般来说,如果一个对象会被多个进程同时更新,就需要使用某种同步机制,而这超出了Celery的范围。