从另一个线程唤醒Python中特定线程的睡眠状态
我这几天一直在纠结这个问题,快要疯了。我是个完全的新手,对Python一窍不通,所以请多包涵我的无知。
我的主线程会不断检查数据库,看有没有新记录,然后为每一条新记录启动一个线程。
这些线程的主要工作就是去数据库里查一个值,如果没找到,就做一些事情,然后睡60秒再重新开始。
下面是一个简化的线程工作流程:
while True:
stop = _Get_a_Value_From_Database_for_Exit() #..... a call to DBMS
If stop = 0:
Do_stuff()
time.sleep(60)
else:
break
在任何时候,可能会有很多这样的线程在运行。我想要做的是,让主线程去数据库的另一个地方检查一个特定的值,然后可以打断上面提到的某个线程的睡眠。我的目标是能够让某个特定的线程退出,而不需要等它剩下的睡眠时间。所有这些线程都可以通过共享的数据库ID来引用。我看到有人提到过event.wait()
和event.set()
,我一直在尝试弄明白怎么用它们来替代time.sleep()
,但我完全不知道怎么才能让它们唤醒特定的线程,而不是所有的线程。
这就是我无知的地方:有没有办法让我根据数据库的id
来使用event.wait(比如在启动的线程中用12345.wait(60)
,在主线程中用12345.set()
,这些都是动态的,基于不断变化的数据库id
)?
谢谢你的帮助!!
2 个回答
与其不停地在数据库里转圈圈,反复查找一个无聊的值,还不如改变你的设计思路,采用分布式的进程间消息传递方式。这样可以避免无休止地重复查询数据库,检查值是否相等,然后再试图“唤醒”它。
ZeroMQ
和nanomsg
都是很聪明的消息传递工具,它们不需要中介,能很好地处理这种情况。
我认为,试图把火和水结合在一起,对现实世界的系统没有什么好处。
而一个聪明、可扩展的分布式进程间设计则是有益的。
( 图示:简单的分布式进程间消息传递/协调,感谢 imatix/ZeroMQ )
这个项目有点复杂,下面是我对它的理解。
扫描数据库文件
/tmp/db.dat
,里面预先填了两个单词。管理者:为每个单词创建一个线程;默认情况下是一个“whiskey”线程和一个“syrup”线程。
如果一个单词以
_stop
结尾,比如syrup_stop
,就告诉那个线程结束,方法是设置它的停止事件。每个线程会扫描数据库文件,如果看到单词
stop
,就会退出。如果它的停止事件被设置了,也会退出。注意,如果管理者线程设置了某个工作线程的停止事件,那个工作线程会立刻退出。每个线程会做一点事情,但大部分时间都在
stop_ev.wait()
这个调用上。因此,当事件被设置时,它就不需要再等,可以立刻退出。
这个服务器玩起来很有趣!启动它后,可以通过往数据库里添加行来发送命令。试试下面的每一个:
$ echo pie >> /tmp/db.dat # start new thread
$ echo pie_stop >> /tmp/db.dat # stop thread by event
$ echo whiskey_stop >> /tmp/db.dat # stop another thread "
$ echo stop >> /tmp/db.dat # stop all threads
源代码
import logging, sys, threading, time
STOP_VALUE = 'stop'
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)-4s %(threadName)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)
class Database(list):
PATH = '/tmp/db.dat'
def __init__(self):
super(Database,self).__init__()
self._update_lock = threading.Lock()
def update(self):
with self._update_lock:
self[:] = [ line.strip() for line in open(self.PATH) ]
db = Database()
def spawn(events, key):
events[key] = threading.Event()
th = threading.Thread(
target=search_worker,
kwargs=dict(stop_ev=events[key]),
name='thread-{}'.format(key),
)
th.daemon = True
th.start()
def search_worker(stop_ev):
"""
scan database until "stop" found, or our event is set
"""
logging.info('start')
while True:
logging.debug('scan')
db.update()
if STOP_VALUE in db:
logging.info('stopvalue: done')
return
if stop_ev.wait(timeout=10):
logging.info('event: done')
return
def manager():
"""
scan database
- word: spawn thread if none already
- word_stop: tell thread to die by setting its stop event
"""
logging.info('start')
events = dict()
while True:
db.update()
for key in db:
if key == STOP_VALUE:
continue
if key in events:
continue
if key.endswith('_stop'):
key = key.split('_')[0]
if key not in events:
logging.error('stop: missing key=%s!', key)
else:
# signal thread to stop
logging.info('stop: key=%s', key)
events[key].set()
del events[key]
else:
spawn(events, key)
logging.info('spawn: key=%s', key)
time.sleep(2)
if __name__=='__main__':
with open(Database.PATH, 'w') as dbf:
dbf.write(
'whiskey\nsyrup\n'
)
db.update()
logging.info('start: db=%s -- %s', db.PATH, db)
manager_t = threading.Thread(
target=manager,
name='manager',
)
manager_t.start()
manager_t.join()