我建立了一个过滤twitter实时流样本的系统。显然,数据库的写入速度太慢,无法跟上比两个小容量关键字更复杂的内容。我将django rq实现为一个简单的排队系统,在tweet进入时将其推送到基于redis的队列中,效果非常好。我的问题在另一边。这个问题的背景是,我有一个正在运行的系统,有150万条tweets用于分析,还有37.5万条通过redis排队。按照目前的表现速度,如果我关闭流媒体,我需要3天的时间才能赶上,这是我不想做的。如果我维持这些数据流,那就需要一个月的时间,这是我最后的估计。在
数据库现在在两个主表上有几百万行,而且写入速度非常慢。rq工作线程的最佳数量似乎是4个,平均每秒1.6个队列任务。(下面排队的代码)。我认为问题可能是每个新队列任务都会打开数据库连接,所以将CONN_MAX_AGE设置为60,但这并没有改善任何东西。在
刚刚在本地主机上测试过,我在MacBook2011上,运行Chrome等的Macbook上,每秒的写入次数超过了13次,但数据库中只有几千行,这让我相信它与大小有关。我正在使用两个get_or_create
命令(见下文),这可能会减慢速度,但看不到其他方法可以使用它们-我需要检查用户是否存在,我需要检查tweet是否已经存在(我可能,我怀疑,将后者移到try/except,基于来自直播流的tweet不应该已经存在的基础上,因为显而易见的原因)我会从中得到很多性能提升吗?由于这仍然在运行,我非常希望对代码进行一点优化,并让一些更快/更高效的工人加入其中,这样我就可以赶上了!让一个预先审核的工作人员来分批工作会有效吗?(也就是说,我可以批量创建不存在的用户,或者类似的用户?)在
我正在数字海洋上运行一个4核/8Gb内存液滴,所以觉得这是一个相当糟糕的性能,可能与代码有关。我在这里哪里做错了?
(我将此贴在这里,而不是代码审阅,因为我认为这与SO的Q&A格式有关,因为我正试图解决一个特定的代码问题,而不是“我如何才能做得更好?”)在
注意:我正在Django1.6中工作,因为这是一段时间以来我一直不确定是否要升级的代码-它不是面向公众的,所以除非现在有一个令人信服的原因(比如这个性能问题),否则我不会(为这个项目)升级。在
流侦听器:
class StdOutListener(tweepy.StreamListener):
def on_data(self, data):
# Twitter returns data in JSON format - we need to decode it first
decoded = json.loads(data)
#print type(decoded), decoded
# Also, we convert UTF-8 to ASCII ignoring all bad characters sent by users
try:
if decoded['lang'] == 'en':
django_rq.enqueue(read_both, decoded)
else:
pass
except KeyError,e:
print "Error on Key", e
except DataError, e:
print "DataError", e
return True
def on_error(self, status):
print status
阅读用户/推文/两者
^{pr2}$
我最终设法拼凑出了一些redditor的答案和其他一些东西。在
基本上,虽然我对id_str字段进行了双重查找,但没有索引。我在
read_tweet
和read_user
的字段中添加了索引db_index=True
,并将read tweet移到try/exceptTweet.objects.create
方法,如果有问题的话,返回到get_或\u create,并且发现速度提高了50-60倍,workers现在是可伸缩的——如果我添加10个worker,我可以获得10倍的速度。在我现在有一个工人正在愉快地每秒处理6条左右的微博。接下来,我将添加一个监视守护进程来检查队列大小,如果队列大小仍在增加,则添加额外的工作线程。在
太长了,读不下去了!在相关问题 更多 >
编程相关推荐