asyncio只运行第一个gpiozero协程

-1 投票
1 回答
27 浏览
提问于 2025-04-13 17:19

我正在尝试让 gpiozeroasyncio 在我的树莓派 4 上一起工作,但遇到了一个我解决不了的问题。简单来说,我想获取旋转编码器的转向信号(顺时针和逆时针),并检测旋转编码器的按钮是否被按下。

我使用 asyncio 来收集协程并同时运行它们(其他的协程到目前为止都能正常工作)。但是,当我尝试同时运行按钮检测和旋转检测时,列表中只有第一个协程会被启动。

这是我无法正常工作的代码片段:

import asyncio
from gpiozero import RotaryEncoder, Button

button = Button(7)
encoder = RotaryEncoder(a=10, b=9, wrap=False, max_steps=0)

async def rotary_detection():
  print("rotary_detection started")
  last_rotary_value = 0
  try:
    while True:
      current_rotary_value = encoder.steps
      if last_rotary_value != current_rotary_value:
        rotDiff = last_rotary_value - current_rotary_value
        if rotDiff > 0:
          print("turned CW")
        else:
          print("turned CCW")
        last_rotary_value = current_rotary_value
      asyncio.sleep(0.01)
  except KeyboardInterrupt:
    print("Program terminated")
  except Exception as e:
    print(e)

async def button_detection():
  print("button_detection started")
  try:
    while True:
      if button.is_pressed:
        print("Button pressed")
  except KeyboardInterrupt:
    print("Program terminated")
  except Exception as e:
    print(e)

async def main():
  await asyncio.gather(
    rotary_detection(),
    button_detection()
  )

if __name__ == "__main__":
  asyncio.run(main())

无论在 asyncio.gather() 列表中顺序如何,只有第一个协程会被启动。

我该如何实现我的需求,让 gpiozeroasyncio 能够和谐地一起工作呢?

1 个回答

0

你的两个异步方法都没有使用 await,这就意味着它们不会把控制权交回给调度器,这样其他的异步任务就无法运行。实际上,你的这两个方法就像是普通的同步方法,没有异步的效果。

如果你在 rotary_detection 方法中记得在 asyncio.sleep 前加上 await,那么这个方法就会是正确的:

async def rotary_detection():
  print("rotary_detection started")
  last_rotary_value = 0
  try:
    while True:
      ...

      # Don't forget to await on async tasks
      await asyncio.sleep(0.01)
  except KeyboardInterrupt:
    print("Program terminated")
  except Exception as e:
    print(e)

不过,你还需要在 button_pressed 方法中引入一个 await。可能只需再加一个 asyncio.sleep 的调用就能让事情正常运作,但你也可以考虑其他的选择:

  1. 有一些支持 asyncio 的库可以用来访问 PGIO,比如 asyncpio
  2. 放弃 asyncio,改用线程模型,使用 gpiozerowhen_heldwhen_pressed 方法来实现你的逻辑。

撰写回答