ElasticSearch学习05

Python中的ElasticSearch模块

其实ElasticSearch的接口可以直接使用http来调用

我们使用Python中的request等模块可以直接进行调用

但是ElasticSearch官方也封装了一个模块

我们可以直接使用封装好的模块进行调用, 也比较方便

官方文档

requests模块调用es

直接上个例子吧, 还是比较好理解的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import requests
import json
import time


class ES:
scroll_id = ''
def __init__(self, host, index, auth):
self.scrollUrl = host + "/_search/scroll"
self.url = host + "/" + index + "/"
self.index = index
self.auth = auth

def exportData(self):
try:
data = {'size': 10,
"query": {
"bool": {
"filter": {
"range": {
"@timestamp_02": {
"gte": str(int(round((time.time() - (24 * 60 * 60)) * 1000))), # 24小时前
"lte": str(int(round(time.time() * 1000))) # 当前时间
}
}
}
}
}
}
# 第一次取数据并取出_scroll_id
url = f'{self.url}_search?scroll=1m'
response = requests.get(url, json=data, auth=self.auth)
# 格式化成Python数据的格式
ret_json = response.json()
# 取出_scroll_id
scroll_id = ret_json['_scroll_id']
# 单次取出来的data数据
hits = ret_json['hits']['hits']
# 定义返回的数据
response_data = []
# 循环添加进列表
for i in hits:
response_data.append(i['_source'])
while len(hits) != 0:
curr_url = f'{self.scrollUrl}?scroll=1m&scroll_id={scroll_id}'
response = requests.get(curr_url, auth=self.auth)
if response:
ret_json = response.json()
scroll_id = ret_json['_scroll_id']
hits = ret_json['hits']['hits']
if len(hits) != 0:
for i in hits:
response_data.append(i['_source'])
return response_data
except Exception as err_:
print(err_)
pass


if __name__ == '__main__':
es = ES(host="http://node1:9200", index='efun*', auth=('elastic', '123456'))
try:
all_data = es.exportData()
print('共' + str(len(all_data)) + '条数据')
print(json.dumps(all_data))

except Exception as err_:
print(err_)
pass

安装模块

1
pip install elasticsearch

简单使用

search的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import json
from elasticsearch import Elasticsearch

HOSTS = 'http://node1:9200'
AUTH = ('elastic', '123456')

# 初始化ElasticSearch类
es = Elasticsearch(hosts=HOSTS, http_auth=AUTH)

# 测试获取20条数据
body_01 = {
"size": 20,
"query": {
"match_all": {}
}
}

# 测试使用search进行查询
result = es.search(index="efun*", body=body_01)
print(json.dumps(result))

scroll的使用

使用ElasticSearch模块进行操作

scroll的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from elasticsearch import Elasticsearch
import time

es = Elasticsearch(hosts='http://node1:9200', http_auth=('elastic', '123456'))
data = {'size': 10,
"query": {
"bool": {
"filter": {
"range": {
"@timestamp_02": {
"gte": str(int(round((time.time() - (24 * 60 * 60)) * 1000))), # 24小时前
"lte": str(int(round(time.time() * 1000))) # 当前时间
}
}
}
}
}
}
result = es.search(body=data, index='efun*', scroll='1m')
scroll_id = result['_scroll_id']
print(result)
while True:
result_02 = es.scroll(scroll_id=scroll_id)
print(result_02)
hits_data = result_02['hits']['hits']
if len(hits_data) != 0:
time.sleep(0.1)
else:
break

使用requests配合yield生成器进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import requests
import json
import time


class ES:
scroll_id = None

def __init__(self, host, index, auth, scroll):
self.scrollUrl = host + "/_search/scroll"
self.url = host + "/" + index + "/"
self.index = index
self.auth = auth
self.scroll = scroll

def next(self):
if self.scroll_id:
ret_ = self.__next_search()
return next(ret_)
else:
ret_ = self.__first_seaech()
return ret_

def __first_seaech(self):
data = {'size': 10,
"query": {
"bool": {
"filter": {
"range": {
"@timestamp_02": {
"gte": str(int(round((time.time() - (24 * 60 * 60)) * 1000))), # 24小时前
"lte": str(int(round(time.time() * 1000))) # 当前时间
}
}
}
}
}
}
# 第一次取数据并取出_scroll_id
url = f'{self.url}_search?scroll={self.scroll}'
response = requests.get(url, json=data, auth=self.auth)
# 格式化成Python数据的格式
ret_json = response.json()
# 取出_scroll_id
self.scroll_id = ret_json['_scroll_id']
# 单次取出来的data数据
hits = ret_json['hits']['hits']
# 定义返回的数据
response_data = []
# 循环添加进列表
for i in hits:
response_data.append(i['_source'])
return response_data

def __next_search(self):
curr_url = f'{self.scrollUrl}?scroll=1m&scroll_id={self.scroll_id}'
response = requests.get(curr_url, auth=self.auth)
if response:
ret_json = response.json()
self.scroll_id = ret_json['_scroll_id']
hits = ret_json['hits']['hits']

if hits:
response_data = []
for i in hits:
response_data.append(i['_source'])
yield response_data
else:
self.scroll_id = None
yield None
else:
self.scroll_id = None
yield None


if __name__ == '__main__':
try:
es = ES(host="http://node1:9200", index='efun*', auth=('elastic', '123456'), scroll='3m')
while input("输入任意字符继续:") != None:
all_data = es.next()
if all_data:
print(json.dumps(all_data))
else:
break
except Exception as err_:
print(err_)
pass

bulk的使用

bulk可以一次性执行多条语句

1
2
3
4
5
6
7
8
9
10
11
from elasticsearch import Elasticsearch

es = Elasticsearch(hosts='http://node1:9200', http_auth=('elastic', '123456'))
data01 = {"index": {"_index": "test", "_id": "1"}}
data02 = {"index": {"_index": "test", "_id": "2"}}
data03 = {"index": {"_index": "test", "_id": "3"}}
body = []
body.append(data01)
body.append(data02)
body.append(data03)
print(es.bulk(body=body))