通过pyspark.ml CrossValid调整隐式pyspark.ml ALS矩阵分解模型的参数

2024-05-16 00:30:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图调整使用隐式数据的ALS矩阵分解模型的参数。为此,我尝试使用pyspark.ml.tuning.CrossValidator来运行参数网格并选择最佳模型。我相信我的问题出在评估者身上,但我想不出来。

我可以使用回归RMSE求值器对显式数据模型执行此操作,如下所示:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import rand


conf = SparkConf() \
  .setAppName("MovieLensALS") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
                                 ["user", "item", "rating"])
dfRatingsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsExplicit = ALS()
defaultModel = alsExplicit.fit(dfRatings)

paramMapExplicit = ParamGridBuilder() \
                    .addGrid(alsExplicit.rank, [8, 12]) \
                    .addGrid(alsExplicit.maxIter, [10, 15]) \
                    .addGrid(alsExplicit.regParam, [1.0, 10.0]) \
                    .build()

evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cvExplicit = CrossValidator(estimator=alsExplicit, estimatorParamMaps=paramMapExplicit, evaluator=evaluatorR)
cvModelExplicit = cvExplicit.fit(dfRatings)

predsExplicit = cvModelExplicit.bestModel.transform(dfRatingsTest)
predsExplicit.show()

当我尝试对隐式数据(比如说视图的计数而不是评级)执行此操作时,我会得到一个无法完全理解的错误。下面是代码(与上面非常相似):

dfCounts = sqlContext.createDataFrame([(0,0,0), (0,1,12), (0,2,3), (1,0,5), (1,1,9), (1,2,0), (2,0,0), (2,1,11), (2,2,25)],
                                 ["user", "item", "rating"])
dfCountsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCounts)

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

evaluatorB = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="rating")
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR)
cvModel = cv.fit(dfCounts)

predsImplicit = cvModel.bestModel.transform(dfCountsTest)
predsImplicit.show()

我试着用一个RMSE评估器做这个,结果得到一个错误。据我所知,我还应该能够使用二元分类计算器的AUC度量,因为隐式矩阵分解的预测是二元矩阵p_uiper this paper预测的置信矩阵c_ui,pyspark ALS的文档引用了这一点。

使用任何一个评估器都会给我一个错误,我在网上找不到任何关于交叉验证隐式ALS模型的有成效的讨论。我正在查看CrossValidator源代码,试图找出问题所在,但遇到了问题。我的一个想法是,在将隐式数据矩阵r_ui转换为二进制矩阵p_ui和置信矩阵c_ui之后,我不确定在评估阶段预测的c_ui矩阵与之相比是什么。

错误如下:

Traceback (most recent call last):

  File "<ipython-input-16-6c43b997005e>", line 1, in <module>
    cvModel = cv.fit(dfCounts)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 69, in fit
    return self._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\tuning.py", line 239, in _fit
    model = est.fit(train, epm[j])

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 67, in fit
    return self.copy(params)._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 133, in _fit
    java_model = self._fit_java(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 130, in _fit_java
    return self._java_obj.fit(dataset._jdf)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)

etc.......

更新

我尝试缩放输入,使其在0到1的范围内,并使用RMSE求值器。在我尝试将它插入到CrossValidator之前,它似乎工作得很好。

以下代码有效。我从评估者那里得到预测和RMSE值。

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator


conf = SparkConf() \
  .setAppName("ALSPractice") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

# Users 0, 1, 2, 3 - Items 0, 1, 2, 3, 4, 5 - Ratings 0.0-5.0
dfCounts2 = sqlContext.createDataFrame([(0,0,5.0), (0,1,5.0),            (0,3,0.0), (0,4,0.0), 
                                        (1,0,5.0),            (1,2,4.0), (1,3,0.0), (1,4,0.0),
                                        (2,0,0.0),            (2,2,0.0), (2,3,5.0), (2,4,5.0),
                                        (3,0,0.0), (3,1,0.0),            (3,3,4.0)            ],
                                       ["user", "item", "rating"])

