Django频道的分布式任务。

django-cq的Python项目详细描述


Django CQ

PyPI version

说明

尝试实现用于django通道的分布式任务队列。 以RQ和芹菜为模型,复杂的任务工作流是可能的,所有这些都利用了 渠道机械。

为什么

有三个原因:

  1. 旨在实现更多的容错任务。在很多情况下 关于测试是如何进行的,需要持久地存储。为了 重要的任务,即使在redis故障的情况下,或者 工人倒下了。

  2. 更喜欢利用与渠道相同的机制。

  3. 希望在没有 似乎可以通过芹菜或RQ。

限制

有两个限制:

  • redis必须用作django缓存。

  • asgi_redis必须用作通道后端。

目前正在努力消除这些限制。

安装

如果可以,请使用pip:

pip install django-cq

或生活在边缘:

pip install -e https://github.com/furious-luke/django-cq.git#egg=django-cq

将软件包添加到设置文件:

INSTALLED_APPS=['cq',...]

并将路由信息包含在频道路由列表中:

channel_routing=[include('cq.routing.channel_routing'),...]

您需要迁移以包含模型:

./manage.py migrate

你很可能想要为你的cq创建一个新的频道层。 任务。默认层在频道消息上的生存时间很短, 这会导致运行时间稍长的任务杀死任何排队的消息。 更新设置文件以包含以下内容:

CHANNEL_LAYERS={'default':{...},'long':{'BACKEND':'asgi_redis.RedisChannelLayer','CONFIG':{'hosts':[REDIS_URL],'expiry':1800,'channel_capacity':{'cq-tasks':1000}},'ROUTING':'path.to.your.channels.channel_routing',},}CQ_CHANNEL_LAYER='long'

为了处理在“cq任务”通道上发送的消息,一个工人 需要启动进程:

./manage.py cq_runworker

任务

基本任务的使用是直接的:

@taskdefsend_email(cqt,addr):...return'OK'task=send_emails.delay('dummy@dummy.org')task.wait()print(task.result)# "OK"

这里,cqtsend_email任务的任务表示。这个 可用于启动子任务、链接后续任务等 东西。

任务也可以通过调用串行方式运行:

result=send_email('dummy@dummy.org')print(result)# "OK"

子任务

对于更复杂的工作流,可以从内部启动子任务 父任务:

@taskdefsend_emails(cqt):...foraddrinemail_addresses:cqt.subtask(send_email,addr)...return'OK'task=send_emails.delay()task.wait()print(task.result)# "OK"

子任务和使用delay启动的另一个任务之间的区别 在任务中,子任务的父任务将不会标记为已完成 直到所有子任务也完成。

fromcq.modelsimportTask@taskdefparent(cqt):task_a.delay()# not a subtaskcqt.subtask(task_b)# subtaskparent.delay()parent.status==Task.STATUS_WAITING# True# once task_b completesparent.wait()parent.status==Task.STATUS_COMPLETE# True

链式任务

待办事项

@taskdefcalculate_something(cqt):returncalc_a.delay(3).chain(add_a_to_4,(4,))

非原子任务

默认情况下,每个cq任务都是原子的;对数据库的任何更改都不会持久。 除非任务毫无例外地完成。如果需要更改 即使发生错误,数据库也会使用atomic标志:

@task(atomic=False)defunsafe_task(cqt):pass

日志记录

对于长时间运行的任务,能够访问正在进行的日志是很有用的 任务的进度。cq任务有一个log方法来发送日志记录 发送到标准django日志流的消息,并将它们缓存在 正在运行的任务。

@taskdeflong_task(cqt):cqt.log('standard old log')cqt.log('debugging log',logging.DEBUG)

如果当前任务是子任务,则日志将转到父任务。 这样就有了一个可以使用的中心任务(顶级任务) 监视子任务和链接任务网络的进度和状态。

性能

由于处理日志的方式,可能会出现性能问题 有很多频繁的日志消息。有两种方法可以防止这种情况。

通过将publish设置为False,减少日志的频率 尽可能记录通话。这将在本地缓存日志并存储它们 在下一个publish=True调用时。

@taskdeflong_task(cqt):foriiinrange(100):cqt.log('iteration %d'%ii,publish=False)cqt.log('done')# publish=True

其次,可以通过限制 保留的日志行数。limit选项指定了这一点。这个 下面只保留10个记录的迭代:

@taskdeflong_task(cqt):foriiinrange(100):cqt.log('iteration %d'%ii,publish=False)cqt.log('done',limit=10)

生存时间

待办事项

重复任务

cq附带健壮的重复任务。有两种方法可以创建 重复任务:

  1. 从Djang来的o管理员。

  2. 使用数据迁移。

从管理中,单击进入cqrepeating tasks。从那里开始你 可以创建新的重复任务,指定要调用的后台任务, 还有一个重复的时间。

要从迁移创建重复任务,请使用helper函数 schedule_task

fromdjango.dbimportmigrationsfromcq.modelsimportschedule_taskfrommyapp.tasksimporta_taskdefadd_repeating(apps,scema_editor):RepeatingTask=apps.get_model('cq.RepeatingTask')schedule_task(RepeatingTask,'* * * * *',a_task)classMigration(migrations.Migration):operations=[migrations.RunPython(add_repeating,reverse_code=migrations.RunPython.noop)]

凝聚

合并任务的挂起或排队实例将阻止该任务的其他实例运行。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
具有较旧spring启动版本的java Maven依赖项   java如何在安卓中获取移动网络活动计划使用历史记录   java CXF抛出了所有策略替代方案都无法满足的问题   java如何创建类似ApachePOI的程序   Java Hashmap如何处理单词网格中的键冲突   java如何在多个下拉列表中搜索下一个元素   如何将css文件导入我的JavaSpringWebApp?   如何在Java中将字符串[]转换为字符串[]?   排序如何在Java中根据列的组合对spark dataframe进行排序?   java错误:无法访问com的zzbej类文件。谷歌。安卓gms。内部的没有找到zzbej   illegalargumentexception Java Comparator引发非法参数异常   java删除ShaperRenderer偏移量   安卓中的java出生日期问题   设置MediaBrowserService和MediaSession时遇到java问题   java对抛出声明的澄清   java在IntelliJ IDEA的Gradle项目的“提供”范围内添加依赖项   带2个变量的java For循环?   java是一个更平坦的问题   使用JACOB保存Word文档(Java)