工作人员。

sqs-workers的Python项目详细描述


SQS工人

我怎么用?

除非您是doist开发团队的一部分, 你很可能不需要它。这是一种固执己见的东西,建立在我们自己的内在需要之上 可能对外部开发人员没有什么价值。

队列处理器非常丰富(参见http://queues.io/" rel="nofollow">http://queues.io/示例),并且 上不缺少SQS队列处理器 pypi,请不要寄予厚望 关于这个特殊的实现

明白了,但我怎么开始用呢?

使用安装软件包

pip install sqs-workers

配置BOTO3库以提供安装所需的访问请求 使用类似这样的

aws configure

不要忘记设置您首选的AWS区域。

然后,您将开始管理两个系统(很可能来自同一个代码库): 其中一个将消息添加到队列,另一个将执行这些消息。

fromsqs_workersimportSQSEnv# This environment will use AWS requisites from ~/.aws/ directorysqs=SQSEnv()# Create a new queue.# Note that you can use AWS web interface for the same action as well, the# web interface provides more options. You only need to do it once.sqs.create_standard_queue('emails')# Register a queue processor@sqs.processor('emails','send_email')defsend_email(to,subject,body):print(f"Sending email {subject} to {to}")

然后有两种方法可以将任务添加到队列中。经典(也称为"显式"):

sqs.add_job('emails','send_email',to='user@examile.com',subject='Hello world',body='hello world')

还有"芹菜方式"(我们在某种程度上模仿了芹菜api)

send_email.delay(to='user@examile.com',subject='Hello world',body='hello world')

要处理队列,必须手动运行工作进程。创建一个新文件 将包含sqs对象的定义并注册所有处理器(很可能, 从项目导入必要的模块),然后运行sqs

fromsqs_workersimportSQSEnvsqs=SQSEnv()...sqs.process_queue('emails')

在生产中,我们通常不会在同一个过程中处理多个队列, 但是对于开发环境来说,处理所有队列更容易 立即使用

sqs.process_queues()

序列化

有两个序列化程序:json和pickle。

先进先出队列

fifo队列可以用创建fifo队列创建,并且必须有名称 以"fifo"结尾。死信队列必须有名字 有什么东西死了。fifo

fromsqs_workersimportSQSEnvsqs=SQSEnv()sqs.create_fifo_queue('emails_dead.fifo')sqs.create_fifo_queue('emails.fifo',redrive_policy=sqs.redrive_policy('emails_dead.fifo',3))

除非设置了"基于内容的重复数据消除"标志,否则每条消息都必须 发送时带有属性\u deduplication\u id。默认情况下,所有消息都具有 相同的消息组默认值,但您可以使用\u group\u id更改它

sqs.add_job('emails.fifo','send_email',_deduplication_id=subject,_group_id=email,**kwargs)

有关AWS上FIfo队列的更多信息

异常处理

如果任务处理最终出现异常,则会记录错误并 任务将在一段时间后返回队列。精确的行为被定义 按队列设置。

批处理

您可以使用sqs.processordecorator而不是sqs.batch\u processor。 在这种情况下,函数必须接受包含 听写列表。

自定义处理器

如果需要执行以下操作,可以定义自己的处理器或批处理器 在执行特定任务之前或之后的一些特定操作。

自定义处理器示例

pip install sqs-workers
0

处理上下文

通过作业消息隐式传递给工作进程的上下文。罐头 用于日志记录或分析目的,例如。

使用示例。

pip install sqs-workers
1

或者,您可以这样设置上下文。

pip install sqs-workers
2

然后,当需要清除上下文时:

pip install sqs-workers
3

在web应用程序中,通常在处理结束时调用它 网络请求的。

为所有任务自动应用上下文

与其处理每个处理函数中的上下文,不如 可以在处理器中通过子类化来执行此操作。

pip install sqs-workers
4

死信队列和重拨

在创建队列时,可以设置回退死信队列并重新驱动 政策,可以是这样的

pip install sqs-workers
5

意思是"在四(3+1)后将邮件移动到电子邮件死信队列" 发送给收件人的尝试失败"

退避政策

您可以为整个环境或特定环境定义退避策略 处理器。

pip install sqs-workers
6

默认策略是指数退避。建议始终设置 同时使用退避策略和死信队列来限制最大数量 执行尝试的次数。

或者,您可以将backoff设置为立即返回以重新执行 任务立即失败。

pip install sqs-workers
7

关闭策略

使用process_queue()启动队列处理器时,可能 可选择何时停止。

支持以下关闭策略:

  • idleshutdown(空闲秒):在没有新任务时从函数返回 具体时间段见

  • maxtasksshutdown(max_tasks):在处理之后从函数返回 最少的最大任务作业。有助于防止内存泄漏

默认策略为nevershutdown。也可以将前两个 使用或关闭或和关闭策略的策略,或创建 特定行为的自定义类。

不活动5分钟后停止处理队列的示例:

pip install sqs-workers
8

通过回推失败的消息来处理死信队列

处理死信队列的最常见方法是修复主错误 使消息首先出现在那里,然后重新处理 再次发送这些信息。

有了sqs,工作人员可以通过将来自 死信排队回到主信。它可以用特殊的 后备处理器称为死信处理器。

死信处理程序对队列的组织方式有自己的见解,并使用了一些 硬编码选项。

它应该处理队列"死的东西",应该是 为"某物"配置的死信队列。在处理队列时, 处理器接收每条消息并将其推回到队列"something" 硬编码延迟1秒。

如果队列名称不以"\u dead"结尾,则deadletterprocessor将执行以下操作 类似于一般的fallbackprocessor:显示错误消息并将消息保留在 排队。它是为了防止在 来自死信队列的消息被推回到同一队列,然后 立即由同一处理器再次处理,等等。

用法示例:

pip install sqs-workers
9

此代码接受foo_u dead queue中的所有消息,并将它们推回到 foo队列。然后等待10秒以确保没有新消息出现, 然后退出。

使用主队列中的处理器处理死信

您可以复制处理器,而不是将任务推回到主队列 从主队列到死信,并在适当的位置处理所有任务。

用法示例:

aws configure
0

与memoryenv一起使用单元内测试

有一种特殊的存储器nv,可作为快速"n"脏的替代品 对于单元测试中的实际队列。如果您有一个函数create_task 对于队列的一些任务,如果要测试它的工作方式,可以从技术上 像这样编写测试:

aws configure
1

问题是,您的测试开始依赖于aws(或localstack) 基础设施,你并不总是需要。你能做的就是 可以用memoryenv()替换sqsenv,并像这样重写测试。

aws configure
2

请注意,memoryenv有一些严重的限制,可能不太适合 你的用例。也就是说,当你使用memoryenv:

  • 重新驱动策略不起作用
  • 标准队列和先进先出队列之间没有区别
  • fifo队列不支持基于内容的重复数据消除
  • 无效执行的延迟任务:任务是从队列中获取的, 如果时间还没到,任务就要放回去了。
  • api可以返回稍有不同的结果

检验使用AWS

确保安装了所有依赖项,并配置了boto3客户端 (ref) 然后运行

aws configure
3

或者,要测试所有支持的版本,请运行

aws configure
4

使用localstack进行测试

localstack测试的执行速度应该比aws测试快,而且, 它们在脱机状态下工作良好。

运行localstack并确保 sqs端点在其默认地址http://localhost:4576 rel="nofollow">http://localhost:4576下可用

然后运行

aws configure
5

aws configure
6

为什么要靠Werkzeug?< < /H2>

唯一的原因是 我们喜欢它,而且我们懒得把它移到这个代码库中。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java编辑并重新运行spring引导单元测试,无需重新加载上下文即可加快测试速度   为什么我不能做演员?   java为什么是线程。join通常用于停止安卓中的线程   java从weblogic服务器调用JSON POST REST服务时收到400:错误请求   java在DeviceAdmin模式禁用时设置身份验证?   java SortedMap的keySet()能否始终安全地强制转换到SortedSet?   安卓 java。lang.NoSuchMethodException可包裹类   java JOGL库安装   javatomcat内存管理   java使用getString()中的变量   java将最小星号设置为评级栏   Java中字符串相等的println()方法。。。它到底是如何工作的?   java如何从文本中输出的数组中放入随机图像