dfCountsTest2 = sqlContext.createDataFrame([(0,0), (0,1), (0,2), (0,3), (0,4),
                                            (1,0), (1,1), (1,2), (1,3), (1,4),
                                            (2,0), (2,1), (2,2), (2,3), (2,4),
                                            (3,0), (3,1), (3,2), (3,3), (3,4)], ["user", "item"])

# Normalize rating data to [0,1] range based on max rating
colmax = dfCounts2.select(F.max('rating')).collect()[0].asDict().values()[0]
normalize = udf(lambda x: x/colmax, FloatType())
dfCountsNorm = dfCounts2.withColumn('ratingNorm', normalize(col('rating')))

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCountsNorm)
preds = defaultModelImplicit.transform(dfCountsTest2)

evaluatorR2 = RegressionEvaluator(metricName="rmse", labelCol="ratingNorm")
evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm))

preds = defaultModelImplicit.transform(dfCountsTest2)

我不明白的是为什么下面的不起作用。我用同样的估计器,同样的估计器,拟合同样的数据。为什么这些在上面工作,而不是在CrossValidator中:

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR2)
cvModel = cv.fit(dfCountsNorm)

Tags: infromimportline矩阵mlsparkpyspark
2条回答

由于忽略了技术问题,严格地说,对于由具有隐式反馈的ALS生成的输入,这两种方法都是不正确的。

  • 不能使用RegressionEvaluator,因为正如您已经知道的,预测可以解释为一个置信值,并表示为[0,1]范围内的浮点数,而label列只是一个未绑定整数。这些价值显然不具有可比性。
  • 不能使用BinaryClassificationEvaluator,因为即使可以将预测解释为概率标签,也不表示二进制决策。此外,预测列的类型无效,不能直接与BinaryClassificationEvaluator一起使用

您可以尝试转换其中一个列,使输入符合要求,但从理论角度来看,这并不是一种真正合理的方法,并且引入了难以调整的其他参数。

  • 将标签列映射到[0,1]范围并使用RMSE。

  • 将标签列转换为具有固定阈值的二进制指示符,并扩展ALS/ALSModel以返回所需的列类型。假设阈值为1,可能是这样的

    from pyspark.ml.recommendation import *
    from pyspark.sql.functions import udf, col
    from pyspark.mllib.linalg import DenseVector, VectorUDT
    
    class BinaryALS(ALS):
        def fit(self, df):
            assert self.getImplicitPrefs()
            model = super(BinaryALS, self).fit(df)
            return ALSBinaryModel(model._java_obj)
    
    class ALSBinaryModel(ALSModel):
        def transform(self, df):
            transformed = super(ALSBinaryModel, self).transform(df)
            as_vector = udf(lambda x: DenseVector([1 - x, x]), VectorUDT())
            return transformed.withColumn(
                "rawPrediction", as_vector(col("prediction")))
    
    # Add binary label column
    with_binary = dfCounts.withColumn(
        "label_binary", (col("rating") > 0).cast("double"))
    
    als_binary_model = BinaryALS(implicitPrefs=True).fit(with_binary)
    
    evaluatorB = BinaryClassificationEvaluator(
        metricName="areaUnderROC", labelCol="label_binary")
    
    evaluatorB.evaluate(als_binary_model.transform(with_binary))
    ## 1.0
    

一般来说,教科书中缺少关于使用隐式反馈评估推荐系统的资料,我建议您阅读一下关于评估这些推荐系统的eliasahsanswer

有了隐含的反馈,我们就不会有用户对我们的建议的反应。因此,我们不能使用基于精度的度量。

在已经存在的cited paper中,将使用预期的百分比排序度量。

您可以尝试在Spark ML库中实现基于类似度量的求值器,并在交叉验证管道中使用它。

相关问题 更多 >