并行运行函数

0 投票
1 回答
43 浏览
提问于 2025-04-14 17:21

我正在模拟一个CPU,并且有一个系统时钟。现在我用asyncio来设置这个时钟,clock.run()这个函数会一直循环,它会等一段时间,然后触发一个事件,再等一段时间后清除这个事件,这就像时钟的高低脉冲一样。

同时,CPU也会一直运行,它会安排一些函数在时钟下次变高时执行,所以CPU的执行会被阻塞,直到时钟变高,函数执行完毕并返回。

问题是,asyncio是在一个主线程上运行的(我想是这样),所以如果它在运行时钟,就无法同时并行运行CPU。

这是我现在的设置:

class Clock:
    def __init__(self, frequency: int):
        if not isinstance(frequency, int) or frequency <= 0:
            raise ValueError('Frequency must be a positive integer.')

        self.frequency = frequency
        self.period = 1 / frequency
        self.half_period = self.period / 2
        self._clock_pulse = asyncio.Event()

    async def run(self):
        while True:
            self._clock_pulse.set()
            print('high', self._clock_pulse.is_set())
            await asyncio.sleep(self.half_period)
            self._clock_pulse.clear()
            print('low', self._clock_pulse.is_set())
            await asyncio.sleep(self.half_period)

    async def schedule(self, func, *args):
        print('scheduled')
        await self._clock_pulse.wait()
        return await func(*args)


class CPU:
    def __init__(self, clock):
        self.clock = clock

    async def do_nothing(self, n):
        return n

    async def run(self):
        self.n = 0
        while True:
            value = await clock.schedule(self.do_nothing, self.n)
            print(value)
            self.n += 1


clock = Clock(1)
cpu = CPU(clock)


async def main():
    clock_task = asyncio.create_task(clock.run())
    cpu_task = asyncio.create_task(cpu.run())


asyncio.run(main())

所以,我希望clock.run的循环能和cpu.run同时持续运行。也许我可以使用线程,但我对这个不太了解?谢谢任何帮助!

1 个回答

1

如果我理解你的情况:

  1. 你有一个时钟,它会定期发出脉冲,这些脉冲在“高”和“低”之间交替。
  2. 你有一个循环,想要在这个循环中安排并等待一个任务的完成,而这个任务需要在下一个“高”脉冲时开始。根据任务完成所需的时间,可能在连续安排任务之间,时钟已经产生了很多高脉冲和低脉冲。我们能保证的是,任务总是在高脉冲时开始运行。

你现在的代码中,Clock.run 方法似乎没有区分高脉冲和低脉冲。我建议你使用一个 asyncio.Condition 实例来表示高脉冲的生成,而不是用 asyncio.Event 来表示脉冲的发生。Clock.schedule 函数只需要等待高脉冲条件的出现。

注意,定义 Clock.schedule 方法时,不需要使用 funcargs 参数,直接传一个协程参数会更简单。此外,你的 main 函数也需要一些修改(见下文):

import asyncio

class Clock:
    def __init__(self, frequency: int):
        if not isinstance(frequency, int) or frequency <= 0:
            raise ValueError('Frequency must be a positive integer.')

        self.frequency = frequency
        self.period = 1 / frequency
        self.half_period = self.period / 2
        self._high_pulse_condition = asyncio.Condition()

    async def run(self):
        while True:
            async with self._high_pulse_condition:
                self._high_pulse_condition.notify_all()  # high pulse event
            await asyncio.sleep(self.period)

    async def schedule(self, coro):
        async with self._high_pulse_condition:
            await self._high_pulse_condition.wait()
        return await coro


class CPU:
    def __init__(self, clock):
        self.clock = clock

    async def do_nothing(self, n):
        return n

    async def run(self):
        import time

        n = 0
        while True:
            value = await self.clock.schedule(self.do_nothing(n))
            print(f'value = {value} at time = {time.time()}')
            n += 1

async def main():
    clock = Clock(1)
    cpu = CPU(clock)

    await asyncio.gather(cpu.run(), clock.run())

asyncio.run(main())

输出:

value = 0 at time = 1710281657.515421
value = 1 at time = 1710281658.5301206
value = 2 at time = 1710281659.53623
value = 3 at time = 1710281660.5377345
value = 4 at time = 1710281661.5463734
value = 5 at time = 1710281662.5613523
value = 6 at time = 1710281663.5721672
value = 7 at time = 1710281664.5855374
value = 8 at time = 1710281665.5871134
value = 9 at time = 1710281666.6020265
value = 10 at time = 1710281667.6114671
value = 11 at time = 1710281668.6124766
value = 12 at time = 1710281669.6271718
...

更新

如果我们让 Clock.schedule 创建一个新任务,而不是让 Clock.schedule 返回一个协程,然后 CPU.run 在安排下一个任务之前等待这个任务完成,这样我们几乎可以保证每个高脉冲都有一个新任务被安排。潜在的问题是,如果被安排的任务平均所需时间超过了连续高脉冲之间的时间,那么任务的数量将会无限增长。

import asyncio

class Clock:
    def __init__(self, frequency: int):
        if not isinstance(frequency, int) or frequency <= 0:
            raise ValueError('Frequency must be a positive integer.')

        self.frequency = frequency
        self.period = 1 / frequency
        self.half_period = self.period / 2
        self._high_pulse_condition = asyncio.Condition()
        self._loop = asyncio.get_running_loop()

    async def run(self):
        while True:
            async with self._high_pulse_condition:
                self._high_pulse_condition.notify_all()  # high pulse event
            await asyncio.sleep(self.period)

    async def schedule(self, coro):
        async with self._high_pulse_condition:
            await self._high_pulse_condition.wait()
        self._loop.create_task(coro)


class CPU:
    def __init__(self, clock):
        self.clock = clock

    async def do_nothing(self, n):
        import time

        print(f'value = {n} at time = {time.time()}')

    async def run(self):

        n = 0
        while True:
            await self.clock.schedule(self.do_nothing(n))
            n += 1

async def main():
    clock = Clock(1)
    cpu = CPU(clock)

    await asyncio.gather(cpu.run(), clock.run())

asyncio.run(main())

输出:

value = 0 at time = 1710281994.7425532
value = 1 at time = 1710281995.752669
value = 2 at time = 1710281996.767249
value = 3 at time = 1710281997.7693186
value = 4 at time = 1710281998.7833076
value = 5 at time = 1710281999.7873156
value = 6 at time = 1710282000.7989564
...

撰写回答