有人能建议如何使用函数elasticsearch.helpers.streaming_bulk代替elasticsearch.helpers.bulk将数据索引到elasticsearch吗。
如果我只是简单地改变流媒体的批量而不是批量,就不会有索引,所以我想它需要以不同的形式使用。
下面的代码将从CSV文件中创建索引、类型和索引数据,这些数据以500个elemens的块形式存储到elasticsearch中。它工作正常,但我不知道是否有可能提高性能。这就是为什么我想尝试流媒体批量功能。
目前,我需要10分钟为200MB的CSV文档索引100万行。我使用两台机器,Centos 6.6和8个CPU-s,x86ʂ64,CPU MHz:2499.902,内存:总共15.574G。 不知道能不能再快一点。
es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file
es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)
with open(file_to_index, 'rb') as csvfile:
reader = csv.reader(csvfile) #read documents for indexing from CSV file, more than million rows
content = {"_index": index_name, "_type": type_name}
batch_chunks = []
iterator = 0
for row in reader:
var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
id_increment = id_increment + 1
#var = transform_row_for_indexing(row,fields, index_name, type_name)
batch_chunks.append(var)
if iterator % 500 == 0:
helpers.bulk(es,batch_chunks)
del batch_chunks[:]
print "ispucalo batch"
iterator = iterator + 1
# indexing of last batch_chunk
if len(batch_chunks) != 0:
helpers.bulk(es,batch_chunks)
所以流式大容量返回一个激励器。这意味着在你开始迭代之前什么都不会发生。“bulk”函数的代码如下所示:
所以基本上只调用streaming_bulk(client,actions,**kwargs)实际上不会做任何事情。直到像在这个for循环中那样迭代它,索引才真正开始发生。
所以在你的代码里。欢迎您将“bulk”更改为“streaming_bulk”,但是您需要迭代streaming bulk的结果,以便实际索引任何内容。
streaming_bulk
使用actions
的迭代器并为每个操作生成一个响应。 因此,首先需要在文档上编写一个简单的迭代器,如下所示:然后执行流式大容量插入
相关问题 更多 >
编程相关推荐