暂停两个Python线程,同时让第三个线程执行操作(使用锁?)

4 投票
5 回答
3728 浏览
提问于 2025-04-17 06:10

我刚接触并发编程。

我想要重复执行三个任务。前两个任务应该一直运行,而第三个任务大约每小时运行一次。前两个任务可以同时进行,但我希望在第三个任务运行时暂停它们。

这是我尝试的代码框架:

import threading
import time

flock = threading.Lock()
glock = threading.Lock()

def f():
    while True:
        with flock:
            print 'f'
            time.sleep(1)

def g():
    while True:
        with glock:
            print 'g'
            time.sleep(1)

def h():
    while True:
        with flock:
            with glock:
                print 'h'
        time.sleep(5)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()

我本以为这段代码每秒会打印一个f和一个g,并且每五秒打印一个h。然而,当我运行它时,前面大约会打印12个f和12个g,才开始看到一些h。看起来前两个线程总是在释放和重新获取它们的锁,而第三个线程却被排除在外。

  1. 这是为什么呢?当第三个线程尝试获取一个已经被占用的锁时,如果这个锁被释放,难道不应该立即成功获取,而不是第一个或第二个线程又立刻获取到它吗?我可能理解错了什么。
  2. 有什么好的方法可以实现我想要的效果吗?

注意:把time.sleep(1)的调用移出with flock/glock块在这个简单的例子中是有效的,但在我的实际应用中就不行了,因为线程大部分时间都在执行实际操作。当前两个线程在每次执行循环体后睡一秒,并释放锁时,第三个任务仍然没有被执行。

5 个回答

1

使用通信来实现同步:

#!/usr/bin/env python
import threading
import time
from Queue import Empty, Queue

def f(q, c):
    while True:
        try: q.get_nowait(); q.get() # get PAUSE signal      
        except Empty: pass  # no signal, do our thing
        else: q.get()       # block until RESUME signal
        print c,
        time.sleep(1)

def h(queues):
    while True:
        for q in queues:
            q.put_nowait(1); q.put(1) # block until PAUSE received
        print 'h'
        for q in queues:
            q.put(1) # put RESUME
        time.sleep(5)

queues = [Queue(1) for _ in range(2)]
threading.Thread(target=f, args=(queues[0], 'f')).start()
threading.Thread(target=f, args=(queues[1], 'g')).start()
threading.Thread(target=h, args=(queues,)).start()

从性能的角度来看,这可能不是最优的选择,但我觉得这样更容易理解。

输出

f g
f g h
f g f g g f f g g f g f f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
1

最简单的方法是用三个Python进程。如果你在Linux系统上操作,那个每小时运行的进程可以发信号让其他任务暂停,或者你甚至可以把它们杀掉,然后在每小时的任务完成后再重启它们。这样就不需要用线程了。

不过,如果你一定要用线程的话,尽量不要在线程之间共享任何数据,只是互相发送消息(这也叫做数据复制,而不是数据共享)。使用线程是比较难掌握的。

而且,多个进程会强制你不共享任何东西,所以这样做起来会简单很多。如果你使用像0MQ这样的库来进行消息传递,那么从线程模型切换到多进程模型就会变得很简单。你可以在这里找到它:http://www.zeromq.org

5

可以试试用 threading.Events 来实现:

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(resume,is_waiting,name):
    while True:
        if not resume.is_set():
            is_waiting.set()
            logger.debug('{n} pausing...'.format(n=name))
            resume.wait()
            is_waiting.clear()
        logger.info(name)
        time.sleep(1)

def h(resume,waiters):
    while True:
        logger.debug('halt') 
        resume.clear()
        for i,w in enumerate(waiters):
            logger.debug('{i}: wait for worker to pause'.format(i=i))
            w.wait()
        logger.info('h begin')
        time.sleep(2)
        logger.info('h end')        
        logger.debug('resume')
        resume.set()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

# set means resume; clear means halt
resume = threading.Event()
resume.set()

waiters=[]
for name in 'fg':
    is_waiting=threading.Event()
    waiters.append(is_waiting)
    threading.Thread(target=f,args=(resume,is_waiting,name)).start()    
threading.Thread(target=h,args=(resume,waiters)).start()

这样做会得到

[07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt

(这是对评论中问题的回应)这段代码试图测量 h 线程从其他工作线程获取每个锁所需的时间。

结果显示,即使 h 正在等待获取一个锁,其他工作线程也有很高的概率会释放并重新获取这个锁。
并不是因为 h 等待的时间更长,就会优先获得锁。

David Beazley 在 PyCon 上讲过关于线程和全局解释器锁(GIL)相关的问题。这里有一份 幻灯片的 PDF。这是一篇很有趣的读物,可能会帮助你理解这个问题。

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(lock,n):
    while True:
        with lock:
            logger.info(n)
            time.sleep(1)

def h(locks):
    while True:
        t=time.time()
        for n,lock in enumerate(locks):
            lock.acquire()
            t2=time.time()
            logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
            t=t2
        t2=time.time()
        logger.info('h {d}'.format(d=t2-t))
        t=t2
        for lock in locks:
            lock.release()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

locks=[]
N=5
for n in range(N):
    lock=threading.Lock()
    locks.append(lock)
    t=threading.Thread(target=f,args=(lock,n))
    t.start()

threading.Thread(target=h,args=(locks,)).start()

撰写回答