Google PubSub python客户端返回状态码不可用

2024-05-16 01:05:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试建立一个长期运行的请求订阅Google云PubSub主题。 我使用的代码与文档here中给出的示例非常相似,即:

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

问题是我有时会收到以下回溯:

^{pr2}$

我看到在another question中引用了这一点,但这里我要问的是如何在Python中正确地处理它。我试图将请求包装在一个异常中,但它似乎是在后台运行的,如果出现该错误,我将无法重试。在


Tags: thetopathnamefromprojectformatmessage
1条回答
网友
1楼 · 发布于 2024-05-16 01:05:28

对我来说,一种有点老套的方法是自定义policy_class。默认的函数有一个忽略DEADLINE_EXCEEDEDon_exception函数。您可以创建一个继承默认值并忽略UNAVAILABLE的类。我的看起来像这样:

from google.cloud import pubsub
from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc

class AvailablePolicy(thread.Policy):
    def on_exception(self, exception):
        """The parent ignores DEADLINE_EXCEEDED. Let's also ignore UNAVAILABLE.

        I'm not sure what triggers that error, but if you ignore it, your
        subscriber seems to work just fine. It's probably an intermittent
        thing and it reconnects later if you just give it a chance.
        """
        # If this is UNAVAILABLE, then we want to retry.
        # That entails just returning None.
        unavailable = grpc.StatusCode.UNAVAILABLE
        if getattr(exception, 'code', lambda: None)() == unavailable:
            return
        # For anything else, fallback on super.
        super(AvailablePolicy, self).on_exception(exception)

subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy)
# Continue to set up as normal.

看起来很像originalon_exception只是忽略了一个不同的错误。如果需要,您可以在抛出异常时添加一些日志记录,并验证一切仍然正常。未来的信息还是会传来的。在

相关问题 更多 >