当运行在具有32个CPU核的Databricks集群中时,多进程python模块似乎不能像Spark MLlib预期的那样工作

2024-05-29 03:39:09 发布

您现在位置:Python中文网/ 问答频道 /正文

请注意,由于这篇文章篇幅较长,我建议您查看每个函数的描述。这是因为,这些函数在没有任何错误的情况下成功执行。我只是将它们呈现给读者,让读者大致了解执行的代码。因此,请多关注我的理论部分和问题部分,而不是技术部分

[理论部分-解释情况]

首先,我想指出这是一个与执行时间相关的问题。虽然时间执行是我关心的问题,但所演示的代码工作得非常好

我想听听你对过去几天我在Databricks集群上处理线程多处理Python模块时遇到的问题的看法,该集群有32个CPU内核。非常简单地说,我创建了一个函数(如下所示),它将Spark数据帧作为输入,并训练两个Spark MLlib分类器。在培训之前,使用类似Spark的命令对Spark数据帧进行一些额外的清理和准备。将显示训练和预处理每个spark数据帧所需的时间。该函数包括训练和预处理功能,应用15次(也就是15个不同的spark数据帧)。因此,您可以理解,我使用线程和多处理的目标是一次执行这15个迭代,而不是顺序执行(一个接一个)。只要想象一下,这15次迭代在不久的将来将变成1500次。因此,这是未来数据规模扩大的一个基准

在继续之前,我想说明一下我在研究线程和多处理时得出的一些结论。基于Brendan Fortuner的article,线程主要用于受GIL限制的I/O绑定任务(防止两个线程在同一程序中同时执行)。另一方面,多处理模块使用进程来加速CPU密集型Python操作,因为它们受益于多核并避免了GIL。因此,尽管我最初创建了一个线程相似的应用程序,同时应用我的函数15次,但由于上面所述的原因,我后来改用了多处理方法

[技术部分]

火花数据帧

