获取从python KafkaProdu发送的消息

2024-06-06 20:29:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我的目标是从非文件源(即在程序中生成或通过API发送)获取数据并将其发送到spark流。为此,我通过python-basedKafkaProducer发送数据:

$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
$ python 
Python 3.6.1| Anaconda custom (64-bit)
> from kafka import KafkaProducer
> import time
> producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
> producer.send(topic = 'my-topic', value = 'MESSAGE ACKNOWLEDGED', timestamp_ms = time.time())
> producer.close()
> exit()

我的问题是,从consumer shell脚本中检查主题时,不会出现任何内容:

^{pr2}$

这里有什么遗漏或错误吗?我是spark/kafka/messaging系统的新手,所以任何东西都会有帮助。Kafka的版本是0.11.0.0(scala2.11),并且不会对配置文件进行任何更改。在


Tags: producerkafkaimportconfiglocalhosttopicbinserver
2条回答

我发现了这个问题,value_serializer静默地崩溃了,因为我没有将json模块导入解释器。有两种解决方案,一种是简单地导入模块,然后返回"MESSAGE ACKNOWLEDGED"(带引号)。或者您可以完全删除value_serializer,并将下一行发送的value字符串转换为字节字符串(例如,对于python3,b'MESSAGE ACKNOWLEDGED'),这样就可以不带引号地返回消息。在

我还将Kafka切换到0.10.2.1(scala2.11),因为Kafka python文档中没有确认它与版本0.11.0.0兼容

如果在将消息发送到主题后启动使用者,则使用者可能会跳过该消息,因为它将为主题的结尾设置一个主题偏移量(可以将其视为读取的“起点”)。要更改该行为,请尝试添加 from-beginning选项:

$ bin/kafka-console-consumer.sh  bootstrap-server localhost:9092  topic my-topic  from-beginning

您还可以尝试kafkacat,这比Kafka的控制台消费者和生产者(imho)更方便。使用kafkacat读取来自Kafka的消息可以使用以下命令执行:

^{pr2}$

希望能有所帮助。在

相关问题 更多 >