运行Python延迟循环的最简单方法

2 投票
5 回答
2093 浏览
提问于 2025-04-17 04:21

我有一个基于事件的聊天机器人,想要实现防止垃圾信息的功能。我希望能让一个表现不好的用户暂时静音,但又不影响其他用户的使用。

以下是我尝试过但不奏效的代码:

if user_behaving_badly():
  ban( user )
  time.sleep( penalty_duration )  # Bad! Blocks the entire application!
  unban( user )

理想情况下,如果用户表现不佳,我想启动一个新的线程,这个线程只负责禁言这个用户,然后等待一段时间,再解除禁言,最后这个线程就结束了。

根据这个链接,我可以用以下方法来实现我的目标:

if user_behaving_badly():
  thread.start_new_thread( banSleepUnban, ( user, penalty ) )

通常“简单”意味着“好”,这个方法确实很简单,但我听说线程有时会带来意想不到的问题。我的问题是:有没有比这个更好的方法来实现简单的延迟循环,而不阻塞其他应用程序的运行?

5 个回答

3

为什么要使用线程呢?

do_something(user):
  if(good_user(user)):
    # do it
  else
    # don't

good_user():
  if(is_user_baned(user)):
    if(past_time_since_ban(user)):
      user_good_user(user)
  elif(is_user_bad()):
    ban_user()

ban_user(user):
  # add a user/start time to a hash

is_user_banned()
  # check hash
  # could check if expired now too, or do it seperately if you care about it

is_user_bad()
  # check params or set more values in a hash
5

与其为每个禁令都启动一个新线程,不如把这些禁令放在一个优先队列里,然后用一个线程来处理休眠和解除禁令的工作。

这段代码维护了两个结构:一个是堆(heapq),可以快速找到最早到期的禁令;另一个是字典(dict),可以快速检查某个用户是否被禁。

import time
import threading
import heapq

class Bans():
    def __init__(self):
        self.lock = threading.Lock()
        self.event = threading.Event()
        self.heap = []
        self.dict = {}
        self.thread = threading.thread(target=self.expiration_thread)
        self.thread.setDaemon(True)
        self.thread.start()

    def ban_user(self, name, duration):
        with self.lock:
            now = time.time()
            expiration = (now+duration) 
            heapq.heappush(self.heap, (expiration, user))
            self.dict[user] = expiration
            self.event.set()

    def is_user_banned(self, user):
        with self.lock:
            now = time.time()
            return self.dict.get(user, None) > now

    def expiration_thread(self):
        while True:
            self.event.wait()
            with self.lock:
                next, user = self.heap[0]
                now = time.time()
                duration = next-now
            if duration > 0:
                time.sleep(duration)
            with self.lock:
                if self.heap[0][0] = next:
                    heapq.heappop(self.heap)
                    del self.dict(user)
                if not self.heap:
                    self.event.clear()

它的使用方法如下:

B = Bans()
B.ban_user("phil", 30.0)
B.is_user_banned("phil")
3

使用一个线程的 定时器对象,像这样:

t = threading.Timer(30.0, unban)
t.start() # after 30 seconds, unban will be run

这样的话,只有解除封禁的操作会在这个线程中运行。

撰写回答