工作人员。
sqs-workers的Python项目详细描述
SQS工人
我怎么用?
除非您是doist开发团队的一部分, 你很可能不需要它。这是一种固执己见的东西,建立在我们自己的内在需要之上 可能对外部开发人员没有什么价值。
队列处理器非常丰富(参见http://queues.io/" rel="nofollow">http://queues.io/示例),并且 上不缺少SQS队列处理器 pypi,请不要寄予厚望 关于这个特殊的实现
明白了,但我怎么开始用呢?
使用安装软件包
pip install sqs-workers
配置BOTO3库以提供安装所需的访问请求
使用类似这样的 不要忘记设置您首选的AWS区域。 然后,您将开始管理两个系统(很可能来自同一个代码库):
其中一个将消息添加到队列,另一个将执行这些消息。 然后有两种方法可以将任务添加到队列中。经典(也称为"显式"): 还有"芹菜方式"(我们在某种程度上模仿了芹菜api) 要处理队列,必须手动运行工作进程。创建一个新文件
将包含sqs对象的定义并注册所有处理器(很可能,
从项目导入必要的模块),然后运行sqs 在生产中,我们通常不会在同一个过程中处理多个队列,
但是对于开发环境来说,处理所有队列更容易
立即使用 有两个序列化程序:json和pickle。 fifo队列可以用 除非设置了"基于内容的重复数据消除"标志,否则每条消息都必须
发送时带有属性 如果任务处理最终出现异常,则会记录错误并
任务将在一段时间后返回队列。精确的行为被定义
按队列设置。 您可以使用 如果需要执行以下操作,可以定义自己的处理器或批处理器
在执行特定任务之前或之后的一些特定操作。 自定义处理器示例 通过作业消息隐式传递给工作进程的上下文。罐头
用于日志记录或分析目的,例如。 使用示例。 或者,您可以这样设置上下文。 然后,当需要清除上下文时: 在web应用程序中,通常在处理结束时调用它
网络请求的。 与其处理每个处理函数中的上下文,不如
可以在处理器中通过子类化来执行此操作。 在创建队列时,可以设置回退死信队列并重新驱动
政策,可以是这样的 意思是"在四(3+1)后将邮件移动到电子邮件死信队列"
发送给收件人的尝试失败" 您可以为整个环境或特定环境定义退避策略
处理器。 默认策略是指数退避。建议始终设置
同时使用退避策略和死信队列来限制最大数量
执行尝试的次数。 或者,您可以将backoff设置为立即返回以重新执行
任务立即失败。 使用process_queue()启动队列处理器时,可能
可选择何时停止。 支持以下关闭策略: idleshutdown(空闲秒):在没有新任务时从函数返回
具体时间段见
maxtasksshutdown(max_tasks):在处理之后从函数返回
最少的最大任务作业。有助于防止内存泄漏 默认策略为nevershutdown。也可以将前两个
使用或关闭或和关闭策略的策略,或创建
特定行为的自定义类。 不活动5分钟后停止处理队列的示例: 处理死信队列的最常见方法是修复主错误
使消息首先出现在那里,然后重新处理
再次发送这些信息。 有了sqs,工作人员可以通过将来自
死信排队回到主信。它可以用特殊的
后备处理器称为死信处理器。 死信处理程序对队列的组织方式有自己的见解,并使用了一些
硬编码选项。 它应该处理队列"死的东西",应该是
为"某物"配置的死信队列。在处理队列时,
处理器接收每条消息并将其推回到队列"something"
硬编码延迟1秒。 如果队列名称不以"\u dead"结尾,则deadletterprocessor将执行以下操作
类似于一般的fallbackprocessor:显示错误消息并将消息保留在
排队。它是为了防止在
来自死信队列的消息被推回到同一队列,然后
立即由同一处理器再次处理,等等。 用法示例: 此代码接受foo_u dead queue中的所有消息,并将它们推回到
foo队列。然后等待10秒以确保没有新消息出现,
然后退出。 您可以复制处理器,而不是将任务推回到主队列
从主队列到死信,并在适当的位置处理所有任务。 用法示例: 有一种特殊的存储器nv,可作为快速"n"脏的替代品
对于单元测试中的实际队列。如果您有一个函数 问题是,您的测试开始依赖于aws(或localstack)
基础设施,你并不总是需要。你能做的就是
可以用memoryenv()替换sqsenv,并像这样重写测试。 请注意,memoryenv有一些严重的限制,可能不太适合
你的用例。也就是说,当你使用memoryenv: 确保安装了所有依赖项,并配置了boto3客户端
(ref)
然后运行 或者,要测试所有支持的版本,请运行 localstack测试的执行速度应该比aws测试快,而且,
它们在脱机状态下工作良好。 运行localstack并确保
sqs端点在其默认地址http://localhost:4576 rel="nofollow">http://localhost:4576下可用
然后运行 或 唯一的原因是
我们喜欢它,而且我们懒得把它移到这个代码库中。aws configure
fromsqs_workersimportSQSEnv# This environment will use AWS requisites from ~/.aws/ directorysqs=SQSEnv()# Create a new queue.# Note that you can use AWS web interface for the same action as well, the# web interface provides more options. You only need to do it once.sqs.create_standard_queue('emails')# Register a queue processor@sqs.processor('emails','send_email')defsend_email(to,subject,body):print(f"Sending email {subject} to {to}")
sqs.add_job('emails','send_email',to='user@examile.com',subject='Hello world',body='hello world')
send_email.delay(to='user@examile.com',subject='Hello world',body='hello world')
fromsqs_workersimportSQSEnvsqs=SQSEnv()...sqs.process_queue('emails')
sqs.process_queues()
序列化
先进先出队列
创建fifo队列
创建,并且必须有名称
以"fifo"结尾。死信队列必须有名字
有什么东西死了。fifo
fromsqs_workersimportSQSEnvsqs=SQSEnv()sqs.create_fifo_queue('emails_dead.fifo')sqs.create_fifo_queue('emails.fifo',redrive_policy=sqs.redrive_policy('emails_dead.fifo',3))
\u deduplication\u id
。默认情况下,所有消息都具有
相同的消息组默认值
,但您可以使用\u group\u id
更改它sqs.add_job('emails.fifo','send_email',_deduplication_id=subject,_group_id=email,**kwargs)
异常处理
批处理
sqs.processor
decorator而不是sqs.batch\u processor
。
在这种情况下,函数必须接受包含
听写列表。自定义处理器
pip install sqs-workers
0
处理上下文
pip install sqs-workers
1
pip install sqs-workers
2
pip install sqs-workers
3
为所有任务自动应用上下文
pip install sqs-workers
4
死信队列和重拨
pip install sqs-workers
5
退避政策
pip install sqs-workers
6
pip install sqs-workers
7
关闭策略
pip install sqs-workers
8
通过回推失败的消息来处理死信队列
pip install sqs-workers
9
使用主队列中的处理器处理死信
aws configure
0
与memoryenv一起使用单元内测试
create_task
对于队列的一些任务,如果要测试它的工作方式,可以从技术上
像这样编写测试:aws configure
1
aws configure
2
检验使用AWS
aws configure
3
aws configure
4
使用localstack进行测试
aws configure
5
aws configure
6
为什么要靠Werkzeug?< < /H2>
推荐PyPI第三方库