使用PyEs分块批量索引Elasticsearch

4 投票
3 回答
8117 浏览
提问于 2025-04-17 11:03

我有一个简单的Python脚本,用来给一个包含100万行的CSV文件建立索引:

import csv
from pyes import *

reader = csv.reader(open('data.csv', 'rb'))

conn = ES('127.0.0.1:9200', timeout=20.0)

counter = 0
for row in reader:
        try:
                data = {"name":row[5]}
                conn.index(data,'namesdb',counter, bulk=True)
                counter += 1
        except:
                pass

这个方法效果还不错,但当数据量达到几千行时,速度就开始急剧下降。

我在想,如果把索引分成更小的部分来做,可能会让ES(Elasticsearch)表现得更好。

有没有更有效的方法呢?使用sleep()延迟会有帮助吗?或者有没有简单的方法可以把CSV文件程序上分成更小的部分?

谢谢。

3 个回答

1

对于未来的访问者,Elasticsearch-py 支持在一次调用中进行批量操作。请注意,每个文档中的 _op_type 字段决定了执行哪种操作(如果没有这个字段,默认会使用 index 操作)

例如:

import elasticsearch as ES
import elasticsearch.helpers as ESH

es = ES.Elasticsearch()
docs = [ doc1, doc2, doc3 ]

n_success, n_fail = ESH.bulk(es, docs, index='test_index', doc_type='test_doc',
                             stats_only=True)
5

在创建ES实例的时候,你可以调整批量处理的大小。像这样:

conn = ES('127.0.0.1:9200', timeout=20.0, bulk_size=100)

默认的批量大小是400。这意味着,当你有400个文档时,pyes会自动发送这些内容。如果你想在达到这个批量大小之前就发送数据(比如在退出之前),你可以调用conn.flush_bulk(forced=True)。

我不太确定每插入第N个文档就手动刷新索引是否是最好的方法。Elasticsearch默认每秒会自动刷新一次。你可以选择延长这个时间。像这样:

curl -XPUT localhost:9200/namesdb/_settings -d '{
    "index" : {
        "refresh_interval" : "3s"
    }
}'

或者,你可以像Dragan建议的那样手动刷新,但在这种情况下,可能需要通过将间隔设置为“-1”来禁用Elasticsearch的自动刷新。不过,你不需要每插入X个文档就刷新一次,可以在插入完所有文档后再刷新。

更多详细信息可以查看这里: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-update-settings.html

请注意,刷新操作是比较耗资源的。根据我的经验,最好选择以下两种方式之一: - 让Elasticsearch在后台自动刷新 - 完全禁用刷新,等我插入完所有文档后再重新启用刷新。

3

每隔N次计数就执行一次

es.refresh()

示例可以在这里找到

撰写回答