赞
踩
在实际开发项目过程当中,难免会使用到Elasticsearch做搜索。文章描述从Mysql通过Logstash实时同步到Elasticsearch,下面就开始来进行实现吧!具体的Elasticsearch+Logstash+kibana搭建,请移步到 ELK搭建步骤。
本人总结了两种实现方案来实现mysql到es的同步。
本次介绍通过 Elastic 官方提供的 Logstash 来实现Mysql的全量和增量同步。
先看Mysql表的关系
一个是主表:news 资讯文章表,表内容如下:
一个是从表:custom_infomation 定制信息表,与news 成 一对多的关系,一条文章对应多条定制信息。表内容如下:
描述:custom_information表中的item_id和news表中的id有关联关系。
用JSON数据结构来描述一对多的关系,如下:
{ "id":"15c7ee7a5dc411ea9bc2fa163e0c8256", "title":"“宅经济”进入数字化时代", "source":"人民日报", "customList":[ { "secondLevel":"32552", "isRelEnterprise":"0", "secondLevelName":"济南", "moduleType":"1", "customName":"地区1", "firstLevel":"37200", "firstLevelName":"山东", "customId":"1", "detId":"1" }, { "secondLevel":"222", "isRelEnterprise":"0", "secondLevelName":"林业1", "moduleType":"1", "customName":"行业1", "firstLevel":"11", "firstLevelName":"林业", "customId":"2", "detId":"3" } ] }
这里需要和Elasticsearch做映射关系。在Elasticsearch中也是一对多的关系。大致是这样的结构,这里采用的是Elasticsearch中的nested类型来实现。
创建所需索引(采用静态mapping映射)
PUT app-article-link { "mappings" : { "properties" : { "address" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "customList" : { "type" : "nested", "properties" : { "customId" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "customName" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "detId" : { "type" : "keyword" }, "firstLevel" : { "type" : "keyword" }, "firstLevelName" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "isRelEnterprise" : { "type" : "keyword" }, "moduleType" : { "type" : "keyword" }, "secondLevel" : { "type" : "keyword" }, "secondLevelName" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } }, "custom_list" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "detail" : { "type" : "text", "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart" }, "endTime" : { "type" : "keyword" }, "id" : { "type" : "keyword" }, "industryName" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "isDelete" : { "type" : "keyword" }, "price" : { "type" : "keyword" }, "publishDate" : { "type" : "keyword" }, "relevanceType" : { "type" : "keyword" }, "savePath" : { "type" : "keyword" }, "source" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 }, "suggest" : { "type" : "completion", "analyzer" : "simple", "preserve_separators" : true, "preserve_position_increments" : true, "max_input_length" : 50 } }, "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart" }, "startTime" : { "type" : "keyword" }, "summary" : { "type" : "text", "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart" }, "techFieldName" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "title" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 }, "suggest" : { "type" : "completion", "analyzer" : "simple", "preserve_separators" : true, "preserve_position_increments" : true, "max_input_length" : 50 } }, "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart" }, "update_time" : { "type" : "keyword" }, "videoStatus" : { "type" : "keyword" } } } }
以下是Logstash 相关配置操作:
由于上面描述的数据库表是一对多的关系,这里选择先建立一个视图,原因是会通过数据库表的最新时间字段来作为临界点进行数据同步(关键点是找出主表和从表的最新时间点)。视图创建sql如下:
SELECT t.id, t.title, t.source, '8' AS relevanceType , date_format( greatest( `t`.`update_time`, ifnull( `i`.`update_time`, '1970' )), '%Y-%m-%d %H:%i:%s' ) AS `update_time` FROM `news` t LEFT JOIN custom_information i ON t.id=i.item_id AND i.is_delete='0' AND i.module_type='8' WHERE t.state = '0' AND t.publish_status='3' AND t.relevance_type='2'
上面的update_time为两表中的最新时间。
在logstash congf目录下创建news.conf,内容如下:
input { jdbc { jdbc_driver_library => "/opt/apps/logstash/lib/mysql-connector-java-8.0.13.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.0.178:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true" jdbc_user => "root" jdbc_password => "123456" connection_retry_attempts => "3" jdbc_validation_timeout => "3600" jdbc_paging_enabled => "true" jdbc_page_size => "500" statement_filepath => "/opt/apps/logstash/sql/news.sql" use_column_value => true lowercase_column_names => false tracking_column => "update_time" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/opt/apps/logstash/station/news.txt" clean_run => false schedule => "*/5 * * * * *" type => "news" } } filter { aggregate { task_id => "%{id}" code => " map['id'] = event.get('id') map['title'] = event.get('title') map['source'] = event.get('source') map['custom_list'] ||=[] map['customList'] ||=[] if (event.get('detId') != nil) if !(map['custom_list'].include? event.get('detId')) map['custom_list'] << event.get('detId') map['customList'] << { 'detId' => event.get('detId'), 'moduleType' => event.get('moduleType'), 'customId' => event.get('customId'), 'customName' => event.get('customName'), 'firstLevel' => event.get('firstLevel'), 'firstLevelName' => event.get('firstLevelName'), 'secondLevel' => event.get('secondLevel'), 'secondLevelName' => event.get('secondLevelName'), 'isRelEnterprise' => event.get('isRelEnterprise') } end end event.cancel() " push_previous_map_as_event => true timeout => 5 } mutate { } mutate { remove_field => ["@timestamp","@version"] } } output { elasticsearch { document_id => "%{id}" document_type => "_doc" index => "app-article-link" hosts => ["http://192.168.0.178:9200"] } stdout{ codec => rubydebug } }
input{} 中
statement_filepath 为sql语句位置,
last_run_metadata_path 记录最新时间位置,下次从这个时间点开始更新,
tracking_column 为更新的时间字段,
schedule 执行的时间 上述中每个五秒钟执行一次,
执行的sql:
SELECT
n.id,
n.title,
n.source
FROM
news_view n
order by n.update_time
编辑conf/pipelines.yml
[root@localhost config]# vi pipelines.yml # List of pipelines to be loaded by Logstash # # This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings. # Default values for omitted settings are read from the `logstash.yml` file. # When declaring multiple pipelines, each MUST have its own `pipeline.id`. # # Example of two pipelines: # # - pipeline.id: test # pipeline.workers: 1 # pipeline.batch.size: 1 # config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }" # - pipeline.id: another_test # queue.type: persisted # path.config: "/tmp/logstash/*.config" # #- pipeline.id: news_table # path.config: /opt/apps/logstash/config/addmysql.conf #- pipeline.id: news_table3 # path.config: /opt/apps/logstash/config/addmysql3.conf - pipeline.id: news path.config: /opt/apps/logstash/config/news.conf
执行./bin/logstash
[root@localhost logstash]# ./bin/logstash
kibana常用查询
精确查询
GET /app-article-link/_search
{
"_source": ["id","title","source","customList","update_time","savePath","isDelete"],
"query": {
"bool": {
"must": [
{ "match": { "id": "15c7ee7a5dc411ea9bc2fa163e0c8256" }}
]
}}}
nested查询,mapping映射类型必须为nested
GET app-article-link/_search { "query": { "bool": { "must": [ { "nested": { "path": "customList", "query": { "bool": { "must": [ { "match": { "customList.customId": "1" }}, { "match": { "customList.secondLevel": "5552" }} ] }}}} ] }}}
自动补全查询,字段类型必须为completion
GET app-article-link/_search
{
"_source": ["source","title","detail"],
"suggest": {
"title_suggest": {
"prefix": "国家知识产",
"completion": {
"field": "title.suggest",
"size": 10,
"skip_duplicates": true
}
}
}
}
高亮查询
GET app-article-link/_search { "query": { "multi_match": { "query": "安徽", "fields": ["title"] } }, "highlight": { "pre_tags": "<span class='highLight'>", "post_tags": "</span>", "fields": { "title": {} } } }
最终通过Logstash导入的数据格式:
搭建的Elasticsearch为7.8.1版本。
引入依赖
<!-- es搜索 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.8.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.8.1</version> </dependency>
创建配置
@Configuration public class ESConfig { private static String hosts = "192.168.0.178"; // 集群地址,多个用,隔开 private static int port = 9200; // 使用的端口号 private static String schema = "http"; // 使用的协议 private static ArrayList<HttpHost> hostList = null; private static int connectTimeOut = 1000; // 连接超时时间 private static int socketTimeOut = 30000; // 连接超时时间 private static int connectionRequestTimeOut = 500; // 获取连接的超时时间 private static int maxConnectNum = 100; // 最大连接数 private static int maxConnectPerRoute = 100; // 最大路由连接数 static { hostList = new ArrayList<>(); String[] hostStrs = hosts.split(","); for (String host : hostStrs) { hostList.add(new HttpHost(host, port, schema)); } } @Bean public RestHighLevelClient restHighLevelClient(){ RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0])); // 异步httpclient连接延时配置 builder.setRequestConfigCallback(new RequestConfigCallback() { @Override public Builder customizeRequestConfig(Builder requestConfigBuilder) { requestConfigBuilder.setConnectTimeout(connectTimeOut); requestConfigBuilder.setSocketTimeout(socketTimeOut); requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut); return requestConfigBuilder; } }); // 异步httpclient连接数配置 builder.setHttpClientConfigCallback(new HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setMaxConnTotal(maxConnectNum); httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute); return httpClientBuilder; } }); RestHighLevelClient client = new RestHighLevelClient(builder); return client; } }
编写测试
注入template
@Autowired
private RestHighLevelClient restHighLevelClient ;
高亮搜索
public ResultBody highlighted(@RequestParam(value = "key") String key, @RequestParam(value = "pageSize",defaultValue = "10") Integer pageSize, @RequestParam(value = "from",defaultValue = "1") Integer from) throws IOException { // 偏移量 int offset = (from -1) * pageSize ; //定义索引库 SearchRequest searchRequest = new SearchRequest("app-article-link"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 创建查询语句 ES中must和should不能同时使用 同时使用should失效 嵌套多个must 将should条件拼接在一个must中即可 BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery(); // 行业 /**if(industryList.size()>0) { for (Map<String,String> itemMap : industryList) { String customId = itemMap.get("customId"); String firstLevel = itemMap.get("firstLevel"); String secondLevel = itemMap.get("secondLevel"); NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("customList", QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("customList.customId.keyword", customId)) .must(QueryBuilders.matchQuery("customList.firstLevel",firstLevel)) .must(QueryBuilders.matchQuery("customList.secondLevel",secondLevel)), ScoreMode.None); shouldQuery.should(nestedQueryBuilder); } }**/ // 地区定位 /**if(StringUtils.isNotBlank(areaCode)) { // nested 嵌套对象查询 NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("customList", QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("customList.customId.keyword", "1")) .must(QueryBuilders.matchQuery("customList.firstLevel",areaCode)), ScoreMode.None); shouldQuery.should(nestedQueryBuilder); }**/ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery() .must(shouldQuery) .must(QueryBuilders.matchQuery("isDelete","0")); //boolQueryBuilder.mustNot(QueryBuilders.termsQuery("id",articleItemId)); //List<String> customIdList = new ArrayList(); //if(customIdList!=null && customIdList.size()>0) { // boolQueryBuilder.mustNot(QueryBuilders.termsQuery("id",customIdList)); //} // 如果有关键词,添加关键词 if(StringUtils.isNotBlank(key)) { boolQueryBuilder.must(QueryBuilders.multiMatchQuery(key,"title","summary","detail" )); } //定义高亮查询 HighlightBuilder highlightBuilder = new HighlightBuilder(); //设置需要高亮的字段 highlightBuilder.field("title") // 设置前缀、后缀 .preTags("<font color='#ee1a1a'>") .postTags("</font>"); searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.highlighter(highlightBuilder); // 分页 searchSourceBuilder.from(offset); searchSourceBuilder.size(pageSize); // 按发布时间降序排序 searchSourceBuilder.sort("publishDate", SortOrder.DESC); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); long total = searchResponse.getHits().getTotalHits().value; List<Map<String, Object>> list = Lists.newArrayList(); //遍历高亮结果 for (SearchHit hit : searchResponse.getHits().getHits()) { Map<String, HighlightField> highlightFields = hit.getHighlightFields(); HighlightField nameHighlight = highlightFields.get("title"); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 拼接,覆盖原有值 if (nameHighlight != null) { Text[] fragments = nameHighlight.getFragments(); String title = ""; for (Text text : fragments) { title += text; } sourceAsMap.put("title", title); } // 初始值 sourceAsMap.put("isRead","0"); list.add(sourceAsMap); } // 构造返回数据 Map<String,Object> retMap = new HashMap<>(); retMap.put("total",total); retMap.put("dataList",list); return ResultBody.ok().data(retMap); }
自动补全
public ResultBody getSearchSuggest(@RequestParam(value = "key") String key) throws IOException { if(StringUtils.isBlank(key)) { return ResultBody.ok(); } CompletionSuggestionBuilder suggestion = SuggestBuilders .completionSuggestion("title.suggest").prefix(key).size(20).skipDuplicates(true); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("suggest", suggestion); // source builder SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.suggest(suggestBuilder); SearchRequest searchRequest = new SearchRequest("app-article-link"); //索引 searchRequest.source(sourceBuilder); SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Suggest suggest = response.getSuggest(); Set<String> keywords = null; if (suggest != null) { keywords = new HashSet<>(); List<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> entries = suggest.getSuggestion("suggest").getEntries(); for (Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option> entry: entries) { for (Suggest.Suggestion.Entry.Option option: entry.getOptions()) { // 最多返回10个推荐,每个长度最大为50 String keyword = option.getText().string(); if (!StringUtils.isEmpty(keyword) && keyword.length() <= 50) { // 去除输入字段 if (keyword.equals(key)) continue; keywords.add(keyword); if (keywords.size() >= 10) { break; } } } } } return ResultBody.ok().data(keywords); }
欢迎交流!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。