如何使数据集中的属性并行化的代码块?

2023-02-06 13:20:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用SUMO 1.3.1Ubuntu 18.04.4中的python traci模块模拟汽车消息传递。每辆车都有自己的消息数据帧,行是模拟中其他车辆的ID,列是时间,它们是在代码请求时创建的,并填充0(这无法更改,因为要求我这样做)

对每辆车进行单独检查,以查看其他车是否在给定范围内,从而向它们发送消息,如果它们在指定范围内,则会启动一个过程,在该过程中,程序将接收器表上的值更改为当前时间,并将发送者的ID更改为1或2,这取决于选择的机会。这段代码运行非常慢,因为它只使用了一个CPU。 我想要实现的是使用并行化来加快这个过程

到目前为止,我所做的是开始使用多处理库,问题是代码现在运行得更慢。我不知道是我误用了这个模块,还是有另一个模块更适合这种情况

以下是多处理前使用的代码:

for car in car_vector:
                dist = np.sqrt((pos_vector_x - traci.vehicle.getPosition(car)[0])**2 + (pos_vector_y - traci.vehicle.getPosition(car)[1])**2) #creates a vector with all distances from the cars in the simulation and the car being used in the moment
                for j , receiver in enumerate(dist): #generates a list of tupples, with j being an index for the distances, j is used to know which car correspond to that distance in the car_vector
                    if receiver < 30 and receiver > 0.5: #the distance defined is 30 meters and the condition of being higher than 0.5 comes to stop the car from sending messages to itself
                        dic_frames[car] = contamination(step - 2,car_vector[j],dic_frames[car])

污染功能为:

def contamination(time,receiver,data_msgs): #pass on the contaminated colors to the non-contaminated cars
    threshold_lost_message = 0.2
    is_critical = random.randint(1,100) == 1 
    if random.random() > threshold_lost_message:
        if is_critical:
            data_msgs.loc[data_msgs.IDs == receiver,str(time) + '00ms'] = 2 
        else:
            data_msgs.loc[data_msgs.IDs == receiver,str(time) + '00ms'] = 1 
    return data_msgs

下面是“使用”多处理的代码:

for car in car_vector:
                processes = []
                dist = np.sqrt((pos_vector_x - traci.vehicle.getPosition(car)[0])**2 + (pos_vector_y - traci.vehicle.getPosition(car)[1])**2) #creates a vector with all distances from the cars in the simulation and the car being used in the moment
                for j , receiver in enumerate(dist): #generates a list of tupples, with j being an index for the distances, j is used to know which car correspond to that distance in the car_vector
                    p = multiprocessing.Process(target=multiprocessing_func, args=(car,j,receiver,dic_frames,car_vector,step))
                    processes.append(p)
                    p.start()
                for process in processes:
                    process.join()

其中multiprocessing_func是:

def multiprocessing_func(car, j, receiver, dic_frames, car_vector, step):
    if receiver < 30 and receiver > 0.5:
        dic_frames[car_vector[j]] = contamination(step - 2,car,dic_frames[car_vector[j]]) 

有没有一种方法可以让这个进程使用多个CPU,即使它只做属性?可能一次为多辆车做这件事


Tags: andthetoinfordataframesiscarvectorreceivermsgsdicbeingtraci
1条回答
网友
1楼 · 发布于 2023-02-06 13:20:58

您可以尝试在外循环中进行并行化,因为多处理有相当大的开销

def func(car,pos_vector_x,pos_vector_y,dic_frames,car_vector,step):
     dist = ...

for car in car_vector:
    processes = []
    p = multiprocessing.Process(target=func, args=(car,pos_vector_x,pos_vector_y,dic_frames,car_vector,step)
    processes.append(p)
    p.start()
for process in processes:
    process.join()

相关问题 更多 >