当前位置:   article > 正文

JAVA开发与JAVA(一文学会使用ElasticSearch)_java elasticsearch

java elasticsearch

        在web网站的架设中特别是数据量大的网站或者APP小程序需要搜索或者全文检索的场景,几乎都需要借助ElasticSearch来作为全文检索引擎,以提高网站的搜索效率和性能。

这一节,我们通过一篇文章介绍,使大家通过一文就学会使用ElasticSearch

一、ElasticSearch介绍:

 

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

ElasticSearch相关概念:

a)、索引index,相当于数据库中的database。
b)、类型type相当于数据库中的table。
c)、主键id相当于数据库中记录的主键,是唯一的。
d)、文档 document (相当于一条数据)
文档是ElasticSearch的基本单位。在Es中文档以JSON格式来表示
向es中的index下面的type中存储json类型的数据。
e) 、字段是文档中的field 属性,需要对每一个属性定义索引和被搜索的方式

二、ElasticSearch的安装:

1、先安装jdk

2、安装ElasticSearch

直接进入elasticsearch的官网,下载最新的安装包:https://www.elastic.co/downloads/elasticsearch,此教程使用的是5.1.1版本。

将下载的安装包上传到centos,或者直接在centos使用wget命令下载。

解压:

unzip elasticsearch-5.1.1.zip

运行:

  1. cd bin
  2. ./elasticsearch

三、java语言操作ElasticSearch:

1、maven依赖

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch</artifactId>
  4. <version>7.6.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.elasticsearch.client</groupId>
  8. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  9. <version>7.6.2</version>
  10. </dependency>

2、连接ElasticSearch

  1. import org.apache.http.HttpHost;
  2. import org.elasticsearch.client.RestClient;
  3. import org.elasticsearch.client.RestHighLevelClient;
  4. import java.io.IOException;
  5. public class EsClientTest {
  6. public static void main(String[] args) throws IOException {
  7. RestHighLevelClient esClient = new RestHighLevelClient(
  8. RestClient.builder(new HttpHost("IP",9200,"http"))
  9. );
  10. System.out.println("success");
  11. esClient.close();
  12. }
  13. }

 3、连接的相关api

  1. public static RestHighLevelClient esClient;
  2. static {
  3. esClient = new RestHighLevelClient(
  4. RestClient.builder(new HttpHost("IP", 9200, "http"))
  5. );
  6. }

4、创建索引操作:

  1. /**
  2. * 创建索引
  3. * @throws IOException
  4. */
  5. public static void createIndex() throws IOException {
  6. CreateIndexRequest createIndexRequest = new CreateIndexRequest("user");
  7. CreateIndexResponse indexResponse = esClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
  8. boolean acknowledged = indexResponse.isAcknowledged();
  9. System.out.println("索引创建状态:" + acknowledged);
  10. }

5、获取索引:

  1. /**
  2. * 索引信息查询
  3. * @throws IOException
  4. */
  5. public static void getIndex() throws IOException {
  6. GetIndexRequest getIndexRequest = new GetIndexRequest("user");
  7. GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
  8. System.out.println(getIndexResponse.getAliases());
  9. System.out.println(getIndexResponse.getMappings());
  10. System.out.println(getIndexResponse.getSettings());
  11. }

6、删除索引:

  1. /**
  2. * 删除索引
  3. * @throws IOException
  4. */
  5. public static void deleteIndex() throws IOException {
  6. DeleteIndexRequest getIndexRequest = new DeleteIndexRequest("user");
  7. AcknowledgedResponse delete = esClient.indices().delete(getIndexRequest, RequestOptions.DEFAULT);
  8. System.out.println("索引删除状态:" + delete.isAcknowledged());
  9. }

7、添加数据:

  1. /**
  2. * 添加数据
  3. * @throws Exception
  4. */
  5. public static void add() throws Exception{
  6. IndexRequest indexRequest = new IndexRequest();
  7. indexRequest.index("user").id("1008");
  8. User user = new User();
  9. user.setName("茅河野人");
  10. user.setAge(28);
  11. user.setSex("男");
  12. user.setSalary(50000);
  13. String userData = objectMapper.writeValueAsString(user);
  14. indexRequest.source(userData,XContentType.JSON);
  15. //插入数据
  16. IndexResponse response = esClient.index(indexRequest, RequestOptions.DEFAULT);
  17. System.out.println(response.status());
  18. System.out.println(response.getResult());
  19. }

