使用Sp分发scikitlearn

2024-05-16 18:59:10 发布

您现在位置:Python中文网/ 问答频道 /正文

所以我一直致力于用不同的参数组合来分布和并行执行不同的ML算法。我正在集群环境上进行实验,我希望尽可能多地利用所有可用资源。 更多信息:How are tasks distributed within a Spark cluster?

我的输入包括JSON格式的算法和参数列表。基于这种情况,我有两种方法:

  1. 对于我拥有的每个ML算法,我创建一个SparkGridSearch对象(从official integration)并将其放入一个列表中。然后,我在不同的线程上执行这些网格搜索对象,如下所示:

    ...
    for experiment in experiments:
        # experiment contains the algorithm and its list of parameters
        gridSearchCV = SparkGridSearchCV(spark_context, experiment[0], experiment[1], scoring=experiment[2], cv=experiment[3])
        experiment_thread = Thread(target=run_experiment, args=(bcast_dataframe, gridSearchCV))
        experiment_thread.start()
    

输入:

^{pr2}$
  1. 我还创建了一个列表,其中每个元素都是一个带有一个参数组合的算法。因此,这个列表的大小比前面的方法大得多,因为每个组合都是列表的一个元素。在本例中,我只需在此列表中使用spark.parallelize,并将其映射到运行每个ML算法的函数:

    def run_experiment(dataframe, experiment):
       ...
       model = estimator.fit(train_data, train_target)
    
       predicted = model.predict(test_data)
    
       score = accuracy_score(predicted, expected)
       return [model, score]
    
    
    def main(): 
      ...
      bcast_dataset = spark_context.broadcast(df)
    
      experimentsRDD = spark_context.parallelize(experiments_dict)
    
      print experimentsRDD.map(lambda experiment: run_experiment(bcast_dataset, experiment)).collect()
    
      bcast_dataset.unpersist()
    

输入:

experiments_dict = {
    'Decision Tree': {'num_folds': 2, 'evaluation': 'accuracy', 'parameters': {'criterion': ['entropy'], 'class_weight': ['balanced']}},
    'Naive Bayes': {'num_folds': 2, 'evaluation': 'accuracy', 'parameters': {}},
    'Random Forests': {'num_folds': 2, 'evaluation': 'accuracy', 'parameters': {'n_estimators': [5]}},
    'Random Forests_1_2': {'num_folds': 2, 'evaluation': 'accuracy', 'parameters': {'n_estimators': [7]}},
    'Random Forests_2': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'bootstrap': [True, False], 'criterion': ['entropy'], 'class_weight': ['balanced']}},
    'Random Forests_2_1': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'bootstrap': [False], 'criterion': ['entropy'], 'class_weight': ['balanced']}},
    'Decision Tree_2': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [2], 'max_depth': [None], 'class_weight': ['balanced']}},
    'Decision Tree_2_1': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [3], 'max_depth': [None], 'class_weight': ['balanced']}},
    'Decision Tree_2_2': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [None], 'max_depth': [None], 'class_weight': ['balanced']}},
    'Decision Tree_2_3': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [2], 'max_depth': [None], 'class_weight': ['None']}},
    'Decision Tree_2_4': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [3], 'max_depth': [None], 'class_weight': ['None']}},
    'Decision Tree_2_5': {'num_folds': 3, 'evaluation': 'accuracy', 'parameters': {'splitter': ['random'], 'max_leaf_nodes': [None], 'max_depth': [None], 'class_weight': ['None']}},
        ...
}

我用第二种方法做了一些实验,取得了较好的效果。但是,当改变分区的数量(等于实验的数量,或者2到3倍的可用核心数量)时,执行时间没有变化。 在多线程方法中使用Spark广播数据集时,我也遇到了一些问题,但显然这是一个已知的issue。在

我的问题:这些结果有意义吗?为什么分区的数量不影响执行时间?我是缺少了什么,还是有什么可以改进的?在

谢谢!在


Tags: nonetree列表nummaxclassexperimentparameters