Spark:加载多个文件、单独分析、合并结果和

2024-06-17 08:12:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我刚接触Spark,不太清楚如何问这个问题(使用哪些术语,等等),下面是我在概念上试图实现的目标:

Conceptual need diagram

我有很多小的,单独的.txt“ledger”文件(例如,有时间戳和属性值的行分隔文件)。在

我想:

  1. 将每个“账本”文件读入单独的数据帧(读取:不合并成一个大数据帧);

  2. 对每个单独的数据帧执行一些基本计算,从而生成一行新的数据值;然后

  3. 将所有单独的结果行合并到一个最终对象中,并将其保存在以行分隔的文件中到磁盘。

似乎我找到的几乎每一个答案(当谷歌搜索相关术语时)都是关于将多个文件加载到单个RDD或数据帧中,但我确实找到了以下Scala代码:

val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = { 
println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs comes here
}

files.collect.foreach( filename => {
    doSomething(filename)
})

。。。但是:

A.我不知道这是否与读取/分析操作并行,并且

我不认为它能把结果合并成一个单一的对象。在

任何方向或建议都非常感谢!在

更新

似乎我要做的(在多个文件上并行运行一个脚本,然后合并结果)可能需要类似thread pools(?)的东西。在

为了清楚起见,下面是一个我想对通过读取“ledger”文件创建的数据帧执行的计算示例:

^{pr2}$

因此,像这样的分类账:

+---------+------+-------------------+-----+
| location|status|          timestamp|wh_id|
+---------+------+-------------------+-----+
|  PUTAWAY|     I|2019-04-01 03:14:00|   20|
|PICKABLE1|     X|2019-04-01 04:24:00|   20|
|PICKABLE2|     X|2019-04-01 05:33:00|   20|
|PICKABLE2|     A|2019-04-01 06:42:00|   20|
|  HOTPICK|     A|2019-04-10 05:51:00|   20|
| ICEXCEPT|     A|2019-04-10 07:04:00|   20|
| ICEXCEPT|     X|2019-04-11 09:28:00|   20|
+---------+------+-------------------+-----+

将减少至(假设计算于2019-04-14运行):

{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }

Tags: 文件数据对象iddatastatuslocationval
2条回答

不建议使用wholeTextFiles,因为它会立即将整个文件加载到内存中。如果您真的想为每个文件创建一个单独的数据帧,您可以简单地使用完整路径而不是目录。但是,不建议这样做,而且很可能导致资源利用率低下。相反,考虑使用input_file_pathhttps://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name

例如:

spark
.read
  .textFile("path/to/files")
  .withColumn("file", input_file_name())
  .filter($"value" like "%a%")
  .groupBy($"file")
  .agg(count($"value"))
  .show(10, false)
^{pr2}$

所以这些文件可以单独处理,然后再合并。在

您可以在hdfs中获取文件路径

import  org.apache.hadoop.fs.{FileSystem,Path}

val files=FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path(your_path)).map( x => x.getPath ).map(x=> "hdfs://"+x.toUri().getRawPath())

为每个数据帧创建唯一的数据路径

^{pr2}$

在联合到一个数据帧之前应用过滤器或任何转换

val df= arr_df.map(x=> x.where(your_filter)).reduce(_ union _)

相关问题 更多 >