当前位置:   article > 正文

Elasticsearch:将数据从 Elasticsearch 和 Kibana 导出到 Pandas Dataframe_kibana导出es数据

kibana导出es数据

在这篇文章中,我们将看到如何从 Elasticsearch 索引和 Kibana 的 CSV 报告中导出数据 - post-url 到 pandas 数据帧。 数据的可视化可以在 Kibana 中完成,但如果你想对数据进行更精细的分析并创建更动态的可视化,将数据导出到 pandas dataframe 将是一个不错的选择。

在如下的演示中,我将使用 Elastic Stack 8.5.3 来进行展示。

安装

为了说明问题的方便,我们可以选择只有基本安全的 Elastic Stack 安装。我们可以参考之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单” 中的 “如何配置 Elasticsearch 只带有基本安全” 章节。针对我们的安装,我们配置 Elasticsearch 的超级用户 elastic 的密码为 password。你也可以参考另外一篇文章 “Elasticsearch:如何在 Docker 上运行 Elasticsearch 8.x 进行本地开发” 进行安装。

准备数据

我们选用 Kibana 中自带的数据来进行展示。我们打开 Kibana:

这样就有一个叫做 kibana_sample_data_logs 的索引在 Elasticsearch 中被创造。我们在 Discover 中打开:

如上所示,我们可以看到数据的时序直方图。我们选择合适的时间区域,然后添加一个 filter:

如上所示,我们仅做了一个很简单的表格。第一行是 timestamp,而第二行是 geo.dest。我们在搜索中创建了一个 geo.src 为 US 的过滤器。我们保存当前的搜索:

如上所示,我们有两种方法可以得到一个 CSV 格式的输出。一中是使用 Generate CSV 按钮。点击它后,我们可以在如下的地址下载相应的 CSV 文件。 

我们可以看到如上所示 CSV 输出。另外一种方式是使用 POST URL 来通过软件的方式来获得这个数据。我们在上面的图中选择 Copy POST URL。我们可以得到如下所示的一个链接:

http://localhost:5601/api/reporting/generate/csv_searchsource?jobParams=%28browserTimezone%3AAsia%2FShanghai%2Ccolumns%3A%21%28timestamp%2Cgeo.dest%29%2CobjectType%3Asearch%2CsearchSource%3A%28fields%3A%21%28%28field%3Atimestamp%2Cinclude_unmapped%3Atrue%29%2C%28field%3Ageo.dest%2Cinclude_unmapped%3Atrue%29%29%2Cfilter%3A%21%28%28meta%3A%28field%3Atimestamp%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparams%3A%28%29%29%2Cquery%3A%28range%3A%28timestamp%3A%28format%3Astrict_date_optional_time%2Cgte%3Anow-7d%2Fd%2Clte%3Anow%29%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparent%3A%28filter%3A%21%28%28%27%24state%27%3A%28store%3AappState%29%2Cmeta%3A%28alias%3A%21n%2Cdisabled%3A%21f%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Ckey%3Ageo.src%2Cnegate%3A%21f%2Cparams%3A%28query%3AUS%29%2Ctype%3Aphrase%29%2Cquery%3A%28match_phrase%3A%28geo.src%3AUS%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cquery%3A%28language%3Akuery%2Cquery%3A%27%27%29%29%2Csort%3A%21%28%28timestamp%3Adesc%29%29%2CtrackTotalHits%3A%21t%29%2Ctitle%3Asrc-US%2Cversion%3A%278.5.3%27%29

方法一:从 Kibana 中获取数据

我们把上面的链接复制并粘贴到如下的代码中:

