我是阿帕奇卡夫卡科技公司的新员工。我试图使用Python2.7将消息作为JSON对象发送到kafka主题,但出现了“AssertionError:Value must be bytes”错误。我可以成功地将消息作为字符串发送,我可以用kafka-console-consumer.sh查看我的消息。我使用的是apache kafka 2.10-0.8.2.1版本。下面是我的代码。
from kafka import KafkaProducer
import yaml
producer = KafkaProducer(bootstap_servers="localhost:9092")
msg = yaml.safe_load('{"id":1, "name":"oguz"}')
producer.send("my-topic", msg)
谢谢你的帮助。
yaml.safe_load()
返回一个dict,因此将其转换为字节需要两件事——通过JSON将其序列化为字符串,然后将其编码为UTF-8。从示例in the kafka-python docs中获取,在实例化
KafkaProducer
时可以使用value_serializer
关键字参数:或者,您可以在调用
send()
时手动序列化它:相关问题 更多 >
编程相关推荐