使用PyEs分块批量索引Elasticsearch
我有一个简单的Python脚本,用来给一个包含100万行的CSV文件建立索引:
import csv
from pyes import *
reader = csv.reader(open('data.csv', 'rb'))
conn = ES('127.0.0.1:9200', timeout=20.0)
counter = 0
for row in reader:
try:
data = {"name":row[5]}
conn.index(data,'namesdb',counter, bulk=True)
counter += 1
except:
pass
这个方法效果还不错,但当数据量达到几千行时,速度就开始急剧下降。
我在想,如果把索引分成更小的部分来做,可能会让ES(Elasticsearch)表现得更好。
有没有更有效的方法呢?使用sleep()延迟会有帮助吗?或者有没有简单的方法可以把CSV文件程序上分成更小的部分?
谢谢。
3 个回答
对于未来的访问者,Elasticsearch-py 支持在一次调用中进行批量操作。请注意,每个文档中的 _op_type
字段决定了执行哪种操作(如果没有这个字段,默认会使用 index
操作)
例如:
import elasticsearch as ES
import elasticsearch.helpers as ESH
es = ES.Elasticsearch()
docs = [ doc1, doc2, doc3 ]
n_success, n_fail = ESH.bulk(es, docs, index='test_index', doc_type='test_doc',
stats_only=True)
在创建ES实例的时候,你可以调整批量处理的大小。像这样:
conn = ES('127.0.0.1:9200', timeout=20.0, bulk_size=100)
默认的批量大小是400。这意味着,当你有400个文档时,pyes会自动发送这些内容。如果你想在达到这个批量大小之前就发送数据(比如在退出之前),你可以调用conn.flush_bulk(forced=True)。
我不太确定每插入第N个文档就手动刷新索引是否是最好的方法。Elasticsearch默认每秒会自动刷新一次。你可以选择延长这个时间。像这样:
curl -XPUT localhost:9200/namesdb/_settings -d '{
"index" : {
"refresh_interval" : "3s"
}
}'
或者,你可以像Dragan建议的那样手动刷新,但在这种情况下,可能需要通过将间隔设置为“-1”来禁用Elasticsearch的自动刷新。不过,你不需要每插入X个文档就刷新一次,可以在插入完所有文档后再刷新。
更多详细信息可以查看这里: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-update-settings.html
请注意,刷新操作是比较耗资源的。根据我的经验,最好选择以下两种方式之一: - 让Elasticsearch在后台自动刷新 - 完全禁用刷新,等我插入完所有文档后再重新启用刷新。