当前位置:   article > 正文

Elasticsearch-ELK使用入门(版本7.3.2)_es 7.3.2讲解入门

es 7.3.2讲解入门

ELK结构图

kibana 数据可视化工具 类似Navicat

Elasticsearch ES数据库

Beats 轻量级数据获取组件

Logstash 基于Beats的数据抽取工具,ETL

全部都是开源的。

 

 

 

 

Elasticsearch

elasticsearch简称ES,是非关系型数据库,采用倒序排序,具有强大的搜索能力,基于java开发的,所以安装时要查询下对应版本采用的JDK版本有没有安装。

需要注意的是,查询全部资料显示,如果使用ELK(Elasticsearch+Logstach+Kibana),一定要注意版本号,最好使用同一版本的,否则会出现很多问题,因为ELK的版本兼容性不是很好。

重点注意:

在理解Elasticsearch时,有个重要的概念,type(类型)有必要详细说下。

在ES6.0版本之前,一个索引对应多个类型,但是在ES6.0~7.0的版本,一个索引对应一个类型,在ES7.0以后直接去掉了类型这个概念。默认类型为_doc,对ES这个非关系型数据,可以这样理解,一个index(索引),相当于一张数据表,_id就是主键,mapping就是字段设置,settings就是表设置。一个文档就是一行数据。

(可能这样理解和本意有些出入,但是对于用惯了关系型数据库的人来说很容易接受和理解)

接下来介绍下如何在java项目中集成Elasticsearch

1.maven 依赖引入(jar包引入)

    千万别信官网说的引入个elasticsearch-rest-high-level-client就行了,下面的两个也都要,而且要注意版本统一,elasticsearch-rest-high-level-client内部和下面的包有依赖关系的。(maven中心库用的阿里的镜像,不同镜像pom中的groupid和artifactid可能不同,如果不正确会导致无法获取)

  1. <!-- 增加ES数据库相关包 begin-->
  2. <dependency>
  3. <groupId>org.elasticsearch.client</groupId>
  4. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  5. <version>7.3.2</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.elasticsearch.client</groupId>
  9. <artifactId>elasticsearch-rest-client</artifactId>
  10. <version>7.3.2</version>
  11. <classifier>sources</classifier>
  12. <type>java-source</type>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.elasticsearch</groupId>
  16. <artifactId>elasticsearch</artifactId>
  17. <version>7.3.2</version>
  18. </dependency>
  19. <!-- 增加ES数据库相关包 end-->

