限制Python线程的并发和速率

5 投票
2 回答
4835 浏览
提问于 2025-04-17 03:17

我有一个线程数量的限制,我想把调用工作函数的速度限制在每秒一次。

我的想法是记录所有线程上最后一次调用的时间,然后在每个线程中将这个时间和当前时间进行比较。如果current_time - last_time < rate,我就让线程稍微休眠一下。不过,我觉得我的实现有问题,可能是对锁的工作原理理解错了。

我的代码:

from Queue import Queue
from threading import Thread, Lock, RLock
import time

num_worker_threads = 2
rate = 1
q = Queue()
lock = Lock()
last_time = [time.time()]

def do_work(i, idx):
    # Do work here, print is just a dummy.
    print('Thread: {0}, Item: {1}, Time: {2}'.format(i, idx, time.time()))

def worker(i):
    while True:
        lock.acquire()
        current_time = time.time()
        interval = current_time - last_time[0]
        last_time[0] = current_time
        if interval < rate:
            time.sleep(rate - interval)
        lock.release()
        item = q.get()
        do_work(i, item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker, args=[i])
     t.daemon = True
     t.start()

for item in xrange(10):
    q.put(item)

q.join()

我原本期待每秒只调用一次do_work,但实际上我大多数时候看到的是两个线程几乎同时调用(每个线程各一次),然后再等一秒。这是怎么回事呢?


好的,稍微修改一下。建议我简单地限制放入队列的速度是个好主意,但我想起来还得处理工作线程重新将项目放回队列的情况。一个经典的例子是:分页或者在网络任务中重试。于是我想出了以下方案。我想对于实际的网络任务,使用eventlet/gevent库可能会更节省资源,但这只是一个示例。这个方案基本上是使用优先队列来堆积请求,并用一个额外的线程以均匀的速度将这些请求从堆中转移到实际的任务队列。我模拟了工作线程将项目重新插入堆中的情况,重新插入的项目会优先处理。

import sys
import os
import time
import random

from Queue import Queue, PriorityQueue
from threading import Thread

rate = 0.1

def worker(q, q_pile, idx):
    while True:
        item = q.get()
        print("Thread: {0} processed: {1}".format(item[1], idx))
        if random.random() > 0.3:
            print("Thread: {1} reinserting item: {0}".format(item[1], idx))
            q_pile.put((-1 * time.time(), item[1]))
        q.task_done()

def schedule(q_pile, q):
    while True:
        if not q_pile.empty():
            print("Items on pile: {0}".format(q_pile.qsize()))
            q.put(q_pile.get())
            q_pile.task_done()
        time.sleep(rate)

def main():

    q_pile = PriorityQueue()
    q = Queue()

    for i in range(5):
        t = Thread(target=worker, args=[q, q_pile, i])
        t.daemon = True
        t.start()

    t_schedule = Thread(target=schedule, args=[q_pile, q])
    t_schedule.daemon = True
    t_schedule.start()

    [q_pile.put((-1 * time.time(), i)) for i in range(10)]
    q_pile.join()
    q.join()

if __name__ == '__main__':
    main()

2 个回答

1

我觉得在多个线程之间限制速度有点奇怪。如果你对每个线程单独限制速度,就可以避免那些复杂的锁定问题。

这只是我的猜测,但我觉得你应该在 sleep 之后把 last_time[0] 设置为 time.time()(而不是 current_time)。

1

我同时收到大约两个调用(每个线程一个),然后停了一秒钟。这是怎么回事?

这正是你应该从你的实现中预期的结果。假设时间 t 从0开始,速率是1:

线程1执行这个:

    lock.acquire() # both threads wait here, one gets the lock
    current_time = time.time() # we start at t=0
    interval = current_time - last_time[0] # so interval = 0
    last_time[0] = current_time # last_time = t = 0
    if interval < rate: # rate = 1 so we sleep
        time.sleep(rate - interval) # to t=1
    lock.release() # now the other thread wakes up
    # it's t=1 and we do the job

线程2执行这个:

    lock.acquire() # we get the lock at t=1 
    current_time = time.time() # still t=1
    interval = current_time - last_time[0] # interval = 1
    last_time[0] = current_time
    if interval < rate: # interval = rate = 1 so we don't sleep
        time.sleep(rate - interval)
    lock.release() 
    # both threads start the work around t=1

我的建议是限制将项目 放入队列 的速度。

撰写回答