Twisted:为什么将延迟回调传递给延迟线程会突然使线程阻塞?

7 投票
3 回答
5184 浏览
提问于 2025-04-15 20:32

我之前尝试使用txredis(一个非阻塞的twisted API,用于redis)来设置一个持久化的消息队列,配合我正在做的scrapy项目,但没有成功。我发现虽然客户端是非阻塞的,但速度却比预期慢了很多,因为原本应该在反应循环中处理的一个事件,被拆分成了成千上万的步骤。

所以我改用了redis-py(常规的阻塞twisted API),并把调用放在一个延迟线程中。这样效果很好,不过我想在调用redis时再做一个内部的延迟操作,因为我想设置连接池,以进一步加快速度。

下面是我根据twisted文档中的一些示例代码,展示我用延迟线程的用法:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

这里是我为连接池做的修改,使得延迟线程中的代码变成了阻塞:

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

所以我想问,有人知道为什么我的修改会导致延迟线程变成阻塞的吗?或者有没有人能建议一个更好的解决方案?

3 个回答

0

顺便提一下,你可以通过使用一个专门为Twisted创建的Redis客户端来获得很多好处,比如这个:http://github.com/deldotdr/txRedis

3

现在有一个最新的Redis客户端,叫做txredisapi,它支持Redis 2.x的新协议和功能。你可以试试看。

对于持久化消息队列,我推荐RestMQ。这是一个基于Redis的消息队列系统,建立在cyclone和txredisapi之上。

http://github.com/gleicon/restmq

祝好

12

好吧,正如twisted 的文档所说:

Deferreds 并不会让代码神奇地不阻塞

每当你使用会阻塞的代码,比如 sleep,你就得把它放到一个新的线程里去执行。

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

如果 Redis 的 API 不是特别复杂,可能更自然的做法是用 twisted.web 来重写它,而不是在很多线程中反复调用阻塞的 API。

撰写回答