kibana-to-pandas.py

  1. import pandas as pd
  2. import requests
  3. from requests.auth import HTTPBasicAuth
  4. from io import StringIO
  5. import json
  6. import time
  7. kibana_ip = "0.0.0.0"
  8. headers = {"kbn-xsrf": "reporting"}
  9. # post_url = 'http://' + kibana_ip + \
  10. # '/api/reporting/generate/csv?jobParams=(conflictedTypesFields:!(),fields:!(xxxxx))'
  11. post_url = "http://localhost:5601/api/reporting/generate/csv_searchsource?jobParams=%28browserTimezone%3AAsia%2FShanghai%2Ccolumns%3A%21%28timestamp%2Cgeo.dest%29%2CobjectType%3Asearch%2CsearchSource%3A%28fields%3A%21%28%28field%3Atimestamp%2Cinclude_unmapped%3Atrue%29%2C%28field%3Ageo.dest%2Cinclude_unmapped%3Atrue%29%29%2Cfilter%3A%21%28%28meta%3A%28field%3Atimestamp%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparams%3A%28%29%29%2Cquery%3A%28range%3A%28timestamp%3A%28format%3Astrict_date_optional_time%2Cgte%3Anow-7d%2Fd%2Clte%3Anow%29%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cparent%3A%28filter%3A%21%28%28%27%24state%27%3A%28store%3AappState%29%2Cmeta%3A%28alias%3A%21n%2Cdisabled%3A%21f%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Ckey%3Ageo.src%2Cnegate%3A%21f%2Cparams%3A%28query%3AUS%29%2Ctype%3Aphrase%29%2Cquery%3A%28match_phrase%3A%28geo.src%3AUS%29%29%29%29%2Cindex%3A%2790943e30-9a47-11e8-b64d-95841ca0b247%27%2Cquery%3A%28language%3Akuery%2Cquery%3A%27%27%29%29%2Csort%3A%21%28%28timestamp%3Adesc%29%29%2CtrackTotalHits%3A%21t%29%2Ctitle%3Asrc-US%2Cversion%3A%278.5.3%27%29"
  12. # print(post_url)
  13. post_url_data = requests.post(post_url, \
  14. auth = HTTPBasicAuth('elastic', 'password'), \
  15. headers = headers)
  16. get_api_json = json.loads(post_url_data.text)
  17. print(get_api_json)
  18. time.sleep(10)
  19. print(get_api_json['path'])
  20. api_url = "http://" + "localhost:5601" + get_api_json['path']
  21. print(api_url)
  22. csv_url = requests.get(api_url, \
  23. auth = HTTPBasicAuth('elastic', 'password'), \
  24. headers = headers)
  25. print(csv_url)
  26. traffic_data = pd.read_csv(StringIO(csv_url.text))
  27. print(traffic_data.head())
  28. print(traffic_data)

在上面的代码中,特别需要注意的是:

time.sleep(10)

