如何使用“触发器一次”触发器控制Spark结构化流媒体中每个触发器处理的文件量?

2024-04-27 16:58:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用Spark Structured Streaming的特性Trigger once来模拟类似批处理的设置。但是,我在运行初始批处理时遇到了一些问题,因为我有很多历史数据,因此我还使用选项.option(“cloudFiles.includeExistingFiles”,“true”)来处理现有文件

因此,我的初始批处理变得非常大,因为我无法控制该批处理的文件量

我还尝试使用选项cloudFiles.maxBytesPerTrigger,但是,当您使用触发器一次时,会忽略此选项-->https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-gen2.html

当我指定maxFilesPerTrigger选项时,它也会被忽略。它只需要所有可用的文件

我的代码如下所示:

df = (
  spark.readStream.format("cloudFiles")
    .schema(schemaAsStruct)
    .option("cloudFiles.format", sourceFormat)
    .option("delimiter", delimiter)
    .option("header", sourceFirstRowIsHeader)
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.includeExistingFiles", "true")
    .option("badRecordsPath", badRecordsPath)
    .option("maxFilesPerTrigger", 1)
    .option("cloudFiles.resourceGroup", omitted)
    .option("cloudFiles.region", omitted)
    .option("cloudFiles.connectionString", omitted)
    .option("cloudFiles.subscriptionId", omitted)
    .option("cloudFiles.tenantId", omitted)
    .option("cloudFiles.clientId", omitted)
    .option("cloudFiles.clientSecret", omitted)
    .load(sourceBasePath)
)

# Traceability columns
df = (
  df.withColumn(sourceFilenameColumnName, input_file_name()) 
    .withColumn(processedTimestampColumnName, lit(processedTimestamp))
    .withColumn(batchIdColumnName, lit(batchId))
)

def process_batch(batchDF, id):
  batchDF.persist()
  
  (batchDF
     .write
     .format(destinationFormat)
     .mode("append")
     .save(destinationBasePath + processedTimestampColumnName + "=" +  processedTimestamp)
  )
    
  (batchDF
   .groupBy(sourceFilenameColumnName, processedTimestampColumnName)
   .count()
   .write
   .format(destinationFormat)
   .mode("append")
   .save(batchSourceFilenamesTmpDir))
  
  batchDF.unpersist()

stream = (
  df.writeStream
    .foreachBatch(process_batch)
    .trigger(once=True)
    .option("checkpointLocation", checkpointPath)
    .start()
)

如您所见,我使用的是cloudfiles格式,这是Databricks Autoloader的格式-->https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-gen2.html

自动加载程序在新数据文件到达云存储时以增量方式高效地处理它们。

自动加载程序提供一个名为cloudFiles的结构化流媒体源。给定云文件存储上的输入目录路径,cloudFiles源会在新文件到达时自动处理这些文件,还可以选择处理该目录中的现有文件

如果我以一种令人困惑的方式提出我的问题,或者它缺乏信息,请这样说


Tags: 文件trueformatdf选项cloudfilessparkoption
2条回答

您在初始批处理中遇到的具体问题是什么,您能否提供更多详细信息或错误消息

为什么你的第一批货很大是个问题?如果你有大量的历史数据,这是意料之中的

要考虑的是子文件夹——您的文件是否位于子文件夹中,或者仅在根^ ^ }路径中?如果在子文件夹中,请尝试对readStream使用此选项:

option("recursiveFileLookup", "true")

我发现这解决了我的自动加载问题,因为我有数据文件在子文件夹/分区中登录

不幸的是,Spark 3.x(DBR>;=7.x)完全忽略了maxFilesPerTrigger等选项,这些选项限制了为处理而提取的数据量——在这种情况下,它将尝试一次性处理所有数据,有时可能会导致性能问题

要解决此问题,您可以定期执行以下hack检查stream.get('numInputRows')的值,如果它在一段时间内等于0,则发出stream.stop()

更新,2021年10月:通过引入新的触发器类型-Trigger.AvailableNow(请参见SPARK-36533),Spark 3.3中似乎将对其进行修复

相关问题 更多 >