Spark:如何在pyspark中将字节字符串写入hdfs hadoop以进行sparkxml转换?

2024-04-25 03:43:16 发布

您现在位置:Python中文网/ 问答频道 /正文

在python中,字节字符串可以简单地保存到单个xml文件中:

with open('/home/user/file.xml' ,'wb') as f:
    f.write(b'<Value>1</Value>') 
   

当前输出:/home/user/file.xml(文件保存在本地文件中)

问题:如何将字符串保存到pyspark中hdfs上的xml文件:

预期输出:'hdfs://hostname:9000/file.xml“

背景:大量xml文件由第三方web API提供。我在pyspark建造了通往三角洲湖的ETL管道。数据由aiohttp异步提取,接下来我想在将spark数据帧保存到delta-lake之前使用spark-xml进行转换(需要pyspark)。我在寻找建造管道的最有效方法

github上的spark xml开发人员也提出了类似的问题。 https://github.com/databricks/spark-xml/issues/515

最新研究:

  1. spark xml用作直接作为文本存储在磁盘或spark数据框上的输入xml文件

  2. 因此,我只能使用以下两个选项之一:

a)一些hdfs客户端(pyarrow、hdfs、aiohdfs)将文件保存到hdfs(hdfs上的文本文件格式不是很有效)

b)将数据加载到spark数据框以进行spark xml转换(delta lake的本机格式)

如果你有其他想法,请告诉我


Tags: 文件数据字符串githubhome管道valuehdfs
2条回答

不要被databricks spark xml文档误导,因为它们会导致使用未压缩的xml文件作为输入。这是非常低效的,直接下载XML到spark dataframe要快得多。Databricks xml pyspark版本不包括它,但有一个workaround

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

下载到列表的XML

xml = [('url',"""<Level_0 Id0="Id0_value_file1">
    <Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
      <Level_2_A>A</Level_2_A>
      <Level_2>
        <Level_3>
          <Level_4>
            <Date>2021-01-01</Date>
            <Value>4_1</Value>
          </Level_4>
          <Level_4>
            <Date>2021-01-02</Date>
            <Value>4_2</Value>
          </Level_4>
        </Level_3>
      </Level_2>
    </Level_1>
  </Level_0>"""),

  ('url',"""<Level_0 I"d0="Id0_value_file2">
    <Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
      <Level_2_A>A</Level_2_A>
      <Level_2>
        <Level_3>
          <Level_4>
            <Date>2021-01-01</Date>
            <Value>4_1</Value>
          </Level_4>
          <Level_4>
            <Date>2021-01-02</Date>
            <Value>4_2</Value>
          </Level_4>
        </Level_3>
      </Level_2>
    </Level_1>
  </Level_0>""")]

XML字符串的Spark数据帧转换:

#create df with XML strings  
 rdd = sc.parallelize(xml)
 df = spark.createDataFrame(rdd,"url string, content string")

# XML schema
 payloadSchema = ext_schema_of_xml_df(df.select("content"))

 # parse xml
 parsed = df.withColumn("parsed", ext_from_xml(df.content, payloadSchema, {"rowTag":"Level_0"}))

# select required data
  df2 = parsed.select(
    'parsed._Id0',
    F.explode_outer('parsed.Level_1.Level_2.Level_3.Level_4').alias('Level_4')
  ).select(
      '`parsed._Id0`',
      'Level_4.*'
  )

要解码字节:b'string'。解码('utf-8')

@mck answer了解有关XMLs的更多信息: How to transform to spark Data Frame data from multiple nested XML files with attributes

相关问题 更多 >