我们在发送完请求后,需要等待一定的时间让 Kibana 做相应的处理,并得到相应的数据。在上面,我使用了超级用户 elastic 的账号信息。运行上面的代码:

  1. $ pwd
  2. /Users/liuxg/python/pandas
  3. $ ls
  4. kibana-to-pandas.py
  5. $ python kibana-to-pandas.py
  6. {'path': '/api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0', 'job': {'id': 'ldfcx8kq1k9b9c2bfe5reij0', 'index': '.reporting-2023-01-22', 'jobtype': 'csv_searchsource', 'created_at': '2023-01-28T02:51:55.178Z', 'created_by': 'elastic', 'meta': {'objectType': 'search'}, 'status': 'pending', 'attempts': 0, 'migration_version': '7.14.0', 'payload': {'browserTimezone': 'Asia/Shanghai', 'columns': ['timestamp', 'geo.dest'], 'objectType': 'search', 'searchSource': {'fields': [{'field': 'timestamp', 'include_unmapped': 'true'}, {'field': 'geo.dest', 'include_unmapped': 'true'}], 'filter': [{'meta': {'field': 'timestamp', 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'params': {}}, 'query': {'range': {'timestamp': {'format': 'strict_date_optional_time', 'gte': 'now-7d/d', 'lte': 'now'}}}}], 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'parent': {'filter': [{'$state': {'store': 'appState'}, 'meta': {'alias': None, 'disabled': False, 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'key': 'geo.src', 'negate': False, 'params': {'query': 'US'}, 'type': 'phrase'}, 'query': {'match_phrase': {'geo.src': 'US'}}}], 'index': '90943e30-9a47-11e8-b64d-95841ca0b247', 'query': {'language': 'kuery', 'query': ''}}, 'sort': [{'timestamp': 'desc'}], 'trackTotalHits': True}, 'title': 'src-US', 'version': '8.5.3'}, 'output': {}}}
  7. /api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0
  8. http://localhost:5601/api/reporting/jobs/download/ldfcx8kq1k9b9c2bfe5reij0
  9. <Response [200]>
  10. timestamp geo.dest
  11. 0 Jan 28, 2023 @ 09:15:02.127 CN
  12. 1 Jan 28, 2023 @ 09:00:52.596 CN
  13. 2 Jan 28, 2023 @ 08:17:33.769 IN
  14. 3 Jan 28, 2023 @ 05:15:19.548 RU
  15. 4 Jan 28, 2023 @ 04:18:45.660 KE
  16. timestamp geo.dest
  17. 0 Jan 28, 2023 @ 09:15:02.127 CN
  18. 1 Jan 28, 2023 @ 09:00:52.596 CN
  19. 2 Jan 28, 2023 @ 08:17:33.769 IN
  20. 3 Jan 28, 2023 @ 05:15:19.548 RU
  21. 4 Jan 28, 2023 @ 04:18:45.660 KE
  22. ... ... ...
  23. 1608 Jan 21, 2023 @ 11:30:30.110 TJ
  24. 1609 Jan 21, 2023 @ 11:14:28.231 IN
  25. 1610 Jan 21, 2023 @ 11:05:31.057 FR
  26. 1611 Jan 21, 2023 @ 10:40:26.055 TR
  27. 1612 Jan 21, 2023 @ 10:24:53.405 IN
  28. [1613 rows x 2 columns]

从上面的输出中,我们可以看到 pandas 的 dataframe 输出。

请注意,它只能根据您在 kibana.yml 文件中指定的字节大小检索部分数据。 请增加 xpack.reporting.csv.maxSizeBytes 的值以获得完整数据。

方法二:从 Elasticsearch 中获取数据

下面的代码片段将有助于直接从 Elasticsearch 索引中检索数据,但它不足以检索大量数据,因此你可能需要根据你的情况决定使用 kibana 的 csv 报告还是 Elasticsearch 索引要求。

关于如何连接到 Elasticsearch,请参阅我之前的文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”。我们创建如下的一个 python 文件:

elasticsearch-to-pandas.py

  1. from elasticsearch import Elasticsearch
  2. import pandas as pd
  3. import numpy as np
  4. import json
  5. # create a client instance of the library
  6. es = Elasticsearch("http://localhost:9200", basic_auth=("elastic", "password"))
  7. resp = es.info()
  8. # print(resp)
  9. total_docs = 50
  10. search_query = {
  11. "match_all": {}
  12. }
  13. response = es.search(
  14. _source="false",
  15. index='kibana_sample_data_logs',
  16. query=search_query,
  17. size=total_docs,
  18. fields=[
  19. "@timestamp",
  20. "clientip",
  21. "host"
  22. ]
  23. )
  24. # print(response)
  25. elastic_docs = response["hits"]["hits"]
  26. # print(elastic_docs)
  27. fields = {}
  28. for num, doc in enumerate(elastic_docs):
  29. fields_data = doc["fields"]
  30. for key, val in fields_data.items():
  31. try:
  32. fields[key] = np.append(fields[key], val)
  33. except KeyError:
  34. fields[key] = np.array([val])
  35. # print(fields)
  36. traffic_data = pd.DataFrame(fields)
  37. print(traffic_data.info())

在上面我们做了如下的一个搜索:

  1. GET kibana_sample_data_logs/_search
  2. {
  3. "_source": false,
  4. "query": {
  5. "match_all": {}
  6. },
  7. "fields": [
  8. "@timestamp",
  9. "clientip",
  10. "host"
  11. ]
  12. }

