如何用m在spark中进行叠加和混合

2024-05-28 22:42:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用sparkml来训练我的web分类器。使用单一型号f1不强。我尝试将以下python代码转换为spark:

skf = list(StratifiedKFold(y, n_folds))

clfs = [RandomForestClassifier(n_estimators=100, n_jobs=-1, criterion='gini'),
        RandomForestClassifier(n_estimators=100, n_jobs=-1, criterion='entropy'),
        ExtraTreesClassifier(n_estimators=100, n_jobs=-1, criterion='gini'),
        ExtraTreesClassifier(n_estimators=100, n_jobs=-1, criterion='entropy'),
        GradientBoostingClassifier(learning_rate=0.05, subsample=0.5, max_depth=6, n_estimators=50)]

for j, clf in enumerate(clfs):
        print j, clf
        dataset_blend_test_j = np.zeros((X_submission.shape[0], len(skf)))
        for i, (train, test) in enumerate(skf):
            print "Fold", i
            X_train = X[train]
            y_train = y[train]
            X_test = X[test]
            y_test = y[test]
            clf.fit(X_train, y_train)
            y_submission = clf.predict_proba(X_test)[:, 1]
            dataset_blend_train[test, j] = y_submission
            dataset_blend_test_j[:, i] = clf.predict_proba(X_submission)[:, 1] dataset_blend_test[:, j] = dataset_blend_test_j.mean(1)

转换的火花代码:

val Array(trainData,testData)=idf.randomSplit(Array(0.7,0.3),seed=1234L)
  val Array(test0,test1,test2,test3,test4,test5,test6,test7,test8,test9) =trainData.randomSplit(Array(0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1),seed= 111L)
  val train0 = test1.union(test2).union(test3).union(test4).union(test5).union(test6).union(test7).union(test8).union(test9)
  val train1 = test0.union(test2).union(test3).union(test4).union(test5).union(test6).union(test7).union(test8).union(test9)
  val train2 = test0.union(test1).union(test3).union(test4).union(test5).union(test6).union(test7).union(test8).union(test9)
  val train3 = test0.union(test1).union(test2).union(test4).union(test5).union(test6).union(test7).union(test8).union(test9)
  val train4 = test0.union(test1).union(test2).union(test3).union(test5).union(test6).union(test7).union(test8).union(test9)
  val train5 = test0.union(test1).union(test2).union(test3).union(test4).union(test6).union(test7).union(test8).union(test9)
  val train6 = test0.union(test1).union(test2).union(test3).union(test4).union(test5).union(test7).union(test8).union(test9)
  val train7 = test0.union(test1).union(test2).union(test3).union(test4).union(test5).union(test6).union(test8).union(test9)
  val train8 = test0.union(test1).union(test2).union(test3).union(test4).union(test5).union(test6).union(test7).union(test9)
  val train9 = test0.union(test1).union(test2).union(test3).union(test4).union(test5).union(test6).union(test7).union(test8)

  val dataMap=Map(train0->test0,train1->test1,train2->test2,train3->test3,train4->test4,train5->test5,
    train6->test6,train7->test7,train8->test8,train9->test9)

  val (trainRes,testRes)=dataMap.map(x=>{
    val (train,test)=x
    val NBmodel=new NaiveBayes().setLabelCol("label").setFeaturesCol("features").setPredictionCol("nb_prediction").fit(train)
    val onevsrestmodle=(new OneVsRest()).setFeaturesCol("features").setLabelCol("label").setPredictionCol("lr_prediction").setClassifier(new LogisticRegression()).fit(train)
    import org.apache.spark.ml.classification.DecisionTreeClassifier
    val dt1 = new DecisionTreeClassifier().
      setLabelCol("label").
      setFeaturesCol("features").
      setPredictionCol("dt_prediction").
      setImpurity("entropy").fit(train)
    val train_nb = NBmodel.transform(test)
    val test_nb = NBmodel.transform(testData)
    val train_lr = onevsrestmodle.transform(train_nb)
    val test_lr = onevsrestmodle.transform(test_nb)
    val train_dt = dt1.transform(train_lr)
    val test_dt = dt1.transform(test_lr)
    (train_dt,test_dt)
  }).reduce((a,b)=>{
    val (train1,test1)=a
    val (train2,test2)=b
    (train1.union(train2),test1.union(test2))
  })

  val trainInput=trainRes.select("url","label","nb_prediction","lr_prediction","dt_prediction").map(x=>{
    val Row(url,label,nb_prediction,lr_prediction,dt_prediction)=x
    (url,label,Array(nb_prediction,lr_prediction,dt_prediction))
  }).toDF("url","label","features")

  val testInput=testRes.select("url","label","nb_prediction","lr_prediction","dt_prediction").map(x=>{
    val Row(url,label,nb_prediction,lr_prediction,dt_prediction)=x
    ((url,label),Array(nb_prediction.toString.toDouble,lr_prediction.toString.toDouble,dt_prediction.toString.toDouble))
  }).rdd.reduceByKey((a,b)=>{
    Array(a.apply(0)+b.apply(0),a.apply(1)+b.apply(1),a.apply(2)+b.apply(2))
  }).map(x=>{
    val ((url,label),list)=x
    val array=list.map(_/10)
    (url,label,array)
  })

这个代码的效果很差。我只用了10节课,但是课太多了。如果增加课时,一项任务要花很长时间。另外,我只选3个型号,将来我想用5个或更多的型号。 有没有更好的办法?你知道吗


Tags: testtrainvallabelunionpredictiontest1test2

热门问题