8、修改数据:

  1. /**
  2. * 修改数据
  3. * @throws Exception
  4. */
  5. public static void update() throws Exception{
  6. UpdateRequest request = new UpdateRequest();
  7. request.index("user").id("1008");
  8. request.doc(XContentType.JSON,"name","茅河野人");
  9. //插入数据
  10. UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
  11. System.out.println(response.getResult());
  12. }

9、删除数据:

  1. /**
  2. * 删除
  3. * @throws Exception
  4. */
  5. public static void delete() throws Exception{
  6. DeleteRequest request = new DeleteRequest();
  7. request.index("user").id("1008");
  8. //插入数据
  9. DeleteResponse delete = esClient.delete(request, RequestOptions.DEFAULT);
  10. System.out.println(delete.getResult());
  11. }

10、批量添加数据:

  1. /**
  2. * 批量添加
  3. * @throws Exception
  4. */
  5. public static void batchInsert() throws Exception{
  6. BulkRequest bulkRequest = new BulkRequest();
  7. User user1 = new User("关羽","男",33,5500);
  8. String userData1 = objectMapper.writeValueAsString(user1);
  9. IndexRequest indexRequest1 = new IndexRequest().index("user").id("1002").source(userData1, XContentType.JSON);
  10. bulkRequest.add(indexRequest1);
  11. User user2 = new User("黄忠","男",50,8000);
  12. String userData2 = objectMapper.writeValueAsString(user2);
  13. IndexRequest indexRequest2 = new IndexRequest().index("user").id("1003").source(userData2, XContentType.JSON);
  14. bulkRequest.add(indexRequest2);
  15. User user3 = new User("黄忠2","男",49,10000);
  16. String userData3 = objectMapper.writeValueAsString(user3);
  17. IndexRequest indexRequest3 = new IndexRequest().index("user").id("1004").source(userData3, XContentType.JSON);
  18. bulkRequest.add(indexRequest3);
  19. User user4 = new User("赵云","男",33,12000);
  20. String userData4 = objectMapper.writeValueAsString(user4);
  21. IndexRequest indexRequest4 = new IndexRequest().index("user").id("1005").source(userData4, XContentType.JSON);
  22. bulkRequest.add(indexRequest4);
  23. User user5 = new User("马超","男",38,20000);
  24. String userData5 = objectMapper.writeValueAsString(user5);
  25. IndexRequest indexRequest5 = new IndexRequest().index("user").id("1006").source(userData5, XContentType.JSON);
  26. bulkRequest.add(indexRequest5);
  27. User user6 = new User("关羽","男",41,27000);
  28. String userData6 = objectMapper.writeValueAsString(user6);
  29. IndexRequest indexRequest6 = new IndexRequest().index("user").id("1007").source(userData6, XContentType.JSON);
  30. bulkRequest.add(indexRequest6);
  31. BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  32. System.out.println(bulkResponse.status());
  33. System.out.println(bulkResponse.getItems());
  34. }

11、批量删除数据:

  1. /**
  2. * 批量删除
  3. * @throws Exception
  4. */
  5. public static void batchDelete() throws Exception{
  6. BulkRequest bulkRequest = new BulkRequest();
  7. DeleteRequest indexRequest1 = new DeleteRequest().index("user").id("1002");
  8. DeleteRequest indexRequest2 = new DeleteRequest().index("user").id("1003");
  9. DeleteRequest indexRequest3 = new DeleteRequest().index("user").id("1004");
  10. DeleteRequest indexRequest4 = new DeleteRequest().index("user").id("1005");
  11. DeleteRequest indexRequest5 = new DeleteRequest().index("user").id("1006");
  12. DeleteRequest indexRequest6 = new DeleteRequest().index("user").id("1007");
  13. bulkRequest.add(indexRequest1);
  14. bulkRequest.add(indexRequest2);
  15. bulkRequest.add(indexRequest3);
  16. bulkRequest.add(indexRequest4);
  17. bulkRequest.add(indexRequest5);
  18. bulkRequest.add(indexRequest6);
  19. BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  20. System.out.println(bulkResponse.status());
  21. System.out.println(bulkResponse.getItems());
  22. }

13、删除某个索引下所有数据:

  1. /**
  2. * 查询某个索引下的所有数据
  3. * @throws Exception
  4. */
  5. public static void searchIndexAll() throws Exception{
  6. SearchRequest request = new SearchRequest();
  7. request.indices("user");
  8. // 索引中的全部数据查询
  9. SearchSourceBuilder query = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
  10. request.source(query);
  11. SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
  12. SearchHits hits = response.getHits();
  13. for (SearchHit searchHit : hits){
  14. System.out.println(searchHit.getSourceAsString());
  15. }
  16. }

 14、根据条件查询:

  1. TermQueryBuilder ageQueryBuilder = QueryBuilders.termQuery("sex", "女");
  2. SearchSourceBuilder query = new SearchSourceBuilder().query(ageQueryBuilder);
  3. request.source(query);
  4. SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
  5. System.out.println(response.getHits().getHits());
  6. System.out.println(response.getHits().getTotalHits());
  7. SearchHits hits = response.getHits();
  8. for (SearchHit searchHit : hits){
  9. System.out.println(searchHit.getSourceAsString());
  10. }

