当前位置:   article > 正文

springboot+es批量新增、批量修改、根据内部id批量查询_springbootes7.10.2根据id更新指定字段

springbootes7.10.2根据id更新指定字段

pom.xml配置

<dependency>
    <!--  ElasticSearch-->
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.8.0</version>
    <exclusions>
        <exclusion>
            <artifactId>elasticsearch</artifactId>
            <groupId>org.elasticsearch</groupId>
        </exclusion>
        <exclusion>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    <version>2.3.9.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>transport-netty4-client</artifactId>
    <version>7.8.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>7.8.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

yml配置

elasticsearch:
  hosts: ${ES_HOST:ip:端口}
es_user_name: ${ES_USER_NAME:xxxx}
es_password: ${ES_PASSWORD:xxxxx}
  • 1
  • 2
  • 3
  • 4

EsConfig配置

@Configuration
@Slf4j
public class EsConfig {
    @Value("${es_user_name}")
    private String username;
    @Value("${es_password}")
    private String password;
    public static final RequestOptions COMMON_OPTIONS;

    static {
        RequestOptions.Builder builder=RequestOptions.DEFAULT.toBuilder();
        COMMON_OPTIONS=builder.build();
    }
    @Bean
    RestHighLevelClient restHighLevelClient(@Value("${spring.elasticsearch.hosts}") String hosts) {
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
        String[] hostsWithPort = hosts.split(",");
        HttpHost[] httpHosts = new HttpHost[hostsWithPort.length];
        for (int i = 0; i < hostsWithPort.length; i++) {
            String hostWithPort = hostsWithPort[i];
            String[] hostPort = hostWithPort.split(":");
            String host = hostPort[0];
            String port = hostPort[1];
            httpHosts[i] = new HttpHost(host, Integer.parseInt(port));
        }
        return new RestHighLevelClient(RestClient.builder(httpHosts).setHttpClientConfigCallback(f->f.setDefaultCredentialsProvider(credentialsProvider)));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

ElasticSearchConfig 配置

@Configuration
public class ElasticSearchConfig {

    /**
     * 防止netty的bug
     * java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
     */
    @PostConstruct
    void init() {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

启动类配置

public static void main(String[] args) {
    System.setProperty("es.set.netty.runtime.available.processors", "false");
    SpringApplication.run(xxxx.class, args);
}
  • 1
  • 2
  • 3
  • 4

//批量操作的对象

private  BulkProcessor bulkProcessor=createBulkProcessor();
@Autowired
RestHighLevelClient restHighLevelClient;
private  BulkProcessor createBulkProcessor() {

    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request,
                              BulkResponse response) {
            if (!response.hasFailures()) {
                log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
            } else {
                BulkItemResponse[] items = response.getItems();
                for (BulkItemResponse item : items) {
                    if (item.isFailed()) {
                        log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
                        break;
                    }
                }
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request,
                              Throwable failure) {

            List<DocWriteRequest<?>> requests = request.requests();
            List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
            log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
        }
    };

    BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
        restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
    }), listener);
    //到达10000条时刷新
    builder.setBulkActions(10000);
    //内存到达8M时刷新
    builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
    //设置的刷新间隔10s
    builder.setFlushInterval(TimeValue.timeValueSeconds(10));
    //设置允许执行的并发请求数。
    builder.setConcurrentRequests(8);
    //设置重试策略
    builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
    return builder.build();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

批量修改

public void bulkUpdate(EsUpdateBO esUpdateBO){
    List<UpdateRequest> updateRequests=new ArrayList<>();

    esUpdateBO.getIds().forEach(e->{
        //获取id
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("es索引名称");
        //更新的id
        updateRequest.id(e);
        //更新的数据
        Map<String,Object> map=new HashMap<>();
        map.put(esUpdateBO.getIsWarning(),"1");

        updateRequest.doc(map);
        updateRequests.add(updateRequest);
    });
    updateRequests.forEach(bulkProcessor::add);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

批量新增

public void bulkAdd(List<Map<String, Object>> result) {
    List<IndexRequest> indexRequests = new ArrayList<>();
    result.forEach(e -> {
        IndexRequest request = new IndexRequest("es索引名称");
     
        request.source(JSON.toJSONString(e), XContentType.JSON);
        request.opType(DocWriteRequest.OpType.CREATE);
        indexRequests.add(request);
    });
    indexRequests.forEach(bulkProcessor::add);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

根据es内部id批量查询数据

public List<Map<String, Object>> getWaringEventList(List<String> ids) throws IOException {
    String[] idsStr = ids.toArray(new String[ids.size()]);
    List<Map<String, Object>> details=new ArrayList<>();
    SearchRequest searchRequest = new SearchRequest("es索引名称");
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    IdsQueryBuilder idsQueryBuilder = QueryBuilders.idsQuery().addIds(idsStr);
    boolQueryBuilder.must(idsQueryBuilder);
    searchSourceBuilder.query(boolQueryBuilder);
    searchSourceBuilder.size(10000);
    //将所有的条件进行整合
    searchRequest.source(searchSourceBuilder);
    //根据restHighLevelClient进行查询
    SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    SearchHits searchHits = search.getHits();
    SearchHit[] hits = searchHits.getHits();
    for (SearchHit inhit : hits) {
Map<String, Object> thirdSourceAsMap = inhit.getSourceAsMap();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/67974?site
推荐阅读
相关标签
  

闽ICP备14008679号