赞
踩
kibana 数据可视化工具 类似Navicat
Elasticsearch ES数据库
Beats 轻量级数据获取组件
Logstash 基于Beats的数据抽取工具,ETL
全部都是开源的。
elasticsearch简称ES,是非关系型数据库,采用倒序排序,具有强大的搜索能力,基于java开发的,所以安装时要查询下对应版本采用的JDK版本有没有安装。
需要注意的是,查询全部资料显示,如果使用ELK(Elasticsearch+Logstach+Kibana),一定要注意版本号,最好使用同一版本的,否则会出现很多问题,因为ELK的版本兼容性不是很好。
重点注意:
在理解Elasticsearch时,有个重要的概念,type(类型)有必要详细说下。
在ES6.0版本之前,一个索引对应多个类型,但是在ES6.0~7.0的版本,一个索引对应一个类型,在ES7.0以后直接去掉了类型这个概念。默认类型为_doc,对ES这个非关系型数据,可以这样理解,一个index(索引),相当于一张数据表,_id就是主键,mapping就是字段设置,settings就是表设置。一个文档就是一行数据。
(可能这样理解和本意有些出入,但是对于用惯了关系型数据库的人来说很容易接受和理解)
接下来介绍下如何在java项目中集成Elasticsearch
1.maven 依赖引入(jar包引入)
千万别信官网说的引入个elasticsearch-rest-high-level-client就行了,下面的两个也都要,而且要注意版本统一,elasticsearch-rest-high-level-client内部和下面的包有依赖关系的。(maven中心库用的阿里的镜像,不同镜像pom中的groupid和artifactid可能不同,如果不正确会导致无法获取)
- <!-- 增加ES数据库相关包 begin-->
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>7.3.2</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-client</artifactId>
- <version>7.3.2</version>
- <classifier>sources</classifier>
- <type>java-source</type>
- </dependency>
-
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>7.3.2</version>
- </dependency>
- <!-- 增加ES数据库相关包 end-->
2.编写工具类(我这个工具类和业务毫无联系,可以拿过去直接测试着玩)很多方法也是看官网api写的,关于复合查询什么的,有机会再详细介绍吧。
- package com.runlin.dealerhelper.util;
-
-
- import com.runlin.dealerhelper.entity.miniapp.CustomerRecruitmentList;
- import org.apache.http.HttpHost;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.DocWriteResponse;
- import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchRequestBuilder;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.*;
- import org.elasticsearch.client.indices.CreateIndexRequest;
- import org.elasticsearch.client.indices.CreateIndexResponse;
- import org.elasticsearch.client.indices.GetIndexRequest;
- import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.unit.Fuzziness;
- import org.elasticsearch.index.query.QueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.SearchHits;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.jfree.util.Log;
-
- import java.io.IOException;
- import java.util.*;
-
- /**
- * Elastic search 操作工具类
- *
- * @author jo.li
- */
- public class EsUtils {
- /**
- * ES连接IP(单节点)
- */
- private static final String hostName = "192.168.215.174";
-
- /**
- * ES连接端口
- */
- private static final Integer esPort = 9200;
-
- /**
- * ES连接类型
- */
- private static final String esSheme = "http";
-
- /**
- * 创建ES库连接
- *
- * @return 返回ES连接对象
- */
- public static RestHighLevelClient createEsClient() {
- HttpHost host = new HttpHost(hostName, esPort, esSheme);
- RestClientBuilder restClientBuilder = RestClient.builder(host);
- return new RestHighLevelClient(restClientBuilder);
- }
-
- /**
- * 创建索引
- *
- * @param client ES数据库连接
- * @param indexName 索引名称(必须全小写,可用特殊字符_ . 进行分隔)
- * @param shards 索引分片数
- * @param replicas 索引分片副本数
- * @return 创建结果 true 创建成功| false 创建失败
- */
- public static boolean createEsIndex(RestHighLevelClient client, String indexName, int shards, int replicas) {
- //创建索引对象
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
- //设置参数
- createIndexRequest.settings(Settings.builder()
- // 索引主分片数
- .put("index.number_of_shards", shards)
- // 索引主分片的副本数
- .put("index.number_of_replicas", replicas)
- );
- //创建映射源(可以将索引理解为表,此步骤类似在表中初始化字段,如果不初始,也可以在Put数据时,由ES自动匹配)
- Map<String, Object> mapping = new HashMap<>();
- mapping.put("properties", new HashMap<>());
- createIndexRequest.mappings();
- createIndexRequest.mapping(mapping);
-
- //操作索引的客户端
- IndicesClient indices = client.indices();
- //执行创建索引库
- CreateIndexResponse createIndexResponse;
- boolean ret = false;
- try {
- createIndexResponse = indices.create(createIndexRequest, RequestOptions.DEFAULT);
- ret = createIndexResponse.isAcknowledged();
- } catch (IOException e) {
- Log.error("ES创建索引失败!" + e);
- closEsClient(client);
- }
- return ret;
- }
-
- /**
- * 根据索引名称在索引中添加文档数据
- *
- * @param indexName 索引名称(必须全小写)
- * @param client ES数据库连接
- * @param jsonMap 要插入的数据集合
- * @param docId 文档ID(唯一不可重复,不可为空,为空则会由ES赋予随机值)
- * @return 添加结果
- */
- public static DocWriteResponse.Result putDataByIndex(RestHighLevelClient client, Map<String, Object> jsonMap, String indexName, String docId) {
- // 根据索引名称获取索引对象
- IndexRequest indexRequest = new IndexRequest(indexName, "_doc", docId);
- indexRequest.source(jsonMap);
- // 通过连接进行http请求
- IndexResponse indexResponse = null;
- try {
- indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
- return indexResponse.getResult();
- } catch (IOException e) {
- Log.error("ES索引数据添加失败!" + e);
- closEsClient(client);
- }
- return null;
- }
-
- /**
- * 根据索引名称删除索引
- *
- * @param client ES数据库连接
- * @param indexName 索引名称
- * @return 运行结果
- */
- public static boolean deleteIndex(RestHighLevelClient client, String indexName) {
- boolean ret = false;
- try {
- DeleteIndexRequest request = new DeleteIndexRequest(indexName);
- client.indices().delete(request, RequestOptions.DEFAULT);
- ret = true;
- } catch (ElasticsearchException | IOException e) {
- Log.error("ES索引删除失败!" + e);
- closEsClient(client);
- }
- return ret;
- }
-
- /**
- * 关闭ES数据库连接
- *
- * @param client ES数据库连接
- */
- public static void closEsClient(RestHighLevelClient client) {
- try {
- if (client != null) {
- client.close();
- }
- } catch (IOException ex) {
- Log.error(ex);
- }
- }
-
- /**
- * 根据索引名称判断索引是否存在
- *
- * @param indexName 索引名称
- * @param client ES数据库连接
- * @return true 存在 false 不存在
- */
- public static boolean existsEsIndex(RestHighLevelClient client, String indexName) {
- GetIndexRequest indexRequest = new GetIndexRequest(indexName);
- try {
- return client.indices().exists(indexRequest, RequestOptions.DEFAULT);
- } catch (IOException e) {
- Log.error("索引是否存在判断异常!" + e);
- closEsClient(client);
- }
- return false;
- }
-
-
- /**
- * 测试方法
- */
- public static void testEs() {
- // 设置索引名称
- String indexName = "testa_big_data";
- // 设置索引分片数
- int shards = 5;
- // 设置索引分片副本数
- int replicas = 2;
- // 索引是否存在标识 true 存在 false 不存在
- boolean indexEsistsFlag = true;
- // 索引是否创建成功 true 创建成功 false 创建失败
- boolean createIndexFlag = false;
- // 是否对新建或已存在索引添加数据
- boolean putDataFlag = false;
- // 是否删除索引
- boolean deleteIndexFlag = false;
-
- // 创建连接
- RestHighLevelClient client = createEsClient();
- // 判断该索引是否存在
- indexEsistsFlag = existsEsIndex(client, indexName);
- // 不存在则创建
- if (!indexEsistsFlag) {
- // 创建索引
- createIndexFlag = createEsIndex(client, indexName, shards, replicas);
- if (createIndexFlag) {
- System.out.println("索引创建成功");
- } else {
- System.out.println("索引创建失败");
- }
- }
-
- // 在索引中添加数据
- if (putDataFlag) {
- Map<String, Object> jsonMap = new HashMap<>();
- Map<String, Object> testMap = new HashMap<>();
- testMap.put("b1", "B1");
- testMap.put("b2", 2);
- testMap.put("b3", 3.21);
- testMap.put("b4", new Date());
- jsonMap.put("name", "asd");
- jsonMap.put("age", 34);
- jsonMap.put("create_time", new Date());
- jsonMap.put("id", testMap);
- DocWriteResponse.Result result = putDataByIndex(client, jsonMap, indexName, "test2");
- System.out.println(result);
- }
-
- // 测试索引删除
- if (deleteIndexFlag) {
- if (deleteIndex(client, indexName)) {
- System.out.println("删除成功");
- } else {
- System.out.println("删除失败");
- }
- }
-
- searchEsData(client,indexName);
- }
-
- public static void searchEsData(RestHighLevelClient client,String indexName){
- // 创建查询请求
- SearchRequest sr = new SearchRequest(indexName);
- // 构造搜索源
- SearchSourceBuilder ssb = new SearchSourceBuilder();
- // 构建搜索条件
- QueryBuilder qb = QueryBuilders.matchAllQuery();
- // 将搜索条件放入搜索源 logstash.bat -f .\config-mysql\mysqlToEs.conf
- ssb.query(qb);
- // 将搜索源放入查询请求
- sr.source(ssb);
- // 用于存放查询结果
- List<Map<String,Object>> resultList = new ArrayList<>();
- try {
- // 进行查询
- SearchResponse srRet = client.search(sr,RequestOptions.DEFAULT);
- // 获取响应中所有数据集
- SearchHits shArr = srRet.getHits();
- SearchHit[] shtArr = shArr.getHits();
- // 遍历输出
- for (SearchHit sh:srRet.getHits()) {
- resultList.add(sh.getSourceAsMap());
- System.out.println(sh.getSourceAsString());
- }
- } catch (IOException e) {
- Log.error("查询结果异常!"+e);
- closEsClient(client);
- }
- }
- }
可以理解为一个强大的ETL数据抽取工具,可以毫秒级同步mysql数据至ES数据库中。需要注意的是ELK版本要一致,不然容易出现奇怪的错误。
1. 官网下载Logstach 网址:https://www.elastic.co/cn/
因为本地搭建,运行环境为Windows,所以下载的是ZIP
2. Logstach 与 mysql数据实时同步
Logstach功能强大,简单说下数据同步的实现方式。数据库数据同步采用的是JDBC数据抽取,抽取后通过规则处理,然后输出至ES。也可以读取文件,将文件数据源读取到ES数据库中,此处只介绍数据库同步方式。
下载速度慢可以通过迅雷下载,下载完成后解压到目标文件夹。然后进入bin目录,创建个文件夹,用来存放驱动包和数据同步的配置文件
抽取的配置文件(数据库驱动包没放这里,不过没关系,配置文件用的是绝对路径)
对应配置信息网上搜搜,有的是,就不赘述了。关键的同步时间设置。
再简单介绍下配置文件的主要结构,配置文件主要结构由三部分构成,input filter output 顾名思义
input 配置要同步的数据源,用于获取数据(可以配置多个不同的数据源)。
filter 用于数据处理,可以进行数据处理规则设置,
output,输出到哪里,以及输出后的一些基础配置设定。
- input {
- jdbc {
- #驱动包绝对路径
- jdbc_driver_library => "D:/developSW/Elastic/logstash-7.3.2/lib/mysql-connector-java-5.6-bin.jar"
- #驱动类别
- jdbc_driver_class => "com.mysql.jdbc.Driver"
- #数据库地址
- jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dealerhelper?useUnicode=true&characterEncoding=UTF8"
- #用户名
- jdbc_user => "root"
- #密码
- jdbc_password => "123456"
- # 是否分页(数据量比较大的时候最好开启)
- jdbc_paging_enabled => "true"
- # 每页最大条数
- jdbc_page_size => "50000"
- # 同步刷新时间 各字段含义(从左至右)秒、分、时、天、月、年,全为*默认含义为每分钟都更新 */2每0.5秒刷新一次
- schedule => "*/2 * * * * *"
- # 同步数据的sql语句
- statement => "SELECT * FROM carkeeper_dictionary"
- #
- use_column_value => true
- tracking_column_type => "timestamp"
- tracking_column => "update_time"
- last_run_metadata_path => "syncpoint_table"
- }
- }
- output {
- elasticsearch {
- # ES的IP地址及端口
- hosts => ["127.0.0.1:9200"]
- # 索引名称 可自定义
- index => "testa"
- # 需要关联的数据库中有有一个id字段,对应类型中的id
- document_id => "%{uuid}"
- # 7.x版本以后不再存在“类型”这一概念,此处使用默认类型_doc就可以了
- document_type => "_doc"
- }
- stdout {
- # JSON格式输出
- codec => json_lines
- }
- }
配置完就可以执行抽取了。bin目录下url框输入CMD回车
然后输入 logstash.bat -f .\config-mysql\配置文件名称。等待数据同步完成就可以了。
其他说明:
1)目前,只能同步mysql 的input update操作,但delete操作无法同步,目前关于delete同步方法还是添加删除标识,删除执行update,修改删除标识来实现。如果业务存在delete操作 也可以在mysql 操作完成后,在java端根据ES索引名称和文档ID进行删除。
2)同步性能单表80万条数据10分钟左右,同步后增量数据可以做到毫秒级同步
附录:input filter output配置说明表
jdbc连接配置说明
Setting Input | type | Required | default | describe |
---|---|---|---|---|
jdbc_connection_string | string | Yes | N/A | jdbc连接 jdbc:mysql://localhost:3306/mydb |
jdbc_default_timezone | string | No | 默认时区,SQL timestamp会默认转成UTC time。jdbc_default_timezone => "Asia/Shanghai" | |
jdbc_driver_class | string | Yes | N/A | 如 jdbc_driver_class => "com.mysql.jdbc.Driver" |
jdbc_driver_library | string | No | 如 jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar",如果不设置会从找本地java classpath中找 | |
jdbc_user | string | Yes | N/A | 数据库用户 |
jdbc_password | password | No | N/A | 数据库密码 |
jdbc_password_filepath | a valid filesystem path | No | N/A | 数据库密码文件存放路径 |
jdbc_pool_timeout | number | No | 5 | 连接池超时时间 默认5s |
jdbc_validate_connection | boolean | No | false | 使用前验证连接,默认不用 |
jdbc_validation_timeout | number | No | 3600 | 验证超时时间3600s |
connection_retry_attempts | number | No | 1 | 尝试重连数据库的最大次数 |
connection_retry_attempts_wait_time | number | No | 0.5 | 尝试重连时休眠的秒数 |
sql 配置
Setting Input | type | Required | default | describe |
---|---|---|---|---|
sequel_opts | hash | No | {} | 可选配置选项 |
statement | string | No | N/A | 执行语句,短sql |
statement_filepath | a valid filesystem path | No | sql语句路径 长sql写sql | |
lowercase_column_names | boolean | No | true | 默认列名转换为小写 |
columns_charset | hash | No | {} | 可指定列的字符集。columns_charset => { "column0" => "ISO-8859-1" } |
parameters | hash | No | {} | 自定义参数数值 "target_id" => "321" |
jdbc_paging_enabled | boolean | No | false | 是否分页,默认不分页。true后采用 limits, offsets 做分页处理,数据量过大会有性能问题。 |
jdbc_page_size | number | No | 100000 | 每页数据量 |
jdbc_fetch_size | number | No | N/A | jdbc fetch 最大值,不设默认为相关数据库驱动设置的默认值。不分页时过大会造成OOM |
schedule | string | No | N/A | sql执行周期,cron表达式,最短间隔1min schedule => "* * * * *"。不设置时 sql只运行一次 |
sql_log_level | string, one of ["fatal", "error", "warn", "info", "debug"] | No | info | sql日志级别 |
sync 配置
Setting Input | type | Required | default | describe |
---|---|---|---|---|
clean_run | boolean | No | false | 设为true,sql_last_value会被忽略,每次start会重新初始化所有数据 |
record_last_run | boolean | No | true | 默认保存最后运行时间 |
last_run_metadata_path | string | No | /home/ph/.logstash_jdbc_last_run | 最后运行时间文件存放地址 |
use_column_value | boolean | No | false | 设为true tracking_column的值就是 :sql_last_value,false 则是last_run_value的时间戳 |
tracking_column | string | No | N/A | 追踪的列(:sql_last_value值),需要use_column_value设为trun |
tracking_column_type | string, one of ["numeric", "timestamp"] | No | numeric | 追踪列的类型,默认数值 |
es cluster 设置及链接
Setting Input | type | Required | default | describe |
---|---|---|---|---|
hosts | uri | No | [//127.0.0.1] | es hosts:"127.0.0.1:9200","127.0.0.2:9200" |
path | string | No | N/A | lives es server(需要设置proxy) |
proxy | uri | No | N/A | 代理地址 |
user | string | No | N/A | es 用户 |
password | password | No | N/A | es password |
sniffing | boolean | No | false | 是否动态发现es cluster nodes。 1.x and 2.x 所有http.enabled的节点, 5.x and 6.x会排除master node |
sniffing_delay | number | No | 5 | sniffing间隔时间 5s |
sniffing_path | string | No | N/A | |
timeout | number | No | 60 | 超时时间 |
pool_max | number | No | 1000 | 线程池最大值 |
pool_max_per_route | number | No | 100 | 每个路由的最大线程池 |
validate_after_inactivity | number | No | 10000 | 执行请求前等待时间超过多少s,验证链接是否过时 |
resurrect_delay | number | No | 5 | |
retry_initial_interval | number | No | 2 | 批处理重试的初始间隔(秒)。每次重试时加倍,直到retry_max_interval |
retry_max_interval | number | No | 64 | 批处理重试之间的最大间隔(秒) |
retry_on_conflict | number | No | 1 | update/upserted冲突重试的次数 |
http_compression | boolean | No | false | 是否开启gzip压缩,es 5.0版本以下默认开启,5.x+版本关闭 |
custom_headers | hash | No | N/A | 自定义http 头信息 |
failure_type_logging_whitelist | array | No | [] | es 错误白名单. |
healthcheck_path | string | No | N/A |
sync 配置
Setting Input | type | Required | default | describe |
---|---|---|---|---|
action | string | No | index | 批处理动作index:文档全局替换,delete:按documentId删除,create:创建,documentId存在则失败,update:更新,documentId不存在则失败 |
doc_as_upsert | boolean | No | false | update模式下,true时documentId不存在会创建新document |
upsert | string | No | "" | upsert 内容 |
bulk_path | string | No | N/A | 批处理路径 path+'_bulk' |
index | string | No | index名 | |
document_id | string | No | N/A | 文档id |
document_type | string | No | N/A | 文档类型 |
parent | string | No | nil | 子文档,关联的父文档的id |
pipeline | string | No | nil | 执行的pipeline name |
routing | string | No | N/A | 路由 |
version | string | No | N/A | |
version_type | string, one of ["internal", "external", "external_gt", "external_gte", "force"] | No | N/A | |
manage_template | boolean | No | true | 默认logstash-%{+YYYY.MM.dd},改为false需要手动管理 |
template | a valid filesystem path | No | N/A | 模板路径 |
template_name | string | No | logstash | 模板名称 |
template_overwrite | boolean | No | false | |
parameters | hash | No | N/A | 查询参数 |
script | string | No | N/A | 更新脚本script => "ctx._source.message = params.event.get('message')" |
script_lang | string | No | painless | 脚本语言,6.0+用其他语言,需要设为"" |
script_type | string, one of ["inline", "indexed", "file"] | No | inline | 脚本类型 |
script_var_name | string | No | event | 脚本变量名称 |
scripted_upsert | boolean | No | false | 设为true,会创建不存在的document |
ssl配置
Setting Input | type | Required | default | describe |
---|---|---|---|---|
ssl | boolean | No | N/A | es 开启https,hosts需要以https://开头 |
ssl_certificate_verification | boolean | No | true | ssl证书验证 |
cacert | a valid filesystem path | No | N/A | .cer or .pem证书路径 |
keystore | a valid filesystem path | No | N/A | .jks or .p12证书路径 |
keystore_password | password | No | N/A | keystore密码 |
truststore | a valid filesystem path | No | N/A | JKS路径 |
truststore_password | password | No | N/A | JKS password |
ES数据库数据可视化工具,可以理解为类似Navicat的工具软件,相关操作方法及KQL规则可查询官网,此文档就不做介绍了。(因为我用的也不是很明白- -!!!)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。