复杂嵌套的json数据集如何与pyspark一起工作

2024-06-08 22:27:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个非常复杂的数据,是在scala的databricks中处理的。 我想将scala转换成python,这应该适用于JSON中给出的数据

Scala代码:

import org.apache.spark.sql.types._                        
import org.apache.spark.sql.functions._                     

val jsonSchema = new StructType()
        .add("battery_level", LongType)
        .add("c02_level", LongType)
        .add("cca3",StringType)
        .add("cn", StringType)
        .add("device_id", LongType)
        .add("device_type", StringType)
        .add("signal", LongType)
        .add("ip", StringType)
        .add("temp", LongType)
        .add("timestamp", TimestampType)

// define a case class

case class DeviceData (id: Int, device: String)

// create some sample data

val eventsDS = Seq (

(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),

 (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),

 (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),

 (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }""")).toDF("id", "device").as[DeviceData]

display(eventsDS)

Click here to see the output

现在我想在pyspark中实现上述代码。 我做了一些工作,但仍然坚持使用Seq,因为python中没有Seq。 如何在pyspark中处理此示例数据

Python代码:

from pyspark.sql import SparkSession 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dataclasses import dataclass

scSpark = SparkSession.builder.appName("complex data types").getOrCreate()
#Creating JSON schema
jsonSchema = StructType([
  StructField("battery_level", LongType(), True), 
  StructField("c02_level", IntegerType(), True),
  StructField("c02_level", LongType(), True),
  StructField("cca3",StringType(), True),
  StructField("cn", StringType(), True),
  StructField("device_id", LongType(), True),
  StructField("device_type", StringType(), True),
  StructField("signal", LongType(), True),
  StructField("ip", StringType(), True),
  StructField("temp", LongType(), True),
  StructField("timestamp", TimestampType(), True),
])
#Create a Dataset from the above schema
@dataclass
class DeviceData(object):
  id: int
  device: str

现在我不明白下一步该写什么。 我想要像我提供的图像一样的输出

我的主要动机是https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html将所有scala转换为pyspark。那将解决我的问题


Tags: importaddidtruesignaldevicetypecn
1条回答
网友
1楼 · 发布于 2024-06-08 22:27:07

scalaSeq与pythonlist最具可比性:

eventsDS = (sql.createDataFrame(
            [(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
             (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
             (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
             (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }""")],
            ['id', 'device']))

eventsDS.show()                                                                                                                                                                                           

+ -+          +                                                      
| id|              device|
+ -+          +
|  0|{"device_id": 0, ...|
|  1|{"device_id": 1, ...|
|  2|{"device_id": 2, ...|
|  3|{"device_id": 3, ...|
+ -+          +

在pysparksql中,case类不是必需的

eventsDS.printSchema()                                                                                                                                                                                    

root
 |  id: long (nullable = true)
 |  device: string (nullable = true)

相关问题 更多 >