请注意,由于这篇文章篇幅较长,我建议您查看每个函数的描述。这是因为,这些函数在没有任何错误的情况下成功执行。我只是将它们呈现给读者,让读者大致了解执行的代码。因此,请多关注我的理论部分和问题部分,而不是技术部分
[理论部分-解释情况]
首先,我想指出这是一个与执行时间相关的问题。虽然时间执行是我关心的问题,但所演示的代码工作得非常好
我想听听你对过去几天我在Databricks集群上处理线程和多处理Python模块时遇到的问题的看法,该集群有32个CPU内核。非常简单地说,我创建了一个函数(如下所示),它将Spark数据帧作为输入,并训练两个Spark MLlib分类器。在培训之前,使用类似Spark的命令对Spark数据帧进行一些额外的清理和准备。将显示训练和预处理每个spark数据帧所需的时间。该函数包括训练和预处理功能,应用15次(也就是15个不同的spark数据帧)。因此,您可以理解,我使用线程和多处理的目标是一次执行这15个迭代,而不是顺序执行(一个接一个)。只要想象一下,这15次迭代在不久的将来将变成1500次。因此,这是未来数据规模扩大的一个基准
在继续之前,我想说明一下我在研究线程和多处理时得出的一些结论。基于Brendan Fortuner的article,线程主要用于受GIL限制的I/O绑定任务(防止两个线程在同一程序中同时执行)。另一方面,多处理模块使用进程来加速CPU密集型Python操作,因为它们受益于多核并避免了GIL。因此,尽管我最初创建了一个线程相似的应用程序,同时应用我的函数15次,但由于上面所述的原因,我后来改用了多处理方法
[技术部分]
火花数据帧
spark_df= pd.DataFrame({ 'IMEI' : ['358639059721529', '358639059721529', '358639059721529', '358639059721529', '358639059721529', '358639059721735', '358639059721735', '358639059721735', '358639059721735', '358639059721735'],
'PoweredOn': [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
'InnerSensorConnected': [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
'averageInnerTemperature': [2.5083397819149877, 12.76785419845581, 2.5431994716326396, 2.5875612214150556, 2.5786447594332143, 2.6642078435610212, 12.767857551574707, 12.767857551574707, 2.6131772499486625, 2.5172743565284166]
'OuterSensorConnected':[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826, 31.784826],
'EnergyConsumption': [70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0],
'DaysDeploymentDate': [10.0, 20.0, 21.0, 31.0, 41.0, 11.0, 19.0, 57.0, 44.0, 141.0],
'label': [0, 0, 1, 1, 1, 0, 0, 1, 1, 1]
}
)
spark_df= spark.createDataFrame(spark_df)
数据帧的出现只是为了记住所使用的spark数据帧。假设这10行是7000行,2个IMEI实际上是15个唯一的IMEI,因为我告诉过你,我有15个spark数据帧,每个IMEI有1个(['358639059721529','358639059721735'])
*[应用的功能]
def training_models_operation_multiprocess(asset_id, location, asset_total_number, timestamp_snap, joined_spark_dataset):
#-------------------------------------------------------------------------------------------------------------------------
# KEYWORDS INITIALIZATION
#-------------------------------------------------------------------------------------------------------------------------
device_length=int(asset_total_number)
list_string_outputs=list()
max_workers=16*2
training_split_ratio=0.5
testing_split_ratio=0.5
cross_validation_rounds=2
optimization_metric="ROC_AUC"
features_column_name="features"
disable_logging_value=1 # a value that prevents standard output to be logged at Application insights
logger_initialization=instantiate_logger(instrumentation_key_value) #a logger instance
# Time format
date_format = '%Y-%m-%d %H-%M-%S'
#-------------------------------------------------------------------------------------------------------------------------
# KEYWORDS INITIALIZED
#-------------------------------------------------------------------------------------------------------------------------
try:
print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length))
begin_time_0 = time.time()
#1.1 Filter the rows related to the current asset
begin_time_1 = time.time()
filtered_dataset=joined_spark_dataset.where(joined_spark_dataset.IMEI.isin([asset_id]))
filtered_dataset=apply_repartitioning(filtered_dataset, max_workers)
end_time_1 = time.time() - begin_time_1
list_string_outputs.append("{0}: FINISH Step 1.1 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_1)))
#------------------------
# FUNCTION: 1.2 Preprocess
begin_time_2 = time.time()
target_column_name=None
target_column_name='label'
preprocessed_spark_df=preprocess_data_pipeline(filtered_dataset, drop_columns_not_used_in_training, target_column_name, executor)
preprocessed_spark_df=apply_repartitioning(preprocessed_spark_df, max_workers)
end_time_2 = time.time() - begin_time_2
list_string_outputs.append("{0}: FINISH Step 1.2 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_2)))
#------------------------
#FUNCTION: 1.3 Train-Test split
begin_time_3 = time.time()
target_column_name=None
target_column_name='target'
training_data, testing_data=spark_train_test_split(asset_id, preprocessed_spark_df, training_split_ratio, testing_split_ratio, target_column_name, disable_logging_value, logger_initialization)
training_data=apply_repartitioning(training_data, max_workers)
testing_data=apply_repartitioning(testing_data, max_workers)
end_time_3 = time.time() - begin_time_3
list_string_outputs.append("{0}: FINISH Step 1.3 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_3)))
#FUNCTION: 1.4 Train the algorithms
begin_time_4 = time.time()
best_classifier_asset_id=spark_ml_classification(asset_id, cross_validation_rounds, training_data, testing_data, target_column_name, features_column_name, optimization_metric, disable_logging_value,
logger_initialization)
end_time_4 = time.time() - begin_time_4
list_string_outputs.append("{0}: FINISH Step 1.4 asset id {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_4)))
end_time_0 = time.time() - begin_time_0
list_string_outputs.append("{0}: END EXECUTION PLAN OF ASSET ID {1}: {2}/{3} in: {4}\n".format(datetime.utcnow().strftime(date_format), asset_id, location, device_length, format_timespan(end_time_0)))
except Exception as e:
custom_logging_function(logger_initialization, disable_logging_value, "ERROR", "ERROR EXCEPTION captured in asset id {0}: {1}".format(asset_id, e))
raise
print(" ".join(list_string_outputs))
[功能1.1]:根据IMEI过滤数据集 描述:从包含所有IMEI ID的整个数据集中,仅过滤属于IMEI每次迭代编号的行
设备ID=['358639059721529','358639059721735'] 过滤的数据集=spark\u df.where(spark\u df.IMEI.isin([device\u id]))
[功能1.2]:预处理火花DF 描述:将矢量汇编程序应用于可培训的功能,将StringIndexer应用于标签
def preprocess_data_pipeline(spark_df, target_variable)
stages = []
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol=target_variable, outputCol="target").setHandleInvalid("keep") #target variable shoule be IntegerType
stages += [label_stringIdx]
numeric_columns=["PoweredOn", "InnerSensorConnected", "averageInnerTemperature", "OuterSensorConnected", "OuterHumidity", "EnergyConsumption", "DaysDeploymentDate"]
# Vectorize trainable features
assemblerInputs = numeric_columns
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid("keep")
stages += [assembler]
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(spark_df)
preppedDataDF = pipelineModel.transform(spark_df)
# Keep relevant columns
selectedcols = ["target", "features"]
dataset = preppedDataDF.select(selectedcols)
dataset=dataset.drop(target_variable)
#dataset.printSchema()
return dataset
[功能1.3:列车测试拆分] *使用分层方法分割数据以训练和测试spark df*
def spark_train_test_split(device_id, prepared_spark_df, train_split_ratio, test_split_ratio, target_variable):
trainingData = prepared_spark_df.sampleBy(target_variable, fractions={0: train_split_ratio, 1: train_split_ratio}, seed=10)
testData = prepared_spark_df.subtract(trainingData)
return trainingData, testData
[功能1.4:训练ML算法] 描述:训练两种分类算法,然后选择ROC_AUC得分最高的一种。每个分类器都使用Spark MLlib的CrossValidator类进行训练…对于第一个分类器(随机森林),我交叉验证了4个模型,而对于第二个分类器(梯度增强树),我交叉验证了8个模型。为了加快速度,我将交叉验证程序类的parallelism参数设置为8(explautionhere)
def machine_learning_estimator_initialization(model_name, target_variable, features_variable):
try:
dictionary_best_metric={}
dictionary_best_estimator={}
list_of_classifiers=["RandomForest Classifier", "GradientBoost Classifier"]
begin_time_train=time.time()
for i in list_of_classifiers:
pipeline_object, paramGrid, evaluator=machine_learning_estimator_initialization(i, target_column, features_column)
start_time_classifier=time.time()
# THE MOST TIME CONSUMING PART OF MY EXECUTION
classification_object = CrossValidator(estimator=pipeline_object, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=cross_validation_rounds, parallelism=8)
classificationModel = classification_object.fit(training_dataset)
end_time_classifier=time.time()-start_time_classifier
print("Time passed to complete training for classifier {0} of asset id {1}: {2}".format(i, device_id, format_timespan(end_time_classifier)))
predictions = classificationModel.transform(testing_dataset)
evaluation_score_classifier=evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
y_true = predictions.select([target_column]).collect()
y_pred = predictions.select(['prediction']).collect()
confusion_mat=confusion_matrix(y_true, y_pred)
confusion_table=pd.DataFrame(confusion_mat,
columns=['0','1'],
index=['0','1'])
accuracy_value=accuracy_score(y_true, y_pred)
f1_value=f1_score(y_true, y_pred, zero_division=1)
precision_value=precision_score(y_true, y_pred, zero_division=1)
recall_value=recall_score(y_true, y_pred, zero_division=1)
hamming_loss_value=hamming_loss(y_true, y_pred)
zero_one_loss_value=zero_one_loss(y_true, y_pred, normalize=False)
list_of_metrics=['ROC_AUC', 'accuracy', 'f1', 'precision', 'recall', 'hamming_loss', 'zero_one_loss']
list_of_metric_values=[evaluation_score_classifier, accuracy_value, f1_value, precision_value, recall_value, hamming_loss_value, zero_one_loss_value]
evaluation_metric_name_index=list_of_metrics.index(evaluation_metric) # With this index I can locate any value selected for the evaluation metric
if evaluation_metric=='ROC_AUC':
dictionary_best_metric.update({"{0}_best_score".format(i): evaluation_score_classifier}) #alternative hamming_loss_value
else:
dictionary_best_metric.update({"{0}_best_score".format(i): list_of_metric_values[evaluation_metric_name_index]})
dictionary_best_estimator.update({"{0}_best_estimator".format(i): classificationModel.bestModel})
end_time_train=time.time()-begin_time_train
print("Total time of training execution of two MLlib algorithms for the asset {0}: {1}".format(device_id, format_timespan(end_time_train)))
maximum_metrics=['ROC_AUC', 'accuracy', 'f1', 'precision', 'recall']
minimum_metrics=['hamming_loss', 'zero_one_loss']
if evaluation_metric in maximum_metrics:
index_of_best_model_score=list(dictionary_best_metric.keys()).index(max(dictionary_best_metric, key=dictionary_best_metric.get))
else:
index_of_best_model_score=list(dictionary_best_metric.keys()).index(min(dictionary_best_metric, key=dictionary_best_metric.get))
classification_model_for_scoring=list(dictionary_best_estimator.values())[index_of_best_model_score]
except Exception as e:
print(e)
return classification_model_for_scoring
上述四个功能是ose应用15次(15个spark数据帧,每个IMEI 1个唯一id)。因为我关心的是执行这15次函数迭代所花费的时间。如前所述,我最初通过遵循线程模块实现了一种方法。方法如下:
[线程方法]
import threading
#Creating a list of threads
device_ids=spark_df.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
device_ids=device_ids[-15:]
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)
date_format = '%Y-%m-%d %H-%M-%S'
timestamp_snapshot=datetime.utcnow()
timestamp_snap=timestamp_snapshot.strftime(date_format)
thread_list = list()
# #looping all objects, creating a thread for each element in the loop, and append them to thread_list
for location, i in enumerate(device_ids, 1):
try:
thread = threading.Thread(target=training_models_operation_multiprocess, args=(i, location, asset_total_number, timestamp_snap, spark_df,)
thread_list.append(thread)
thread.start()
except Exception as e:
print(e)
**[BENCHMARK OF MULTIPROCESSING APPROACH]**
#--------------------------------------
# Wait for all threads to finish
for thread in thread_list:
thread.join()
print("Finished executing all threads")
基准测试:在具有32个CPU内核的群集上:~16m
然而,正如前面提到的,踏步并不是我最后的方法。最后,在阅读了一些关于多重处理的内容后,我选择了这种方法
[多处理方法]
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import freeze_support
from itertools import cycle
if __name__ == '__main__':
freeze_support()
device_ids=datalake_spark_dataframe_downsampled.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']
device_ids=device_ids[-15:] #15 UNIQUE IMEI's
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)
pool_list=list()
with Pool(mp.cpu_count()) as pool:
start_time = time.time()
tasks = [*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([datalake_spark_dataframe_downsampled]))]
pool.starmap(training_models_operation_multiprocess,
iterable=(tasks),
chunksize=1)
pool.close()
pool.join()
pool.terminate()
end_time = time.time()
secs_per_iteration = (end_time - start_time) / len(device_ids)
print("Time per iteration: {0}".format(format_timespan(secs_per_iteration)))
[在具有32个CPU内核的集群上进行基准测试]
平均而言,对于每个IMEI及其相关的spark df,RandomForest和GradientBoostedTrees分别需要5分钟和6分钟来执行。
下面,您将注意到执行上述4个子功能(1.1、1.2、1.3、1.4)所需的时间
[我的问题]
在介绍了我实验的所有事实和结果之后,是时候写我的问题了
一个由32个CPU核组成的集群(2个工人,每个工人16个核)怎么可能获得如此耗时的结果?池执行是否可能需要将近12分钟的时间来对大约467行的数据帧运行交叉验证。。。我的spark df总共有7000行,每个IMEI id有15个id,我得到467行。在我谦逊的头脑中,32个CPU核具有强大的计算能力,但它们在15分钟内执行一系列函数
因此,我想了解为什么会发生这种情况:
这是火花的问题吗?也就是说,它不能正确分配32个CPU内核来执行4个简单的函数?结合多处理模块,我希望在更短的时间内完成15次迭代。也许这是我还没有理解的关于多重处理的事情,我的执行只能达到这个执行时间
我真的很感激你对这件事的意见,因为也许我没有抓住多重处理的重点。我无法接受这样一个事实:我有32个cpu核,每个池的执行需要1分钟,Spark才能完成。请不要考虑我使用Spark来训练500行数据帧的数据这一事实,因为在不久的将来,这个df将有100000行甚至更多。所以我想到了Spark over Python在如此少的行数上的缺点。但我想了解更多的是多重处理方法
目前没有回答
相关问题 更多 >
编程相关推荐