我可以将数据从Kafka推送到Memsql。在
我正在尝试使用转换来推动。我用Python创建了Kafka Consumer,它使用Kafka主题中的数据并转换为Json格式。在
我不知道如何在Memsql中使用它作为转换。在
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import sys
c = AvroConsumer({
'bootstrap.servers': 'X.Y.Z.W:9092',
'group.id': 'groupid1112',
'schema.registry.url': 'http://X.Y.Z.W:8081',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
c.subscribe(['test_topic'])
count =0
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
valueList = list(msg.value().values())
print(valueList)
c.close()
是春天
^{pr2}$
检查这些文件 https://docs.memsql.com/memsql-pipelines/v6.0/transforms/
在即将发布的MemSQL版本中,请继续关注本机avro支持。在
你会想做一些类似下面的事情,但由于我不知道avro库在我的头顶上,我对avro的具体细节做了草图。在
```
```
使用schemaregistry应该可以,但是您不必担心在转换脚本中读取kafka的细节。我可以在星期一给你一个更详细的脚本,但这是如何组织代码。在
相关问题 更多 >
编程相关推荐