ElasticSearch学习05
Python中的ElasticSearch模块
其实ElasticSearch的接口可以直接使用http来调用
我们使用Python中的request等模块可以直接进行调用
但是ElasticSearch官方也封装了一个模块
我们可以直接使用封装好的模块进行调用, 也比较方便
官方文档
requests模块调用es
直接上个例子吧, 还是比较好理解的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import requests import json import time
class ES: scroll_id = '' def __init__(self, host, index, auth): self.scrollUrl = host + "/_search/scroll" self.url = host + "/" + index + "/" self.index = index self.auth = auth
def exportData(self): try: data = {'size': 10, "query": { "bool": { "filter": { "range": { "@timestamp_02": { "gte": str(int(round((time.time() - (24 * 60 * 60)) * 1000))), "lte": str(int(round(time.time() * 1000))) } } } } } } url = f'{self.url}_search?scroll=1m' response = requests.get(url, json=data, auth=self.auth) ret_json = response.json() scroll_id = ret_json['_scroll_id'] hits = ret_json['hits']['hits'] response_data = [] for i in hits: response_data.append(i['_source']) while len(hits) != 0: curr_url = f'{self.scrollUrl}?scroll=1m&scroll_id={scroll_id}' response = requests.get(curr_url, auth=self.auth) if response: ret_json = response.json() scroll_id = ret_json['_scroll_id'] hits = ret_json['hits']['hits'] if len(hits) != 0: for i in hits: response_data.append(i['_source']) return response_data except Exception as err_: print(err_) pass
if __name__ == '__main__': es = ES(host="http://node1:9200", index='efun*', auth=('elastic', '123456')) try: all_data = es.exportData() print('共' + str(len(all_data)) + '条数据') print(json.dumps(all_data))
except Exception as err_: print(err_) pass
|
安装模块
1
| pip install elasticsearch
|
简单使用
search的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import json from elasticsearch import Elasticsearch
HOSTS = 'http://node1:9200' AUTH = ('elastic', '123456')
es = Elasticsearch(hosts=HOSTS, http_auth=AUTH)
body_01 = { "size": 20, "query": { "match_all": {} } }
result = es.search(index="efun*", body=body_01) print(json.dumps(result))
|
使用ElasticSearch模块进行操作
scroll的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| from elasticsearch import Elasticsearch import time
es = Elasticsearch(hosts='http://node1:9200', http_auth=('elastic', '123456')) data = {'size': 10, "query": { "bool": { "filter": { "range": { "@timestamp_02": { "gte": str(int(round((time.time() - (24 * 60 * 60)) * 1000))), "lte": str(int(round(time.time() * 1000))) } } } } } } result = es.search(body=data, index='efun*', scroll='1m') scroll_id = result['_scroll_id'] print(result) while True: result_02 = es.scroll(scroll_id=scroll_id) print(result_02) hits_data = result_02['hits']['hits'] if len(hits_data) != 0: time.sleep(0.1) else: break
|
使用requests配合yield生成器进行操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| import requests import json import time
class ES: scroll_id = None
def __init__(self, host, index, auth, scroll): self.scrollUrl = host + "/_search/scroll" self.url = host + "/" + index + "/" self.index = index self.auth = auth self.scroll = scroll
def next(self): if self.scroll_id: ret_ = self.__next_search() return next(ret_) else: ret_ = self.__first_seaech() return ret_
def __first_seaech(self): data = {'size': 10, "query": { "bool": { "filter": { "range": { "@timestamp_02": { "gte": str(int(round((time.time() - (24 * 60 * 60)) * 1000))), "lte": str(int(round(time.time() * 1000))) } } } } } } url = f'{self.url}_search?scroll={self.scroll}' response = requests.get(url, json=data, auth=self.auth) ret_json = response.json() self.scroll_id = ret_json['_scroll_id'] hits = ret_json['hits']['hits'] response_data = [] for i in hits: response_data.append(i['_source']) return response_data
def __next_search(self): curr_url = f'{self.scrollUrl}?scroll=1m&scroll_id={self.scroll_id}' response = requests.get(curr_url, auth=self.auth) if response: ret_json = response.json() self.scroll_id = ret_json['_scroll_id'] hits = ret_json['hits']['hits']
if hits: response_data = [] for i in hits: response_data.append(i['_source']) yield response_data else: self.scroll_id = None yield None else: self.scroll_id = None yield None
if __name__ == '__main__': try: es = ES(host="http://node1:9200", index='efun*', auth=('elastic', '123456'), scroll='3m') while input("输入任意字符继续:") != None: all_data = es.next() if all_data: print(json.dumps(all_data)) else: break except Exception as err_: print(err_) pass
|
bulk的使用
bulk可以一次性执行多条语句
1 2 3 4 5 6 7 8 9 10 11
| from elasticsearch import Elasticsearch
es = Elasticsearch(hosts='http://node1:9200', http_auth=('elastic', '123456')) data01 = {"index": {"_index": "test", "_id": "1"}} data02 = {"index": {"_index": "test", "_id": "2"}} data03 = {"index": {"_index": "test", "_id": "3"}} body = [] body.append(data01) body.append(data02) body.append(data03) print(es.bulk(body=body))
|