异步操作后Tornado神秘挂起--如何调试?

2 投票
1 回答
2522 浏览
提问于 2025-04-17 11:04

我们的应用程序需要进行很多网络请求(它是建立在一个第三方的REST API之上的),所以我们使用了很多异步操作来保持系统的响应速度。(使用Swirl来保持理智,因为这个应用是在tornado.gen出现之前写的)。所以当我们需要进行一些地理编码时,我们觉得这应该很简单——只需添加几个异步调用到另一个外部API,就能搞定。

然而,我们的异步代码神秘地让Tornado卡住了——进程仍在运行,但它不响应请求,也不在日志中输出任何信息。更糟糕的是,当我们完全去掉第三方服务器时,它仍然卡住——似乎在异步请求返回后的一段时间内就锁住了。

以下是一些示例代码,能够重现这个问题:

def async_geocode(lat, lon, callback, fields=('city', 'country')):
    '''Translates lat and lon into human-readable geographic info'''
    iol = IOLoop.instance()
    iol.add_timeout(time.time() + 1, lambda: callback("(unknown)"))

这里是通常(但并不总是——这也是它最初进入生产环境的原因)能捕捉到这个问题的测试:

class UtilTest(tornado.testing.AsyncTestCase):

    def get_new_ioloop(self):
        '''Ensure that any test code uses the right IOLoop, since the code
        it tests will use the singleton.'''
        return tornado.ioloop.IOLoop.instance()

    def test_async_geocode(self):
        # Yahoo gives (-122.419644, 37.777125) for SF, so we expect it to
        # reverse geocode to SF too...
        async_geocode(lat=37.777, lon=-122.419, callback=self.stop,
                      fields=('city', 'country'))
        result = self.wait(timeout=4)
        self.assertEquals(result, u"San Francisco, United States")
        # Now test if it's hanging (or has hung) the IOLoop on finding London
        async_geocode(lat=51.506, lon=-0.127, callback=self.stop,
                      fields=('city',))
        result = self.wait(timeout=5)
        self.assertEquals(result, u"London")
        # Test it fails gracefully
        async_geocode(lat=0.00, lon=0.00, callback=self.stop,
                      fields=('city',))
        result = self.wait(timeout=6)
        self.assertEquals(result, u"(unknown)")

    def test_async_geocode2(self):
        async_geocode(lat=37.777, lon=-122.419, callback=self.stop,
                      fields=('city', 'state', 'country'))
        result = self.wait(timeout=7)
        self.assertEquals(result, u"San Francisco, California, United States")
        async_geocode(lat=51.506325, lon=-0.127144, callback=self.stop,
                      fields=('city', 'state', 'country'))
        result = self.wait(timeout=8)
        self.io_loop.add_timeout(time.time() + 8, lambda: self.stop(True))
        still_running = self.wait(timeout=9)
        self.assert_(still_running)

注意,第一个测试几乎总是通过,而第二个测试(以及它对async_geocode的调用)通常会失败。

补充说明:我们还有很多类似的异步调用到其他第三方API,这些调用都运行得很好。

(为了完整性,这里是async_geocode及其辅助类的完整实现(尽管上面的示例代码已经能重现问题)):

def async_geocode(lat, lon, callback, fields=('city', 'country')):
    '''Use AsyncGeocoder to do the work.'''
    geo = AsyncGeocoder(lat, lon, callback, fields)
    geo.geocode()


class AsyncGeocoder(object):
    '''
    Reverse-geocode to as specific a level as possible

    Calls Yahoo! PlaceFinder for reverse geocoding. Takes a lat, lon, and
    callback function (to call with the result string when the request
    completes), and optionally a sequence of fields to return, in decreasing
    order of specificity (e.g. street, neighborhood, city, country)

    NB: Does not do anything intelligent with the geocoded data -- just returns
    the first result found.
    '''

    url = "http://where.yahooapis.com/geocode"

    def __init__(self, lat, lon, callback, fields, ioloop=None):
        self.lat, self.lon = lat, lon
        self.callback = callback
        self.fields = fields
        self.io_loop = ioloop or IOLoop.instance()
        self._client = AsyncHTTPClient(io_loop=self.io_loop)

    def geocode(self):
        params = urllib.urlencode({
            'q': '{0}, {1}'.format(self.lat, self.lon),
            'flags': 'J', 'gflags': 'R'
        })

        tgt_url = self.url + "?" + params
        self._client.fetch(tgt_url, self.geocode_cb)

    def geocode_cb(self, response):
        geodata = json_decode(response.body)
        try:
            geodata = geodata['ResultSet']['Results'][0]
        except IndexError:
            # Response didn't contain anything
            result_string = ""
        else:
            results = []
            for f in self.fields:
                val = geodata.get(f, None)
                if val:
                    results.append(val)
            result_string = ", ".join(results)

        if result_string == '':
            # This can happen if the response was empty _or_ if 
            # the requested fields weren't in it. Regardless, 
            # the user needs to see *something*
            result_string = "(unknown)"

        self.io_loop.add_callback(lambda: self.callback(result_string))

编辑:经过几天的繁琐调试和记录系统失败的情况后,发现我的测试失败是由于无关的原因。还发现,导致它卡住的原因与IOLoop无关,而是其中一个协程在等待数据库锁时立即卡住了。

抱歉问了一个错误的问题,感谢大家的耐心。

1 个回答

3

你的第二个测试似乎失败了,原因在于这一部分:

self.io_loop.add_timeout(time.time() + 8, lambda: self.stop(True))
still_running = self.wait(timeout=9)
self.assert_(still_running)

当你通过 self.wait 给 IOLoop 添加一个超时时间时,我发现这个超时时间在调用 self.stop 时并不会被清除。也就是说,你的第一个超时时间仍然存在,当你让 IOLoop 暂停 8 秒时,它就会被触发。

我怀疑这些和你最初的问题没有关系。

撰写回答