使用Python将CSV转换为Avro:Avro模式问题

2024-04-25 03:47:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试将我的CSV文件序列化为Avro,然后遍历每一行并发送给卡夫卡消费者。目前,我遇到一个问题,即发送的数据与我的模式不匹配,但我不确定原因

下面是读取csv并序列化其中的行并输出为Avro格式文件的代码

import os, csv, avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from kafka import KafkaProducer
from collections import namedtuple

output_loc = '{}/avro.avro'.format(os.path.dirname(__file__))
CSV = '{}/oscar_age_male.csv'.format(os.path.dirname(__file__))
fields = ("Index","Year", "Age", "Name", "Movie")
csv_record = namedtuple('csv_record', fields)

def read_csv(path):
    with open(path, 'rU') as data:
        data.readline()
        reader = csv.reader(data, delimiter=",")
        for row in map(csv_record._make, reader):
            print(row)
            yield row

def parse_schema(path='{}/schema.avsc'.format(os.path.dirname(__file__))):
    with open(path, 'r') as data:
        return avro.schema.parse(data.read())

def serilialise_records(records, outpath=output_loc):
    schema = parse_schema()
    with open(outpath, 'w') as out:
        writer = DataFileWriter(out, DatumWriter(), schema)
        for record in records:
            record = dict((f, getattr(record, f)) for f in record._fields)
            writer.append(record)

serilialise_records(read_csv(CSV))
         

下面是接收错误:

 raise AvroTypeException(self.writers_schema, datum)
avro.io.AvroTypeException: The datum {'Index': '1', 'Year': '1928', 'Age': '44', 'Name': ' "Emil Jannings"', 'Movie': ' "The Last Command The Way of All Flesh"'} is not an example of the schema {
  "type": "record",
  "name": "Test",
  "namespace": "avro_schema_test",
  "fields": [
    {
      "type": "int",
      "name": "Index"
    },
    {
      "type": "int",
      "name": "Year"
    },
    {
      "type": "int",
      "name": "Age"
    },
    {
      "type": "string",
      "name": "Name"
    },
    {
      "type": "string",
      "name": "Movie"
    }
  ]
}

我的Avro模式是:

{
    "type": "record",
    "namespace": "avro_schema_test",
    "name": "Test",
    "fields": [
        {"name": "Index", "type": "int"},
        {"name": "Year", "type": "int"},
        {"name": "Age", "type": "int"},
        {"name": "Name", "type": "string"},
        {"name": "Movie", "type": "string"}
    ]
}

问题解决后,我将遍历我的avro文件,并将记录发送给卡夫卡


Tags: csvpathnamefromimportfieldsdataindex