处理多个IO错误

2024-04-27 19:55:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我在类init上执行了几个IO操作,但它们经常会因IOError而失败。我想做的是延迟几百毫秒,然后再试一次,直到成功或某个定义的超时。在继续/结束循环之前,如何确保每个命令都成功?我假设有一种比为每个项使用if语句和计数器更好的方法来检查所有命令是否成功。你知道吗

我下面的当前代码经常出现IOError错误,并挂起应用程序的其余部分。你知道吗

   def __init__(self):
      print("Pressure init.")
      self.readCoefficients()

   def readCoefficients(self):
      global a0_MSB;
      global a0_LSB;
      global b1_MSB;
      global b1_LSB;
      global b2_MSB;
      global b2_LSB;
      global c12_MSB;
      global c12_LSB;

      a0_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0);
      a0_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0);

      b1_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0);
      b1_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0);

      b2_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0);
      b2_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0);

      c12_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0);
      c12_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0);

Tags: registerreaddataaddressbytea0globalb2
3条回答

您是想单独重试最后8行中的每一行还是作为一个组重试?如果你想做一个小助手函数:

def retry_function(tries, function, *args, **kwargs):
    for try in range(tries):
        try:
            return function(*args, **kwargs)
        except IOError as e:
            time.sleep(.005)
    raise e   # will be the last error from inside the loop. be sure tries is at least 1 or this will be undefined!

那就这样称呼吧:

a0_MSB = retry_function(5, Pressure.bus.read_byte_data, Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0)

如果不是独立地而是作为一个群体,您可能仍然需要这个助手函数。但是您必须重写它来处理函数/参数列表,或者传入另一个自定义函数

这是另一种方法。此代码尝试读取所有地址,并保存失败的地址。然后稍等,然后重试所有失败的地址,直到所有地址都被正确读取或超过允许的重试次数。你知道吗

def readCoefficients(self):
    (
        a0_MSB, a0_LSB,
        b1_MSB, b1_LSB,
        b2_MSB, b2_LSB,
        c12_MSB, c12_LSB) = self.mio_read(15,
            Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0
    )

def mio_read(self, max_retries, *addresses):
    # Create storage for results
    results = [None] * len(addresses)
    # Keep track of the index of a particular address in the list of results
    ios = list(enumerate(addresses))
    for i in range(max_retries):
        failedios = []
        for index, address in ios:
            try:
                results[index] = Pressure.bus.read_byte_data(
                    Pressure.MPL115A2_ADDRESS,
                    address
                )
            except IOError as e:
                # Place address in the queue for the next round
                failedios.append((index, address))
        # If all succeeded
        if len(failedios) == 0:
            return results
        # Time may be reduced as so was spent checking other addresses
        time.sleep(0.1)
        ios = failedios
    else:
        raise IOError(",".join((addr for ind, addr in failedios)))

如果对你来说所有的文件都是一个接一个地读取是可以的,你可以使用一个简单的函数。你知道吗

import time

# ...

def readCoefficients(self):
    global a0_MSB;
    global a0_LSB;
    global b1_MSB;
    global b1_LSB;
    global b2_MSB;
    global b2_LSB;
    global c12_MSB;
    global c12_LSB;

    max_retries = 15

    a0_MSB = self.readretry(Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0, max_retries)
    a0_LSB = self.readretry(Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0, max_retries)

    b1_MSB = self.readretry(Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0, max_retries)
    b1_LSB = self.readretry(Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0, max_retries)

    b2_MSB = self.readretry(Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0, max_retries)
    b2_LSB = self.readretry(Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0, max_retries)

    c12_MSB = self.readretry(Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0, max_retries)
    c12_LSB = self.readretry(Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0, max_retries)

    def readretry(self, address, max_retries):
        for i in range(max_retries):
            try:
                return Pressure.bus.read_byte_data(
                    Pressure.MPL115A2_ADDRESS,
                    address
                )
            except IOError as e:
                # print(e)
                time.sleep(0.1)
        else:
            raise IOError("Reading failed after multiple tries")

注意:不应该使用globals,尤其是在类中。你知道吗

相关问题 更多 >