在pyspark中创建大型字典

13 投票
2 回答
11262 浏览
提问于 2025-04-18 11:45

我正在尝试用pyspark解决一个问题。
我在hdfs上有一个文件,这个文件是一个查找表的转储。

key1, value1
key2, value2
...

我想把这个文件加载到pyspark中的Python字典里,然后用它来做其他事情。所以我试着这样做:

table = {}
def populateDict(line):
    (k,v) = line.split(",", 1)
    table[k] = v

kvfile = sc.textFile("pathtofile")
kvfile.foreach(populateDict)

但我发现table这个变量没有被修改。那么,有没有办法在spark中创建一个大的内存哈希表呢?

2 个回答

1

为了提高效率,可以参考:sortByKey() 和 lookup()

lookup(key):

这个功能会返回在 RDD 中与指定 key 相关的值的列表。如果 RDD 有已知的分区方式,这个操作会很高效,因为它只会在与这个 key 对应的分区中进行查找。

使用 sortByKey() 会重新分区 RDD(参考:OrderedRDD),在调用 lookup() 时也会高效地进行搜索。在代码中,像这样:

kvfile = sc.textFile("pathtofile")
sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey()

sorted_kv.lookup('key1').take(10)

就能同时作为 RDD 使用,并且效率很高。

6

foreach 是一种分布式计算方式,所以你不能指望它能修改只在驱动程序中可见的数据结构。你想要的是。

kv.map(line => { line.split(" ") match { 
    case Array(k,v) => (k,v)
    case _ => ("","")
}.collectAsMap()

这段代码是用 Scala 写的,但你可以理解它的意思,关键的函数是 collectAsMap(),它会把结果返回给驱动程序。

如果你的数据量非常大,你可以使用 PairRDD 来作为一个映射。首先把数据映射成成对的形式。

    kv.map(line => { line.split(" ") match { 
        case Array(k,v) => (k,v)
        case _ => ("","")
    }

然后你可以用 rdd.lookup("key") 来访问,这个方法会返回与这个键相关联的一系列值。不过,这样的效率肯定比其他分布式键值存储要低,因为 Spark 并不是专门为这个设计的。

撰写回答