我与熊猫和火花数据帧工作。数据帧总是很大(>;20 GB),标准的spark函数不足以满足这些大小。目前,我正在将我的熊猫数据帧转换为如下spark数据帧:
dataframe = spark.createDataFrame(pandas_dataframe)
我之所以进行这种转换,是因为使用spark将数据帧写入hdfs非常简单:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
但是对于大于2GB的数据帧来说,转换是失败的。 如果我将spark数据帧转换为pandas,我可以使用pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.Parquetdataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
这是一个从spark到pandas的快速对话,它也适用于大于2GB的数据帧。我还找不到另一种方法。意思是有一个pandas数据框,我在pyarrow的帮助下转换成spark。问题是我真的找不到如何将pandas数据帧写入hdfs。
我的熊猫版:0.19.0
^{} 是您要查找的函数:
结果可以直接写入拼花地板/HDFS,而无需通过Spark传递数据:
另请参见
火花音符:
此外,由于Spark 2.3(当前主)箭头在} to compute number of chunks 使您可以轻松控制单个批次的大小。
createDataFrame
(SPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame)中直接受支持。它uses ^{最后,可以使用
defaultParallelism
来控制使用标准_convert_from_pandas
生成的分区数,从而有效地将切片的大小减小到更易于管理的程度。不幸的是,这些不太可能解决您的current memory problems。两者都依赖于
parallelize
,因此将所有数据存储在驱动程序节点的内存中。切换到箭头或调整配置只能加快进程或地址块大小限制。实际上,我不认为有任何理由在这里切换到Spark,只要您使用本地Pandas
DataFrame
作为输入。在这种情况下,最严重的瓶颈是驱动程序的网络I/O,而分发数据并不能解决这个问题。另一种方法是将pandas数据帧转换为spark数据帧(使用pyspark),并使用save命令将其保存到hdfs。 示例
这里
astype
将列的类型从object
更改为string
。这将避免您在其他情况下引发异常,因为spark无法找出pandas类型object
。但要确保这些列的类型是string。现在要在hdfs中保存df:
来自https://issues.apache.org/jira/browse/SPARK-6235
已解决。
来自https://pandas.pydata.org/pandas-docs/stable/r_interface.html
可以将pandas数据帧转换为R data.frame
那么,也许是熊猫的转变->;R->;火花->;HDF?
相关问题 更多 >
编程相关推荐