spark_df= pd.DataFrame({    'IMEI' : ['358639059721529', '358639059721529', '358639059721529', '358639059721529', '358639059721529', '358639059721735', '358639059721735', '358639059721735', '358639059721735', '358639059721735'],
                            'PoweredOn': [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
                            'InnerSensorConnected': [1.0,  1.0,  1.0,  1.0,  1.0,  1.0, 1.0,  1.0,  1.0,  1.0,  1.0],
                            'averageInnerTemperature': [2.5083397819149877, 12.76785419845581, 2.5431994716326396, 2.5875612214150556, 2.5786447594332143, 2.6642078435610212, 12.767857551574707, 12.767857551574707, 2.6131772499486625, 2.5172743565284166]
                            'OuterSensorConnected':[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], 
                            'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826, 31.784826],
                            'EnergyConsumption': [70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0],
                            'DaysDeploymentDate': [10.0, 20.0, 21.0, 31.0, 41.0, 11.0, 19.0, 57.0, 44.0, 141.0],
                            'label': [0, 0, 1, 1, 1, 0, 0, 1, 1, 1]
                      }
                    )
spark_df= spark.createDataFrame(spark_df)

数据帧的出现只是为了记住所使用的spark数据帧。假设这10行是7000行,2个IMEI实际上是15个唯一的IMEI,因为我告诉过你,我有15个spark数据帧,每个IMEI有1个(['358639059721529','358639059721735'])

*[应用的功能]

def training_models_operation_multiprocess(asset_id, location, asset_total_number, timestamp_snap, joined_spark_dataset):
 #-------------------------------------------------------------------------------------------------------------------------
    # KEYWORDS INITIALIZATION
    #-------------------------------------------------------------------------------------------------------------------------
    device_length=int(asset_total_number)
    list_string_outputs=list()
    max_workers=16*2
    training_split_ratio=0.5
    testing_split_ratio=0.5
    cross_validation_rounds=2
    optimization_metric="ROC_AUC"
    features_column_name="features"
    disable_logging_value=1 # a value that prevents standard output to be logged at Application insights
    logger_initialization=instantiate_logger(instrumentation_key_value) #a logger instance

    # Time format
    date_format = '%Y-%m-%d %H-%M-%S'
    #-------------------------------------------------------------------------------------------------------------------------
    # KEYWORDS INITIALIZED
    #-------------------------------------------------------------------------------------------------------------------------

    try:

        print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length))
        begin_time_0 = time.time()

        #1.1 Filter the rows related to the current asset
        begin_time_1 = time.time()

        filtered_dataset=joined_spark_dataset.where(joined_spark_dataset.IMEI.isin([asset_id]))
        filtered_dataset=apply_repartitioning(filtered_dataset, max_workers)

        end_time_1 = time.time() - begin_time_1
        list_string_outputs.append("{0}: FINISH Step 1.1 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_1)))

        #------------------------
        # FUNCTION: 1.2 Preprocess
        begin_time_2 = time.time()

        target_column_name=None
        target_column_name='label'
        preprocessed_spark_df=preprocess_data_pipeline(filtered_dataset, drop_columns_not_used_in_training, target_column_name, executor)
        preprocessed_spark_df=apply_repartitioning(preprocessed_spark_df, max_workers)

        end_time_2 = time.time() - begin_time_2
        list_string_outputs.append("{0}: FINISH Step 1.2 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_2)))

        #------------------------
        #FUNCTION: 1.3 Train-Test split
        begin_time_3 = time.time()

        target_column_name=None
        target_column_name='target'
        training_data, testing_data=spark_train_test_split(asset_id, preprocessed_spark_df, training_split_ratio, testing_split_ratio, target_column_name, disable_logging_value, logger_initialization)
        training_data=apply_repartitioning(training_data, max_workers)
        testing_data=apply_repartitioning(testing_data, max_workers)

        end_time_3 = time.time() - begin_time_3
        list_string_outputs.append("{0}: FINISH Step 1.3 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_3)))

        #FUNCTION: 1.4 Train the algorithms
        begin_time_4 = time.time()

        best_classifier_asset_id=spark_ml_classification(asset_id, cross_validation_rounds, training_data, testing_data, target_column_name, features_column_name, optimization_metric, disable_logging_value, 
                                                         logger_initialization)

        end_time_4 = time.time() - begin_time_4
        list_string_outputs.append("{0}: FINISH Step 1.4 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_4)))

        end_time_0 = time.time() - begin_time_0
        list_string_outputs.append("{0}: END EXECUTION PLAN OF ASSET ID {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_0)))

    except Exception as e:
            custom_logging_function(logger_initialization, disable_logging_value, "ERROR", "ERROR EXCEPTION captured in asset id {0}: {1}".format(asset_id, e))
            raise
    print(" ".join(list_string_outputs))

[功能1.1]:根据IMEI过滤数据集 描述:从包含所有IMEI ID的整个数据集中,仅过滤属于IMEI每次迭代编号的行

设备ID=['358639059721529','358639059721735'] 过滤的数据集=spark\u df.where(spark\u df.IMEI.isin([device\u id]))

[功能1.2]:预处理火花DF 描述:将矢量汇编程序应用于可培训的功能,将StringIndexer应用于标签

   def preprocess_data_pipeline(spark_df, target_variable)
    stages = []

    # Convert label into label indices using the StringIndexer
    label_stringIdx = StringIndexer(inputCol=target_variable, outputCol="target").setHandleInvalid("keep") #target variable shoule be IntegerType
    stages += [label_stringIdx]

    numeric_columns=["PoweredOn", "InnerSensorConnected", "averageInnerTemperature", "OuterSensorConnected", "OuterHumidity", "EnergyConsumption", "DaysDeploymentDate"]

    # Vectorize trainable features
    assemblerInputs = numeric_columns
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid("keep")
    stages += [assembler]

    partialPipeline = Pipeline().setStages(stages)
    pipelineModel = partialPipeline.fit(spark_df)
    preppedDataDF = pipelineModel.transform(spark_df)

    # Keep relevant columns
    selectedcols = ["target", "features"]
    dataset = preppedDataDF.select(selectedcols)
    dataset=dataset.drop(target_variable)

    #dataset.printSchema()
    return dataset

[功能1.3:列车测试拆分] *使用分层方法分割数据以训练和测试spark df*

def spark_train_test_split(device_id, prepared_spark_df, train_split_ratio, test_split_ratio, target_variable):

    trainingData = prepared_spark_df.sampleBy(target_variable, fractions={0: train_split_ratio, 1: train_split_ratio}, seed=10)
    testData = prepared_spark_df.subtract(trainingData)

    return trainingData, testData

[功能1.4:训练ML算法] 描述:训练两种分类算法,然后选择ROC_AUC得分最高的一种。每个分类器都使用Spark MLlib的CrossValidator类进行训练…对于第一个分类器(随机森林),我交叉验证了4个模型,而对于第二个分类器(梯度增强树),我交叉验证了8个模型。为了加快速度,我将交叉验证程序类的parallelism参数设置为8(explautionhere

def machine_learning_estimator_initialization(model_name, target_variable, features_variable):

    try:
        dictionary_best_metric={}
        dictionary_best_estimator={}
        list_of_classifiers=["RandomForest Classifier", "GradientBoost Classifier"]

        begin_time_train=time.time()
        for i in list_of_classifiers:

            pipeline_object, paramGrid, evaluator=machine_learning_estimator_initialization(i, target_column, features_column)

            start_time_classifier=time.time()

            # THE MOST TIME CONSUMING PART OF MY EXECUTION
            classification_object = CrossValidator(estimator=pipeline_object, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=cross_validation_rounds, parallelism=8)

            classificationModel = classification_object.fit(training_dataset)
            end_time_classifier=time.time()-start_time_classifier
            print("Time passed to complete training for classifier {0} of asset id {1}: {2}".format(i, device_id, format_timespan(end_time_classifier)))

            predictions = classificationModel.transform(testing_dataset)
            evaluation_score_classifier=evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
            y_true = predictions.select([target_column]).collect()
            y_pred = predictions.select(['prediction']).collect()

            confusion_mat=confusion_matrix(y_true, y_pred)
            confusion_table=pd.DataFrame(confusion_mat,
                                         columns=['0','1'],
                                         index=['0','1'])

            accuracy_value=accuracy_score(y_true, y_pred)
            f1_value=f1_score(y_true, y_pred, zero_division=1)
            precision_value=precision_score(y_true, y_pred, zero_division=1)
            recall_value=recall_score(y_true, y_pred, zero_division=1)
            hamming_loss_value=hamming_loss(y_true, y_pred)
            zero_one_loss_value=zero_one_loss(y_true, y_pred, normalize=False)

            list_of_metrics=['ROC_AUC', 'accuracy', 'f1', 'precision', 'recall', 'hamming_loss', 'zero_one_loss']
            list_of_metric_values=[evaluation_score_classifier, accuracy_value, f1_value, precision_value, recall_value, hamming_loss_value, zero_one_loss_value]

            evaluation_metric_name_index=list_of_metrics.index(evaluation_metric) # With this index I can locate any value selected for the evaluation metric

            if evaluation_metric=='ROC_AUC':
                dictionary_best_metric.update({"{0}_best_score".format(i): evaluation_score_classifier}) #alternative hamming_loss_value

            else:
                dictionary_best_metric.update({"{0}_best_score".format(i): list_of_metric_values[evaluation_metric_name_index]})

            dictionary_best_estimator.update({"{0}_best_estimator".format(i): classificationModel.bestModel})

        end_time_train=time.time()-begin_time_train
        print("Total time of training execution of two MLlib algorithms for the asset {0}: {1}".format(device_id, format_timespan(end_time_train)))

        maximum_metrics=['ROC_AUC', 'accuracy', 'f1', 'precision', 'recall']
        minimum_metrics=['hamming_loss', 'zero_one_loss']

        if evaluation_metric in maximum_metrics:
            index_of_best_model_score=list(dictionary_best_metric.keys()).index(max(dictionary_best_metric, key=dictionary_best_metric.get))

        else:
            index_of_best_model_score=list(dictionary_best_metric.keys()).index(min(dictionary_best_metric, key=dictionary_best_metric.get))

        classification_model_for_scoring=list(dictionary_best_estimator.values())[index_of_best_model_score]

    except Exception as e:
        print(e)

    return classification_model_for_scoring

上述四个功能是ose应用15次(15个spark数据帧,每个IMEI 1个唯一id)。因为我关心的是执行这15次函数迭代所花费的时间。如前所述,我最初通过遵循线程模块实现了一种方法。方法如下:

[线程方法]

import threading

#Creating a list of threads
device_ids=spark_df.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
device_ids=device_ids[-15:]
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)

date_format = '%Y-%m-%d %H-%M-%S'
timestamp_snapshot=datetime.utcnow()
timestamp_snap=timestamp_snapshot.strftime(date_format)

thread_list = list()

# #looping all objects, creating a thread for each element in the loop, and append them to thread_list
for location, i in enumerate(device_ids, 1):

    try:
        thread = threading.Thread(target=training_models_operation_multiprocess, args=(i, location, asset_total_number, timestamp_snap, spark_df,)
        thread_list.append(thread)
        thread.start()

    except Exception as e:
        print(e)

**[BENCHMARK OF MULTIPROCESSING APPROACH]**


#--------------------------------------
# Wait for all threads to finish
for thread in thread_list:
    thread.join()

print("Finished executing all threads")

基准测试:在具有32个CPU内核的群集上:~16m

然而,正如前面提到的,踏步并不是我最后的方法。最后,在阅读了一些关于多重处理的内容后,我选择了这种方法

[多处理方法]

from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import freeze_support
from itertools import cycle

if __name__ == '__main__':
    freeze_support()

    device_ids=datalake_spark_dataframe_downsampled.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
    device_ids=device_ids[-15:] #15 UNIQUE IMEI's 
    location=range(1, len(device_ids)+1, 1)
    devices_total_number=len(device_ids)

    pool_list=list()

    with Pool(mp.cpu_count()) as pool:
        start_time = time.time()
        tasks = [*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([datalake_spark_dataframe_downsampled]))]

        pool.starmap(training_models_operation_multiprocess,
                     iterable=(tasks),
                     chunksize=1)
        pool.close()
        pool.join()
        pool.terminate()
        end_time = time.time()
        secs_per_iteration = (end_time - start_time) / len(device_ids)
        print("Time per iteration: {0}".format(format_timespan(secs_per_iteration)))

[在具有32个CPU内核的集群上进行基准测试]

平均而言,对于每个IMEI及其相关的spark df,RandomForest和GradientBoostedTrees分别需要5分钟和6分钟来执行。 enter image description here

下面,您将注意到执行上述4个子功能(1.1、1.2、1.3、1.4)所需的时间 enter image description here

[我的问题]

在介绍了我实验的所有事实和结果之后,是时候写我的问题了

一个由32个CPU核组成的集群(2个工人,每个工人16个核)怎么可能获得如此耗时的结果?池执行是否可能需要将近12分钟的时间来对大约467行的数据帧运行交叉验证。。。我的spark df总共有7000行,每个IMEI id有15个id,我得到467行。在我谦逊的头脑中,32个CPU核具有强大的计算能力,但它们在15分钟内执行一系列函数

因此,我想了解为什么会发生这种情况:

这是火花的问题吗?也就是说,它不能正确分配32个CPU内核来执行4个简单的函数?结合多处理模块,我希望在更短的时间内完成15次迭代。也许这是我还没有理解的关于多重处理的事情,我的执行只能达到这个执行时间

我真的很感激你对这件事的意见,因为也许我没有抓住多重处理的重点。我无法接受这样一个事实:我有32个cpu核,每个池的执行需要1分钟,Spark才能完成。请不要考虑我使用Spark来训练500行数据帧的数据这一事实,因为在不久的将来,这个df将有100000行甚至更多。所以我想到了Spark over Python在如此少的行数上的缺点。但我想了解更多的是多重处理方法


Tags: 数据idformattargetdftimevaluedevice

热门问题