2.编写工具类(我这个工具类和业务毫无联系,可以拿过去直接测试着玩)很多方法也是看官网api写的,关于复合查询什么的,有机会再详细介绍吧。

  1. package com.runlin.dealerhelper.util;
  2. import com.runlin.dealerhelper.entity.miniapp.CustomerRecruitmentList;
  3. import org.apache.http.HttpHost;
  4. import org.elasticsearch.ElasticsearchException;
  5. import org.elasticsearch.action.DocWriteResponse;
  6. import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
  7. import org.elasticsearch.action.index.IndexRequest;
  8. import org.elasticsearch.action.index.IndexResponse;
  9. import org.elasticsearch.action.search.SearchRequest;
  10. import org.elasticsearch.action.search.SearchRequestBuilder;
  11. import org.elasticsearch.action.search.SearchResponse;
  12. import org.elasticsearch.client.*;
  13. import org.elasticsearch.client.indices.CreateIndexRequest;
  14. import org.elasticsearch.client.indices.CreateIndexResponse;
  15. import org.elasticsearch.client.indices.GetIndexRequest;
  16. import org.elasticsearch.common.settings.Settings;
  17. import org.elasticsearch.common.unit.Fuzziness;
  18. import org.elasticsearch.index.query.QueryBuilder;
  19. import org.elasticsearch.index.query.QueryBuilders;
  20. import org.elasticsearch.search.SearchHit;
  21. import org.elasticsearch.search.SearchHits;
  22. import org.elasticsearch.search.builder.SearchSourceBuilder;
  23. import org.jfree.util.Log;
  24. import java.io.IOException;
  25. import java.util.*;
  26. /**
  27. * Elastic search 操作工具类
  28. *
  29. * @author jo.li
  30. */
  31. public class EsUtils {
  32. /**
  33. * ES连接IP(单节点)
  34. */
  35. private static final String hostName = "192.168.215.174";
  36. /**
  37. * ES连接端口
  38. */
  39. private static final Integer esPort = 9200;
  40. /**
  41. * ES连接类型
  42. */
  43. private static final String esSheme = "http";
  44. /**
  45. * 创建ES库连接
  46. *
  47. * @return 返回ES连接对象
  48. */
  49. public static RestHighLevelClient createEsClient() {
  50. HttpHost host = new HttpHost(hostName, esPort, esSheme);
  51. RestClientBuilder restClientBuilder = RestClient.builder(host);
  52. return new RestHighLevelClient(restClientBuilder);
  53. }
  54. /**
  55. * 创建索引
  56. *
  57. * @param client ES数据库连接
  58. * @param indexName 索引名称(必须全小写,可用特殊字符_ . 进行分隔)
  59. * @param shards 索引分片数
  60. * @param replicas 索引分片副本数
  61. * @return 创建结果 true 创建成功| false 创建失败
  62. */
  63. public static boolean createEsIndex(RestHighLevelClient client, String indexName, int shards, int replicas) {
  64. //创建索引对象
  65. CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
  66. //设置参数
  67. createIndexRequest.settings(Settings.builder()
  68. // 索引主分片数
  69. .put("index.number_of_shards", shards)
  70. // 索引主分片的副本数
  71. .put("index.number_of_replicas", replicas)
  72. );
  73. //创建映射源(可以将索引理解为表,此步骤类似在表中初始化字段,如果不初始,也可以在Put数据时,由ES自动匹配)
  74. Map<String, Object> mapping = new HashMap<>();
  75. mapping.put("properties", new HashMap<>());
  76. createIndexRequest.mappings();
  77. createIndexRequest.mapping(mapping);
  78. //操作索引的客户端
  79. IndicesClient indices = client.indices();
  80. //执行创建索引库
  81. CreateIndexResponse createIndexResponse;
  82. boolean ret = false;
  83. try {
  84. createIndexResponse = indices.create(createIndexRequest, RequestOptions.DEFAULT);
  85. ret = createIndexResponse.isAcknowledged();
  86. } catch (IOException e) {
  87. Log.error("ES创建索引失败!" + e);
  88. closEsClient(client);
  89. }
  90. return ret;
  91. }
  92. /**
  93. * 根据索引名称在索引中添加文档数据
  94. *
  95. * @param indexName 索引名称(必须全小写)
  96. * @param client ES数据库连接
  97. * @param jsonMap 要插入的数据集合
  98. * @param docId 文档ID(唯一不可重复,不可为空,为空则会由ES赋予随机值)
  99. * @return 添加结果
  100. */
  101. public static DocWriteResponse.Result putDataByIndex(RestHighLevelClient client, Map<String, Object> jsonMap, String indexName, String docId) {
  102. // 根据索引名称获取索引对象
  103. IndexRequest indexRequest = new IndexRequest(indexName, "_doc", docId);
  104. indexRequest.source(jsonMap);
  105. // 通过连接进行http请求
  106. IndexResponse indexResponse = null;
  107. try {
  108. indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
  109. return indexResponse.getResult();
  110. } catch (IOException e) {
  111. Log.error("ES索引数据添加失败!" + e);
  112. closEsClient(client);
  113. }
  114. return null;
  115. }
  116. /**
  117. * 根据索引名称删除索引
  118. *
  119. * @param client ES数据库连接
  120. * @param indexName 索引名称
  121. * @return 运行结果
  122. */
  123. public static boolean deleteIndex(RestHighLevelClient client, String indexName) {
  124. boolean ret = false;
  125. try {
  126. DeleteIndexRequest request = new DeleteIndexRequest(indexName);
  127. client.indices().delete(request, RequestOptions.DEFAULT);
  128. ret = true;
  129. } catch (ElasticsearchException | IOException e) {
  130. Log.error("ES索引删除失败!" + e);
  131. closEsClient(client);
  132. }
  133. return ret;
  134. }
  135. /**
  136. * 关闭ES数据库连接
  137. *
  138. * @param client ES数据库连接
  139. */
  140. public static void closEsClient(RestHighLevelClient client) {
  141. try {
  142. if (client != null) {
  143. client.close();
  144. }
  145. } catch (IOException ex) {
  146. Log.error(ex);
  147. }
  148. }
  149. /**
  150. * 根据索引名称判断索引是否存在
  151. *
  152. * @param indexName 索引名称
  153. * @param client ES数据库连接
  154. * @return true 存在 false 不存在
  155. */
  156. public static boolean existsEsIndex(RestHighLevelClient client, String indexName) {
  157. GetIndexRequest indexRequest = new GetIndexRequest(indexName);
  158. try {
  159. return client.indices().exists(indexRequest, RequestOptions.DEFAULT);
  160. } catch (IOException e) {
  161. Log.error("索引是否存在判断异常!" + e);
  162. closEsClient(client);
  163. }
  164. return false;
  165. }
  166. /**
  167. * 测试方法
  168. */
  169. public static void testEs() {
  170. // 设置索引名称
  171. String indexName = "testa_big_data";
  172. // 设置索引分片数
  173. int shards = 5;
  174. // 设置索引分片副本数
  175. int replicas = 2;
  176. // 索引是否存在标识 true 存在 false 不存在
  177. boolean indexEsistsFlag = true;
  178. // 索引是否创建成功 true 创建成功 false 创建失败
  179. boolean createIndexFlag = false;
  180. // 是否对新建或已存在索引添加数据
  181. boolean putDataFlag = false;
  182. // 是否删除索引
  183. boolean deleteIndexFlag = false;
  184. // 创建连接
  185. RestHighLevelClient client = createEsClient();
  186. // 判断该索引是否存在
  187. indexEsistsFlag = existsEsIndex(client, indexName);
  188. // 不存在则创建
  189. if (!indexEsistsFlag) {
  190. // 创建索引
  191. createIndexFlag = createEsIndex(client, indexName, shards, replicas);
  192. if (createIndexFlag) {
  193. System.out.println("索引创建成功");
  194. } else {
  195. System.out.println("索引创建失败");
  196. }
  197. }
  198. // 在索引中添加数据
  199. if (putDataFlag) {
  200. Map<String, Object> jsonMap = new HashMap<>();
  201. Map<String, Object> testMap = new HashMap<>();
  202. testMap.put("b1", "B1");
  203. testMap.put("b2", 2);
  204. testMap.put("b3", 3.21);
  205. testMap.put("b4", new Date());
  206. jsonMap.put("name", "asd");
  207. jsonMap.put("age", 34);
  208. jsonMap.put("create_time", new Date());
  209. jsonMap.put("id", testMap);
  210. DocWriteResponse.Result result = putDataByIndex(client, jsonMap, indexName, "test2");
  211. System.out.println(result);
  212. }
  213. // 测试索引删除
  214. if (deleteIndexFlag) {
  215. if (deleteIndex(client, indexName)) {
  216. System.out.println("删除成功");
  217. } else {
  218. System.out.println("删除失败");
  219. }
  220. }
  221. searchEsData(client,indexName);
  222. }
  223. public static void searchEsData(RestHighLevelClient client,String indexName){
  224. // 创建查询请求
  225. SearchRequest sr = new SearchRequest(indexName);
  226. // 构造搜索源
  227. SearchSourceBuilder ssb = new SearchSourceBuilder();
  228. // 构建搜索条件
  229. QueryBuilder qb = QueryBuilders.matchAllQuery();
  230. // 将搜索条件放入搜索源 logstash.bat -f .\config-mysql\mysqlToEs.conf
  231. ssb.query(qb);
  232. // 将搜索源放入查询请求
  233. sr.source(ssb);
  234. // 用于存放查询结果
  235. List<Map<String,Object>> resultList = new ArrayList<>();
  236. try {
  237. // 进行查询
  238. SearchResponse srRet = client.search(sr,RequestOptions.DEFAULT);
  239. // 获取响应中所有数据集
  240. SearchHits shArr = srRet.getHits();
  241. SearchHit[] shtArr = shArr.getHits();
  242. // 遍历输出
  243. for (SearchHit sh:srRet.getHits()) {
  244. resultList.add(sh.getSourceAsMap());
  245. System.out.println(sh.getSourceAsString());
  246. }
  247. } catch (IOException e) {
  248. Log.error("查询结果异常!"+e);
  249. closEsClient(client);
  250. }
  251. }
  252. }

 

