Python spavro使用多个avro模式的并集解析avro模式

2024-04-24 09:54:30 发布

您现在位置:Python中文网/ 问答频道 /正文

无法使用spavro lib在python中解析以下模式,该模式正在java项目中使用,新的python项目需要使用该模式读取kafka消息。是否有其他库来解析模式

{
 "namespace": "com.group.flexi.avro",
 "name": "Metric",
 "type": "record",
 "fields": [{
    "name": "groupId",
    "type": ["string", "null"]
    },{
    "name": "product",
    "type": ["string", "null"]
    },{
    "name": "productVersion",
    "type": ["string", "null"]
    },{
    "name": "version",
    "type": ["string", "null"]
    },{
    "name": "documentId",
    "type": ["string", "null"]
    },{
    "name": "MetricSource",
    "type": ["com.group.flexi.avro.MetricSource", "null"]
    },{
    "name": "GreenHighway",
    "type": ["com.group.flexi.avro.GreenHighway", "null"]
    }
  ]
}

展开模式:

{
    "namespace": "com.group.flexi.avro",
    "name": "Metric",
    "type": "record",
    "fields": [{
            "name": "groupId",
            "type": ["string", "null"]
        }, {
            "name": "product",
            "type": ["string", "null"]
        }, {
            "name": "productVersion",
            "type": ["string", "null"]
        }, {
            "name": "version",
            "type": ["string", "null"]
        }, {
            "name": "documentId",
            "type": ["string", "null"]
        }, {
            "name": "MetricSource",
            "type": {
                "name": "MetricSource",
                "type": "record",
                "fields": [{
                    "name": "policy",
                    "type": ["string", "null"]
                }, {
                    "name": "system",
                    "type": ["string", "null"]
                }, {
                    "name": "entity",
                    "type": ["string", "null"]
                }, {
                    "name": "metric",
                    "type": ["string", "null"]
                }, {
                    "name": "metricValue",
                    "type": ["string", "null"]
                }, {
                    "name": "time",
                    "type": "long"
                }, {
                    "name": "timeOffset",
                    "type": "double"
                }, {
                    "name": "metricExtension",
                    "type": ["string", "null"]
                }, {
                    "name": "metricType",
                    "type": ["string", "null"]
                }, {
                    "name": "metricInterval",
                    "type": ["string", "null"]
                }, {
                    "name": "encodedMetric",
                    "type": ["string", "null"]
                }, {
                    "name": "entityMap",
                    "type": {
                        "type": "map",
                        "values": "string"
                    }
                }, {
                    "name": "algorithm",
                    "type": ["string", "null"]
                }, {
                    "name": "metricClass",
                    "order": "ignore",
                    "type": ["null", "string"],
                    "default": null
                }]
            }
        }, {
            "name": "GreenHighway",
            "type": {
                "name": "GreenHighway",
                "type": "record",
                "fields": [{
                    "name": "percentile",
                    "type": ["string", "null"]
                }, {
                    "name": "zoneAHigh",
                    "type": ["string", "null"]
                }, {
                    "name": "zoneBHigh",
                    "type": ["string", "null"]
                }, {
                    "name": "zoneCHigh",
                    "type": ["string", "null"]
                }, {
                    "name": "center",
                    "type": ["string", "null"]
                }, {
                    "name": "zoneCLow",
                    "type": ["string", "null"]
                }, {
                    "name": "zoneBLow",
                    "type": ["string", "null"]
                }, {
                    "name": "zoneALow",
                    "type": ["string", "null"]
                }, {
                    "name": "alertCode",
                    "type": ["string", "null"]
                }, {
                    "name": "errorCode",
                    "type": ["string", "null"]
                }, {
                    "name": "errorMessage",
                    "type": ["string", "null"]
                }, {
                    "name": "anomalyScore",
                    "type": "int",
                    "default": 0
                }, {
                    "name": "originalAlertCode",
                    "type": ["null", "string"],
                    "default": null
                }]
            }
        }
    ]
}

我以前读过的代码如下,解析甚至没有发生。它只是挂在解析函数中:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import threading
import logging
import io
import os
import spavro.datafile
import spavro.schema
import spavro.io
from retry import retry

MSG_ENCODING = "ISO-8859-1"
EARLIEST = 'earliest'

logger = logging.getLogger(__name__)


class KafkaReader(threading.Thread):
    def __init__(self, latch):
        threading.Thread.__init__(self)
        self.substridx = ConfigParams.IGNORE_CHARS
        self.broker = None
        self.reconnect_backoff_ms = 300
        self.reconnect_backoff_max_ms = 600000

        kafka_host = os.getenv('KAFKA_HOST', 'kafka_kafka_1')
        kafka_port = os.getenv('KAFKA_PORT', '9092')

        self.broker = kafka_host + ":" + kafka_port
        self.latch = latch

    def run(self):
        pass

    @retry(Exception, tries=ConfigParams.KAFKA_CONNECT_RETRY_COUNT, delay=ConfigParams.KAFKA_CONNECT_RETRY_DELAY)
    def retrying_consumer(self, topic, partition_number, offset):
        logging.info("Configuring Kafka consumer at broker %s for topic %s from offset %s ", self.broker, topic, offset)

        partition = TopicPartition(topic, partition_number)
        consumer = KafkaConsumer(bootstrap_servers=[self.broker],
                                 auto_offset_reset=offset,
                                 reconnect_backoff_ms=self.reconnect_backoff_ms,
                                 reconnect_backoff_max_ms=self.reconnect_backoff_max_ms)
        consumer.assign([partition])

        if consumer is not None:
            if offset == EARLIEST:
                logging.info("Seeking to %s for partition %d", offset, partition_number)
                consumer.seek_to_beginning(partition)
            else:
                consumer.seek_to_end(partition)
        self.latch.count_down()
        logging.info("Ready to consume data from topic %s from offset %d", topic, consumer.position(partition))
        return consumer

    def create_consumer(self, topic, partition_number, offset=None):
        consumer = None
        try:
            consumer = self.retrying_consumer(topic, partition_number, offset)
        except Exception as e:
            ReadinessChecker.delete_ready_marker()
            logging.error("Failed to connect to kafka %s", e, exc_info=True)

        except KeyboardInterrupt:
            ReadinessChecker.delete_ready_marker()
            logging.error('%% Aborted by user\n')

        return consumer

    @staticmethod
    def parse_avro_schema(schema_path):
        root = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
        #return spavro.schema.parse(open(os.path.join(root, "avro", schema_path), "r").read())
        return avro.schema.parse(open(os.path.join(root, "avro", schema_path), "r").read())

    def read_message(self, msg, avro_schema):
        msg_str = msg.value.decode(MSG_ENCODING)
        msg_str = msg_str[self.substridx:]
        message_buf = io.BytesIO(msg_str.encode(MSG_ENCODING))
        decoder = spavro.io.BinaryDecoder(message_buf)
        reader = spavro.io.DatumReader(avro_schema)
        return reader.read(decoder)


Tags: kafkapathnameimportselfstringtopicconsumer