Python 捕获超时并重复请求

4 投票
2 回答
3839 浏览
提问于 2025-04-18 07:43

我正在用Python使用Xively的API来更新数据流,但有时候会遇到504错误,这个错误似乎会导致我的脚本停止运行。

我该怎么做才能捕捉到这个错误,更重要的是,如何延迟一段时间再重试,这样我的脚本就可以继续运行,并在大约一分钟后上传我的数据呢?

这是我进行上传的代码块。

    # Upload to Xivity
    api = xively.XivelyAPIClient("[MY_API_KEY")
    feed = api.feeds.get([MY_DATASTREAM_ID])
    now = datetime.datetime.utcnow()
    feed.datastreams = [xively.Datastream(id='temps', current_value=tempF, at=now)]
    feed.update()

这是我在脚本失败时看到的错误日志:

Traceback (most recent call last):
 File "C:\[My Path] \ [My_script].py", line 39, in <module>
   feed = api.feeds.get([MY_DATASTREAM_ID])
 File "C:\Python34\lib\site-packages\xively_python-0.1.0_rc2-py3.4.egg\xively\managers.py", >line 268, in get
   response.raise_for_status()
 File "C:\Python34\lib\site-packages\requests-2.3.0-py3.4.egg\requests\models.py", line 795, >in raise_for_status
   raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 504 Server Error: Gateway Time-out

谢谢,

附言:我已经把个人信息替换成了[MY_INFO],但显然在我的代码中会出现正确的数据。

2 个回答

3

你可以在一个循环里加上一个try/except语句,这样可以在每次尝试之间设置一个等待时间,等你想等多久都可以。大概可以这样写:

import time

# Upload to Xivity
api = xively.XivelyAPIClient("[MY_API_KEY")
feed = api.feeds.get([MY_DATASTREAM_ID])
now = datetime.datetime.utcnow()
feed.datastreams = [xively.Datastream(id='temps', current_value=tempF, at=now)]

### Try loop
feed_updated = False
while feed_updated == False:
    try: 
        feed.update()
        feed_updated=True
    except: time.sleep(60)

编辑 正如Dano提到的,使用更具体的except语句会更好。

### Try loop
feed_updated = False
while feed_updated == False:
    try: 
        feed.update()
        feed_updated=True
    except HTTPError: time.sleep(60) ##Just needs more time.
    except: ## Otherwise, you have bigger fish to fry
        print "Unidentified Error"
        ## In such a case, there has been some other kind of error. 
        ## Not sure how you prefer this handled. 
        ## Maybe update a log file and quit, or have some kind of notification, 
        ## depending on how you are monitoring it. 

编辑 使用一个通用的except语句。

### Try loop
feed_updated = False
feed_update_count = 0
while feed_updated == False:
    try: 
        feed.update()
        feed_updated=True
    except: 
        time.sleep(60)
        feed_update_count +=1 ## Updates counter

    if feed_update_count >= 60:    ## This will exit the loop if it tries too many times
        feed.update()              ## By running the feed.update() once more,
                                   ## it should print whatever error it is hitting, and crash
4

我通常会用一个装饰器来处理这个问题:

from functools import wraps
from requests.exceptions import HTTPError
import time

def retry(func):
    """ Call `func` with a retry.

    If `func` raises an HTTPError, sleep for 5 seconds
    and then retry.

    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            ret = func(*args, **kwargs)
        except HTTPError:
            time.sleep(5)
            ret = func(*args, **kwargs)
        return ret
    return wrapper

或者,如果你想尝试多次的话:

def retry_multi(max_retries):
    """ Retry a function `max_retries` times. """
    def retry(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            num_retries = 0 
            while num_retries <= max_retries:
                try:
                    ret = func(*args, **kwargs)
                    break
                except HTTPError:
                    if num_retries == max_retries:
                        raise
                    num_retries += 1
                    time.sleep(5)
            return ret 
        return wrapper
    return retry

那么就把你的代码放在一个像这样的函数里

#@retry
@retry_multi(5) # retry 5 times before giving up.
def do_call():
    # Upload to Xivity
    api = xively.XivelyAPIClient("[MY_API_KEY")
    feed = api.feeds.get([MY_DATASTREAM_ID])
    now = datetime.datetime.utcnow()
    feed.datastreams = [xively.Datastream(id='temps', current_value=tempF, at=now)]
    feed.update()

撰写回答