当前位置:   article > 正文

Java 实现ES批量写入_java es批量写入

java es批量写入

 先学会用,再去研究原理,代码如下

  1. import java.io.IOException;
  2. import java.util.List;
  3. import java.util.stream.Collectors;
  4. import javax.annotation.Resource;
  5. import org.elasticsearch.action.DocWriteRequest;
  6. import org.elasticsearch.action.bulk.BackoffPolicy;
  7. import org.elasticsearch.action.bulk.BulkItemResponse;
  8. import org.elasticsearch.action.bulk.BulkProcessor;
  9. import org.elasticsearch.action.bulk.BulkRequest;
  10. import org.elasticsearch.action.bulk.BulkResponse;
  11. import org.elasticsearch.action.delete.DeleteRequest;
  12. import org.elasticsearch.client.RequestOptions;
  13. import org.elasticsearch.client.RestHighLevelClient;
  14. import org.elasticsearch.common.unit.ByteSizeUnit;
  15. import org.elasticsearch.common.unit.ByteSizeValue;
  16. import org.elasticsearch.common.unit.TimeValue;
  17. import org.springframework.stereotype.Component;
  18. import lombok.extern.slf4j.Slf4j;
  19. @Slf4j
  20. @Component
  21. public class EsUtils {
  22. @Resource
  23. private RestHighLevelClient restHighLevelClient;
  24. public BulkProcessor createBulkProcessor() {
  25. BulkProcessor.Listener listener = new BulkProcessor.Listener() {
  26. @Override
  27. public void beforeBulk(long executionId, BulkRequest request) {
  28. log.info("【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
  29. }
  30. @Override
  31. public void afterBulk(long executionId, BulkRequest request,BulkResponse response) {
  32. if (!response.hasFailures()) {
  33. log.info("【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
  34. } else {
  35. BulkItemResponse[] items = response.getItems();
  36. for (BulkItemResponse item : items) {
  37. if (item.isFailed()) {
  38. log.info("afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
  39. break;
  40. }
  41. }
  42. }
  43. }
  44. @Override
  45. public void afterBulk(long executionId, BulkRequest request,Throwable failure) {
  46. List<DocWriteRequest<?>> requests = request.requests();
  47. List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
  48. log.error("【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
  49. }
  50. };
  51. BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
  52. restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
  53. }), listener);
  54. //到达10000条时刷新
  55. builder.setBulkActions(10000);
  56. //内存到达8M时刷新
  57. builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
  58. //设置的刷新间隔10s
  59. builder.setFlushInterval(TimeValue.timeValueSeconds(10));
  60. //设置允许执行的并发请求数。
  61. builder.setConcurrentRequests(8);
  62. //设置重试策略
  63. builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
  64. return builder.build();
  65. }
  66. }
  1. import java.util.List;
  2. import *************.Actor;
  3. public interface EsService {
  4. /*****
  5. *asIndex 索引
  6. list 入参
  7. */
  8. void pushData(String asIndex, List<Actor> list);
  9. }

 业务代码实现

  1. import java.util.ArrayList;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import javax.annotation.Resource;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.springframework.stereotype.Service;
  8. import *************.Actor;
  9. @Service
  10. public class EsServiceImpl implements EsService {
  11. @Resource
  12. private EsUtils esUtils;
  13. @Override
  14. public void pushData(String asIndex, List<Actor> list) {
  15. List<IndexRequest> indexRequests = new ArrayList<>();
  16. list.forEach(data -> {
  17. IndexRequest request = new IndexRequest();
  18. Map<String,Object> map = new HashMap<>();
  19. map.put("id", data.getActorId());
  20. map.put("actorName", data.getActorName());
  21. request.id(data.getActorId()+"");
  22. request.index(asIndex);
  23. request.source(map);
  24. indexRequests.add(request);
  25. });
  26. indexRequests.forEach(esUtils.createBulkProcessor()::add);
  27. }
  28. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/68019
推荐阅读
相关标签
  

闽ICP备14008679号