python多处理池如果值为

2024-06-07 06:20:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个脚本,我可以随机创建对象,但我不想重复。它们被存储起来,每次我创建一个新的,我都会对照已经存在的进行检查。由于我想对大量的对象执行此操作,我现在正在尝试将其并行化,但迄今为止没有成功。我尝试了一些在网上找到的解决方案(事实上主要是在这里),但仍然没有奏效。在

我的想法是启动一个池并将我的函数映射到它。当进程找到匹配项时,它将值设置为1。这个值对所有进程都是可读的,他们可以用锁来写它,我需要它在最后返回。因此,我生成了一个Lock和一个Value,这样所有进程都可以读取该值(因此是lock=False),并检查是否在另一个进程中找到了匹配项。然后我用Event尝试了一些不同的方法,并检查它是否已设置,但这仍然不起作用。。。然后我试图提出一个特殊的Exception,但仍然没有成功地使代码成功。在

拜托,我更喜欢编程OOP,所以在我最后一个资源之前,我会避免定义一个global变量,因为我认为它们是不确定的(个人观点)。在

这是一个MWE,我用int替换了复杂的对象,用range(10000)替换了存储的对象,以帮助您理解。在

#!/usr/bin/env python3

import multiprocessing as muproc


def ParallelCheck(me):
    print(" Testing {}...".format(me))
    #manager = muproc.Manager()
    #lock = manager.Lock()
    lock = muproc.Lock()
    back = muproc.Value("i", 0, lock=False)
    ParChild = ParallelChild(me, lock, back)
    with muproc.Pool() as pool:
        try:
            pool.map(ParChild.run, range(10000))
        except AbortPool:
            pool.terminate()
            print("pool")
    return back.value


def Computation(me, neighbour):
    return me == neighbour



class ParallelChild(object):
    def __init__(self, me, lock, back):
        self.abort = muproc.Event()
        self.lock = lock
        self.me = me
        self.back = back

    def run(self, neighbour):
        print("run")
        if self.abort.is_set():
            print("Aborting")
            pass
        else:
            if Computation(self.me, neighbour):
                self.lock.acquire()
                self.abort.set()
                self.back.value = 1
                print("GOTCHA")
                self.lock.release()
                raise AbortPool
            else:
                print("...")


class AbortPool(Exception):
    #pass
    def __init__(self):
        ## Just to check
        print("AbortPool raised!")



if __name__ == "__main__":
    values = [12000, 13, 7]
    for v in values:
        print("value={} match={}".format(v, ParallelCheck(v)))

现在它产生一个RunTimeError

^{pr2}$

我想这和Lock(虽然注释了Manager,但这并没有更好地工作)或者{}有关,但是现在想知道如何摆脱它了。。。在

编辑

当我继续尝试改变我的代码以我想要的方式工作时,我意识到我没有提到我的主要问题是什么。我真正的困难是,如果找到匹配项,池中的所有进程都将停止。这就是我需要的,这样并行运行比串行运行好。现在,我可以使用一个事件来告诉子进程是否已经找到匹配项,但是它会在数据中循环,即使我引发了一个异常。。。在

编辑2

简单地说,我有以下几点。。。在

for o in objects:
    if too_close(o, existing_objects):
        return 1
return 0

…我想在CPU之间分配类似。。。在

for o in objects:
    if too_close(o, some_existing_objects):
        return 1 and abort other processes
return 0

Tags: selflockreturnif进程defbackme
1条回答
网友
1楼 · 发布于 2024-06-07 06:20:03

为了寻找答案,我把剧本弄得太复杂了。 我试着从接近原始医生的东西开始 多处理模块。 但没有成功,我想办法解决它,并添加了一些东西。在

我不是python多处理方面的专家,但在尝试了一段时间后, 我发现在第一次匹配时中止pool.map的唯一方法是使用事件 所有进程都知道它已经发生了,然后它们都抛出一个特殊的异常 使自己流产。 我可以把值和锁去掉,它们对我的案子没用。在

但我做事的方式可能效率不高。 产生这些过程需要大量的计算时间, 每个进程都会将运行所需的数据复制到自己的内存中。在

我尝试生成较少的进程,但每个进程都具有较少的数据和 他们将自己迭代的数据集(不让池处理此部分)。 所以我可以选择哪些数据将被传送到哪个进程。 在我的例子中,我将range(10000)分成4个进程 每个都有2500个范围。在

我只想知道是否有匹配,这样我可以进一步简化。 我可以设置,当找到匹配项时,事件被设置,函数返回以便停止。 另一个进程测试事件的状态,一旦设置好,它们就会返回并停止自己的操作。在

现在回到主流程,最后我只看了一下事件 (当然,一开始也别忘了清理)。 如果设置了它,就会找到一个匹配项,就这么简单。在

缺点是我必须声明全局的。。。 否则,当进程被派生时,每个子进程将生成一个副本 他们将无法在他们之间以及与主进程进行通信。在

但正如bj0已经提到的,将这个问题并行化可能不会更好。。。在

在实现了这两种方法后,我将它们与串行问题进行了比较,下面是我的结果 对于同一台机器的给定案例:

  • 序列号:7s
  • 带游泳池:910s
  • 有3个进程,每个进程都有自己的数据集:97s

所以这里没有更好的。。。我将坚持我的串行实现,并寻找其他方法来加速事情,像其他方法,而不是完全随机。。。在

以下是我的MWE的最后一个工作版本:

#!/usr/bin/env python3
import multiprocessing as muproc

def ParallelCheck(me):
    print(" Testing {}...".format(me))
    global abort
    abort.clear()
    ParChild = ParallelChild(me)
    jobs = []
    N = 4
    for i in range(N):
        jobs.append(muproc.Process(target = ParChild.run, args=(range(i * 2500, (i+1) * 2500),)))
    for p in jobs:
        p.start()
    for p in jobs:
        p.join()
    if abort.is_set():
        print("MATCH FOUND")
        return 1
    else:
        print(" no match...")
        return 0


def Computation(me, neighbour):
    return me == neighbour


class ParallelChild(object):
    def __init__(self, me):
        self.me = me

    def run(self, neighbours):
        global abort
        for neighbour in neighbours:
            print("{} vs {} by {}".format(self.me, neighbour, self.CurProc()))
            if abort.is_set():
                print("Aborting {}".format(self.CurProc()))
                return 0
            else:
                if Computation(self.me, neighbour):
                    abort.set()
                    print("GOTCHA {}".format(self.CurProc()))
                    return 1

    def CurProc(self):
        return muproc.current_process()._identity[0]



if __name__ == "__main__":
    abort = muproc.Event()
    values = [12000, 130, 7]
    for v in values:
        print("value={} match={}".format(v, ParallelCheck(v)))

相关问题 更多 >