暂停两个Python线程,同时让第三个线程执行操作(使用锁?)
我刚接触并发编程。
我想要重复执行三个任务。前两个任务应该一直运行,而第三个任务大约每小时运行一次。前两个任务可以同时进行,但我希望在第三个任务运行时暂停它们。
这是我尝试的代码框架:
import threading
import time
flock = threading.Lock()
glock = threading.Lock()
def f():
while True:
with flock:
print 'f'
time.sleep(1)
def g():
while True:
with glock:
print 'g'
time.sleep(1)
def h():
while True:
with flock:
with glock:
print 'h'
time.sleep(5)
threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()
我本以为这段代码每秒会打印一个f和一个g,并且每五秒打印一个h。然而,当我运行它时,前面大约会打印12个f和12个g,才开始看到一些h。看起来前两个线程总是在释放和重新获取它们的锁,而第三个线程却被排除在外。
- 这是为什么呢?当第三个线程尝试获取一个已经被占用的锁时,如果这个锁被释放,难道不应该立即成功获取,而不是第一个或第二个线程又立刻获取到它吗?我可能理解错了什么。
- 有什么好的方法可以实现我想要的效果吗?
注意:把time.sleep(1)
的调用移出with flock/glock块在这个简单的例子中是有效的,但在我的实际应用中就不行了,因为线程大部分时间都在执行实际操作。当前两个线程在每次执行循环体后睡一秒,并释放锁时,第三个任务仍然没有被执行。
5 个回答
使用通信来实现同步:
#!/usr/bin/env python
import threading
import time
from Queue import Empty, Queue
def f(q, c):
while True:
try: q.get_nowait(); q.get() # get PAUSE signal
except Empty: pass # no signal, do our thing
else: q.get() # block until RESUME signal
print c,
time.sleep(1)
def h(queues):
while True:
for q in queues:
q.put_nowait(1); q.put(1) # block until PAUSE received
print 'h'
for q in queues:
q.put(1) # put RESUME
time.sleep(5)
queues = [Queue(1) for _ in range(2)]
threading.Thread(target=f, args=(queues[0], 'f')).start()
threading.Thread(target=f, args=(queues[1], 'g')).start()
threading.Thread(target=h, args=(queues,)).start()
从性能的角度来看,这可能不是最优的选择,但我觉得这样更容易理解。
输出
f g
f g h
f g f g g f f g g f g f f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
最简单的方法是用三个Python进程。如果你在Linux系统上操作,那个每小时运行的进程可以发信号让其他任务暂停,或者你甚至可以把它们杀掉,然后在每小时的任务完成后再重启它们。这样就不需要用线程了。
不过,如果你一定要用线程的话,尽量不要在线程之间共享任何数据,只是互相发送消息(这也叫做数据复制,而不是数据共享)。使用线程是比较难掌握的。
而且,多个进程会强制你不共享任何东西,所以这样做起来会简单很多。如果你使用像0MQ这样的库来进行消息传递,那么从线程模型切换到多进程模型就会变得很简单。你可以在这里找到它:http://www.zeromq.org。
可以试试用 threading.Events 来实现:
import threading
import time
import logging
logger=logging.getLogger(__name__)
def f(resume,is_waiting,name):
while True:
if not resume.is_set():
is_waiting.set()
logger.debug('{n} pausing...'.format(n=name))
resume.wait()
is_waiting.clear()
logger.info(name)
time.sleep(1)
def h(resume,waiters):
while True:
logger.debug('halt')
resume.clear()
for i,w in enumerate(waiters):
logger.debug('{i}: wait for worker to pause'.format(i=i))
w.wait()
logger.info('h begin')
time.sleep(2)
logger.info('h end')
logger.debug('resume')
resume.set()
time.sleep(5)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
# set means resume; clear means halt
resume = threading.Event()
resume.set()
waiters=[]
for name in 'fg':
is_waiting=threading.Event()
waiters.append(is_waiting)
threading.Thread(target=f,args=(resume,is_waiting,name)).start()
threading.Thread(target=h,args=(resume,waiters)).start()
这样做会得到
[07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt
(这是对评论中问题的回应)这段代码试图测量 h
线程从其他工作线程获取每个锁所需的时间。
结果显示,即使 h
正在等待获取一个锁,其他工作线程也有很高的概率会释放并重新获取这个锁。
并不是因为 h
等待的时间更长,就会优先获得锁。
David Beazley 在 PyCon 上讲过关于线程和全局解释器锁(GIL)相关的问题。这里有一份 幻灯片的 PDF。这是一篇很有趣的读物,可能会帮助你理解这个问题。
import threading
import time
import logging
logger=logging.getLogger(__name__)
def f(lock,n):
while True:
with lock:
logger.info(n)
time.sleep(1)
def h(locks):
while True:
t=time.time()
for n,lock in enumerate(locks):
lock.acquire()
t2=time.time()
logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
t=t2
t2=time.time()
logger.info('h {d}'.format(d=t2-t))
t=t2
for lock in locks:
lock.release()
time.sleep(5)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
locks=[]
N=5
for n in range(N):
lock=threading.Lock()
locks.append(lock)
t=threading.Thread(target=f,args=(lock,n))
t.start()
threading.Thread(target=h,args=(locks,)).start()