2024-05-28 19:10:22 发布
网友
因为我是python编程的新手,我想根据本文的表格加载数据,但我不知道如何进行分类培训,并将NSL_KDD数据集测试到('normal'、'dos'、'r2l'、'probe'、'u2r')。
我已经查看了GateHub中的许多代码,以预处理NSL_KDD数据集,将其分为五组(“正常”、“dos”、“r2l”、“探测”、“u2r”),但我仍然无法找到正确的代码。 有人能帮我吗?我真的需要帮助
此代码用于加载您可以找到如何PCA algorithm is used for visualization purposes. It's also used later as preprocessing for Gaussian Mixture clustering.
import os import math import itertools import multiprocessing import pandas import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from time import time from collections import OrderedDict %matplotlib inline gt0 = time() from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext, Row conf = SparkConf()\ .setMaster(f"local[{multiprocessing.cpu_count()}]")\ .setAppName("PySpark NSL-KDD")\ .setAll([("spark.driver.memory", "8g"), ("spark.default.parallelism", f"{multiprocessing.cpu_count()}")]) # Creating local SparkContext with specified SparkConf and creating SQLContext based on it sc = SparkContext.getOrCreate(conf=conf) sc.setLogLevel('INFO') sqlContext = SQLContext(sc) from pyspark.sql.types import * from pyspark.sql.functions import udf, split, col import pyspark.sql.functions as sql train20_nsl_kdd_dataset_path = os.path.join("NSL_KDD_Dataset", "KDDTrain+_20Percent.txt") train_nsl_kdd_dataset_path = os.path.join("NSL_KDD_Dataset", "KDDTrain+.txt") test_nsl_kdd_dataset_path = os.path.join("NSL_KDD_Dataset", "KDDTest+.txt") col_names = np.array(["duration","protocol_type","service","flag","src_bytes", "dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins", "logged_in","num_compromised","root_shell","su_attempted","num_root", "num_file_creations","num_shells","num_access_files","num_outbound_cmds", "is_host_login","is_guest_login","count","srv_count","serror_rate", "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate", "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count", "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate", "dst_host_rerror_rate","dst_host_srv_rerror_rate","labels"]) nominal_inx = [1, 2, 3] binary_inx = [6, 11, 13, 14, 20, 21] numeric_inx = list(set(range(41)).difference(nominal_inx).difference(binary_inx)) nominal_inx = [1, 2, 3] binary_inx = [6, 11, 13, 14, 20, 21] numeric_inx = list(set(range(41)).difference(nominal_inx).difference(binary_inx)) nominal_cols = col_names[nominal_inx].tolist() binary_cols = col_names[binary_inx].tolist() numeric_cols = col_names[numeric_inx].tolist() # Function to load dataset and divide it into 8 partitions def load_dataset(path): dataset_rdd = sc.textFile(path, 8).map(lambda line: line.split(',')) dataset_df = (dataset_rdd.toDF(col_names.tolist()).select( col('duration').cast(DoubleType()), col('protocol_type').cast(StringType()), col('service').cast(StringType()), col('flag').cast(StringType()), col('src_bytes').cast(DoubleType()), col('dst_bytes').cast(DoubleType()), col('land').cast(DoubleType()), col('wrong_fragment').cast(DoubleType()), col('urgent').cast(DoubleType()), col('hot').cast(DoubleType()), col('num_failed_logins').cast(DoubleType()), col('logged_in').cast(DoubleType()), col('num_compromised').cast(DoubleType()), col('root_shell').cast(DoubleType()), col('su_attempted').cast(DoubleType()), col('num_root').cast(DoubleType()), col('num_file_creations').cast(DoubleType()), col('num_shells').cast(DoubleType()), col('num_access_files').cast(DoubleType()), col('num_outbound_cmds').cast(DoubleType()), col('is_host_login').cast(DoubleType()), col('is_guest_login').cast(DoubleType()), col('count').cast(DoubleType()), col('srv_count').cast(DoubleType()), col('serror_rate').cast(DoubleType()), col('srv_serror_rate').cast(DoubleType()), col('rerror_rate').cast(DoubleType()), col('srv_rerror_rate').cast(DoubleType()), col('same_srv_rate').cast(DoubleType()), col('diff_srv_rate').cast(DoubleType()), col('srv_diff_host_rate').cast(DoubleType()), col('dst_host_count').cast(DoubleType()), col('dst_host_srv_count').cast(DoubleType()), col('dst_host_same_srv_rate').cast(DoubleType()), col('dst_host_diff_srv_rate').cast(DoubleType()), col('dst_host_same_src_port_rate').cast(DoubleType()), col('dst_host_srv_diff_host_rate').cast(DoubleType()), col('dst_host_serror_rate').cast(DoubleType()), col('dst_host_srv_serror_rate').cast(DoubleType()), col('dst_host_rerror_rate').cast(DoubleType()), col('dst_host_srv_rerror_rate').cast(DoubleType()), col('labels').cast(StringType()))) return dataset_df from pyspark.ml import Pipeline, Transformer from pyspark.ml.feature import StringIndexer from pyspark import keyword_only from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param # Dictionary that contains mapping of various attacks to the four main categories attack_dict = { 'normal': 'normal', 'back': 'DoS', 'land': 'DoS', 'neptune': 'DoS', 'pod': 'DoS', 'smurf': 'DoS', 'teardrop': 'DoS', 'mailbomb': 'DoS', 'apache2': 'DoS', 'processtable': 'DoS', 'udpstorm': 'DoS', 'ipsweep': 'Probe', 'nmap': 'Probe', 'portsweep': 'Probe', 'satan': 'Probe', 'mscan': 'Probe', 'saint': 'Probe', 'ftp_write': 'R2L', 'guess_passwd': 'R2L', 'imap': 'R2L', 'multihop': 'R2L', 'phf': 'R2L', 'spy': 'R2L', 'warezclient': 'R2L', 'warezmaster': 'R2L', 'sendmail': 'R2L', 'named': 'R2L', 'snmpgetattack': 'R2L', 'snmpguess': 'R2L', 'xlock': 'R2L', 'xsnoop': 'R2L', 'worm': 'R2L', 'buffer_overflow': 'U2R', 'loadmodule': 'U2R', 'perl': 'U2R', 'rootkit': 'U2R', 'httptunnel': 'U2R', 'ps': 'U2R', 'sqlattack': 'U2R', 'xterm': 'U2R' } attack_mapping_udf = udf(lambda v: attack_dict[v]) class Labels2Converter(Transformer): @keyword_only def __init__(self): super(Labels2Converter, self).__init__() def _transform(self, dataset): return dataset.withColumn('labels2', sql.regexp_replace(col('labels'), '^(?!normal).*$', 'attack')) class Labels5Converter(Transformer): @keyword_only def __init__(self): super(Labels5Converter, self).__init__() def _transform(self, dataset): return dataset.withColumn('labels5', attack_mapping_udf(col('labels'))) labels2_indexer = StringIndexer(inputCol="labels2", outputCol="labels2_index") labels5_indexer = StringIndexer(inputCol="labels5", outputCol="labels5_index") labels_mapping_pipeline = Pipeline(stages=[Labels2Converter(), Labels5Converter(), labels2_indexer, labels5_indexer]) labels2 = ['normal', 'attack'] labels5 = ['normal', 'DoS', 'Probe', 'R2L', 'U2R'] labels_col = 'labels2_index' # Loading train data t0 = time() train_df = load_dataset(train_nsl_kdd_dataset_path) # Fitting preparation pipeline labels_mapping_model = labels_mapping_pipeline.fit(train_df) # Transforming labels column and adding id column train_df = labels_mapping_model.transform(train_df).withColumn('id', sql.monotonically_increasing_id()) train_df = train_df.cache() print(f"Number of examples in train set: {train_df.count()}") print(f"Time: {time() - t0:.2f}s") # Loading test data t0 = time() test_df = load_dataset(test_nsl_kdd_dataset_path) # Transforming labels column and adding id column test_df = labels_mapping_model.transform(test_df).withColumn('id', sql.monotonically_increasing_id()) test_df = test_df.cache() print(f"Number of examples in test set: {test_df.count()}") print(f"Time: {time() - t0:.2f}s")
此代码用于加载您可以找到如何PCA algorithm is used for visualization purposes. It's also used later as preprocessing for Gaussian Mixture clustering.
相关问题 更多 >
编程相关推荐