如何使用celery通过mongoengine向mongodb插入数据
我正在尝试使用celery将大量数据插入到我的mongodb中,但遇到了并发的问题。如果我同时发送多个任务到celery,部分数据会被插入到mongodb中,而其他数据则不会。我认为这是因为mongodb在插入操作时会锁定数据库,但我需要一个解决方案,能够同时发送多个相同类型的任务来插入数据到数据库中。比如说,检查数据库是否被锁定,如果被锁定就等它解锁。以下是我代码的一部分:
@celery.task(name='celery_tasks.add_book_product')
def add_book_product(product_dict, store_id):
connect(DefaultConfig.MONGODB_DB, host=DefaultConfig.MONGODB_HOST)
store_obj = Store.objects.get(pk=store_id)
try:
book = Books.objects.get(pk=product_dict['RawBook'])
try:
product_obj = Product.objects.get(store=store_obj, related_book=book, kind='book')
print("Product {} found for store {}".format(product_obj.id, store_obj.id))
product_obj.count = int(product_dict['count'])
product_obj.buy_price = int(product_dict['buy_book'])
product_obj.sell_price = int(product_dict['sell_book'])
product_obj.save()
except (DoesNotExist, ValidationError):
product_obj = Product(store=store_obj,
related_book=book,
kind='book',
count=int(product_dict['count']),
buy_price=int(product_dict['buy_book']),
sell_price=int(product_dict['sell_book']),
name=book.name_fa)
product_obj.save()
print("Appending books to store obj...")
store_obj.products.append(product_obj)
store_obj.save()
print("Appending books to store obj done")
return "Product {} saved for store {}".format(product_obj.id, store_obj.id)
except (DoesNotExist, ValidationError):
traceback.print_exc()
return "Product with raw book {} does not exist.".format(product_dict['RawBook'])
1 个回答
2
默认情况下,celery使用多进程来同时执行任务。但是,有两种方法可以确保在任何时候只执行一个任务。
解决方案 1:
当你用下面的命令启动celery工作进程时
celery -A your_app worker -l info
默认的并发数是你电脑核心的数量。所以如果你像这样启动一个工作进程
celery -A your_app worker -l info -c 1
它在任何时候只会运行一个任务。如果你还有其他需要执行的任务,可以启动一个新的队列,并分配一个工作进程来处理它。
解决方案 2:
这个方法稍微复杂一点。你需要在你的任务中使用一个锁,类似于下面这样。
if acquire_lock():
try:
#do something
finally:
release_lock()
return
你可以在Celery文档中了解更多信息。