自定义Python线程池同步

2 投票
1 回答
1038 浏览
提问于 2025-04-18 13:08

我正在尝试在Python中创建一个线程池,开始的时候觉得一切都很好,但当我去做一些测试时却遇到了问题。

在我的测试中,我记录了n个任务在x个线程下完成所需的时间。然后我把这些数据画成图,想看看结果是否和我预想的一样。不幸的是,我得到了意想不到的结果。出于某种原因,我某些测试的时间差t并没有符合我预期的趋势,其他数据点却在那条线上。我认为这可能是因为我代码中的线程同步问题,特别是和threading.Event()有关。我对Python不太熟悉,所以可能忽略了一些东西。

是什么导致我的线程池在测试中给出了意外的结果?任何帮助都非常感谢!

enter image description here

线程数量:

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40]

时间差:

[15.005759, 0.002, 5.003255, 1.995844, 0.99826, 0.006006, 0.997074, 0.994626, 0.002004, 0.988823, 0.005081, 0.993242, 0.990138, 0.002986, 0.995473, 0.000999, 0.986356, 0.002002, 0.975053, 0.021287]

以下是我的代码:

from threading import Thread
from threading import Event
from queue import Queue
from time import sleep
import matplotlib.pyplot as plt
import datetime
import requests
import random
import json
import copy
import sys


class Tester:

def __init__(self):
    pass

def run(self):
    numThreads = 2
    x = []  # thread count
    y = []  # delta time

    for t in range(20): # Run t tests
        threadPool = ThreadPool(numThreads)
        startTime = datetime.datetime.now()
        x.append(numThreads)

        print("Starting test %d" % (t + 1))

        # Add n tasks
        for n in range(30):
            threadPool.addTask(PoolTask(n))

        threadPool.start() # wait until all tasks are added before starting

        # Wait for all tasks in queue to complete
        threadPool.wait() 

        timeDelta = datetime.datetime.now() - startTime
        print("Test %d complete (t=%f s, n=%d)" % ((t + 1), timeDelta.total_seconds(), numThreads))
        y.append(timeDelta.total_seconds())

        numThreads += 2

    # After the tests plot the resulting data
    print(x)
    print(y)
    plt.plot(x, y, 'ro')
    plt.xlabel('thread count')
    plt.ylabel('delta t (time)')
    plt.show()

class ThreadPool:
__poolEvent = Event()
__runEvent = Event()

def __init__(self, size = 1, start=False):
    self.__size = size
    self.__queue = Queue()
    self.__pool = []
    self.__destroyed = False

    if start:
        __runEvent.set()

    # Create the thread pool
    for i in range(self.__size):
        thread = Thread(target = self.__worker, args = [i])
        self.__pool.append(thread)
        thread.daemon = True
        thread.start()
        pass

def __worker(self, workerNumber):
# Worker will run until thread pool is terminated
    while True:
        if(self.__destroyed):
            break

        self.__runEvent.wait() # Wait until threadpool is started

        task = self.__queue.get() # Blocking

        try:
            task.execute()
        except (AttributeError, TypeError):
            raise Exception('Task does not have execute() defined.')

        self.__queue.task_done()

        if self.__queue.empty():
            self.__poolEvent.set() # Allow caller to proceed if waiting

def start(self):
    self.__runEvent.set()

def addTask(self, task):
    if(self.__destroyed):
        raise Exception('Unable to add task the pool has already been destroyed.')
    self.__poolEvent.clear()    # Have caller wait if listening
    self.__queue.put(task)

def destroy(self):
    if(self.__destroyed):
        raise Exception('Cannot destory as the thread pool has already been destroyed.')

    # Flag causes threads to stop pulling from queue and return
    self.__destroyed = True

def wait(self):
    self.__poolEvent.wait()

class PoolTask:
""" example task that implements execute() """

def __init__(self, taskNumber):
    self.taskNumber = taskNumber

def execute(self):
    #print('Task %d executing...' % self.taskNumber)
    #sleep(random.randint(1, 5))
    sleep(1)
    #print('Task %d done executing' % self.taskNumber)

执行输出:

