当前位置:   article > 正文

【elasticsearch7.x】增删改&批量_org.elasticsearch.client.indices.putmappingrequest

org.elasticsearch.client.indices.putmappingrequest

一、新增数据:单条

  1. 先创建一个es测试表test_wd
    在这里插入图片描述

  2. 插入一条数据

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;

public class InsertData {

	public static void main(String[] args) {
		addOneData();
	}

	private static void addOneData() {
		Map<String, Object> map = new HashMap<String, Object>();
		map.put("company", "测试公司哈哈哈");
		map.put("desc", "这是一个测试公司");
		map.put("person", "小测");
		map.put("title", "hello world!");
		map.put("createTime", new Date().getTime());
		IndexRequest request = new IndexRequest("test_wd").source(map);//.id可设置自定义id
		IndexResponse response = null;
		try {
			response = ESClient.getES7Client().index(request, RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
		}
		ESClient.close();
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

在这里插入图片描述

二、修改:单条

  1. 修改内容在这里插入图片描述
  2. 代码如下
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;


public class UpdateEsData {
	public static void main(String[] args) throws IOException {
		RestHighLevelClient client = ESClient.getES7Client();
		Map<String, Object> jsonMap = new HashMap<>();
		jsonMap.put("createTime", new Date().getTime());
		jsonMap.put("company", "测试公司");
		jsonMap.put("person", "测试");
		UpdateRequest request = new UpdateRequest("test_wd", "trcAhHYBOd26bujRs8yo").doc(jsonMap);
		try {
			UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
		}
		client.close();
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 修改结果
    在这里插入图片描述

三:批量Bulk:修改、插入



import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;


public class UpdateEsData {
	public static void main(String[] args) throws IOException {
		RestHighLevelClient client = ESClient.getES7Client();

		BulkRequest request = new BulkRequest();
		// 第一条
		Map<String, Object> map = new HashMap<>();
		map.put("title", "测试批量修改");
		map.put("desc", "这里是测试公司");
		request.add(new UpdateRequest("test_wd", "37cjhHYBOd26bujR8sya").doc(map));
		// 第二条
		map.put("title", "正式修改");
		map.put("desc", "这里是正式公司");
		request.add(new UpdateRequest("test_wd", "3rcjhHYBOd26bujR6syQ").doc(map));
		// 添加多个提交...

		// TODO:批量修改:new UpdateRequest
		// TODO:批量增加:new IndexRequest
//		request.add(new IndexRequest("test_wd").source(map));

		// 提交
		BulkResponse bulkResponse = null;
		try {
			bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
		}
		System.out.println("批量更新状态:" + bulkResponse.status());

		client.close();
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

四、批量处理器:BulkProcessor


import java.io.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequest;
import com.alibaba.fastjson.JSONObject;
import com.es.util.ESBulkProcessor;

 
public class BulkAdd {

	public static String indexName = "test_wd";

	public static void main(String[] args) {
		buldAddData();
	}

	private static void buldAddData() {
		ESBulkProcessor bulk = new ESBulkProcessor();
		bulk.init();
		for (int i = 1; i < 1000; i++) {
			if (i % 100 == 0) {
				System.out.println(i);
			}
			Map<String, Object> map = new HashMap<String, Object>();
			map.put("title", "test_title" + i);
			map.put("company", "测试数据" + i);
			map.put("person", "小" + i);
			map.put("desc", "这里是测试" + i + "公司");
			map.put("createTime", new Date().getTime());

			IndexRequest ir = new IndexRequest(indexName, "_doc", "" + i).source(map);
			bulk.addRequest(ir);
		}
		bulk.closeBulkProcessor();//一定要close,不然会出现程序执行完毕未报错,但es并没有进数据的情况
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39


import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.http.HttpHost;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import com.alibaba.fastjson.JSON;

public class ESBulkProcessor {
	static final Logger logger = LogManager.getLogger("ESBulkProcessor");
	RestHighLevelClient client = null;
	BulkProcessor bulkProcessor = null;
	MappingMetaData mmd = null;

	/**
	 * 依据HBase表名进行初始化工作
	 * 
	 * @param tableName
	 */
	public synchronized void init() {
		initESClient();
		initESBulkProcessor();
	}

	/**
	 * 初始化ES客户端
	 */
	private void initESClient() {
		logger.info("开始初始化Elastic Search客户端");
		client = ESClient.getES7Client();
	}

	/**
	 * 初始化ES批量处理器
	 */
	private void initESBulkProcessor() {
		logger.info("开始初始化批量操作处理器 BulkProcessor:bulkAsync");
		BulkProcessor.Listener listener = new BulkProcessor.Listener() {
			@Override
			public void beforeBulk(long executionId, BulkRequest request) {
				int numberOfActions = request.numberOfActions();
				logger.debug("executionId=" + executionId + ", numberOfActions=" + numberOfActions);
			}

			@Override
			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
				if (response.hasFailures()) {
					logger.error("Bulk executed with failures, executionId=" + executionId);
				} else {
					logger.debug("Bulk completed in  milliseconds=" + response.getTook().getMillis() + "executionId="
							+ executionId);
				}
			}

			@Override
			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
				logger.error("BulkProcessor.Listener.afterBulk :: " + executionId);
				logger.error(failure);
			}
		};
		BulkProcessor.Builder builder = BulkProcessor.builder(
				(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener);
		builder.setBulkActions(1000);
		builder.setBulkSize(new ByteSizeValue(1024L, ByteSizeUnit.MB));
		builder.setConcurrentRequests(1);
		builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
		// builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(10L),
		// 3));
		bulkProcessor = builder.build();
		logger.info("BulkProcessor:bulkAsync 初始化完毕");
	}

	/**
	 * 初始/刷新获取ES索引中的Mapping信息
	 * 
	 * @param indexName
	 */
	private void initESMapping(String indexName) {
		logger.info("获取ES中索引的信息, indexName=" + indexName);
		GetMappingsRequest request = new GetMappingsRequest();
		request.indices(indexName);
		try {
			GetMappingsResponse getMappingResponse = client.indices().getMapping(request, RequestOptions.DEFAULT);
			if (getMappingResponse != null && getMappingResponse.mappings().size() > 0) {
				mmd = getMappingResponse.mappings().get(indexName);
				logger.info(JSON.toJSONString(mmd));
			} else {
				logger.info(MessageFormat.format("ES中未找到对应的索引 {0}", indexName));
			}
		} catch (Exception e) {
			logger.error(e);
			logger.info("相应索引不存在,该异常不影响程序继续执行");
		}
	}

	/**
	 * 检查所以中是否存在响应的字段
	 * 
	 * @param indexName
	 * @param fieldName
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public boolean checkField2(String indexName, String fieldName) {
		if (mmd == null)
			return false;

		Object fields = mmd.getSourceAsMap().get("properties");
		// 存在直接返回
		if (fields != null && ((Map<String, Object>) fields).containsKey(fieldName)) {
			return true;
		} else {
			return false;
		}
	}

	/**
	 * 新增
	 * 
	 * @param ir
	 */
	public void addRequest(IndexRequest ir) {
		bulkProcessor.add(ir);
	}

	/**
	 * 更新
	 * 
	 * @param ur
	 */
	public void addRequest(UpdateRequest ur) {
		bulkProcessor.add(ur);
	}

	/**
	 * 删除
	 * 
	 * @param dr
	 */
	public void addRequest(DeleteRequest dr) {
		bulkProcessor.add(dr);
	}

	/**
	 * 关闭批量操作,关闭客户端
	 */
	public void closeBulkProcessor() {
		logger.info("开始开始关闭  ES BulkProcessor");
		try {
			if (bulkProcessor != null)
				bulkProcessor.awaitClose(60L, TimeUnit.SECONDS);
		} catch (Exception e) {
			logger.error("ESManager.closeClient:bulkProcessor", e);
		} finally {
			try {
				if (client != null)
					client.close();
			} catch (Exception e) {
				logger.error("ESManager.closeClient:client", e);
			}
		}
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192

在这里插入图片描述

五、删除

  1. 删除整个索引
/*
	 * 删除整个索引:慎用
	 */
	public static void deleteIndex(String indexName) {
		try {
			RestHighLevelClient client = ESClient.getES7Client();
			try {
				System.out.println("【严重警告,慎重操作,sleep500s】正在删除索引!!!!!"+indexName);
				Thread.sleep(500000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			DeleteIndexRequest request = new DeleteIndexRequest(indexName);
			client.indices().delete(request, RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  1. 根据主键删除
/**
	 * @param indexName:索引
	 * @param id:主键
	 */
	private static void deleteById(String indexName, String id) throws IOException {
		RestHighLevelClient client = ESClient.getES7Client();
		DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
		client.delete(deleteRequest, RequestOptions.DEFAULT);
		client.close();
		System.out.println("------删除成功!------");
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 根据条件删除
/*
	 * 根据条件删除
	 */
	private static void deleteByQuery(String indexName) throws Exception {
		RestHighLevelClient client = ESClient.getES7Client();
		DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest(indexName);// _all

		// term查询
		deleteByQuery.setQuery(new TermQueryBuilder("_id", "c7a45799d5131e19af28bf3db6b832dd"));
		// prefix查询
		deleteByQuery.setQuery(new PrefixQueryBuilder("lanmu", "5ea5"));
		// queryString查询
		deleteByQuery.setQuery(new QueryStringQueryBuilder("深圳市南山区").field("company"));
		// 根据时间删除
		deleteByQuery.setQuery(new RangeQueryBuilder("createTime").gt(sdf_day.parse("20200526").getTime()));

		client.deleteByQuery(deleteByQuery, RequestOptions.DEFAULT);

		ESClient.close();
		System.out.println("------删除成功!------");
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/68021
推荐阅读
相关标签
  

闽ICP备14008679号