如何将大Pandas数据帧保存到hdfs?

2024-06-01 02:58:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我与熊猫和火花数据帧工作。数据帧总是很大(>;20 GB),标准的spark函数不足以满足这些大小。目前,我正在将我的熊猫数据帧转换为如下spark数据帧:

dataframe = spark.createDataFrame(pandas_dataframe)  

我之所以进行这种转换,是因为使用spark将数据帧写入hdfs非常简单:

dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")

但是对于大于2GB的数据帧来说,转换是失败的。 如果我将spark数据帧转换为pandas,我可以使用pyarrow:

// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.Parquetdataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)

这是一个从spark到pandas的快速对话,它也适用于大于2GB的数据帧。我还找不到另一种方法。意思是有一个pandas数据框,我在pyarrow的帮助下转换成spark。问题是我真的找不到如何将pandas数据帧写入hdfs。

我的熊猫版:0.19.0


Tags: to数据pathdataframepandasmodetablehdfs
3条回答

Meaning having a pandas dataframe which I transform to spark with the help of pyarrow.

^{}是您要查找的函数:

Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)

Convert pandas.DataFrame to an Arrow Table
import pyarrow as pa

pdf = ...  # type: pandas.core.frame.DataFrame
adf = pa.Table.from_pandas(pdf)  # type: pyarrow.lib.Table

结果可以直接写入拼花地板/HDFS,而无需通过Spark传递数据:

import pyarrow.parquet as pq

fs  = pa.hdfs.connect()

with fs.open(path, "wb") as fw
    pq.write_table(adf, fw)

另请参见

火花音符

此外,由于Spark 2.3(当前主)箭头在createDataFrameSPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame)中直接受支持。它uses ^{} to compute number of chunks使您可以轻松控制单个批次的大小。

最后,可以使用defaultParallelism来控制使用标准_convert_from_pandas生成的分区数,从而有效地将切片的大小减小到更易于管理的程度。

不幸的是,这些不太可能解决您的current memory problems。两者都依赖于parallelize,因此将所有数据存储在驱动程序节点的内存中。切换到箭头或调整配置只能加快进程或地址块大小限制。

实际上,我不认为有任何理由在这里切换到Spark,只要您使用本地PandasDataFrame作为输入。在这种情况下,最严重的瓶颈是驱动程序的网络I/O,而分发数据并不能解决这个问题。

另一种方法是将pandas数据帧转换为spark数据帧(使用pyspark),并使用save命令将其保存到hdfs。 示例

    df = pd.read_csv("data/as/foo.csv")
    df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
    sc = SparkContext(conf=conf)
    sqlCtx = SQLContext(sc)
    sdf = sqlCtx.createDataFrame(df)

这里astype将列的类型从object更改为string。这将避免您在其他情况下引发异常,因为spark无法找出pandas类型object。但要确保这些列的类型是string。

现在要在hdfs中保存df:

    sdf.write.csv('mycsv.csv')

来自https://issues.apache.org/jira/browse/SPARK-6235

Support for parallelizing R data.frame larger than 2GB

已解决。

来自https://pandas.pydata.org/pandas-docs/stable/r_interface.html

Converting DataFrames into R objects

可以将pandas数据帧转换为R data.frame

那么,也许是熊猫的转变->;R->;火花->;HDF?

相关问题 更多 >