Scala中哪个数据结构类似于Python的嵌套字典或CSV?
我正在Spark的Scala环境中工作。我有一些数据,已经处理成了一个叫做byHour: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[47] at reduceByKey at <console>:16
的RDD,或者如果把它收集成数组的话,就是byHour: Array[(String, Int)]
,它的样子是这样的:
Array((6497+2006-03-19 20:00,13), (7511+2006-03-17 02:00,1), (13508+2006-03-26 10:00,4), (217+2006-05-16 16:00,1), (12404+2006-03-27 15:00,1), (9777+2006-05-14 09:00,1), (10291+2006-03-03 17:00,2), (4781+2006-05-10 14:00,2), (10291+2006-04-26 17:00,1), (15198+2006-04-26 12:00,1))
我想把这些数据存储得像Python中的嵌套字典或者CSV文件那样。
在Python中,我会这样创建:
{"6497": {"2006-03-19 20:00": 13, "2006-03-19 22:00": 1}, "7511": {"2006-03-17 02:00": 1}...}
最后我想要的结果是:
userid, 2006-03-17 01:00, 2006-03-17 02:00, ... , 2006-03-19 20:00, 2006-03-19 21:00, 2006-03-19 22:00
6497,0,0, ..., 13,0,1
7511,0,1, ..., 0,0,0
我不太确定在Scala中该怎么做到这一点。我觉得我需要一个列表或者一组哈希映射,或者一个哈希映射结构,像是hashMap[String => hashMap]。
更新:
byHour是一个RDD[(String, Int)]
val byUserHour = byHour.map(x => (x._1.split("\\+")(0),(x._1.split("\\+")(1),x._2)))
val byUser = byUserHour.groupByKey
val times = byHour.map(x => x._1.split("\\+")(1)).distinct.collect.sortWith(_ < _)
val broadcastTimes = sc.broadcast(times)
val userMaps = byUser.mapValues {
x => x.map{
case(time,cnt) => time -> cnt
}.toMap
}
val result = userMaps.map {
case(u,ut) => (u +: broadcastTimes.value.map(ut.getOrElse(_,0).toString))}
val lines = result.map(_.mkString(","))
val header = List("userid") ::: times.toList
1 个回答
首先,你需要把用户ID分离出来,这样你就得到了一个 data: Seq[(String, String, Int)]
的数据结构。接着,按照用户ID进行分组:
val byUser: Map[String, Seq[(String, String, Int)]] = data.groupBy(_._1)
现在我们可以为每个用户创建一个映射(map):
val userMaps: Map[String, Map[String, Int]] = byUser.mapValues {
s => s.map {
case (user, time, n) => time -> n
}.toMap
}
在最后的格式化过程中,你首先需要获取不同的时间戳,然后在每个用户的映射中查找这些时间戳:
val times: Seq[String] = data.map(_._2).toSet.toList
val result: Seq[Seq[String]] = userMaps.toSeq.map {
case (u, ut) => (u +: times.map(ut.getOrElse(_, 0).toString))
}
val lines: Seq[String] = result.map(_.mkString(","))
希望这些信息能帮助你入门。你可以在 http://twitter.github.io/scala_school/collections.html(以及很多其他地方)了解更多关于Scala集合的内容。
以上所有操作都是在本地进行的——完全没有分布式处理。如果想要以分布式的方式做同样的事情,你需要在一开始把数据读入一个RDD(弹性分布式数据集),使用 sc.textFile()
,然后大致执行相同的操作序列。
一个小的区别是,你需要使用 groupByKey
而不是 groupBy
,它的行为稍微有点不同。从 RDD[A, B]
你会得到 RDD[A, Iterable[B]]
,而不是 Map[A, Seq[(A, B)]]
。
一个主要的区别是,你需要把 times
从集群收集到应用程序中,然后再广播到所有节点:
val times: Seq[String] = data.map(_._2).distinct.collect
val broadcast = sc.broadcast(times)
val result: RDD[Seq[String]] = userMaps.map {
val times = broadcast.value
case (u, ut) => (u +: times.map(ut.getOrElse(_, 0).toString))
}