无法使用spavro lib在python中解析以下模式,该模式正在java项目中使用,新的python项目需要使用该模式读取kafka消息。是否有其他库来解析模式
{
"namespace": "com.group.flexi.avro",
"name": "Metric",
"type": "record",
"fields": [{
"name": "groupId",
"type": ["string", "null"]
},{
"name": "product",
"type": ["string", "null"]
},{
"name": "productVersion",
"type": ["string", "null"]
},{
"name": "version",
"type": ["string", "null"]
},{
"name": "documentId",
"type": ["string", "null"]
},{
"name": "MetricSource",
"type": ["com.group.flexi.avro.MetricSource", "null"]
},{
"name": "GreenHighway",
"type": ["com.group.flexi.avro.GreenHighway", "null"]
}
]
}
展开模式:
{
"namespace": "com.group.flexi.avro",
"name": "Metric",
"type": "record",
"fields": [{
"name": "groupId",
"type": ["string", "null"]
}, {
"name": "product",
"type": ["string", "null"]
}, {
"name": "productVersion",
"type": ["string", "null"]
}, {
"name": "version",
"type": ["string", "null"]
}, {
"name": "documentId",
"type": ["string", "null"]
}, {
"name": "MetricSource",
"type": {
"name": "MetricSource",
"type": "record",
"fields": [{
"name": "policy",
"type": ["string", "null"]
}, {
"name": "system",
"type": ["string", "null"]
}, {
"name": "entity",
"type": ["string", "null"]
}, {
"name": "metric",
"type": ["string", "null"]
}, {
"name": "metricValue",
"type": ["string", "null"]
}, {
"name": "time",
"type": "long"
}, {
"name": "timeOffset",
"type": "double"
}, {
"name": "metricExtension",
"type": ["string", "null"]
}, {
"name": "metricType",
"type": ["string", "null"]
}, {
"name": "metricInterval",
"type": ["string", "null"]
}, {
"name": "encodedMetric",
"type": ["string", "null"]
}, {
"name": "entityMap",
"type": {
"type": "map",
"values": "string"
}
}, {
"name": "algorithm",
"type": ["string", "null"]
}, {
"name": "metricClass",
"order": "ignore",
"type": ["null", "string"],
"default": null
}]
}
}, {
"name": "GreenHighway",
"type": {
"name": "GreenHighway",
"type": "record",
"fields": [{
"name": "percentile",
"type": ["string", "null"]
}, {
"name": "zoneAHigh",
"type": ["string", "null"]
}, {
"name": "zoneBHigh",
"type": ["string", "null"]
}, {
"name": "zoneCHigh",
"type": ["string", "null"]
}, {
"name": "center",
"type": ["string", "null"]
}, {
"name": "zoneCLow",
"type": ["string", "null"]
}, {
"name": "zoneBLow",
"type": ["string", "null"]
}, {
"name": "zoneALow",
"type": ["string", "null"]
}, {
"name": "alertCode",
"type": ["string", "null"]
}, {
"name": "errorCode",
"type": ["string", "null"]
}, {
"name": "errorMessage",
"type": ["string", "null"]
}, {
"name": "anomalyScore",
"type": "int",
"default": 0
}, {
"name": "originalAlertCode",
"type": ["null", "string"],
"default": null
}]
}
}
]
}
我以前读过的代码如下,解析甚至没有发生。它只是挂在解析函数中:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import threading
import logging
import io
import os
import spavro.datafile
import spavro.schema
import spavro.io
from retry import retry
MSG_ENCODING = "ISO-8859-1"
EARLIEST = 'earliest'
logger = logging.getLogger(__name__)
class KafkaReader(threading.Thread):
def __init__(self, latch):
threading.Thread.__init__(self)
self.substridx = ConfigParams.IGNORE_CHARS
self.broker = None
self.reconnect_backoff_ms = 300
self.reconnect_backoff_max_ms = 600000
kafka_host = os.getenv('KAFKA_HOST', 'kafka_kafka_1')
kafka_port = os.getenv('KAFKA_PORT', '9092')
self.broker = kafka_host + ":" + kafka_port
self.latch = latch
def run(self):
pass
@retry(Exception, tries=ConfigParams.KAFKA_CONNECT_RETRY_COUNT, delay=ConfigParams.KAFKA_CONNECT_RETRY_DELAY)
def retrying_consumer(self, topic, partition_number, offset):
logging.info("Configuring Kafka consumer at broker %s for topic %s from offset %s ", self.broker, topic, offset)
partition = TopicPartition(topic, partition_number)
consumer = KafkaConsumer(bootstrap_servers=[self.broker],
auto_offset_reset=offset,
reconnect_backoff_ms=self.reconnect_backoff_ms,
reconnect_backoff_max_ms=self.reconnect_backoff_max_ms)
consumer.assign([partition])
if consumer is not None:
if offset == EARLIEST:
logging.info("Seeking to %s for partition %d", offset, partition_number)
consumer.seek_to_beginning(partition)
else:
consumer.seek_to_end(partition)
self.latch.count_down()
logging.info("Ready to consume data from topic %s from offset %d", topic, consumer.position(partition))
return consumer
def create_consumer(self, topic, partition_number, offset=None):
consumer = None
try:
consumer = self.retrying_consumer(topic, partition_number, offset)
except Exception as e:
ReadinessChecker.delete_ready_marker()
logging.error("Failed to connect to kafka %s", e, exc_info=True)
except KeyboardInterrupt:
ReadinessChecker.delete_ready_marker()
logging.error('%% Aborted by user\n')
return consumer
@staticmethod
def parse_avro_schema(schema_path):
root = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
#return spavro.schema.parse(open(os.path.join(root, "avro", schema_path), "r").read())
return avro.schema.parse(open(os.path.join(root, "avro", schema_path), "r").read())
def read_message(self, msg, avro_schema):
msg_str = msg.value.decode(MSG_ENCODING)
msg_str = msg_str[self.substridx:]
message_buf = io.BytesIO(msg_str.encode(MSG_ENCODING))
decoder = spavro.io.BinaryDecoder(message_buf)
reader = spavro.io.DatumReader(avro_schema)
return reader.read(decoder)
目前没有回答
相关问题 更多 >
编程相关推荐