15、分页查询:

  1. SearchSourceBuilder sourceBuilder = new
  2. SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
  3. sourceBuilder.from(0).size(3);
  4. request.source(sourceBuilder);
  5. SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
  6. System.out.println(response.getHits().getHits());
  7. System.out.println(response.getHits().getTotalHits());
  8. SearchHits hits = response.getHits();
  9. for (SearchHit searchHit : hits){
  10. System.out.println(searchHit.getSourceAsString());
  11. }

四、在springboot中的运用

1、maven依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  4. </dependency>

2、yml配置文件:

  1. # es 服务地址
  2. elasticsearch.host=IP
  3. # es 服务端口
  4. elasticsearch.port=9200
  5. # 配置日志级别,开启 debug 日志
  6. logging.level.com.congge=debug

3、实际例子:

创建一个实体类

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. import lombok.ToString;
  5. import org.springframework.data.annotation.Id;
  6. import org.springframework.data.elasticsearch.annotations.Document;
  7. import org.springframework.data.elasticsearch.annotations.Field;
  8. import org.springframework.data.elasticsearch.annotations.FieldType;
  9. @Data
  10. @NoArgsConstructor
  11. @AllArgsConstructor
  12. @ToString
  13. @Document(indexName = "shopping", shards = 3, replicas = 1)
  14. public class Product {
  15. //必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
  16. @Id
  17. private Long id;//商品唯一标识
  18. /**
  19. * type : 字段数据类型
  20. * analyzer : 分词器类型
  21. * index : 是否索引(默认:true)
  22. * Keyword : 短语,不进行分词
  23. */
  24. @Field(type = FieldType.Text, analyzer = "ik_max_word")
  25. private String title;//商品名称
  26. @Field(type = FieldType.Keyword)
  27. private String category;//分类名称
  28. @Field(type = FieldType.Double)
  29. private Double price;//商品价格
  30. @Field(type = FieldType.Keyword, index = false)
  31. private String images;//图片地址
  32. }

提供接口:

  1. import com.congge.entity.Product;
  2. import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
  3. import org.springframework.stereotype.Repository;
  4. @Repository
  5. public interface ProductDao extends ElasticsearchRepository<Product, Long>{
  6. }

配置类:

  1. import lombok.Data;
  2. import org.apache.http.HttpHost;
  3. import org.elasticsearch.client.RestClient;
  4. import org.elasticsearch.client.RestClientBuilder;
  5. import org.elasticsearch.client.RestHighLevelClient;
  6. import org.springframework.boot.context.properties.ConfigurationProperties;
  7. import org.springframework.context.annotation.Configuration;
  8. //import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
  9. @ConfigurationProperties(prefix = "elasticsearch")
  10. @Configuration
  11. @Data
  12. public class EsConfig extends com.congge.config.AbstractElasticsearchConfiguration {
  13. private String host ;
  14. private Integer port ;
  15. //重写父类方法
  16. @Override
  17. public RestHighLevelClient elasticsearchClient() {
  18. RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
  19. RestHighLevelClient restHighLevelClient = new
  20. RestHighLevelClient(builder);
  21. return restHighLevelClient;
  22. }
  23. }
  1. import org.elasticsearch.client.RestHighLevelClient;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport;
  4. import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
  5. import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
  6. import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
  7. public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport {
  8. //需重写本方法
  9. public abstract RestHighLevelClient elasticsearchClient();
  10. @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
  11. public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter) {
  12. return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter);
  13. }
  14. }

测试1:

  1. import com.congge.entity.Product;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. @RunWith(SpringRunner.class)
  9. @SpringBootTest
  10. public class EsIndexTest {
  11. //注入 ElasticsearchRestTemplate
  12. @Autowired
  13. private ElasticsearchRestTemplate elasticsearchRestTemplate;
  14. //创建索引并增加映射配置
  15. @Test
  16. public void createIndex(){
  17. //创建索引,系统初始化会自动创建索引
  18. System.out.println("创建索引");
  19. }
  20. @Test
  21. public void deleteIndex(){
  22. //创建索引,系统初始化会自动创建索引
  23. boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
  24. System.out.println("删除索引 = " + flg);
  25. }
  26. }

