分析中的多行记录

2024-06-09 14:54:00 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我的RDD[字符串]

M1 module1
PIP a Z A
PIP b Z B
PIP c Y n4

M2 module2
PIP a I n4
PIP b O D
PIP c O n5

等等。 基本上,我需要key的RDD(包含第1行的第二个单词)和可以迭代的后续PIP行的值。在

我试过以下方法

^{pr2}$

但这给出了以下输出

(,)
(M1 module1,M1 module1)
(PIP a Z A,PIP a Z A)
(PIP b Z B,PIP b Z B)
(PIP c Y n4,PIP c Y n4)
(,)
(M2 module2,M2 module2)
(PIP a I n4,PIP a I n4)
(PIP b O D,PIP b O D)
(PIP c O n5,PIP c O n5)

相反,我希望输出是

module1, (PIP a Z A, PIP b Z B, PIP b Z B)
module2, (PIP a I n4,PIP b O D, PIP c O n5)

我做错什么了?我对Spark API还很陌生。 谢谢

你好@zero323

usgRDD.take(10).foreach(x => println(x + "%%%%%%%%%"))

收益率。。。在

%%%%%%%%%
M1 module1%%%%%%%%%
PIP a Z A%%%%%%%%%
PIP b Z B%%%%%%%%%
PIP c Y n4%%%%%%%%%
%%%%%%%%%
M2 module2%%%%%%%%%
PIP a I n4%%%%%%%%%
PIP b O D%%%%%%%%%
PIP c O n5%%%%%%%%%

等等

你好,@zero323和@Daniel Darabos 我的输入是非常大的一组许多文件(跨越TBs)。这是样品。。在

BIN n4
BIN n5
BIN D
BIN E
PIT A I A
PIT B I B 
PIT C I C
PIT D O D
PIT E O E
DEF M1 module1
   PIP a Z A
   PIP b Z B
   PIP c Y n4
DEF M2 module2
   PIP a I n4
   PIP b O D
   PIP c O n5

我需要所有的箱子,坑和定义(包括下面的管道线)在3个不同的RDD。以下是我目前的做法(从讨论中,我感觉到下面的usgRDD计算错误)

val binRDD = levelfileRDD.filter(line => line.contains("BIN"))
val pitRDD = levelfileRDD.filter(line => line.contains("PIT"))
val usgRDD = levelfileRDD.filter(line => !line.contains("BIN") && !line.contains("PIT")).flatMap(s=>s.split("DEF").map(_.trim))

我需要3种类型的RDD(目前),因为我需要稍后执行验证。例如,“DEF M2 module2”下的“n4”只能存在于n4是BIN元素的情况下。我希望从rdd中使用graphxapi来派生关系(显然我还没有到这一步)。如果每个usgPairRDD(根据usgRDD或其他方式计算)打印以下内容是理想的

module1, (a Z A, b Z B, c Y n4) %%%%%%%
module2, (a I n4, b O D, c O n5) %%%%%%%

我希望我说得通。向星火之神道歉,如果我没有。在


Tags: pipbindeflinevalmodule1rddcontains
1条回答
网友
1楼 · 发布于 2024-06-09 14:54:00

默认情况下,Spark每行创建一个元素。这意味着,在您的例子中,每个记录都分布在多个元素上,正如注释中的Daniel Darabos所述,这些元素可以由不同的工人处理。在

由于您的数据看起来比较规则,并且由一个空行分隔,因此您应该能够将newAPIHadoopFile与自定义分隔符一起使用:

import org.apache.spark.rdd.RDD
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val path: String = ???

val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration
conf.set("textinputformat.record.delimiter", "\n\n")

val usgRDD = sc.newAPIHadoopFile(
    path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
  .map{ case (_, v) => v.toString }

val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("\n") match {
  case Array(x, xs @ _*) => (x, xs)
})

在Spark 2.4或更高版本中,也可以使用DatasetAPI实现数据加载部分:

^{pr2}$

相关问题 更多 >