我正在尝试将我的CSV文件序列化为Avro,然后遍历每一行并发送给卡夫卡消费者。目前,我遇到一个问题,即发送的数据与我的模式不匹配,但我不确定原因
下面是读取csv并序列化其中的行并输出为Avro格式文件的代码
import os, csv, avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from kafka import KafkaProducer
from collections import namedtuple
output_loc = '{}/avro.avro'.format(os.path.dirname(__file__))
CSV = '{}/oscar_age_male.csv'.format(os.path.dirname(__file__))
fields = ("Index","Year", "Age", "Name", "Movie")
csv_record = namedtuple('csv_record', fields)
def read_csv(path):
with open(path, 'rU') as data:
data.readline()
reader = csv.reader(data, delimiter=",")
for row in map(csv_record._make, reader):
print(row)
yield row
def parse_schema(path='{}/schema.avsc'.format(os.path.dirname(__file__))):
with open(path, 'r') as data:
return avro.schema.parse(data.read())
def serilialise_records(records, outpath=output_loc):
schema = parse_schema()
with open(outpath, 'w') as out:
writer = DataFileWriter(out, DatumWriter(), schema)
for record in records:
record = dict((f, getattr(record, f)) for f in record._fields)
writer.append(record)
serilialise_records(read_csv(CSV))
下面是接收错误:
raise AvroTypeException(self.writers_schema, datum)
avro.io.AvroTypeException: The datum {'Index': '1', 'Year': '1928', 'Age': '44', 'Name': ' "Emil Jannings"', 'Movie': ' "The Last Command The Way of All Flesh"'} is not an example of the schema {
"type": "record",
"name": "Test",
"namespace": "avro_schema_test",
"fields": [
{
"type": "int",
"name": "Index"
},
{
"type": "int",
"name": "Year"
},
{
"type": "int",
"name": "Age"
},
{
"type": "string",
"name": "Name"
},
{
"type": "string",
"name": "Movie"
}
]
}
我的Avro模式是:
{
"type": "record",
"namespace": "avro_schema_test",
"name": "Test",
"fields": [
{"name": "Index", "type": "int"},
{"name": "Year", "type": "int"},
{"name": "Age", "type": "int"},
{"name": "Name", "type": "string"},
{"name": "Movie", "type": "string"}
]
}
问题解决后,我将遍历我的avro文件,并将记录发送给卡夫卡
目前没有回答
相关问题 更多 >
编程相关推荐