如何在Python中对类实例使用多处理?

2024-03-29 14:41:17 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图创建一个类,这样就可以运行一个单独的进程去做一些需要很长时间的工作,从一个主模块中启动一堆,然后等待它们全部完成。我只想启动一次流程,然后继续为它们提供要做的事情,而不是创建和销毁流程。例如,我可能有10个服务器运行dd命令,然后我希望它们都scp一个文件,等等

我的最终目标是为每个系统创建一个类,该类跟踪与之绑定的系统的信息,如IP地址、日志、运行时等,但该类必须能够启动一个系统命令,然后在该系统命令运行时将执行返回给调用方,以便稍后跟踪系统命令的结果。

我的尝试失败,因为我无法通过pickle通过管道将类的实例方法发送到子进程。那些是不可腌制的。因此我试着用各种方法来解决它,但我想不通。如何修补我的代码才能做到这一点?如果不能发送有用的信息,多处理有什么好处?

有没有关于类实例使用多处理的好文档?让多处理模块工作的唯一方法是使用简单的函数。每次尝试在类实例中使用它都失败了。也许我应该通过事件来代替?我还不知道怎么做。

import multiprocessing
import sys
import re

class ProcessWorker(multiprocessing.Process):
    """
    This class runs as a separate process to execute worker's commands in parallel
    Once launched, it remains running, monitoring the task queue, until "None" is sent
    """

    def __init__(self, task_q, result_q):
        multiprocessing.Process.__init__(self)
        self.task_q = task_q
        self.result_q = result_q
        return

    def run(self):
        """
        Overloaded function provided by multiprocessing.Process.  Called upon start() signal
        """
        proc_name = self.name
        print '%s: Launched' % (proc_name)
        while True:
            next_task_list = self.task_q.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % (proc_name)
                self.task_q.task_done()
                break
            next_task = next_task_list[0]
            print '%s: %s' % (proc_name, next_task)
            args = next_task_list[1]
            kwargs = next_task_list[2]
            answer = next_task(*args, **kwargs)
            self.task_q.task_done()
            self.result_q.put(answer)
        return
# End of ProcessWorker class

class Worker(object):
    """
    Launches a child process to run commands from derived classes in separate processes,
    which sit and listen for something to do
    This base class is called by each derived worker
    """
    def __init__(self, config, index=None):
        self.config = config
        self.index = index

        # Launce the ProcessWorker for anything that has an index value
        if self.index is not None:
            self.task_q = multiprocessing.JoinableQueue()
            self.result_q = multiprocessing.Queue()

            self.process_worker = ProcessWorker(self.task_q, self.result_q)
            self.process_worker.start()
            print "Got here"
            # Process should be running and listening for functions to execute
        return

    def enqueue_process(target):  # No self, since it is a decorator
        """
        Used to place an command target from this class object into the task_q
        NOTE: Any function decorated with this must use fetch_results() to get the
        target task's result value
        """
        def wrapper(self, *args, **kwargs):
            self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
        return wrapper

    def fetch_results(self):
        """
        After all processes have been spawned by multiple modules, this command
        is called on each one to retreive the results of the call.
        This blocks until the execution of the item in the queue is complete
        """
        self.task_q.join()                          # Wait for it to to finish
        return self.result_q.get()                  # Return the result

    @enqueue_process
    def run_long_command(self, command):
        print "I am running number % as process "%number, self.name

        # In here, I will launch a subprocess to run a  long-running system command
        # p = Popen(command), etc
        # p.wait(), etc
        return 

    def close(self):
        self.task_q.put(None)
        self.task_q.join()

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(5):
        worker = Worker(config, index)
        worker.run_long_command("ls /")
        workers.append(worker)
    for worker in workers:
        worker.fetch_results()

    # Do more work... (this would actually be done in a distributor in another class)

    for worker in workers:
        worker.close() 

编辑:我试图将ProcessWorker类和多处理队列的创建移到Worker类之外,然后尝试手动pickle工作实例。即使那不起作用,我也会出错

RuntimeError: Queue objects should only be shared between processes through inheritance

是的。但我只是将这些队列的引用传递到worker实例??我错过了一些基本的东西。以下是主要部分中修改后的代码:

if __name__ == '__main__':
    config = ["some value", "something else"]
    index = 7
    workers = []
    for i in range(1):
        task_q = multiprocessing.JoinableQueue()
        result_q = multiprocessing.Queue()
        process_worker = ProcessWorker(task_q, result_q)
        worker = Worker(config, index, process_worker, task_q, result_q)
        something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??
        process_worker.start()
        worker.run_long_command("ls /")

Tags: thetonameinselftaskindexis
2条回答
< >问题是,我假设Python在做某种魔术,这与C++ +Frk()的工作方式有些不同。不知怎的,我认为Python只复制了类,而不是将整个程序复制到单独的进程中。我真的浪费了很多时间试图让它工作,因为所有关于pickle序列化的讨论都让我认为它实际上把所有东西都发送到了管道上。我知道有些东西不能通过管道传送,但我想我的问题是我没有把东西包装好。

如果Python文档给了我一个10000英尺的视图,让我了解使用这个模块时会发生什么,这一切都可以避免。当然,它告诉我多处理模块的方法,并给我一些基本的例子,但我想知道的是什么是幕后的“操作理论”!这是我可以使用的信息。如果我的答案是否定的,请插嘴。它将帮助我学习。

使用此模块运行启动进程时,整个程序将复制到另一个进程中。但由于它不是“__main__”进程,而且我的代码正在检查它,所以它不会无限地触发另一个进程。它只是停下来,坐在那里等着做点什么,就像僵尸一样。调用multiprocess.Process()时在父进程中初始化的所有内容都已设置好并准备就绪。一旦您将某个东西放入多进程、队列、共享内存或管道等(不管您是如何通信的),那么单独的进程就会接收它并开始工作。它可以在所有导入的模块上绘制并设置,就像它是父模块一样。但是,一旦父进程或单独进程中的某些内部状态变量发生更改,这些更改将被隔离。一旦生成了进程,现在您的工作就是在必要时通过队列、管道、共享内存等使它们保持同步

我抛出了代码并重新开始,但是现在我只在ProcessWorker中抛出了一个额外的函数,这是一个运行命令行的“execute”方法。很简单。我不必担心启动,然后关闭这样的过程,这给我造成了各种不稳定和性能问题在过去的C++。当我在开始时切换到启动进程,然后将消息传递给那些等待的进程时,我的性能提高了,而且非常稳定。

顺便说一句,我查看了这个链接以获取帮助,这让我很失望,因为这个示例使我认为方法是跨队列传输的:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html 第一部分的第二个示例使用了“next_task()”(对我来说)似乎是在执行通过队列接收的任务。

与其尝试发送方法本身(这是不实际的),不如尝试发送要执行的方法的名称

如果每个工作进程运行相同的代码,这只是一个简单的问题getattr(self, task_name)

我将传递元组(task_name, task_args),其中task_args是直接馈送给task方法的dict:

next_task_name, next_task_args = self.task_q.get()
if next_task_name:
  task = getattr(self, next_task_name)
  answer = task(**next_task_args)
  ...
else:
  # poison pill, shut down
  break

相关问题 更多 >