通过代理服务器拉取PubSub消息 - Python

1 投票
1 回答
34 浏览
提问于 2025-04-14 18:14

我正在使用下面这个脚本来获取消息,按照文档的说明开发的,但遇到了错误。

import json
from googleapiclient.discovery import build
from httplib2 import Http
import httplib2
from oauth2client.service_account import ServiceAccountCredentials

# Replace placeholders with your project ID, topic name, subscription name, and proxy details
project_id = 'v-acp'  # Replace with your GCP project ID
topic_name = 'd_ack'   # Replace with your Pub/Sub topic name
subscription_name = 'dpull'  # Replace with your desired subscription name
proxy_host = '192.173.10.2'  # Replace with your proxy server address
proxy_port = 8095  # Replace with your proxy server port

# Create credentials (replace with your own authentication method)
credentials = ServiceAccountCredentials.from_json_keyfile_name(
    'key.json',
    scopes=['https://www.googleapis.com/auth/pubsub']
)

# Configure HTTP connection with proxy
proxy_info = httplib2.ProxyInfo(proxy_type=httplib2.socks.PROXY_TYPE_HTTP_NO_TUNNEL,
                                proxy_host=proxy_host,
                                proxy_port=proxy_port)
http = Http(proxy_info=proxy_info)

# Build the Pub/Sub API client
service = build('pubsub', 'v1', http=credentials.authorize(http))

# Pull messages from the subscription
def pull_messages():
    request = service.projects().subscriptions().pull(
        subscription=f'projects/{project_id}/subscriptions/{subscription_name}'
    )
    response = request.execute()
    if 'receivedMessages' in response:
        for message in response['receivedMessages']:
            # Process message data (message['message']['data']) and acknowledgment ID (message['ackId'])
            print(f"Received message: {message['message']['data']}")
            # Acknowledge message using service.projects().subscriptions().acknowledge()

# Call the pull_messages function to retrieve messages
pull_messages()

我收到以下错误信息:

json返回了“You have passed an invalid argument to the service (argument=max_messages).”(你传递给服务的参数无效,参数是max_messages)。详细信息是:“You have passed an invalid argument to the service (argument=max_messages).”

1 个回答

0

我在一个主体里面添加了最大消息限制,它运行得很好。

request = service.projects().subscriptions().pull(
        subscription=f'projects/{project_id}/subscriptions/{subscription_name}',
        body={'maxMessages': 10}
    )

撰写回答