Logstach

可以理解为一个强大的ETL数据抽取工具,可以毫秒级同步mysql数据至ES数据库中。需要注意的是ELK版本要一致,不然容易出现奇怪的错误。

Logstach同步mysql数据库方法

1. 官网下载Logstach 网址:https://www.elastic.co/cn/

因为本地搭建,运行环境为Windows,所以下载的是ZIP

2. Logstach 与 mysql数据实时同步

Logstach功能强大,简单说下数据同步的实现方式。数据库数据同步采用的是JDBC数据抽取,抽取后通过规则处理,然后输出至ES。也可以读取文件,将文件数据源读取到ES数据库中,此处只介绍数据库同步方式。

下载速度慢可以通过迅雷下载,下载完成后解压到目标文件夹。然后进入bin目录,创建个文件夹,用来存放驱动包和数据同步的配置文件

抽取的配置文件(数据库驱动包没放这里,不过没关系,配置文件用的是绝对路径)

对应配置信息网上搜搜,有的是,就不赘述了。关键的同步时间设置。

再简单介绍下配置文件的主要结构,配置文件主要结构由三部分构成,input filter output 顾名思义

input 配置要同步的数据源,用于获取数据(可以配置多个不同的数据源)。 

filter 用于数据处理,可以进行数据处理规则设置,

