使用Python客户端进行Elasticsearch滚动查询
在使用elasticsearch进行滚动查询时,每次滚动都需要提供最新的 scroll_id
:
第一次搜索请求和每次后续的滚动请求都会返回一个新的
scroll_id
— 只应该使用最新的scroll_id
。
下面这个例子(来自 这里)让我有点困惑。首先是滚动的初始化:
rs = es.search(index=['tweets-2014-04-12','tweets-2014-04-13'],
scroll='10s',
search_type='scan',
size=100,
preference='_primary_first',
body={
"fields" : ["created_at", "entities.urls.expanded_url", "user.id_str"],
"query" : {
"wildcard" : { "entities.urls.expanded_url" : "*.ru" }
}
}
)
sid = rs['_scroll_id']
然后是循环的部分:
tweets = [] while (1):
try:
rs = es.scroll(scroll_id=sid, scroll='10s')
tweets += rs['hits']['hits']
except:
break
这个代码是可以运行的,但我看不出 sid
是在哪里更新的……我觉得这个更新是在python客户端内部进行的;但是我不太明白它是怎么工作的……
5 个回答
在编程中,有时候我们需要处理一些数据,这些数据可能来自不同的地方,比如用户输入、文件或者网络请求。为了让程序能够理解这些数据,我们通常会把它们转换成一种程序能处理的格式。
比如说,如果我们从用户那里得到一个数字的字符串(像“123”),我们可能需要把它转换成真正的数字(123),这样程序才能进行数学运算。
这个过程叫做“类型转换”,就是把一种数据类型变成另一种数据类型。常见的类型有数字、字符串、布尔值(真或假)等。
在编程中,类型转换有两种方式:一种是自动转换,另一种是手动转换。自动转换是指程序自己会根据需要把数据转换成合适的类型,而手动转换则是我们自己告诉程序怎么转换。
理解这些概念后,我们就能更好地处理数据,让程序运行得更加顺利。
from elasticsearch import Elasticsearch
elasticsearch_user_name ='es_username'
elasticsearch_user_password ='es_password'
es_index = "es_index"
es = Elasticsearch(["127.0.0.1:9200"],
http_auth=(elasticsearch_user_name, elasticsearch_user_password))
query = {
"query": {
"bool": {
"must": [
{
"range": {
"es_datetime": {
"gte": "2021-06-21T09:00:00.356Z",
"lte": "2021-06-21T09:01:00.356Z",
"format": "strict_date_optional_time"
}
}
}
]
}
},
"fields": [
"*"
],
"_source": False,
"size": 2000,
}
resp = es.search(index=es_index, body=query, scroll="1m")
old_scroll_id = resp['_scroll_id']
results = resp['hits']['hits']
while len(results):
for i, r in enumerate(results):
# do something whih data
pass
result = es.scroll(
scroll_id=old_scroll_id,
scroll='1m' # length of time to keep search context
)
# check if there's a new scroll ID
if old_scroll_id != result['_scroll_id']:
print("NEW SCROLL ID:", result['_scroll_id'])
# keep track of pass scroll _id
old_scroll_id = result['_scroll_id']
results = result['hits']['hits']
self._elkUrl = "http://Hostname:9200/logstash-*/_search?scroll=1m"
self._scrollUrl="http://Hostname:9200/_search/scroll"
"""
Function to get the data from ELK through scrolling mechanism
"""
def GetDataFromELK(self):
#implementing scroll and retriving data from elk to get more than 100000 records at one search
#ref :https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-scroll.html
try :
dataFrame=pd.DataFrame()
if self._elkUrl is None:
raise ValueError("_elkUrl is missing")
if self._username is None:
raise ValueError("_userNmae for elk is missing")
if self._password is None:
raise ValueError("_password for elk is missing")
response=requests.post(self._elkUrl,json=self.body,auth=(self._username,self._password))
response=response.json()
if response is None:
raise ValueError("response is missing")
sid = response['_scroll_id']
hits = response['hits']
total= hits["total"]
if total is None:
raise ValueError("total hits from ELK is none")
total_val=int(total['value'])
url = self._scrollUrl
if url is None:
raise ValueError("scroll url is missing")
#start scrolling
while(total_val>0):
#keep search context alive for 2m
scroll = '2m'
scroll_query={"scroll" : scroll, "scroll_id" : sid }
response1=requests.post(url,json=scroll_query,auth=(self._username,self._password))
response1=response1.json()
# The result from the above request includes a scroll_id, which should be passed to the scroll API in order to retrieve the next batch of results
sid = response1['_scroll_id']
hits=response1['hits']
data=response1['hits']['hits']
if len(data)>0:
cleanDataFrame=self.DataClean(data)
dataFrame=dataFrame.append(cleanDataFrame)
total_val=len(response1['hits']['hits'])
num=len(dataFrame)
print('Total records recieved from ELK=',num)
return dataFrame
except Exception as e:
logging.error('Error while getting the data from elk', exc_info=e)
sys.exit()
实际上,这段代码有个错误——为了正确使用滚动功能,你应该在每次调用scroll()时使用新返回的scroll_id,而不是重复使用第一个返回的那个:
重要提示
初始搜索请求和每次后续的滚动请求都会返回一个新的scroll_id——你只应该使用最新的scroll_id。
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html
之所以能工作,是因为Elasticsearch在调用之间并不总是更改scroll_id,对于较小的结果集,它可能会在一段时间内返回与最初相同的scroll_id。去年有两个用户讨论过这个问题,他们也遇到了相同的情况,发现一段时间内返回的scroll_id是一样的:
所以虽然你的代码在处理较小的结果集时能正常工作,但这并不正确——你需要在每次新的scroll()调用中获取返回的scroll_id,并在下一次调用时使用它。
使用Python的requests库
import requests
import json
elastic_url = 'http://localhost:9200/my_index/_search?scroll=1m'
scroll_api_url = 'http://localhost:9200/_search/scroll'
headers = {'Content-Type': 'application/json'}
payload = {
"size": 100,
"sort": ["_doc"]
"query": {
"match" : {
"title" : "elasticsearch"
}
}
}
r1 = requests.request(
"POST",
elastic_url,
data=json.dumps(payload),
headers=headers
)
# first batch data
try:
res_json = r1.json()
data = res_json['hits']['hits']
_scroll_id = res_json['_scroll_id']
except KeyError:
data = []
_scroll_id = None
print 'Error: Elastic Search: %s' % str(r1.json())
while data:
print data
# scroll to get next batch data
scroll_payload = json.dumps({
'scroll': '1m',
'scroll_id': _scroll_id
})
scroll_res = requests.request(
"POST", scroll_api_url,
data=scroll_payload,
headers=headers
)
try:
res_json = scroll_res.json()
data = res_json['hits']['hits']
_scroll_id = res_json['_scroll_id']
except KeyError:
data = []
_scroll_id = None
err_msg = 'Error: Elastic Search Scroll: %s'
print err_msg % str(scroll_res.json())
这是一个老问题,但在搜索“elasticsearch python scroll”时,出乎意料地排在了第一位。这个Python模块提供了一个辅助方法,可以为你处理所有的工作。它是一个生成器函数,会在管理底层的滚动ID的同时,把每个文档返回给你。
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#scan
下面是一个使用示例:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
query = {
"query": {"match_all": {}}
}
es = Elasticsearch(...)
for hit in scan(es, index="my-index", query=query):
print(hit["_source"]["field"])