测试PubSub订阅回调

2024-03-28 14:34:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个小应用程序,它在googlepub/Sub上调用SubscriberClient的subscribe方法。同时,在实际的subscribe调用中,我尝试在调用回调之前和之后进行一些处理

client.py

@wrapt.patch_function_wrapper('google.cloud.pubsub_v1', 'SubscriberClient.subscribe')
def subscribe_handler(wrapped, instance, *args, **kwargs):

    cbk = args[1]
    def callback_handler(message):
       print('before')
       cbk(message)
       print('after')
    return wrapped(*[args[0], callback_handler], **kwargs)

def callback(msg):
    print(msg.data)
    msg.ack()

future = SubscriberClient.subscribe(subscription_path, callback)

test.py

class TestPubSubSubscribe(unittest.TestCase):
    
    def test_subscribe_handler(self):

        def handle_message(msg):
            print(msg.data)
            msg.ack()

        creds = mock.Mock(spec=credentials.Credentials)
        sub_client = SubscriberClient(credentials=creds)
        msg = self.create_message(b"foo", baz="bacon", spam="eggs") # using some helper to create a PubSubMessage message.

        # call the subscribe 
        future = sub_client.subscribe("test-sub-path", handle_message)

在我的测试中,在最后一个地方,我想调用subscribe方法到SubscriberClient,但我希望它以某种方式使用我创建的消息msg,而不是让它从实际的API获取它

有人知道怎么做吗?我应该模仿/修补什么


Tags: 方法pytestclientmessagedefcallbackargs