我有一个非常复杂的数据,是在scala的databricks中处理的。 我想将scala转换成python,这应该适用于JSON中给出的数据
Scala代码:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val jsonSchema = new StructType()
.add("battery_level", LongType)
.add("c02_level", LongType)
.add("cca3",StringType)
.add("cn", StringType)
.add("device_id", LongType)
.add("device_type", StringType)
.add("signal", LongType)
.add("ip", StringType)
.add("temp", LongType)
.add("timestamp", TimestampType)
// define a case class
case class DeviceData (id: Int, device: String)
// create some sample data
val eventsDS = Seq (
(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
(1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
(2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
(3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }""")).toDF("id", "device").as[DeviceData]
display(eventsDS)
现在我想在pyspark中实现上述代码。 我做了一些工作,但仍然坚持使用Seq,因为python中没有Seq。 如何在pyspark中处理此示例数据
Python代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dataclasses import dataclass
scSpark = SparkSession.builder.appName("complex data types").getOrCreate()
#Creating JSON schema
jsonSchema = StructType([
StructField("battery_level", LongType(), True),
StructField("c02_level", IntegerType(), True),
StructField("c02_level", LongType(), True),
StructField("cca3",StringType(), True),
StructField("cn", StringType(), True),
StructField("device_id", LongType(), True),
StructField("device_type", StringType(), True),
StructField("signal", LongType(), True),
StructField("ip", StringType(), True),
StructField("temp", LongType(), True),
StructField("timestamp", TimestampType(), True),
])
#Create a Dataset from the above schema
@dataclass
class DeviceData(object):
id: int
device: str
现在我不明白下一步该写什么。 我想要像我提供的图像一样的输出
我的主要动机是https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html将所有scala转换为pyspark。那将解决我的问题
scala
Seq
与pythonlist
最具可比性:在pysparksql中,case类不是必需的
相关问题 更多 >
编程相关推荐