从pysp中的数据帧构建结构类型

2024-04-24 17:03:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我是新的spark和python,面临着从一个可以应用到我的数据文件的元数据文件构建模式的困难。 场景:数据文件的元数据文件(csv格式),包含列及其类型:例如:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0

我已成功将其转换为如下数据帧:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|

但是当我试图用这个转换成StructField格式时

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

或者

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

然后使用

schemaFinal = StructType(schemaList)

我得到以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType

由于我对数据帧的知识不足,我在这个问题上陷入了困境,你能告诉我,如何继续这个问题吗。一旦我准备好了模式,我想使用createDataFrame来应用到我的数据文件。这个过程必须对许多表执行,所以我不想硬编码类型,而是使用元数据文件来构建模式,然后应用到RDD。

提前谢谢。


Tags: 数据nameinid类型fields数据文件格式
3条回答

字段的参数必须是DataType对象的列表。这:

.map(lambda l:([StructField(l.name, l.type, 'true')]))

collect后生成tuplesRows)的listslist[list[tuple[DataType]]])的list,更不用说nullable参数应该是布尔值而不是字符串。

你的第二次尝试:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).

str对象的collecta list之后生成。

显示的记录的正确架构应该大致如下所示:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])

尽管对这样的任务使用分布式数据结构是一种严重的过度破坏,更不用说效率低下了,但是您可以尝试按如下方式调整第一个解决方案:

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])

但它并不特别安全(eval)。从JSON/dictionary构建模式可能更容易。假设您有一个从类型描述映射到规范类型名的函数:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())

您可以建立下列形状的字典:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}

把它送到StructType.fromJson

StructType.fromJson(schema_dict)
val columns: Array[String] = df1.columns
val reorderedColumnNames: Array[String] = df2.columns //or do the reordering you want
val result: DataFrame = dataFrame.select(reorderedColumnNames.head, reorderedColumnNames.tail: _*)

可以按照以下步骤更改数据类型对象

data_schema=[
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True)
]



final_struct=StructType(fields=data_schema)

df=spark.read.json('/home/abcde/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json', schema=final_struct)



df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

相关问题 更多 >