我无法从Python客户端发送或接收Kafka消息
我正在尝试创建一个运行Kafka和ZooKeeper的程序,并通过Python连接到它。不过,按照我现在的配置,我无法从Python客户端发送或接收消息,尽管我可以成功连接到Kafka暴露的端口。
我目前在我的程序配置上遇到了一些问题。我已经定义了主题,并在Lens中检查了Kafka服务暴露的端口。我可以确认它已经连接到Kafka,但当我从生产者发送消息并访问容器时,消息却没有到达。同样,从消费者那里也没有收到任何信息。我已经排除了与端口或主题相关的错误,因为我已经仔细检查过它们。
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-zookeeper
spec:
serviceName: kafka-zookeeper-service
replicas: 1
selector:
matchLabels:
app: kafka-zookeeper
template:
metadata:
labels:
app: kafka-zookeeper
spec:
containers:
- name: zookeeper
image: docker.io/bitnami/zookeeper:3.9
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_VOLUME_DIR
value: /bitnami/zookeeper
ports:
- containerPort: 2181
name: zookeeper
volumeMounts:
- name: zookeeper-data
mountPath: /bitnami/zookeeper
- name: kafka
image: docker.io/bitnami/kafka:3.4
env:
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: kafka-zookeeper-service:2181
- name: KAFKA_VOLUME_DIR
value: /bitnami/kafka
- name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_CFG_PROCESS_ROLES
value: controller,broker
- name: KAFKA_CFG_LISTENERS
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: "1001@localhost:9093"
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_CFG_NODE_ID
value: "1001"
- name: KAFKA_CFG_ZOOKEEPER_METADATA_MIGRATION_ENABLE
value: "true"
ports:
- containerPort: 9092
name: kafka
- containerPort: 9093
name: kafka-plaintext
- containerPort: 9094
name: kafka-tls
volumeMounts:
- name: kafka-data
mountPath: /bitnami/kafka
volumes:
- name: zookeeper-data
persistentVolumeClaim:
claimName: zookeeper-data-pvc
- name: kafka-data
persistentVolumeClaim:
claimName: kafka-data-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: zookeeper-data-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-data-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
---
apiVersion: v1
kind: Service
metadata:
name: kafka-zookeeper-service
spec:
selector:
app: kafka-zookeeper
ports:
- protocol: TCP
port: 2181
targetPort: 2181
name: zookeeper
- protocol: TCP
port: 9092
targetPort: 9092
name: kafka
- protocol: TCP
port: 9093
targetPort: 9093
name: kafka-plaintext
- protocol: TCP
port: 9094
targetPort: 9094
name: kafka-tls
生产者
from kafka import KafkaProducer
import json
def main():
bootstrap_servers = 'localhost:34715'
topic = 'dev'
message = {'message': 'Hello from the producer'}
try:
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Successful connection to Kafka server at:", bootstrap_servers)
producer.send(topic, value=message)
print("Message sent successfully to topic:", topic)
except Exception as e:
print("Error sending message:", e)
if __name__ == '__main__':
main()
消费者
from kafka import KafkaConsumer
def main():
consumer = KafkaConsumer('dev', bootstrap_servers='localhost:34715')
try:
for message in consumer:
print(f'Message received: {message.value.decode("utf-8")}')
finally:
consumer.close()
if __name__ == '__main__':
main()
1 个回答
0
你可以试试用 flush
来确保消息立刻发送出去,还有 close
这个函数吗?
import json
from kafka import KafkaProducer
def main():
bootstrap_servers = 'localhost:34715'
topic = 'dev'
message = {'message': 'Hello from the producer'}
try:
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Successful connection to Kafka server at:", bootstrap_servers)
producer.send(topic, value=message)
print("Message sent successfully to topic:", topic)
# Flush the producer to ensure the message is sent
producer.flush()
except Exception as e:
print("Error sending message:", e)
finally:
# Close the producer
producer.close()
if __name__ == '__main__':
main()