如何在pyspark中将密集向量的RDD转换成数据帧?

2024-04-25 13:48:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个这样的DenseVectorRDD

>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]

我想把它转换成Dataframe。我这样试过

>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()

它会产生这样的错误

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
    schema = _infer_schema(first)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>

旧解决方案

frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))

编辑1-代码可复制

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split

from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()

vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")

idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

Tags: infromimportsqldatatypelinespark
2条回答

我认为这里的问题是createDataframe没有使用denseVactor作为参数,请尝试将denseVector转换为相应的集合[即数组或列表]。在scala和java中

toArray()

方法可用。您可以在数组或列表中转换DensEventer,然后尝试创建dataFrame。

不能直接转换RDD[Vector]。它应该映射到可以解释为structs的对象的RDD,例如RDD[Tuple[Vector]]

frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

否则Spark将尝试转换对象__dict__,并创建使用不支持的NumPy数组作为字段。

from pyspark.ml.linalg import DenseVector  
from pyspark.sql.types import _infer_schema

v = DenseVector([1, 2, 3])
_infer_schema(v)
TypeError                                 Traceback (most recent call last)
... 
TypeError: not supported type: <class 'numpy.ndarray'>

_infer_schema((v, ))
StructType(List(StructField(_1,VectorUDT,true)))

注意:

  • 在Spark 2.0中,必须使用正确的本地类型:

    • pyspark.ml.linalg使用基于DataFramepyspark.mlAPI时。
    • pyspark.mllib.linalg使用基于RDDpyspark.mllibAPI时。

    这两个命名空间不再兼容,需要显式转换(例如How to convert from org.apache.spark.mllib.linalg.VectorUDT to ml.linalg.VectorUDT)。

  • 编辑中提供的代码与原始问题中的代码不同。你应该知道tuplelist没有相同的语义。如果将向量映射到对,请使用tuple,并直接转换为DataFrame

    tfidf.rdd.map(
        lambda row: (row[0], DenseVector(row[1].toArray()))
    ).toDF()
    

    使用tuple(产品类型)也适用于嵌套结构,但我怀疑这是您想要的:

    (tfidf.rdd
        .map(lambda row: (row[0], DenseVector(row[1].toArray())))
        .map(lambda x: (x, ))
        .toDF())
    

    list在顶层以外的任何其他地方row被解释为ArrayType

  • 使用UDF进行转换(Spark Python: Standard scaler error "Do not support ... SparseVector")要干净得多。

相关问题 更多 >