从另一个线程唤醒Python中特定线程的睡眠状态

3 投票
2 回答
1195 浏览
提问于 2025-04-18 18:39

我这几天一直在纠结这个问题,快要疯了。我是个完全的新手,对Python一窍不通,所以请多包涵我的无知。

我的主线程会不断检查数据库,看有没有新记录,然后为每一条新记录启动一个线程。

这些线程的主要工作就是去数据库里查一个值,如果没找到,就做一些事情,然后睡60秒再重新开始。

下面是一个简化的线程工作流程:

while True:
    stop = _Get_a_Value_From_Database_for_Exit() #..... a call to DBMS 
    If stop = 0:
        Do_stuff()
        time.sleep(60)
    else:
        break

在任何时候,可能会有很多这样的线程在运行。我想要做的是,让主线程去数据库的另一个地方检查一个特定的值,然后可以打断上面提到的某个线程的睡眠。我的目标是能够让某个特定的线程退出,而不需要等它剩下的睡眠时间。所有这些线程都可以通过共享的数据库ID来引用。我看到有人提到过event.wait()event.set(),我一直在尝试弄明白怎么用它们来替代time.sleep(),但我完全不知道怎么才能让它们唤醒特定的线程,而不是所有的线程。

这就是我无知的地方:有没有办法让我根据数据库的id来使用event.wait(比如在启动的线程中用12345.wait(60),在主线程中用12345.set(),这些都是动态的,基于不断变化的数据库id)?

谢谢你的帮助!!

2 个回答

0

与其不停地在数据库里转圈圈,反复查找一个无聊的值,还不如改变你的设计思路,采用分布式的进程间消息传递方式。这样可以避免无休止地重复查询数据库,检查值是否相等,然后再试图“唤醒”它。

ZeroMQnanomsg都是很聪明的消息传递工具,它们不需要中介,能很好地处理这种情况。

我认为,试图把火和水结合在一起,对现实世界的系统没有什么好处。

而一个聪明、可扩展的分布式进程间设计则是有益的。

( 图示:简单的分布式进程间消息传递,感谢 imatix/ZeroMQ )

( 图示:简单的分布式进程间消息传递/协调,感谢 imatix/ZeroMQ )

2

这个项目有点复杂,下面是我对它的理解。

  • 扫描数据库文件 /tmp/db.dat,里面预先填了两个单词。

  • 管理者:为每个单词创建一个线程;默认情况下是一个“whiskey”线程和一个“syrup”线程。

  • 如果一个单词以 _stop 结尾,比如 syrup_stop,就告诉那个线程结束,方法是设置它的停止事件。

  • 每个线程会扫描数据库文件,如果看到单词 stop,就会退出。如果它的停止事件被设置了,也会退出。

  • 注意,如果管理者线程设置了某个工作线程的停止事件,那个工作线程会立刻退出。每个线程会做一点事情,但大部分时间都在 stop_ev.wait() 这个调用上。因此,当事件被设置时,它就不需要再等,可以立刻退出。

这个服务器玩起来很有趣!启动它后,可以通过往数据库里添加行来发送命令。试试下面的每一个:

$ echo pie >> /tmp/db.dat  # start new thread

$ echo pie_stop >> /tmp/db.dat # stop thread by event

$ echo whiskey_stop >> /tmp/db.dat # stop another thread "

$ echo stop >> /tmp/db.dat # stop all threads

源代码

import logging, sys, threading, time

STOP_VALUE = 'stop'

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)-4s %(threadName)s %(levelname)s %(message)s", 
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)


class Database(list):
    PATH = '/tmp/db.dat'
    def __init__(self):
        super(Database,self).__init__()
        self._update_lock = threading.Lock()

    def update(self):
        with self._update_lock:
            self[:] = [ line.strip() for line in open(self.PATH) ]

db = Database()

def spawn(events, key):
    events[key] = threading.Event()
    th = threading.Thread(
        target=search_worker,
        kwargs=dict(stop_ev=events[key]),
        name='thread-{}'.format(key),
    )
    th.daemon = True
    th.start()

def search_worker(stop_ev):
    """
    scan database until "stop" found, or our event is set
    """
    logging.info('start')
    while True:
        logging.debug('scan')
        db.update()
        if STOP_VALUE in db:
            logging.info('stopvalue: done')
            return
        if stop_ev.wait(timeout=10):
            logging.info('event: done')
            return 

def manager():
    """
    scan database
    - word: spawn thread if none already
    - word_stop: tell thread to die by setting its stop event
    """
    logging.info('start')
    events = dict()
    while True:
        db.update()
        for key in db:
            if key == STOP_VALUE:
                continue
            if key in events:
                continue
            if key.endswith('_stop'):
                key = key.split('_')[0]
                if key not in events:
                    logging.error('stop: missing key=%s!', key)
                else:
                    # signal thread to stop
                    logging.info('stop: key=%s', key)
                    events[key].set()
                    del events[key]
            else:
                spawn(events, key)
                logging.info('spawn: key=%s', key)
        time.sleep(2)


if __name__=='__main__':

    with open(Database.PATH, 'w') as dbf:
        dbf.write(
        'whiskey\nsyrup\n'
        )

    db.update()
    logging.info('start: db=%s -- %s', db.PATH, db)

    manager_t = threading.Thread(
        target=manager,
        name='manager',
    )
    manager_t.start()
    manager_t.join()

撰写回答