防止在多进程Python应用中线程重复

1 投票
1 回答
2131 浏览
提问于 2025-04-17 22:59

我有一个用Python写的应用程序,它会通过multiprocessing.Process来同时运行多个任务。这个主程序还会启动一个线程,把进度报告到数据库里。不过,我发现如果某个任务自己再启动子进程,就会导致这个线程被复制,从而造成数据库里的数据混乱。比如,当一个孙子进程完成时,它的线程会把父任务标记为完成,因为它误以为自己就是父进程,尽管父进程其实还在运行。

我该如何使用multiprocessing.Process,让它不复制任何正在运行的线程呢?最简单的办法是记录下我线程的原始进程ID(PID),如果“当前”的PID和这个值不匹配,就立刻退出吗?

我看到去年有一个类似的问题,但似乎没有人回应。

1 个回答

2

你描述的问题表明,父进程中的一个后台线程在子进程中仍然存在并执行。这在POSIX系统上是不可能的。你遇到的情况是其他原因造成的。下面我会对这个问题进行一些推测,并给出避免这个问题的建议。我们逐一来看这些要点……

1. 只有一个线程在分叉后存活。

在调用fork()之后,只有调用这个函数的线程会继续存活。这里有一个小例子可以演示其他线程在子进程中不会继续执行:

def output():
    time.sleep(3)
    print "Thread executing in process: %d" % os.getpid()

thread = threading.Thread(target=output)
thread.start()
os.fork()
print "Pid: %d" % os.getpid()

你会看到父进程和子进程都打印它们的进程ID,但第二个线程只会在父进程中产生输出。

所以让监控线程检查它的进程ID或者以其他方式判断它在哪个进程中运行是没有意义的;这个线程只会在一个进程中执行。

2. 分叉可能导致的问题。

分叉可能会以多种方式导致程序状态的损坏。例如:

  • 在一个因分叉而终止的线程中引用的对象可能会超出作用域,从而触发它们的终结器。如果这个对象代表一个网络资源,调用它的del方法可能会导致连接的一端意外关闭,这就会造成问题。
  • 任何缓冲的输入输出都会造成问题,因为缓冲区在子进程中会被复制。

注意,第二点甚至不需要线程。考虑以下情况:

f = open("testfile", "w", 1024)
f.write("a")
os.fork()

我们在testfile中写了一个字符,并且是在父进程中写的,写完后我们进行了分叉。但我们在内容未刷新时就进行了分叉,因此:

alp:~ $ wc -c testfile
      2 testfile

文件中包含两个字符,因为输出缓冲区被复制到了子进程中,父进程和子进程最终都刷新了它们的缓冲区。

我怀疑你的问题是由类似于第二个问题引起的(虽然我承认这只是纯粹的推测)。

3. 重新设计以避免这些问题。

你在评论中提到,由于需要反复创建新的工作线程,所以你不能在生成工作线程后启动监控线程。其实,重新组织你的工作流程可能比你想象的要简单。与其为每个新的工作单元生成一个进程,不如创建一组由控制进程管理的长期工作线程:控制器将需要处理的工作任务放入一个队列中,随时处理。每个工作线程会无限循环,当有新任务到达时从队列中取出并执行。(multiprocessing中的队列实现会保证每个任务描述只会被一个工作线程提取。)这样,你只需要在一开始生成工作线程,并且可以在所有分叉完成后再创建监控线程。

下面是这种组织方式的示意例子:

from multiprocessing import Process, Queue

def work(q):
    while True:
        job = q.get()
        if job is None:
            # We've been signaled to stop.
            break
        do_something_with(job)

queue = Queue()
NUM_WORKERS = 3
NUM_JOBS = 20

# Start workers.
for _ in range(NUM_WORKERS):
    p = Process(target=work, args=(queue,))
    p.start()

# Create your monitor thread here.

# Put work in the queue.  This continues as long as you want.
for i in range(NUM_JOBS):
    queue.put(i)

# When there's no more work, put sentinel values in the queue so workers
# know to gracefully exit.
for _ in range(NUM_WORKERS):
    queue.put(None)

撰写回答