回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<p>我试图创建一个类,这样就可以运行一个单独的进程去做一些需要很长时间的工作,从一个主模块中启动一堆,然后等待它们全部完成。我只想启动一次流程,然后继续为它们提供要做的事情,而不是创建和销毁流程。例如,我可能有10个服务器运行dd命令,然后我希望它们都scp一个文件,等等</p>
<p>我的最终目标是为每个系统创建一个类,该类跟踪与之绑定的系统的信息,如IP地址、日志、运行时等,但该类必须能够启动一个系统命令,然后在该系统命令运行时将执行返回给调用方,以便稍后跟踪系统命令的结果。</p>
<p>我的尝试失败,因为我无法通过pickle通过管道将类的实例方法发送到子进程。那些是不可腌制的。因此我试着用各种方法来解决它,但我想不通。如何修补我的代码才能做到这一点?如果不能发送有用的信息,多处理有什么好处?</p>
<p>有没有关于类实例使用多处理的好文档?让多处理模块工作的唯一方法是使用简单的函数。每次尝试在类实例中使用它都失败了。也许我应该通过事件来代替?我还不知道怎么做。</p>
<pre><code>import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker's commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task's result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.<a href="https://www.cnpython.com/list/append" class="inner-link">append</a>(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
</code></pre>
<p>编辑:我试图将<code>ProcessWorker</code>类和多处理队列的创建移到<code>Worker</code>类之外,然后尝试手动pickle工作实例。即使那不起作用,我也会出错</p>
<blockquote>
<p>RuntimeError: Queue objects should only be shared between processes
through inheritance</p>
</blockquote>
<p>是的。但我只是将这些队列的引用传递到worker实例??我错过了一些基本的东西。以下是主要部分中修改后的代码:</p>
<pre><code>if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")
</code></pre>