output,输出到哪里,以及输出后的一些基础配置设定。

  1. input {
  2. jdbc {
  3. #驱动包绝对路径
  4. jdbc_driver_library => "D:/developSW/Elastic/logstash-7.3.2/lib/mysql-connector-java-5.6-bin.jar"
  5. #驱动类别
  6. jdbc_driver_class => "com.mysql.jdbc.Driver"
  7. #数据库地址
  8. jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dealerhelper?useUnicode=true&characterEncoding=UTF8"
  9. #用户名
  10. jdbc_user => "root"
  11. #密码
  12. jdbc_password => "123456"
  13. # 是否分页(数据量比较大的时候最好开启)
  14. jdbc_paging_enabled => "true"
  15. # 每页最大条数
  16. jdbc_page_size => "50000"
  17. # 同步刷新时间 各字段含义(从左至右)秒、分、时、天、月、年,全为*默认含义为每分钟都更新 */20.5秒刷新一次
  18. schedule => "*/2 * * * * *"
  19. # 同步数据的sql语句
  20. statement => "SELECT * FROM carkeeper_dictionary"
  21. #
  22. use_column_value => true
  23. tracking_column_type => "timestamp"
  24. tracking_column => "update_time"
  25. last_run_metadata_path => "syncpoint_table"
  26. }
  27. }
  28. output {
  29. elasticsearch {
  30. # ES的IP地址及端口
  31. hosts => ["127.0.0.1:9200"]
  32. # 索引名称 可自定义
  33. index => "testa"
  34. # 需要关联的数据库中有有一个id字段,对应类型中的id
  35. document_id => "%{uuid}"
  36. # 7.x版本以后不再存在“类型”这一概念,此处使用默认类型_doc就可以了
  37. document_type => "_doc"
  38. }
  39. stdout {
  40. # JSON格式输出
  41. codec => json_lines
  42. }
  43. }

配置完就可以执行抽取了。bin目录下url框输入CMD回车

然后输入 logstash.bat -f .\config-mysql\配置文件名称。等待数据同步完成就可以了。

其他说明:

1)目前,只能同步mysql 的input update操作,但delete操作无法同步,目前关于delete同步方法还是添加删除标识,删除执行update,修改删除标识来实现。如果业务存在delete操作 也可以在mysql 操作完成后,在java端根据ES索引名称和文档ID进行删除。

2)同步性能单表80万条数据10分钟左右,同步后增量数据可以做到毫秒级同步

附录:input filter output配置说明表

jdbc连接配置说明

Setting InputtypeRequireddefaultdescribe
jdbc_connection_stringstringYesN/Ajdbc连接 jdbc:mysql://localhost:3306/mydb
jdbc_default_timezonestringNo默认时区,SQL timestamp会默认转成UTC time。jdbc_default_timezone => "Asia/Shanghai"
jdbc_driver_classstringYesN/A如 jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_librarystringNo如 jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar",如果不设置会从找本地java classpath中找
jdbc_userstringYesN/A数据库用户
jdbc_passwordpasswordNoN/A数据库密码
jdbc_password_filepatha valid filesystem pathNoN/A数据库密码文件存放路径
jdbc_pool_timeoutnumberNo5连接池超时时间 默认5s
jdbc_validate_connectionbooleanNofalse使用前验证连接,默认不用
jdbc_validation_timeoutnumberNo3600验证超时时间3600s
connection_retry_attemptsnumberNo1尝试重连数据库的最大次数
connection_retry_attempts_wait_timenumberNo0.5尝试重连时休眠的秒数

sql 配置

Setting InputtypeRequireddefaultdescribe
sequel_optshashNo{}可选配置选项
statementstringNoN/A执行语句,短sql
statement_filepatha valid filesystem pathNosql语句路径 长sql写sql
lowercase_column_namesbooleanNotrue默认列名转换为小写
columns_charsethashNo{}可指定列的字符集。columns_charset => { "column0" => "ISO-8859-1" }
parametershashNo{}自定义参数数值 "target_id" => "321"
jdbc_paging_enabledbooleanNofalse是否分页,默认不分页。true后采用 limits, offsets 做分页处理,数据量过大会有性能问题。
jdbc_page_sizenumberNo100000每页数据量
jdbc_fetch_sizenumberNoN/Ajdbc fetch 最大值,不设默认为相关数据库驱动设置的默认值。不分页时过大会造成OOM
schedulestringNoN/Asql执行周期,cron表达式,最短间隔1min schedule => "* * * * *"。不设置时 sql只运行一次
sql_log_levelstring, one of ["fatal", "error", "warn", "info", "debug"]Noinfosql日志级别

