Pyarrow模式定义

1 投票
1 回答
47 浏览
提问于 2025-04-14 16:34

我正在尝试从MongoDB的记录创建一个parquet文件。为此,我首先创建了一个模式,像这样:

import pyarrow as pa
import pyarrow.parquet as pq

USER = pa.schema([
    pa.field("_id", pa.string(), nullable=True),
    pa.field("appID", pa.string(), nullable=True),
    pa.field("group", pa.string(), nullable=True),
    pa.field("_created", pa.int64(), nullable=True),
    pa.field("_touched", pa.int64(), nullable=True),
    pa.field("_updated", pa.int64(), nullable=True)
])

writer = pq.ParquetWriter('output.parquet', USER)

然后我试着在循环MongoDB文档后,使用以下代码将数据添加到parquet文件中:

batch = pa.RecordBatch.from_pylist(chunk)
    
writer.write_batch(batch)

但是我遇到了这个错误:表的模式与创建文件时使用的模式不匹配。这个错误是因为并不是所有的MongoDB记录都有group字段,我该怎么解决这个问题呢?

1 个回答

0

要解决在将MongoDB记录转换成Parquet文件时出现的“表结构与创建文件时使用的结构不匹配”错误,首先要确保你定义的表结构和你要写入的记录的数据结构是一致的。

下面是一个示例,展示你如何修改代码来填补缺失的部分:

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd


USER = pa.schema([
pa.field("_id", pa.string(), nullable=True),
pa.field("appID", pa.string(), nullable=True),
pa.field("group", pa.string(), nullable=True),
pa.field("_created", pa.int64(), nullable=True),
pa.field("_touched", pa.int64(), nullable=True),
pa.field("_updated", pa.int64(), nullable=True)
])

writer = pq.ParquetWriter('output.parquet', USER)

for doc in mongo_docs:
filled_doc = {field.name: doc.get(field.name, None) for field in USER}

batch = pa.RecordBatch.from_pandas(pd.DataFrame([filled_doc]), 
schema=USER)

writer.write_batch(batch)

writer.close()

在写入所有数据批次后,别忘了用writer.close()来关闭写入器,这样才能确保Parquet文件正确完成。

撰写回答