测试2:

  1. import com.congge.dao.ProductDao;
  2. import com.congge.entity.Product;
  3. import org.elasticsearch.index.query.QueryBuilders;
  4. import org.elasticsearch.index.query.TermQueryBuilder;
  5. import org.junit.Test;
  6. import org.junit.runner.RunWith;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. import org.springframework.data.domain.Page;
  10. import org.springframework.data.domain.PageRequest;
  11. import org.springframework.data.domain.Sort;
  12. import org.springframework.test.context.junit4.SpringRunner;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. @RunWith(SpringRunner.class)
  16. @SpringBootTest
  17. public class EsDocTest {
  18. @Autowired
  19. private ProductDao productDao;
  20. /**
  21. * 新增
  22. */
  23. @Test
  24. public void save() {
  25. Product product = new Product();
  26. product.setId(2L);
  27. product.setTitle("ipad mini");
  28. product.setCategory("ipad");
  29. product.setPrice(1998.0);
  30. product.setImages("http://ipad.jpg");
  31. productDao.save(product);
  32. }
  33. //修改
  34. @Test
  35. public void update(){
  36. Product product = new Product();
  37. product.setId(2L);
  38. product.setTitle("iphone");
  39. product.setCategory("mobile");
  40. product.setPrice(6999.0);
  41. product.setImages("http://www.phone.jpg");
  42. productDao.save(product);
  43. }
  44. //根据 id 查询
  45. @Test
  46. public void findById(){
  47. Product product = productDao.findById(2L).get();
  48. System.out.println(product);
  49. }
  50. //查询所有
  51. @Test
  52. public void findAll(){
  53. Iterable<Product> products = productDao.findAll();
  54. for (Product product : products) {
  55. System.out.println(product);
  56. }
  57. }
  58. //删除
  59. @Test
  60. public void delete(){
  61. Product product = new Product();
  62. product.setId(2L);
  63. productDao.delete(product);
  64. }
  65. //批量新增
  66. @Test
  67. public void saveAll(){
  68. List<Product> productList = new ArrayList<>();
  69. for (int i = 0; i < 10; i++) {
  70. Product product = new Product();
  71. product.setId(Long.valueOf(i));
  72. product.setTitle("iphone" + i);
  73. product.setCategory("mobile");
  74. product.setPrice(5999.0 + i);
  75. product.setImages("http://www.phone.jpg");
  76. productList.add(product);
  77. }
  78. productDao.saveAll(productList);
  79. }
  80. //分页查询
  81. @Test
  82. public void findByPageable(){
  83. //设置排序(排序方式,正序还是倒序,排序的 id)
  84. Sort sort = Sort.by(Sort.Direction.DESC,"id");
  85. int currentPage=0;//当前页,第一页从 0 开始, 1 表示第二页
  86. int pageSize = 5;//每页显示多少条
  87. //设置查询分页
  88. PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
  89. //分页查询
  90. Page<Product> productPage = productDao.findAll(pageRequest);
  91. for (Product Product : productPage.getContent()) {
  92. System.out.println(Product);
  93. }
  94. }
  95. /**
  96. * term 查询
  97. * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
  98. */
  99. @Test
  100. public void termQuery(){
  101. TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "iphone");
  102. Iterable<Product> products = productDao.search(termQueryBuilder);
  103. for (Product product : products) {
  104. System.out.println(product);
  105. }
  106. }
  107. /**
  108. * term 查询加分页
  109. */
  110. @Test
  111. public void termQueryByPage(){
  112. int currentPage= 0 ;
  113. int pageSize = 5;
  114. //设置查询分页
  115. PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
  116. TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "phone");
  117. Iterable<Product> products =
  118. productDao.search(termQueryBuilder,pageRequest);
  119. for (Product product : products) {
  120. System.out.println(product);
  121. }
  122. }
  123. }

