Spark 创建 NumPy 数组 RDD 的最快方法

3 投票
3 回答
11124 浏览
提问于 2025-05-10 15:26

我的Spark应用程序正在使用包含numpy数组的RDD。
目前,我是从AWS S3读取数据,这些数据以简单的文本文件形式存储,每一行都是一个向量,每个元素之间用空格分开,比如:

1 2 3
5.1 3.6 2.1
3 0.24 1.333

我正在使用numpy的loadtxt()函数来从中创建一个numpy数组。
不过,这种方法似乎非常慢,我觉得我的应用在将数据集转换为numpy数组时花费了太多时间。

你能给我推荐一个更好的方法吗?比如,我应该把数据集保留为二进制文件吗?或者我应该用其他方式创建RDD?

这是我创建RDD的一些代码:

data = sc.textFile("s3_url", initial_num_of_partitions).mapPartitions(readData)

readData函数:

 def readPointBatch(iterator):
     return [(np.loadtxt(iterator,dtype=np.float64)]

相关文章:

  • 暂无相关问题
暂无标签

3 个回答

2

在使用Spark时,不建议使用numpy。因为Spark有自己处理数据的方法,它能确保你那些可能很大的文件不会一次性全部加载到内存中,这样就不会超过内存的限制。你应该用Spark这样加载你的文件:

data = sc.textFile("s3_url", initial_num_of_partitions) \
    .map(lambda row: map(lambda x: float(x), row.split(' ')))

这样做之后,你会得到一个RDD,它的格式会像这样,具体取决于你的例子:

>>> print(data.collect())
[[1.0, 2.0, 3.0], [5.1, 3.6, 2.1], [3.0, 0.24, 1.333]]

@编辑 关于文件格式和numpy使用的一些建议:

文本文件和CSV、TSV、Parquet等格式一样好,选择你觉得舒服的格式就行。根据Spark的文档,二进制文件不是首选:

binaryFiles(路径, 最小分区数=None)

注意:实验性

从HDFS、本地文件系统(在所有节点上可用)或任何Hadoop支持的文件系统URI读取一组二进制文件,作为字节数组。每个文件被读取为一个单独的记录,并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。

注意:小文件更受欢迎,大文件也是可以的,但可能会导致性能下降。

至于numpy的使用,如果我是你,我一定会尝试用Spark的原生功能替代任何外部包,比如用pyspark.mlib.random来进行随机化:http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.random

4

numpy.fromstring 来处理会更符合常规做法,而且速度也会稍微快一些,像这样:

import numpy as np.

path = ...
initial_num_of_partitions = ...

data = (sc.textFile(path, initial_num_of_partitions)
   .map(lambda s: np.fromstring(s, dtype=np.float64, sep=" ")))

不过,忽略这一点,你的方法其实没有什么特别大的问题。根据我的观察,使用基本配置时,你的方法大约慢了两倍,读取数据的速度要快一些,而创建虚拟的 numpy 数组则稍微慢一点。

所以看起来问题可能出在别的地方。可能是集群配置不当,或者从 S3 获取数据的成本太高,甚至可能是期望值不切实际。

0

在这种情况下,最好的办法是使用pandas库来处理输入输出。
请参考这个问题:pandas的read_csv()和Python迭代器作为输入
在那里你会看到如何替换np.loadtxt()这个函数,这样创建numpy数组的RDD会快得多。

撰写回答