在pyspark中创建大型字典
我正在尝试用pyspark解决一个问题。
我在hdfs上有一个文件,这个文件是一个查找表的转储。
key1, value1
key2, value2
...
我想把这个文件加载到pyspark中的Python字典里,然后用它来做其他事情。所以我试着这样做:
table = {}
def populateDict(line):
(k,v) = line.split(",", 1)
table[k] = v
kvfile = sc.textFile("pathtofile")
kvfile.foreach(populateDict)
但我发现table这个变量没有被修改。那么,有没有办法在spark中创建一个大的内存哈希表呢?
2 个回答
1
为了提高效率,可以参考:sortByKey() 和 lookup()
lookup(key):
这个功能会返回在 RDD 中与指定 key 相关的值的列表。如果 RDD 有已知的分区方式,这个操作会很高效,因为它只会在与这个 key 对应的分区中进行查找。
使用 sortByKey() 会重新分区 RDD(参考:OrderedRDD),在调用 lookup()
时也会高效地进行搜索。在代码中,像这样:
kvfile = sc.textFile("pathtofile")
sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey()
sorted_kv.lookup('key1').take(10)
就能同时作为 RDD 使用,并且效率很高。
6
foreach
是一种分布式计算方式,所以你不能指望它能修改只在驱动程序中可见的数据结构。你想要的是。
kv.map(line => { line.split(" ") match {
case Array(k,v) => (k,v)
case _ => ("","")
}.collectAsMap()
这段代码是用 Scala 写的,但你可以理解它的意思,关键的函数是 collectAsMap()
,它会把结果返回给驱动程序。
如果你的数据量非常大,你可以使用 PairRDD 来作为一个映射。首先把数据映射成成对的形式。
kv.map(line => { line.split(" ") match {
case Array(k,v) => (k,v)
case _ => ("","")
}
然后你可以用 rdd.lookup("key")
来访问,这个方法会返回与这个键相关联的一系列值。不过,这样的效率肯定比其他分布式键值存储要低,因为 Spark 并不是专门为这个设计的。