五、将mysql数据写入Elasticsearch例子

  1. package com.example.esdemo.service.impl;
  2. import com.example.esdemo.config.DBHelper;
  3. import com.example.esdemo.imports.ImportDb2Es;
  4. import com.example.esdemo.service.ImportService;
  5. import org.apache.logging.log4j.LogManager;
  6. import org.apache.logging.log4j.Logger;
  7. import org.elasticsearch.action.ActionListener;
  8. import org.elasticsearch.action.bulk.BackoffPolicy;
  9. import org.elasticsearch.action.bulk.BulkProcessor;
  10. import org.elasticsearch.action.bulk.BulkRequest;
  11. import org.elasticsearch.action.bulk.BulkResponse;
  12. import org.elasticsearch.action.index.IndexRequest;
  13. import org.elasticsearch.client.RequestOptions;
  14. import org.elasticsearch.client.RestHighLevelClient;
  15. import org.elasticsearch.common.unit.ByteSizeUnit;
  16. import org.elasticsearch.common.unit.ByteSizeValue;
  17. import org.elasticsearch.common.unit.TimeValue;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.stereotype.Component;
  20. import java.sql.*;
  21. import java.util.ArrayList;
  22. import java.util.HashMap;
  23. import java.util.concurrent.TimeUnit;
  24. import java.util.function.BiConsumer;
  25. /**
  26. * 导入db2es 实现类
  27. */
  28. @Component
  29. public class ImportServiceImpl implements ImportService {
  30. private static final Logger logger = LogManager.getLogger(ImportServiceImpl.class);
  31. @Autowired
  32. private RestHighLevelClient client;
  33. @Override
  34. public void importDb2Es(ImportDb2Es importDb2Es) {
  35. writeMySQLDataToES(importDb2Es.getDbTableName(),importDb2Es.getDbTableName());
  36. }
  37. private void writeMySQLDataToES(String tableName,String esIndeName) {
  38. BulkProcessor bulkProcessor = getBulkProcessor(client);
  39. Connection connection = null;
  40. PreparedStatement ps = null;
  41. ResultSet rs = null;
  42. try {
  43. connection = DBHelper.getConn();
  44. logger.info("start handle data :" + tableName);
  45. String sql = "select * from " + tableName;
  46. ps = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
  47. // 根据自己需要设置 fetchSize
  48. ps.setFetchSize(20);
  49. rs = ps.executeQuery();
  50. ResultSetMetaData colData = rs.getMetaData();
  51. ArrayList<HashMap<String, String>> dataList = new ArrayList<>();
  52. HashMap<String, String> map = null;
  53. int count = 0;
  54. // c 就是列的名字 v 就是列对应的值
  55. String c = null;
  56. String v = null;
  57. while (rs.next()) {
  58. count++;
  59. map = new HashMap<String, String>(128);
  60. for (int i = 1; i < colData.getColumnCount(); i++) {
  61. c = colData.getColumnName(i);
  62. v = rs.getString(c);
  63. map.put(c, v);
  64. }
  65. dataList.add(map);
  66. // 每1万条 写一次 不足的批次的数据 最后一次提交处理
  67. if (count % 10000 == 0) {
  68. logger.info("mysql handle data number:" + count);
  69. // 将数据添加到 bulkProcessor
  70. for (HashMap<String, String> hashMap2 : dataList) {
  71. bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
  72. }
  73. // 每提交一次 清空 map 和 dataList
  74. map.clear();
  75. dataList.clear();
  76. }
  77. }
  78. // 处理 未提交的数据
  79. for (HashMap<String, String> hashMap2 : dataList) {
  80. bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
  81. }
  82. bulkProcessor.flush();
  83. } catch (SQLException e) {
  84. e.printStackTrace();
  85. } finally {
  86. try {
  87. rs.close();
  88. ps.close();
  89. connection.close();
  90. boolean terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
  91. logger.info(terinaFlag);
  92. } catch (Exception e) {
  93. e.printStackTrace();
  94. }
  95. }
  96. }
  97. private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
  98. BulkProcessor bulkProcessor = null;
  99. try {
  100. BulkProcessor.Listener listener = new BulkProcessor.Listener() {
  101. @Override
  102. public void beforeBulk(long executionId, BulkRequest request) {
  103. logger.info("Try to insert data number : "
  104. + request.numberOfActions());
  105. }
  106. @Override
  107. public void afterBulk(long executionId, BulkRequest request,
  108. BulkResponse response) {
  109. logger.info("************** Success insert data number : "
  110. + request.numberOfActions() + " , id: " + executionId);
  111. }
  112. @Override
  113. public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
  114. logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
  115. }
  116. };
  117. BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
  118. .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
  119. BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
  120. builder.setBulkActions(5000);
  121. builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
  122. builder.setConcurrentRequests(10);
  123. builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
  124. builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
  125. // 注意点:让参数设置生效
  126. bulkProcessor = builder.build();
  127. } catch (Exception e) {
  128. e.printStackTrace();
  129. try {
  130. bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
  131. } catch (Exception e1) {
  132. logger.error(e1.getMessage());
  133. }
  134. }
  135. return bulkProcessor;
  136. }
  137. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/565909
推荐阅读
相关标签
  

闽ICP备14008679号