赞
踩
接着上面的项目,添加工具类的方法
- /**
- * @Description 单条记录插入
- * @param indexName
- * @param sourceMap 传入的记录的键值对
- * @return
- */
- public static Map<String, Object> singleInsert (String indexName, Map<String, Object> sourceMap) {
- Map<String, Object> insertResult = new HashMap<>();
- RestHighLevelClient client = EsClient.getInstance();
- IndexRequest indexRequest = new IndexRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE);
- //es对应的索引文档的字段一定要有id_s,便于与_id相对应,且一定要逻辑控制id的唯一
- String id = StringUtils.isEmpty(sourceMap.get("id_s")) ? UUID.randomUUID().toString() : (String) sourceMap.get("id_s");
- indexRequest.id(id);
- indexRequest.source(sourceMap);
- //写入后立即刷新
- indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- try {
- String lineSeparator = System.lineSeparator();
- log.info(lineSeparator + "index:" + indexName + lineSeparator + "insert:"+ indexRequest.toString() + lineSeparator);
- IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
- log.info("es insert response: {}", indexResponse.toString());
- insertResult.put("id", indexResponse.getId());
- insertResult.put("status", indexResponse.status().toString());
- } catch (IOException e) {
- log.error(e.getMessage());
- }
- return insertResult;
- }
向income_expense_analysis索引中插入一条记录
对应的响应报文以及日志打印
成功插入一条记录,这里的id_s与_id一致,便于后续修改操作的时候传入id
- /**
- * @Description 修改记录数据
- * @param indexName
- * @param updateMap
- * @return
- */
- public static Map<String, Object> updateEsRecord (String indexName, Map<String, Object> updateMap){
- Map<String, Object> updateResult = new HashMap<>();
- RestHighLevelClient client = EsClient.getInstance();
- //此时之前指定的id_s字段就起作用了
- if (StringUtils.isEmpty(updateMap.get("id_s"))){
- log.error("尚未指定修改的记录id");
- return null;
- }
- String id = (String) updateMap.get("id_s");
- UpdateRequest updateRequest = new UpdateRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE, id);
-
- updateRequest.doc(updateMap);
- updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- try {
- String lineSeparator = System.lineSeparator();
- log.info(lineSeparator + "index:" + indexName + lineSeparator + "update:"+ updateRequest.toString() + lineSeparator);
- UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
- log.info("es update response: {}", updateResponse.toString());
- updateResult.put("id", updateResponse.getId());
- updateResult.put("status", updateResponse.status().toString());
- } catch (IOException e) {
- log.error(e.getMessage());
- }
- return updateResult;
- }
修改刚新增id为2020-03-313186482的文档的txn_remark_s字段为"03月31日商户交易入账"
对应的响应报文以及日志打印
成功修改文档记录
- /**
- * @Description 删除记录数据
- * @param indexName
- * @param id
- * @return
- */
- public static Map<String, Object> deleteEsRecord (String indexName, String id){
- Map<String, Object> deleteResult = new HashMap<>();
- RestHighLevelClient client = EsClient.getInstance();
- //此时之前指定的id_s字段就起作用了
- if (StringUtils.isEmpty(id)){
- log.error("尚未指定删除的记录id");
- return null;
- }
- DeleteRequest deleteRequest = new DeleteRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE, id);
- deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- try {
- String lineSeparator = System.lineSeparator();
- log.info(lineSeparator + "index:" + indexName + lineSeparator + "delete:"+ deleteRequest.toString() + lineSeparator);
- DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
- log.info("es update response: {}", deleteResponse.toString());
- deleteResult.put("id", deleteResponse.getId());
- deleteResult.put("status", deleteResponse.status().toString());
- } catch (IOException e) {
- log.error(e.getMessage());
- }
- return deleteResult;
- }
删除刚刚新增的id为2020-03-313186482 的文档记录
响应的结果以及日志打印
再结合工具查看income_expense_analysis索引下的文档数据记录,成功删除
- /**
- * @Description 批量向es插入数据,最好将批量数据分批向es插入,而且将客户端的超时时间延长(此处我使用的单记录的操作的5秒对于批量操作就不合适了)
- * @param indexName
- * @param sourceList
- * @return
- */
- public static Map<String, Object> batchInsert(String indexName, List<Map<String, Object>> sourceList){
- List<Map<String, Object>> insertResults = new ArrayList<>();
- Map<String, Object> result = new HashMap<>();
- RestHighLevelClient client = EsClient.getInstance();
- BulkRequest bulkRequest = new BulkRequest();
- for (Map<String, Object> source : sourceList){
- IndexRequest indexRequest = new IndexRequest(indexName,DefineConstant.SEARCH_REQUEST_TYPE);
- indexRequest.id(StringUtils.isEmpty(source.get("id_s")) ? UUID.randomUUID().toString() : (String) source.get("id_s"));
- indexRequest.source(source);
- bulkRequest.add(indexRequest);
- }
- bulkRequest.timeout(TimeValue.timeValueMintue(2));
- bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
- try {
- BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
- BulkItemResponse[] itemResponses = bulkResponse.getItems();
- for (int i = 0;i < itemResponses.length; i++){
- BulkItemResponse response = itemResponses[i];
- Map<String, Object> item = new HashMap<>();
- item.put("id", response.getId());
- item.put("status", response.status().toString());
- item.put("failure", response.getFailureMessage());
- if (response.isFailed()){
- IndexRequest ireq = (IndexRequest) bulkRequest.requests().get(i);
- log.error("Failed while indexing to " + response.getIndex() + "type " +response.getType() + " " +
- "request: [" + ireq + "]: [" + response.getFailureMessage() + "]");
- item.put("isSuccess", "FAIL");
- }
- log.info("-------bulk---batchInsert------->{}", item);
- insertResults.add(item);
- }
- } catch (IOException e) {
- log.error(e.getMessage());
- }
- result.put("insertResults", insertResults);
- return result;
- }
一般是从文件中读取记录,此处我就不使用请求中展示,就在代码中模拟插入的记录
响应结果以及执行的日志打印
查看es插入的记录
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。