我有一个小应用程序,它在googlepub/Sub上调用SubscriberClient的subscribe方法。同时,在实际的subscribe调用中,我尝试在调用回调之前和之后进行一些处理
client.py
@wrapt.patch_function_wrapper('google.cloud.pubsub_v1', 'SubscriberClient.subscribe')
def subscribe_handler(wrapped, instance, *args, **kwargs):
cbk = args[1]
def callback_handler(message):
print('before')
cbk(message)
print('after')
return wrapped(*[args[0], callback_handler], **kwargs)
def callback(msg):
print(msg.data)
msg.ack()
future = SubscriberClient.subscribe(subscription_path, callback)
test.py
class TestPubSubSubscribe(unittest.TestCase):
def test_subscribe_handler(self):
def handle_message(msg):
print(msg.data)
msg.ack()
creds = mock.Mock(spec=credentials.Credentials)
sub_client = SubscriberClient(credentials=creds)
msg = self.create_message(b"foo", baz="bacon", spam="eggs") # using some helper to create a PubSubMessage message.
# call the subscribe
future = sub_client.subscribe("test-sub-path", handle_message)
在我的测试中,在最后一个地方,我想调用subscribe方法到SubscriberClient,但我希望它以某种方式使用我创建的消息msg
,而不是让它从实际的API获取它
有人知道怎么做吗?我应该模仿/修补什么
目前没有回答
相关问题 更多 >
编程相关推荐