Django频道的分布式任务。
django-cq的Python项目详细描述
Django CQ
说明
尝试实现用于django通道的分布式任务队列。 以RQ和芹菜为模型,复杂的任务工作流是可能的,所有这些都利用了 渠道机械。
为什么
有三个原因:
旨在实现更多的容错任务。在很多情况下 关于测试是如何进行的,需要持久地存储。为了 重要的任务,即使在redis故障的情况下,或者 工人倒下了。
更喜欢利用与渠道相同的机制。
希望在没有 似乎可以通过芹菜或RQ。
限制
有两个限制:
redis必须用作django缓存。
asgi_redis
必须用作通道后端。
目前正在努力消除这些限制。
安装
如果可以,请使用pip:
pip install django-cq
或生活在边缘:
pip install -e https://github.com/furious-luke/django-cq.git#egg=django-cq
将软件包添加到设置文件:
INSTALLED_APPS=['cq',...]
并将路由信息包含在频道路由列表中:
channel_routing=[include('cq.routing.channel_routing'),...]
您需要迁移以包含模型:
./manage.py migrate
你很可能想要为你的cq创建一个新的频道层。 任务。默认层在频道消息上的生存时间很短, 这会导致运行时间稍长的任务杀死任何排队的消息。 更新设置文件以包含以下内容:
CHANNEL_LAYERS={'default':{...},'long':{'BACKEND':'asgi_redis.RedisChannelLayer','CONFIG':{'hosts':[REDIS_URL],'expiry':1800,'channel_capacity':{'cq-tasks':1000}},'ROUTING':'path.to.your.channels.channel_routing',},}CQ_CHANNEL_LAYER='long'
为了处理在“cq任务”通道上发送的消息,一个工人 需要启动进程:
./manage.py cq_runworker
任务
基本任务的使用是直接的:
@taskdefsend_email(cqt,addr):...return'OK'task=send_emails.delay('dummy@dummy.org')task.wait()print(task.result)# "OK"
这里,cqt
是send_email
任务的任务表示。这个
可用于启动子任务、链接后续任务等
东西。
任务也可以通过调用串行方式运行:
result=send_email('dummy@dummy.org')print(result)# "OK"
子任务
对于更复杂的工作流,可以从内部启动子任务 父任务:
@taskdefsend_emails(cqt):...foraddrinemail_addresses:cqt.subtask(send_email,addr)...return'OK'task=send_emails.delay()task.wait()print(task.result)# "OK"
子任务和使用delay
启动的另一个任务之间的区别
在任务中,子任务的父任务将不会标记为已完成
直到所有子任务也完成。
fromcq.modelsimportTask@taskdefparent(cqt):task_a.delay()# not a subtaskcqt.subtask(task_b)# subtaskparent.delay()parent.status==Task.STATUS_WAITING# True# once task_b completesparent.wait()parent.status==Task.STATUS_COMPLETE# True
链式任务
待办事项
@taskdefcalculate_something(cqt):returncalc_a.delay(3).chain(add_a_to_4,(4,))
非原子任务
默认情况下,每个cq任务都是原子的;对数据库的任何更改都不会持久。
除非任务毫无例外地完成。如果需要更改
即使发生错误,数据库也会使用atomic
标志:
@task(atomic=False)defunsafe_task(cqt):pass
日志记录
对于长时间运行的任务,能够访问正在进行的日志是很有用的
任务的进度。cq任务有一个log
方法来发送日志记录
发送到标准django日志流的消息,并将它们缓存在
正在运行的任务。
@taskdeflong_task(cqt):cqt.log('standard old log')cqt.log('debugging log',logging.DEBUG)
如果当前任务是子任务,则日志将转到父任务。 这样就有了一个可以使用的中心任务(顶级任务) 监视子任务和链接任务网络的进度和状态。
性能
由于处理日志的方式,可能会出现性能问题 有很多频繁的日志消息。有两种方法可以防止这种情况。
通过将publish
设置为False
,减少日志的频率
尽可能记录通话。这将在本地缓存日志并存储它们
在下一个publish=True
调用时。
@taskdeflong_task(cqt):foriiinrange(100):cqt.log('iteration %d'%ii,publish=False)cqt.log('done')# publish=True
其次,可以通过限制
保留的日志行数。limit
选项指定了这一点。这个
下面只保留10个记录的迭代:
@taskdeflong_task(cqt):foriiinrange(100):cqt.log('iteration %d'%ii,publish=False)cqt.log('done',limit=10)
生存时间
待办事项
重复任务
cq附带健壮的重复任务。有两种方法可以创建 重复任务:
从Djang来的o管理员。
使用数据迁移。
从管理中,单击进入cq
和repeating tasks
。从那里开始你
可以创建新的重复任务,指定要调用的后台任务,
还有一个重复的时间。
要从迁移创建重复任务,请使用helper函数
schedule_task
。
fromdjango.dbimportmigrationsfromcq.modelsimportschedule_taskfrommyapp.tasksimporta_taskdefadd_repeating(apps,scema_editor):RepeatingTask=apps.get_model('cq.RepeatingTask')schedule_task(RepeatingTask,'* * * * *',a_task)classMigration(migrations.Migration):operations=[migrations.RunPython(add_repeating,reverse_code=migrations.RunPython.noop)]
凝聚
合并任务的挂起或排队实例将阻止该任务的其他实例运行。