带有阻塞请求的Gevent异步服务器

2 投票
1 回答
1265 浏览
提问于 2025-04-18 08:15

我有一个我认为是Gevent的常见使用场景。我需要一个UDP服务器来监听请求,然后根据请求向一个外部网络服务发送POST请求。这个外部网络服务基本上只允许一次处理一个请求。

我想要一个异步的UDP服务器,这样数据可以立即被获取和存储,这样我就不会错过任何请求(这部分使用Gevent提供的DatagramServer很简单)。然后,我需要一种方法来顺序发送请求到外部网络服务,但又不影响UDP服务器的异步特性。

我最开始尝试了猴子补丁(monkey patching)所有东西,结果得到的解决方案很快,但我的请求对外部网络服务没有任何速率限制,导致出现了错误。

看起来我需要一个单独的非阻塞工作线程来顺序发送请求到外部网络服务,同时UDP服务器将任务添加到一个队列中,而这个非阻塞工作线程则从这个队列中取任务。

我需要的是关于如何运行一个带有额外绿色线程(greenlets)来处理其他任务的Gevent服务器的信息(特别是涉及到队列的部分)。我一直在使用DatagramServer的serve_forever函数,并认为我需要使用start方法,但还没有找到很多关于它们如何结合在一起的信息。

谢谢,

编辑

这个答案效果很好。我根据@mguijarr的回答调整了UDP服务器示例的代码,制作了一个适合我使用场景的工作示例:

from __future__ import print_function
from gevent.server import DatagramServer
import gevent.queue
import gevent.monkey
import urllib

gevent.monkey.patch_all()

n = 0

def process_request(q):
    while True:
        request = q.get()
        print(request)
        print(urllib.urlopen('https://test.com').read())


class EchoServer(DatagramServer):
    __q = gevent.queue.Queue()
    __request_processing_greenlet = gevent.spawn(process_request, __q)

    def handle(self, data, address):
        print('%s: got %r' % (address[0], data))
        global n
        n += 1
        print(n)
        self.__q.put(n)
        self.socket.sendto('Received %s bytes' % len(data), address)


if __name__ == '__main__':
    print('Receiving datagrams on :9000')
    EchoServer(':9000').serve_forever()

1 个回答

1

下面是我会怎么做的:

  1. 首先,写一个函数,这个函数需要一个“队列”对象作为参数;这个函数会不停地处理队列里的项目。每个项目应该是一个对网络服务的请求。这个函数可以放在模块的层级上,而不是放在你的DatagramServer实例里面:

    def process_requests(q):
        while True:
            request = q.get()
            # do your magic with 'request'
            ...
    
  2. 在你的DatagramServer里,让这个函数在一个绿色线程(类似于后台任务)中运行:

    self.__q = gevent.queue.Queue()
    self.__request_processing_greenlet = gevent.spawn(process_requests, self.__q)
    
  3. 当你的DatagramServer实例收到UDP请求时,把这个请求放到队列里:

    self.__q.put(request)    
    

这样就可以实现你想要的功能。你仍然可以在DatagramServer上调用'serve_forever',没问题。

撰写回答