上面的查询返回如下的结果:

  1. {
  2. "took": 0,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 10000,
  13. "relation": "gte"
  14. },
  15. "max_score": 1,
  16. "hits": [
  17. {
  18. "_index": "kibana_sample_data_logs",
  19. "_id": "Gf8y9oUBLUWnAhRe7p8u",
  20. "_score": 1,
  21. "fields": {
  22. "@timestamp": [
  23. "2023-01-15T00:39:02.912Z"
  24. ],
  25. "clientip": [
  26. "223.87.60.27"
  27. ],
  28. "host": [
  29. "artifacts.elastic.co"
  30. ]
  31. }
  32. },
  33. {
  34. "_index": "kibana_sample_data_logs",
  35. "_id": "Gv8y9oUBLUWnAhRe7p8u",
  36. "_score": 1,
  37. "fields": {
  38. "@timestamp": [
  39. "2023-01-15T03:26:21.326Z"
  40. ],
  41. "clientip": [
  42. "130.246.123.197"
  43. ],
  44. "host": [
  45. "www.elastic.co"
  46. ]
  47. }
  48. },
  49. {
  50. "_index": "kibana_sample_data_logs",
  51. "_id": "G_8y9oUBLUWnAhRe7p8u",
  52. "_score": 1,
  53. "fields": {
  54. "@timestamp": [
  55. "2023-01-15T03:30:25.131Z"
  56. ],
  57. "clientip": [
  58. "120.49.143.213"
  59. ],
  60. "host": [
  61. "cdn.elastic-elastic-elastic.org"
  62. ]
  63. }
  64. },
  65. {
  66. "_index": "kibana_sample_data_logs",
  67. "_id": "HP8y9oUBLUWnAhRe7p8u",
  68. "_score": 1,
  69. "fields": {
  70. "@timestamp": [
  71. "2023-01-15T03:34:43.399Z"
  72. ],
  73. "clientip": [
  74. "99.74.118.237"
  75. ],
  76. "host": [
  77. "artifacts.elastic.co"
  78. ]
  79. }
  80. },
  81. {
  82. "_index": "kibana_sample_data_logs",
  83. "_id": "Hf8y9oUBLUWnAhRe7p8u",
  84. "_score": 1,
  85. "fields": {
  86. "@timestamp": [
  87. "2023-01-15T03:37:04.863Z"
  88. ],
  89. "clientip": [
  90. "177.111.217.54"
  91. ],
  92. "host": [
  93. "www.elastic.co"
  94. ]
  95. }
  96. },
  97. {
  98. "_index": "kibana_sample_data_logs",
  99. "_id": "Hv8y9oUBLUWnAhRe7p8u",
  100. "_score": 1,
  101. "fields": {
  102. "@timestamp": [
  103. "2023-01-15T03:49:40.669Z"
  104. ],
  105. "clientip": [
  106. "106.225.58.146"
  107. ],
  108. "host": [
  109. "www.elastic.co"
  110. ]
  111. }
  112. }
  113. ...

运行上面的代码,我们可以得到如下的数据:

  1. $ pwd
  2. /Users/liuxg/python/pandas
  3. $ ls
  4. elasticsearch-to-pandas.py kibana-to-pandas.py
  5. $ python elasticsearch-to-pandas.py
  6. <class 'pandas.core.frame.DataFrame'>
  7. RangeIndex: 50 entries, 0 to 49
  8. Data columns (total 3 columns):
  9. # Column Non-Null Count Dtype
  10. --- ------ -------------- -----
  11. 0 @timestamp 50 non-null object
  12. 1 clientip 50 non-null object
  13. 2 host 50 non-null object
  14. dtypes: object(3)
  15. memory usage: 1.3+ KB
  16. None
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/391298
推荐阅读
相关标签
  

闽ICP备14008679号