并行运行函数
我正在模拟一个CPU,并且有一个系统时钟。现在我用asyncio来设置这个时钟,clock.run()
这个函数会一直循环,它会等一段时间,然后触发一个事件,再等一段时间后清除这个事件,这就像时钟的高低脉冲一样。
同时,CPU也会一直运行,它会安排一些函数在时钟下次变高时执行,所以CPU的执行会被阻塞,直到时钟变高,函数执行完毕并返回。
问题是,asyncio是在一个主线程上运行的(我想是这样),所以如果它在运行时钟,就无法同时并行运行CPU。
这是我现在的设置:
class Clock:
def __init__(self, frequency: int):
if not isinstance(frequency, int) or frequency <= 0:
raise ValueError('Frequency must be a positive integer.')
self.frequency = frequency
self.period = 1 / frequency
self.half_period = self.period / 2
self._clock_pulse = asyncio.Event()
async def run(self):
while True:
self._clock_pulse.set()
print('high', self._clock_pulse.is_set())
await asyncio.sleep(self.half_period)
self._clock_pulse.clear()
print('low', self._clock_pulse.is_set())
await asyncio.sleep(self.half_period)
async def schedule(self, func, *args):
print('scheduled')
await self._clock_pulse.wait()
return await func(*args)
class CPU:
def __init__(self, clock):
self.clock = clock
async def do_nothing(self, n):
return n
async def run(self):
self.n = 0
while True:
value = await clock.schedule(self.do_nothing, self.n)
print(value)
self.n += 1
clock = Clock(1)
cpu = CPU(clock)
async def main():
clock_task = asyncio.create_task(clock.run())
cpu_task = asyncio.create_task(cpu.run())
asyncio.run(main())
所以,我希望clock.run
的循环能和cpu.run
同时持续运行。也许我可以使用线程,但我对这个不太了解?谢谢任何帮助!
1 个回答
1
如果我理解你的情况:
- 你有一个时钟,它会定期发出脉冲,这些脉冲在“高”和“低”之间交替。
- 你有一个循环,想要在这个循环中安排并等待一个任务的完成,而这个任务需要在下一个“高”脉冲时开始。根据任务完成所需的时间,可能在连续安排任务之间,时钟已经产生了很多高脉冲和低脉冲。我们能保证的是,任务总是在高脉冲时开始运行。
你现在的代码中,Clock.run
方法似乎没有区分高脉冲和低脉冲。我建议你使用一个 asyncio.Condition
实例来表示高脉冲的生成,而不是用 asyncio.Event
来表示脉冲的发生。Clock.schedule
函数只需要等待高脉冲条件的出现。
注意,定义 Clock.schedule
方法时,不需要使用 func 和 args 参数,直接传一个协程参数会更简单。此外,你的 main
函数也需要一些修改(见下文):
import asyncio
class Clock:
def __init__(self, frequency: int):
if not isinstance(frequency, int) or frequency <= 0:
raise ValueError('Frequency must be a positive integer.')
self.frequency = frequency
self.period = 1 / frequency
self.half_period = self.period / 2
self._high_pulse_condition = asyncio.Condition()
async def run(self):
while True:
async with self._high_pulse_condition:
self._high_pulse_condition.notify_all() # high pulse event
await asyncio.sleep(self.period)
async def schedule(self, coro):
async with self._high_pulse_condition:
await self._high_pulse_condition.wait()
return await coro
class CPU:
def __init__(self, clock):
self.clock = clock
async def do_nothing(self, n):
return n
async def run(self):
import time
n = 0
while True:
value = await self.clock.schedule(self.do_nothing(n))
print(f'value = {value} at time = {time.time()}')
n += 1
async def main():
clock = Clock(1)
cpu = CPU(clock)
await asyncio.gather(cpu.run(), clock.run())
asyncio.run(main())
输出:
value = 0 at time = 1710281657.515421
value = 1 at time = 1710281658.5301206
value = 2 at time = 1710281659.53623
value = 3 at time = 1710281660.5377345
value = 4 at time = 1710281661.5463734
value = 5 at time = 1710281662.5613523
value = 6 at time = 1710281663.5721672
value = 7 at time = 1710281664.5855374
value = 8 at time = 1710281665.5871134
value = 9 at time = 1710281666.6020265
value = 10 at time = 1710281667.6114671
value = 11 at time = 1710281668.6124766
value = 12 at time = 1710281669.6271718
...
更新
如果我们让 Clock.schedule
创建一个新任务,而不是让 Clock.schedule
返回一个协程,然后 CPU.run
在安排下一个任务之前等待这个任务完成,这样我们几乎可以保证每个高脉冲都有一个新任务被安排。潜在的问题是,如果被安排的任务平均所需时间超过了连续高脉冲之间的时间,那么任务的数量将会无限增长。
import asyncio
class Clock:
def __init__(self, frequency: int):
if not isinstance(frequency, int) or frequency <= 0:
raise ValueError('Frequency must be a positive integer.')
self.frequency = frequency
self.period = 1 / frequency
self.half_period = self.period / 2
self._high_pulse_condition = asyncio.Condition()
self._loop = asyncio.get_running_loop()
async def run(self):
while True:
async with self._high_pulse_condition:
self._high_pulse_condition.notify_all() # high pulse event
await asyncio.sleep(self.period)
async def schedule(self, coro):
async with self._high_pulse_condition:
await self._high_pulse_condition.wait()
self._loop.create_task(coro)
class CPU:
def __init__(self, clock):
self.clock = clock
async def do_nothing(self, n):
import time
print(f'value = {n} at time = {time.time()}')
async def run(self):
n = 0
while True:
await self.clock.schedule(self.do_nothing(n))
n += 1
async def main():
clock = Clock(1)
cpu = CPU(clock)
await asyncio.gather(cpu.run(), clock.run())
asyncio.run(main())
输出:
value = 0 at time = 1710281994.7425532
value = 1 at time = 1710281995.752669
value = 2 at time = 1710281996.767249
value = 3 at time = 1710281997.7693186
value = 4 at time = 1710281998.7833076
value = 5 at time = 1710281999.7873156
value = 6 at time = 1710282000.7989564
...