Asyncio Python客户端for Google Cloud发布/订阅
gcloud-aio-pubsub的Python项目详细描述
安装
$ pip install --upgrade gcloud-aio-pubsub
用法
此发布/订阅实现基于google-cloud-pubsub >= 0.29.4
目前我们只实现了^{tt2}的异步版本$ 因为订阅模式默认情况下不适用于asyncio。官员 谷歌出版商(google publisher)返回了一个最有用的未来。
下面是订阅的大致使用模式:
fromgcloud.aio.pubsubimportSubscriberClientfromgoogle.cloud.pubsub_v1.subscriber.messageimportMessageclient=SubscriberClient()# create subscription if it doesn't already existclient.create_subscription('subscription_name','topic_name')asyncdefmessage_callback(message:Message)->None:try:# just an example: process the message however you need to here...result=handle(message)awaitupload_result(result)exceptException:message.nack()else:message.ack()# subscribe to the subscription, receiving a Future that acts as a keepalivekeep_alive=client.subscribe('subscription_name',message_callback)# have the client run forever, pulling messages from this subscription,# passing them to the specified callback function, and wrapping it in an# asyncio task.client.run_forever(keep_alive)
配置
我们的create_订阅方法是一个事物包装器,因此支持所有关键字 来自官方pubsub客户机的配置参数,您可以在 这是official Google documentation。
订阅订阅时,可以选择传入FlowControl 和/或Scheduler实例。
example_flow_control=FlowControl(max_messages=1,resume_threshold=0.8,max_request_batch_size=1,max_request_batch_latency=0.1,max_lease_duration=10,)keep_alive=client.subscribe('subscription_name',message_callback,flow_control=example_flow_control)
了解修改FlowControl如何影响pubsub运行时 将操作可能会混淆,所以这里有一个方便的花花公子指南
欢迎使用@thekevjames's guide配置google pubsub订阅 政策!安顿下来,喝一杯,待一会儿。
订阅服务器由定义的流控制配置元组控制 here: 该配置对象f由订阅服务器在以下情况下使用 方式:
最大并发性
允许订阅服务器在其当前租用的任何时间租用新任务 任务x满足:
((len(x)<f.resume_threshold*f.max_messages)and(sum(x.bytes)<f.resume_threshold*f.max_bytes))
实际上,这意味着我们应该设置以下值 限制:
- 高峰时并发租借任务的最大数量为: = (f.max_messages * f.resume_threshold) + f.max_request_batch_size
- 在高峰时我们的租用任务的最大内存使用量是: = (f.max_bytes * f.resume_threshold) + (f.max_request_batch_size * bytes_per_task)
- 这些价值观相互制约 给出的这些值中: max_tasks * bytes_per_task <> max_memory
旁白:Pubsub上的ocn似乎是每个1538字节
租赁请求
当租用新任务时,Subscriber使用以下算法:
deflease_more_tasks():start=time.now()yieldqueue.Queue.get(block=True)# always returns >=1for_inrange(f.max_request_batch_size-1):elapsed=time.now()-startyieldqueue.Queue.get(block=False,timeout=f.max_request_batch_latency-elapsed)ifelapsed>=f.max_request_batch_latency:break
实际上,这意味着我们应该设置f.max_request_batch_size 上述并发问题并设置f.max_request_batch_latency给定 不管我们愿意接受什么延迟率。
对于完全队列中的Queue.get(),预期的最佳大小写时间不会更糟 大于0.3ms。此队列应以GRPC请求的速度填满 给google pubsub,它应该足够快(tm)来保持它的填充,给定 这些请求被批处理。
因此,我们可以预期:
- 平均租用延迟:~= f.max_request_batch_size * 0.0003
- 最坏情况延迟:~= f.max_request_batch_latency
请注意,租赁是基于f.resume_threshold进行的,因此 延迟与任务执行同时发生。
任务到期
任何未确认或未确认的任务都将计入当前租用的 任务计数。我们的工作线程应该确保所有任务都已确认或已被访问,但是 FlowControlconfig允许我们处理任何其他情况。请注意 租赁工作如下:
- 当订户租用一个任务时,Google Pubsub将不会重新租用该任务 任务直到subscription.ack_deadline_seconds = 10(可配置 每个订阅)秒已过
- 如果客户机调用任务的ack(),它将立即从google中删除 普布苏布。
- 如果客户机调用任务的nack(),它会立即允许google pubsub 将该任务重新出租给新客户端。客户端将任务从其 记忆。
- 如果f.max_lease_duration在被租用和确认的消息之间传递, 客户端将发送一个nack(请参阅上面的工作流)。它不会掉下来的 任务在其内存中–例如,worker(task)进程可能仍在运行
注意:
- 所有步骤都是尽力而为的,例如,将“任务将被删除”改为“任务将被删除” 如果分布式系统运气好,可能会被删除“
- 在上述工作流中,“google pubsub”是指服务器端系统,例如。 由google管理,任务实际上存储在那里。
实际上,我们应该将f.max_lease_duration设置为不低于 高负载时95%的任务延迟。这个这个值越低, 在极端情况下,我们的吞吐量会更好。
混淆
f.max_requests已定义,但似乎未使用。
贡献
请看我们的contributing guide。