当前位置:   article > 正文

使用python查询Elasticsearch并导出所有数据_python elasticsearch查询

python elasticsearch查询

作为数据分析师,要学的可真多!但一旦触及到数据,无论它在藏在哪里,只要我们想要使用,就真是无所不用其极阿,python刀操起来~

刚开始一直通过Kibana提供的工具来查询存储在Elasticsearch中的数据,统计个结果完全没问题。偶一日不仅仅是需要查询统计个结果,而是要对满足条件的结果近一步分析,此时发现手头的Kibana有点囧…

我神气的python出场啦

为了实现自己的需求,搬了会砖,又是修又是补,终于算是满足了自己的要求!

功能拆解

  1. python连接Elasticsearch
  2. 查询Elasticsearch打印结果
  3. 导出所有结果数据
  4. 将所有结果写入csv文件

1.打通python与Elasticsearch的通信

与python连接Oracle、MySQL差不多思路,这里需要用到Elasticsearch包,没有的赶紧使用pip install elasticsearch来安装。安装成功后,再使用from elasticsearch import Elasticsearch就不会报错了。

from elasticsearch import Elasticsearch

es = Elasticsearch(hosts="http://192.168.21.33:9200/", http_auth=('abc','dataanalysis'))
print(es.info())
  • 1
  • 2
  • 3
  • 4

通过Elasticsearch()来配置连接,告诉它Elasticsearch所在服务器的IP地址,如果需要输入用户名密码,在http_auth参数中给出。如果打印连接的信息不报错,那就表明连接成功了

2.通过json查询体实现ES的查询

请求体与Kibana下使用的格式完全一致,如果不确定请求体写的对不对,可以放在Kibana下调试一下,调试正确了再放进来。

如下所示,通过"_source" : "title"可以限制返回结果只返回title字段。

query_json = {
  "_source": "title",
  "query": {
    "bool": {
      "must": [
        {"match_phrase": {
          "content": "汽车"
        }},
        {"match_phrase": {
          "content": "房子"
        }}
      ]
    }
  }
}

query = es.search(index='mydata',body=query_json)
print(query)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

正常情况下,打印query不报错的话就可以看到结果了。但是,你会发现返回的结果只有有限的几条。这是因为Elasticsearch默认情况下只会返回10或20条结果,如果你想要得到所有结果,接下来的工作才是重点。

3.借助游标导出所有结果数据

敲黑板,划重点:

  • 先借助游标,将所有结果数据存储到内存中
  • 然后将内存中的结果数据写入到磁盘,也就是文件中
query = es.search(index='1485073708892',body=query_json,scroll='5m',size=100)

results = query['hits']['hits'] # es查询出的结果第一页
total = query['hits']['total']  # es查询出的结果总量
scroll_id = query['_scroll_id'] # 游标用于输出es查询出的所有结果

for i in range(0, int(total/100)+1):
    # scroll参数必须指定否则会报错
    query_scroll = es.scroll(scroll_id=scroll_id,scroll='5m')['hits']['hits']
    results += query_scroll
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在发送查询请求的时候,就告诉ES需要使用游标,并定义每次返回数据量的大小。

定义一个list变量results用来存储数据结果,在代码中,可以另其为空list,即results = [],也可以先将返回结果的第一页存进来,即resutls = query[‘hits’][‘hits’]

对于所有结果数据,写个分页加载到内存变量的循环。

4.将结果写入csv文件

import csv

with open('./data/event_title.csv','w',newline='',encoding='utf-8') as flow:
    csv_writer = csv.writer(flow)
    for res in results:
        # print(res)
        csv_writer.writerow([res['_id']+','+res['_source']['title']])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Done!全部代码如下所示:

import csv
from elasticsearch import Elasticsearch

# 查看参数配置:https://pypi.org/project/elasticsearch/
es = Elasticsearch(hosts="http://192.168.21.33:9200/", http_auth=('abc','dataanalysis'))
query_json = {
  "_source": "title",
  "query": {
    "bool": {
      "must": [
        {"match_phrase": {
          "content": "汽车"
        }},
        {"match_phrase": {
          "content": "房子"
        }}
      ]
    }
  }
}
query = es.search(index='1485073708892',body=query_json,scroll='5m',size=100)

results = query['hits']['hits'] # es查询出的结果第一页
total = query['hits']['total']  # es查询出的结果总量
scroll_id = query['_scroll_id'] # 游标用于输出es查询出的所有结果

for i in range(0, int(total/100)+1):
    # scroll参数必须指定否则会报错
    query_scroll = es.scroll(scroll_id=scroll_id,scroll='5m')['hits']['hits']
    results += query_scroll


with open('./data/event_title.csv','w',newline='',encoding='utf-8') as flow:
    csv_writer = csv.writer(flow)
    for res in results:
        # print(res)
        csv_writer.writerow([res['_id']+','+res['_source']['title']])


print('done!')
# print(es.info())

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/391275
推荐阅读
相关标签