当前位置:   article > 正文

Java与Elasticsearch的API实现工具类(二)——增删改,以及批量插入_java elasticsearch修改数据使用immediate

java elasticsearch修改数据使用immediate

接着上面的项目,添加工具类的方法

项目gitee https://gitee.com/gangye/elasticsearch_demo

一、向es的指定索引中新增文档

  1. /**
  2. * @Description 单条记录插入
  3. * @param indexName
  4. * @param sourceMap 传入的记录的键值对
  5. * @return
  6. */
  7. public static Map<String, Object> singleInsert (String indexName, Map<String, Object> sourceMap) {
  8. Map<String, Object> insertResult = new HashMap<>();
  9. RestHighLevelClient client = EsClient.getInstance();
  10. IndexRequest indexRequest = new IndexRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE);
  11. //es对应的索引文档的字段一定要有id_s,便于与_id相对应,且一定要逻辑控制id的唯一
  12. String id = StringUtils.isEmpty(sourceMap.get("id_s")) ? UUID.randomUUID().toString() : (String) sourceMap.get("id_s");
  13. indexRequest.id(id);
  14. indexRequest.source(sourceMap);
  15. //写入后立即刷新
  16. indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  17. try {
  18. String lineSeparator = System.lineSeparator();
  19. log.info(lineSeparator + "index:" + indexName + lineSeparator + "insert:"+ indexRequest.toString() + lineSeparator);
  20. IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
  21. log.info("es insert response: {}", indexResponse.toString());
  22. insertResult.put("id", indexResponse.getId());
  23. insertResult.put("status", indexResponse.status().toString());
  24. } catch (IOException e) {
  25. log.error(e.getMessage());
  26. }
  27. return insertResult;
  28. }

 向income_expense_analysis索引中插入一条记录

对应的响应报文以及日志打印

 成功插入一条记录,这里的id_s与_id一致,便于后续修改操作的时候传入id 

 二、向es指定的索引中,根据文档id修改记录

  1. /**
  2. * @Description 修改记录数据
  3. * @param indexName
  4. * @param updateMap
  5. * @return
  6. */
  7. public static Map<String, Object> updateEsRecord (String indexName, Map<String, Object> updateMap){
  8. Map<String, Object> updateResult = new HashMap<>();
  9. RestHighLevelClient client = EsClient.getInstance();
  10. //此时之前指定的id_s字段就起作用了
  11. if (StringUtils.isEmpty(updateMap.get("id_s"))){
  12. log.error("尚未指定修改的记录id");
  13. return null;
  14. }
  15. String id = (String) updateMap.get("id_s");
  16. UpdateRequest updateRequest = new UpdateRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE, id);
  17. updateRequest.doc(updateMap);
  18. updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  19. try {
  20. String lineSeparator = System.lineSeparator();
  21. log.info(lineSeparator + "index:" + indexName + lineSeparator + "update:"+ updateRequest.toString() + lineSeparator);
  22. UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
  23. log.info("es update response: {}", updateResponse.toString());
  24. updateResult.put("id", updateResponse.getId());
  25. updateResult.put("status", updateResponse.status().toString());
  26. } catch (IOException e) {
  27. log.error(e.getMessage());
  28. }
  29. return updateResult;
  30. }

 修改刚新增id为2020-03-313186482的文档的txn_remark_s字段为"03月31日商户交易入账"

 

 对应的响应报文以及日志打印

成功修改文档记录

三、根据指定的id删除文档

  1. /**
  2. * @Description 删除记录数据
  3. * @param indexName
  4. * @param id
  5. * @return
  6. */
  7. public static Map<String, Object> deleteEsRecord (String indexName, String id){
  8. Map<String, Object> deleteResult = new HashMap<>();
  9. RestHighLevelClient client = EsClient.getInstance();
  10. //此时之前指定的id_s字段就起作用了
  11. if (StringUtils.isEmpty(id)){
  12. log.error("尚未指定删除的记录id");
  13. return null;
  14. }
  15. DeleteRequest deleteRequest = new DeleteRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE, id);
  16. deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  17. try {
  18. String lineSeparator = System.lineSeparator();
  19. log.info(lineSeparator + "index:" + indexName + lineSeparator + "delete:"+ deleteRequest.toString() + lineSeparator);
  20. DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
  21. log.info("es update response: {}", deleteResponse.toString());
  22. deleteResult.put("id", deleteResponse.getId());
  23. deleteResult.put("status", deleteResponse.status().toString());
  24. } catch (IOException e) {
  25. log.error(e.getMessage());
  26. }
  27. return deleteResult;
  28. }

删除刚刚新增的id为2020-03-313186482 的文档记录

响应的结果以及日志打印

 

 再结合工具查看income_expense_analysis索引下的文档数据记录,成功删除

四、批量插入数据,使用bulk操作

  1. /**
  2. * @Description 批量向es插入数据,最好将批量数据分批向es插入,而且将客户端的超时时间延长(此处我使用的单记录的操作的5秒对于批量操作就不合适了)
  3. * @param indexName
  4. * @param sourceList
  5. * @return
  6. */
  7. public static Map<String, Object> batchInsert(String indexName, List<Map<String, Object>> sourceList){
  8. List<Map<String, Object>> insertResults = new ArrayList<>();
  9. Map<String, Object> result = new HashMap<>();
  10. RestHighLevelClient client = EsClient.getInstance();
  11. BulkRequest bulkRequest = new BulkRequest();
  12. for (Map<String, Object> source : sourceList){
  13. IndexRequest indexRequest = new IndexRequest(indexName,DefineConstant.SEARCH_REQUEST_TYPE);
  14. indexRequest.id(StringUtils.isEmpty(source.get("id_s")) ? UUID.randomUUID().toString() : (String) source.get("id_s"));
  15. indexRequest.source(source);
  16. bulkRequest.add(indexRequest);
  17. }
  18. bulkRequest.timeout(TimeValue.timeValueMintue(2));
  19. bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
  20. try {
  21. BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
  22. BulkItemResponse[] itemResponses = bulkResponse.getItems();
  23. for (int i = 0;i < itemResponses.length; i++){
  24. BulkItemResponse response = itemResponses[i];
  25. Map<String, Object> item = new HashMap<>();
  26. item.put("id", response.getId());
  27. item.put("status", response.status().toString());
  28. item.put("failure", response.getFailureMessage());
  29. if (response.isFailed()){
  30. IndexRequest ireq = (IndexRequest) bulkRequest.requests().get(i);
  31. log.error("Failed while indexing to " + response.getIndex() + "type " +response.getType() + " " +
  32. "request: [" + ireq + "]: [" + response.getFailureMessage() + "]");
  33. item.put("isSuccess", "FAIL");
  34. }
  35. log.info("-------bulk---batchInsert------->{}", item);
  36. insertResults.add(item);
  37. }
  38. } catch (IOException e) {
  39. log.error(e.getMessage());
  40. }
  41. result.put("insertResults", insertResults);
  42. return result;
  43. }

一般是从文件中读取记录,此处我就不使用请求中展示,就在代码中模拟插入的记录

 响应结果以及执行的日志打印

查看es插入的记录

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/68033
推荐阅读
相关标签
  

闽ICP备14008679号