赞
踩
前段时间搭建了Elasticsearch 和 Kibana,顺手把Logstash给装了,这篇文章是把我在用Logstash同步数据库数据到ES中遇到的问题记录下来
一个至少装有 Elasticsearch 和 Logstash 的主机
创建一个存放配置目录,并新建一个 jdbc.sql SQL文件,写入需要同步的SQL并保存
jdbc.conf (mysql)
input { jdbc { # mysql 数据库链接,dianpingdb为数据库名 jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianpingdb" # 用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动 jdbc_driver_library => "/usr/local/logstash/bin/mysql/mysql-connector-java-8.0.15.jar" # 驱动类名 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 执行的sql 文件路径+名称 statement_filepath => "/usr/local/logstash/bin/mysql/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" } } output { elasticsearch { # ES的IP地址及端口 hosts => ["localhost:9200"] # 索引名称 index => "shop" document_type => "_doc" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } stdout { # JSON格式输出 codec => json_lines } }
jdbc.conf (oracle)
input { jdbc { # mysql 数据库链接,dianpingdb为数据库名 jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521/XE" # 用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动 jdbc_driver_library => "/usr/local/logstash/bin/oracle/ojdbc6-12.1.0.2.0.jar" # 驱动类名前面的 Java:: 必须要加 jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 执行的sql 文件路径+名称 statement_filepath => "/usr/local/logstash/bin/oracle/jdbc.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" } } output { elasticsearch { # ES的IP地址及端口 hosts => ["localhost:9200"] # 索引名称 index => "shop" document_type => "_doc" # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } stdout { # JSON格式输出 codec => json_lines } }
其他参考配置
# 数据库驱动包存放路径 jdbc_driver_library => "D:\logstash-5.6.9\lib\ojdbc6.jar" # 数据库驱动器; jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" # 数据库连接方式 jdbc_connection_string => "jdbc:oracle:thin:@192.168.1.1:1521:test" # 数据库用户名 jdbc_user => "admin" # 数据库密码 jdbc_password => "admin" # 数据库重连尝试次数 connection_retry_attempts => "3" # 判断数据库连接是否可用,默认false不开启 jdbc_validate_connection => "true" # 数据库连接可用校验超时时间,默认3600s jdbc_validation_timeout => "3600" # 开启分页查询(默认false不开启) jdbc_paging_enabled => "true" # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值) jdbc_page_size => "500" # statement为查询数据sql,如果sql较复杂,建议通过statement_filepath配置sql文件的存放路径 # statement_filepath => "D:\logstash-5.6.9\config\jdbc.sql" # sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为rowid statement => "select t.*, t.rowid from test t where rowid > :sql_last_value" # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false); lowercase_column_names => false # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中 record_last_run => true # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值 use_column_value => true # 查询结果某字段的数据类型,仅包括numeric和timestamp,默认为numeric tracking_column_type => numeric # 需要记录的字段,用于增量同步,需是数据库字段 tracking_column => "rowid" # 记录上次执行结果数据的存放位置 last_run_metadata_path => "D:\logstash-5.6.9\logs\last_id.txt" # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false clean_run => false # 同步频率(分 时 天 月 年),默认每分钟同步一次 schedule => "* * * * *" # ES索引的type type => "test_type01"
在logstash的bin目录运行配置文件,我的conf文件是直接放在logstash的bin/mysql目录下的
./logstash -f mysql/jdbc.conf
之后就会查出数据库数据并转成一堆json数据输入到ES中
通过
GET /site_record/_search
{
“query”: {
“match_all”: {}
}
}
查询出导入的数据,发现字段名称都是以下划线的形式命名的
这个时候我们通过 ElasticsearchTemplate 查询的数据就会发现,所有带_ 的字段返回的值都是null。
这是为啥呢,ElasticsearchTemplate 默认将查询的结果只是简单的转换成JSON了,并未对其进行驼峰的转换,经过一番的摸索,发现在
org.springframework.data.elasticsearch.core.ResultsMapper
中对查询的数据进行转换
@Nullable
default <T> T mapEntity(String source, Class<T> clazz) {
if (StringUtils.isEmpty(source)) {
return null;
}
try {
// 转换 JSON 对象
return getEntityMapper().mapToObject(source, clazz);
} catch (IOException e) {
throw new ElasticsearchException("failed to map source [ " + source + "] to class " + clazz.getSimpleName(), e);
}
}
MyResultMapper.java
/** * @Description: 自定义接收结果集的转换方式 * @Author: Jonas * @Date: 2020-04-16 11:30 */ public class MyResultMapper extends DefaultResultMapper { private MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext; public MyResultMapper(MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext){ super(new DefaultEntityMapper(mappingContext)); this.mappingContext = mappingContext; } @Override public <T> T mapEntity(String source, Class<T> clazz) { if (StringUtils.isEmpty(source)) { return null; } try { // 这里使用fastjson进行数据格式化,将其转换为驼峰格式的数据 return JSON.parseObject(source, clazz); } catch (Exception e) { e.printStackTrace(); } return null; } }
MyEsTemplate.java
/**
* @Description: 用于配置初始化 ElasticsearchTemplate
* @Author: Jonas
* @Date: 2020-04-16 11:25
*/
public class MyEsTemplate extends ElasticsearchTemplate {
public MyEsTemplate(Client client, ElasticsearchConverter elasticsearchConverter) {
super(client, elasticsearchConverter,
new MyResultMapper(elasticsearchConverter.getMappingContext()));
}
}
EsConfig.java
/**
* @Description: 将自定义的converter注入到ElasticsearchTemplate
* @Author: Jonas
* @Date: 2020-04-16 11:44
*/
@Configuration
public class EsConfig {
@Bean
public ElasticsearchTemplate elasticSearchTemplate(Client client , ElasticsearchConverter converter){
return new MyEsTemplate(client,converter);
}
}
重启应用,调用接口
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。