Asyncio Python客户端for Google Cloud发布/订阅

gcloud-aio-pubsub的Python项目详细描述


Latest PyPI VersionPython Version Support

安装

$ pip install --upgrade gcloud-aio-pubsub

用法

此发布/订阅实现基于google-cloud-pubsub >= 0.29.4

目前我们只实现了^{tt2}的异步版本$ 因为订阅模式默认情况下不适用于asyncio。官员 谷歌出版商(google publisher)返回了一个最有用的未来。

下面是订阅的大致使用模式:

fromgcloud.aio.pubsubimportSubscriberClientfromgoogle.cloud.pubsub_v1.subscriber.messageimportMessageclient=SubscriberClient()# create subscription if it doesn't already existclient.create_subscription('subscription_name','topic_name')asyncdefmessage_callback(message:Message)->None:try:# just an example: process the message however you need to here...result=handle(message)awaitupload_result(result)exceptException:message.nack()else:message.ack()# subscribe to the subscription, receiving a Future that acts as a keepalivekeep_alive=client.subscribe('subscription_name',message_callback)# have the client run forever, pulling messages from this subscription,# passing them to the specified callback function, and wrapping it in an# asyncio task.client.run_forever(keep_alive)

配置

我们的create_订阅方法是一个事物包装器,因此支持所有关键字 来自官方pubsub客户机的配置参数,您可以在 这是official Google documentation

订阅订阅时,可以选择传入FlowControl 和/或Scheduler实例。

example_flow_control=FlowControl(max_messages=1,resume_threshold=0.8,max_request_batch_size=1,max_request_batch_latency=0.1,max_lease_duration=10,)keep_alive=client.subscribe('subscription_name',message_callback,flow_control=example_flow_control)

了解修改FlowControl如何影响pubsub运行时 将操作可能会混淆,所以这里有一个方便的花花公子指南

欢迎使用@thekevjames's guide配置google pubsub订阅 政策!安顿下来,喝一杯,待一会儿。

订阅服务器由定义的流控制配置元组控制 here: 该配置对象f由订阅服务器在以下情况下使用 方式:

最大并发性

允许订阅服务器在其当前租用的任何时间租用新任务 任务x满足:

((len(x)<f.resume_threshold*f.max_messages)and(sum(x.bytes)<f.resume_threshold*f.max_bytes))

实际上,这意味着我们应该设置以下值 限制:

  • 高峰时并发租借任务的最大数量为: = (f.max_messages * f.resume_threshold) + f.max_request_batch_size
  • 在高峰时我们的租用任务的最大内存使用量是: = (f.max_bytes * f.resume_threshold) + (f.max_request_batch_size * bytes_per_task)
  • 这些价值观相互制约 给出的这些值中: max_tasks * bytes_per_task <> max_memory

旁白:Pubsub上的ocn似乎是每个1538字节

租赁请求

当租用新任务时,Subscriber使用以下算法:

deflease_more_tasks():start=time.now()yieldqueue.Queue.get(block=True)# always returns >=1for_inrange(f.max_request_batch_size-1):elapsed=time.now()-startyieldqueue.Queue.get(block=False,timeout=f.max_request_batch_latency-elapsed)ifelapsed>=f.max_request_batch_latency:break

实际上,这意味着我们应该设置f.max_request_batch_size 上述并发问题并设置f.max_request_batch_latency给定 不管我们愿意接受什么延迟率。

对于完全队列中的Queue.get(),预期的最佳大小写时间不会更糟 大于0.3ms。此队列应以GRPC请求的速度填满 给google pubsub,它应该足够快(tm)来保持它的填充,给定 这些请求被批处理。

因此,我们可以预期:

  • 平均租用延迟:~= f.max_request_batch_size * 0.0003
  • 最坏情况延迟:~= f.max_request_batch_latency

请注意,租赁是基于f.resume_threshold进行的,因此 延迟与任务执行同时发生。

任务到期

任何未确认或未确认的任务都将计入当前租用的 任务计数。我们的工作线程应该确保所有任务都已确认或已被访问,但是 FlowControlconfig允许我们处理任何其他情况。请注意 租赁工作如下:

  • 当订户租用一个任务时,Google Pubsub将不会重新租用该任务 任务直到subscription.ack_deadline_seconds = 10(可配置 每个订阅)秒已过
  • 如果客户机调用任务的ack(),它将立即从google中删除 普布苏布。
  • 如果客户机调用任务的nack(),它会立即允许google pubsub 将该任务重新出租给新客户端。客户端将任务从其 记忆。
  • 如果f.max_lease_duration在被租用和确认的消息之间传递, 客户端将发送一个nack(请参阅上面的工作流)。它不会掉下来的 任务在其内存中–例如,worker(task)进程可能仍在运行

注意:

  • 所有步骤都是尽力而为的,例如,将“任务将被删除”改为“任务将被删除” 如果分布式系统运气好,可能会被删除“
  • 在上述工作流中,“google pubsub”是指服务器端系统,例如。 由google管理,任务实际上存储在那里。

实际上,我们应该将f.max_lease_duration设置为不低于 高负载时95%的任务延迟。这个这个值越低, 在极端情况下,我们的吞吐量会更好。

混淆

f.max_requests已定义,但似乎未使用。

贡献

请看我们的contributing guide

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java泛型和基类   ProcessBuilder或DefaultExecutor启动的“RunAs”子进程的java读取标准输出   java应用程序在尝试登录时突然停止   Java:神秘的Java未捕获异常处理程序[带代码]   java JavaFX NumberAxis自动范围无限循环   最新版本和旧版本冲突的java双Maven依赖关系   java如何导入带有部署变量类名的静态函数?   编译器构造不同的JDK更新会产生不同的Java字节码吗?   java无法在struts 1.1中上载任何超过250 MB大小的文件   java调整jcombobox下拉菜单的宽度   java如何在某些情况下忽略@SQLDelete注释   在Eclipse for Java EE developers edition中禁用HTML警告   java HttpUrlConnection重置请求属性   java@Provider资源未在rest应用程序中注册   java TOP N使用JPA连接   java在使用反射调用方法时区分int和Integer参数