>>> import Tester
>>> kp = KrogoPoints.KrogoPoints()
>>> kp.run()
Starting test 1
Test 1 complete (t=15.005759 s, n=2)
Starting test 2
Test 2 complete (t=0.002000 s, n=4)
Starting test 3
Test 3 complete (t=5.003255 s, n=6)
Starting test 4
Test 4 complete (t=1.995844 s, n=8)
Starting test 5
Test 5 complete (t=0.998260 s, n=10)
Starting test 6
Test 6 complete (t=0.006006 s, n=12)
Starting test 7
Test 7 complete (t=0.997074 s, n=14)
Starting test 8
Test 8 complete (t=0.994626 s, n=16)
Starting test 9
Test 9 complete (t=0.002004 s, n=18)
Starting test 10
Test 10 complete (t=0.988823 s, n=20)
Starting test 11
Test 11 complete (t=0.005081 s, n=22)
Starting test 12
Test 12 complete (t=0.993242 s, n=24)
Starting test 13
Test 13 complete (t=0.990138 s, n=26)
Starting test 14
Test 14 complete (t=0.002986 s, n=28)
Starting test 15
Test 15 complete (t=0.995473 s, n=30)
Starting test 16
Test 16 complete (t=0.000999 s, n=32)
Starting test 17
Test 17 complete (t=0.986356 s, n=34)
Starting test 18
Test 18 complete (t=0.002002 s, n=36)
Starting test 19
Test 19 complete (t=0.975053 s, n=38)
Starting test 20
Test 20 complete (t=0.021287 s, n=40)

1 个回答

3

我觉得你的问题在于,你把 __poolEvent__runEvent 定义成了类变量,但其实你应该把它们定义为实例变量。这样一来,__poolEvent__runEvent 的状态就会在你的 ThreadPool 实例之间共享。

>>> class A(object):
...   event = threading.Event()  # class variable, like yours
...   def getEvent(self): return self.event
... 
>>> a = A()
>>> a.getEvent()
<threading._Event object at 0x7f4c51d3db90>
>>> a.getEvent().is_set()
False
>>> b = A()
>>> b.getEvent().set()
>>> a.getEvent().is_set()
True   # Uh oh, changing b's event also changed a's
>>> b.getEvent()
<threading._Event object at 0x7f4c51d3db90> # Because it's the same event!

因为这种意外的共享,你的事件有时候会在你不希望它们被设置的时候被设置。

如果你把类的顶部改成这样,你的结果会看起来更合理:

class ThreadPool(object):

    def __init__(self, size = 1, start=False):
        self.__poolEvent = Event()
        self.__runEvent = Event()
        self.__size = size
        self.__queue = Queue()
        self.__pool = []
        self.__destroyed = False

        if start:
            self.__runEvent.set()

这样输出的结果就会好很多:

Starting test 1
Test 1 complete (t=15.019228 s, n=2)
Starting test 2
Test 2 complete (t=7.009718 s, n=4)
Starting test 3
Test 3 complete (t=5.007426 s, n=6)
Starting test 4
Test 4 complete (t=3.005955 s, n=8)
Starting test 5
Test 5 complete (t=3.006504 s, n=10)
Starting test 6
Test 6 complete (t=2.004594 s, n=12)
Starting test 7
Test 7 complete (t=2.004225 s, n=14)
Starting test 8
Test 8 complete (t=1.004068 s, n=16)
Starting test 9
Test 9 complete (t=1.004277 s, n=18)
Starting test 10
Test 10 complete (t=1.004509 s, n=20)
Starting test 11
Test 11 complete (t=1.004266 s, n=22)
Starting test 12
Test 12 complete (t=1.003043 s, n=24)
Starting test 13
Test 13 complete (t=1.004713 s, n=26)
Starting test 14
Test 14 complete (t=1.003422 s, n=28)
Starting test 15
Test 15 complete (t=1.003525 s, n=30)
Starting test 16
Test 16 complete (t=1.003448 s, n=32)
Starting test 17
Test 17 complete (t=1.002924 s, n=34)
Starting test 18
Test 18 complete (t=1.003600 s, n=36)
Starting test 19
Test 19 complete (t=1.003569 s, n=38)
Starting test 20
Test 20 complete (t=1.003708 s, n=40)
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[15.019228, 7.009718, 5.007426, 3.005955, 3.006504, 2.004594, 2.004225, 1.004068, 1.004277, 1.004509, 1.004266, 1.003043, 1.004713, 1.003422, 1.003525, 1.003448, 1.002924, 1.0036, 1.003569, 1.003708]

撰写回答