从StreamingContext和textFileStream转换RDD拼花地板

2024-04-16 06:02:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个流式PySpark作业,它读取新行分隔的JSON并执行一些聚合,然后将它们保存到一些文件夹中

我想将正在被流化的文件从JSON更改为parquet

以下是我如何创建上下文:

conf = pyspark.SparkConf().set("spark.driver.host", "127.0.0.1")
conf.set("spark.sql.execution.arrow.enabled", "true")
conf.set("spark.sql.parquet.compression.codec", "snappy")
sc = pyspark.SparkContext(
    master="local[5]", appName="parquet_job", conf=conf
)
DATA_PATH = "the_data_path"
BATCH_TIMEOUT = 10

ssc = StreamingContext(sc, BATCH_TIMEOUT)
sqlC = SQLContext(sc)

dstream = ssc.textFileStream(DATA_PATH)

接下来,每次Spark读取新文件时,我都会这样做

dstream.foreachRDD(parse_data)

ssc.start()
ssc.awaitTermination()
while not stopped:
    pass
ssc.stop()
sc.stop()

parse_data是一个使用不同的其他函数执行多个聚合的函数

读取JSON文件没问题。当我开始阅读拼花文件时,什么都没用

我试过了

        schema = StructType([
            StructField('a', StringType(), True),
            StructField('b', StringType(), True),
            StructField('c', StringType(), True),
            ...
        ])
        # rdd is the dstream from foreachRDD
        lines = sqlC.createDataFrame(rdd, schema)

但我总是会遇到以下错误:

AttributeError: 'RDD' object has no attribute '_get_object_id'

TypeError: StructType can not accept object 'PAR1\x15\x04\x15\x1c\x15 L\x15\x02\x15\x04\x12\x00\x00\x0e4' in type <class 'str'>


Tags: 文件jsontruedataobjectconfsparksc