sync 配置

Setting InputtypeRequireddefaultdescribe
clean_runbooleanNofalse设为true,sql_last_value会被忽略,每次start会重新初始化所有数据
record_last_runbooleanNotrue默认保存最后运行时间
last_run_metadata_pathstringNo/home/ph/.logstash_jdbc_last_run最后运行时间文件存放地址
use_column_valuebooleanNofalse设为true tracking_column的值就是 :sql_last_value,false 则是last_run_value的时间戳
tracking_columnstringNoN/A追踪的列(:sql_last_value值),需要use_column_value设为trun
tracking_column_typestring, one of ["numeric", "timestamp"]Nonumeric追踪列的类型,默认数值

Output

ES

es cluster 设置及链接

Setting InputtypeRequireddefaultdescribe
hostsuriNo[//127.0.0.1]es hosts:"127.0.0.1:9200","127.0.0.2:9200"
pathstringNoN/Alives es server(需要设置proxy)
proxyuriNoN/A代理地址
userstringNoN/Aes 用户
passwordpasswordNoN/Aes password
sniffingbooleanNofalse是否动态发现es cluster nodes。 1.x and 2.x 所有http.enabled的节点, 5.x and 6.x会排除master node
sniffing_delaynumberNo5sniffing间隔时间 5s
sniffing_pathstringNoN/A
timeoutnumberNo60超时时间
pool_maxnumberNo1000线程池最大值
pool_max_per_routenumberNo100每个路由的最大线程池
validate_after_inactivitynumberNo10000执行请求前等待时间超过多少s,验证链接是否过时
resurrect_delaynumberNo5
retry_initial_intervalnumberNo2批处理重试的初始间隔(秒)。每次重试时加倍,直到retry_max_interval
retry_max_intervalnumberNo64批处理重试之间的最大间隔(秒)
retry_on_conflictnumberNo1update/upserted冲突重试的次数
http_compressionbooleanNofalse是否开启gzip压缩,es 5.0版本以下默认开启,5.x+版本关闭
custom_headershashNoN/A自定义http 头信息
failure_type_logging_whitelistarrayNo[]es 错误白名单.
healthcheck_pathstringNoN/A

sync 配置

Setting InputtypeRequireddefaultdescribe
actionstringNoindex批处理动作index:文档全局替换,delete:按documentId删除,create:创建,documentId存在则失败,update:更新,documentId不存在则失败
doc_as_upsertbooleanNofalseupdate模式下,true时documentId不存在会创建新document
upsertstringNo""upsert 内容
bulk_pathstringNoN/A批处理路径 path+'_bulk'
indexstringNoindex名
document_idstringNoN/A文档id
document_typestringNoN/A文档类型
parentstringNonil子文档,关联的父文档的id
pipelinestringNonil执行的pipeline name
routingstringNoN/A路由
versionstringNoN/A
version_typestring, one of ["internal", "external", "external_gt", "external_gte", "force"]NoN/A
manage_templatebooleanNotrue默认logstash-%{+YYYY.MM.dd},改为false需要手动管理
templatea valid filesystem pathNoN/A模板路径
template_namestringNologstash模板名称
template_overwritebooleanNofalse
parametershashNoN/A查询参数
scriptstringNoN/A更新脚本script => "ctx._source.message = params.event.get('message')"
script_langstringNopainless脚本语言,6.0+用其他语言,需要设为""
script_typestring, one of ["inline", "indexed", "file"]Noinline脚本类型
script_var_namestringNoevent脚本变量名称
scripted_upsertbooleanNofalse设为true,会创建不存在的document

ssl配置

Setting InputtypeRequireddefaultdescribe
sslbooleanNoN/Aes 开启https,hosts需要以https://开头
ssl_certificate_verificationbooleanNotruessl证书验证
cacerta valid filesystem pathNoN/A.cer or .pem证书路径
keystorea valid filesystem pathNoN/A.jks or .p12证书路径
keystore_passwordpasswordNoN/Akeystore密码
truststorea valid filesystem pathNoN/AJKS路径
truststore_passwordpasswordNoN/AJKS password

Kibana

ES数据库数据可视化工具,可以理解为类似Navicat的工具软件,相关操作方法及KQL规则可查询官网,此文档就不做介绍了。(因为我用的也不是很明白- -!!!)

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/681438
推荐阅读
相关标签
  

闽ICP备14008679号