赞
踩
先学会用,再去研究原理,代码如下
- import java.io.IOException;
- import java.util.List;
- import java.util.stream.Collectors;
-
- import javax.annotation.Resource;
-
- import org.elasticsearch.action.DocWriteRequest;
- import org.elasticsearch.action.bulk.BackoffPolicy;
- import org.elasticsearch.action.bulk.BulkItemResponse;
- import org.elasticsearch.action.bulk.BulkProcessor;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.unit.ByteSizeUnit;
- import org.elasticsearch.common.unit.ByteSizeValue;
- import org.elasticsearch.common.unit.TimeValue;
- import org.springframework.stereotype.Component;
- import lombok.extern.slf4j.Slf4j;
-
- @Slf4j
- @Component
- public class EsUtils {
- @Resource
- private RestHighLevelClient restHighLevelClient;
-
- public BulkProcessor createBulkProcessor() {
-
- BulkProcessor.Listener listener = new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- log.info("【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request,BulkResponse response) {
- if (!response.hasFailures()) {
- log.info("【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
- } else {
- BulkItemResponse[] items = response.getItems();
- for (BulkItemResponse item : items) {
- if (item.isFailed()) {
- log.info("afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
- break;
- }
- }
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request,Throwable failure) {
- List<DocWriteRequest<?>> requests = request.requests();
- List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
- log.error("【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
- }
- };
-
- BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
- restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
- }), listener);
- //到达10000条时刷新
- builder.setBulkActions(10000);
- //内存到达8M时刷新
- builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
- //设置的刷新间隔10s
- builder.setFlushInterval(TimeValue.timeValueSeconds(10));
- //设置允许执行的并发请求数。
- builder.setConcurrentRequests(8);
- //设置重试策略
- builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
- return builder.build();
- }
- }
- import java.util.List;
-
- import *************.Actor;
-
- public interface EsService {
-
- /*****
- *asIndex 索引
- list 入参
- */
- void pushData(String asIndex, List<Actor> list);
-
- }
业务代码实现
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import javax.annotation.Resource;
- import org.elasticsearch.action.index.IndexRequest;
- import org.springframework.stereotype.Service;
- import *************.Actor;
-
- @Service
- public class EsServiceImpl implements EsService {
- @Resource
- private EsUtils esUtils;
-
- @Override
- public void pushData(String asIndex, List<Actor> list) {
- List<IndexRequest> indexRequests = new ArrayList<>();
-
- list.forEach(data -> {
- IndexRequest request = new IndexRequest();
- Map<String,Object> map = new HashMap<>();
- map.put("id", data.getActorId());
- map.put("actorName", data.getActorName());
- request.id(data.getActorId()+"");
- request.index(asIndex);
- request.source(map);
- indexRequests.add(request);
- });
- indexRequests.forEach(esUtils.createBulkProcessor()::add);
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。