Spark 创建 NumPy 数组 RDD 的最快方法
我的Spark应用程序正在使用包含numpy数组的RDD。
目前,我是从AWS S3读取数据,这些数据以简单的文本文件形式存储,每一行都是一个向量,每个元素之间用空格分开,比如:
1 2 3
5.1 3.6 2.1
3 0.24 1.333
我正在使用numpy的loadtxt()
函数来从中创建一个numpy数组。
不过,这种方法似乎非常慢,我觉得我的应用在将数据集转换为numpy数组时花费了太多时间。
你能给我推荐一个更好的方法吗?比如,我应该把数据集保留为二进制文件吗?或者我应该用其他方式创建RDD?
这是我创建RDD的一些代码:
data = sc.textFile("s3_url", initial_num_of_partitions).mapPartitions(readData)
readData函数:
def readPointBatch(iterator):
return [(np.loadtxt(iterator,dtype=np.float64)]
相关文章:
- 暂无相关问题
3 个回答
在使用Spark时,不建议使用numpy
。因为Spark有自己处理数据的方法,它能确保你那些可能很大的文件不会一次性全部加载到内存中,这样就不会超过内存的限制。你应该用Spark这样加载你的文件:
data = sc.textFile("s3_url", initial_num_of_partitions) \
.map(lambda row: map(lambda x: float(x), row.split(' ')))
这样做之后,你会得到一个RDD
,它的格式会像这样,具体取决于你的例子:
>>> print(data.collect())
[[1.0, 2.0, 3.0], [5.1, 3.6, 2.1], [3.0, 0.24, 1.333]]
@编辑 关于文件格式和numpy
使用的一些建议:
文本文件和CSV、TSV、Parquet等格式一样好,选择你觉得舒服的格式就行。根据Spark的文档,二进制文件不是首选:
binaryFiles(路径, 最小分区数=None)
注意:实验性
从HDFS、本地文件系统(在所有节点上可用)或任何Hadoop支持的文件系统URI读取一组二进制文件,作为字节数组。每个文件被读取为一个单独的记录,并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
注意:小文件更受欢迎,大文件也是可以的,但可能会导致性能下降。
至于numpy
的使用,如果我是你,我一定会尝试用Spark的原生功能替代任何外部包,比如用pyspark.mlib.random
来进行随机化:http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.random
用 numpy.fromstring
来处理会更符合常规做法,而且速度也会稍微快一些,像这样:
import numpy as np.
path = ...
initial_num_of_partitions = ...
data = (sc.textFile(path, initial_num_of_partitions)
.map(lambda s: np.fromstring(s, dtype=np.float64, sep=" ")))
不过,忽略这一点,你的方法其实没有什么特别大的问题。根据我的观察,使用基本配置时,你的方法大约慢了两倍,读取数据的速度要快一些,而创建虚拟的 numpy 数组则稍微慢一点。
所以看起来问题可能出在别的地方。可能是集群配置不当,或者从 S3 获取数据的成本太高,甚至可能是期望值不切实际。
在这种情况下,最好的办法是使用pandas库来处理输入输出。
请参考这个问题:pandas的read_csv()和Python迭代器作为输入。
在那里你会看到如何替换np.loadtxt()
这个函数,这样创建numpy数组的RDD会快得多。