在Python中使用Twisted或线程、队列处理高吞吐量流数据

3 投票
3 回答
1416 浏览
提问于 2025-04-16 00:50

我从一个长时间保持连接的Twitter API流媒体服务器那里收到了非常快速的推文。接下来,我进行一些复杂的文本处理,并把这些推文保存到我的数据库里。

我使用PyCurl来建立连接,并用一个回调函数来处理文本和保存到数据库。下面是我目前的做法,但效果不太好。

我对网络编程不太熟悉,所以想知道:我该如何使用线程、队列或Twisted框架来解决这个问题呢?

def process_tweet():
    # do some heaving text processing


def open_stream_connection():
    connect = pycurl.Curl()
    connect.setopt(pycurl.URL, STREAMURL)
    connect.setopt(pycurl.WRITEFUNCTION, process_tweet)
    connect.setopt(pycurl.USERPWD, "%s:%s" % (TWITTER_USER, TWITTER_PASS))
    connect.perform()

3 个回答

1

我建议这样组织:

  • 一个程序负责读取Twitter上的内容,把推文放进数据库里
  • 一个或多个程序从数据库中读取这些推文,处理每条推文,然后把处理后的结果放进另一个数据库。原来的推文可以选择删除或者标记为已处理。

也就是说,你需要两个或更多的程序/线程。推文数据库可以看作是一个工作队列。多个工作程序从这个队列中取出任务(推文),然后在第二个数据库中生成数据。

2

你应该有几个线程来接收消息。使用pycurl的时候,线程数量可以设为1,但如果用httplib,线程数量应该多一些。这样做的目的是为了能够同时对Twitter API发起多个请求,这样就能保持有持续的工作量来处理。

每当一条推文到达时,它会被放到一个队列(Queue.Queue)里。这个队列确保了在通信时是安全的——每条推文只会被一个工作线程处理。

一组工作线程负责从队列中读取推文并处理它们。只有那些有趣的推文才会被添加到数据库中。

由于数据库可能是处理的瓶颈,所以线程池中添加的线程数量是有限的——增加更多线程并不会让处理速度变快,只会让更多线程在队列中等待访问数据库。

这是一种比较常见的Python写法。这种架构的扩展性是有限的,也就是说,它只能处理一台机器所能处理的工作量。

1

如果你只想在一台机器上搞定,这里有个简单的设置。

一个线程负责接收连接。当有连接进来后,它会把这个连接交给另一个线程去处理。

当然,你也可以用进程(比如用 multiprocessing)来代替线程,但我对 multiprocessing 不太熟,所以没法给你建议。设置方式是一样的:一个进程接收连接,然后把它们交给子进程处理。

如果你需要在多台机器上分担处理工作,简单的方法就是把消息存到数据库里,然后通知工作者有新记录(这需要在工作者之间进行一些协调和锁定)。如果你想避免访问数据库,那你就得把消息从网络进程传给工作者(不过我对底层网络不太了解,没法告诉你怎么做 :))

撰写回答