赞
踩
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>7.6.2</version>
- <exclusions>
- <exclusion>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>7.6.2</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.plugin</groupId>
- <artifactId>delete-by-query</artifactId>
- <version>2.4.6</version>
- </dependency>
Es的基本信息的获取和封装
- package *.*.common.es.common;
-
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- import java.util.StringJoiner;
-
- import org.apache.http.HttpHost;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import *.*.common.config.ConfigGroupFile;
- import *.*.common.config.ConfigService;
-
- public class EsConfig {
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(EsConfig.class);
-
- private static EsConfig config;
-
- /**
- * 节点列表
- */
- private HttpHost[] hosts;
-
- /**
- * 用户名
- */
- private String username;
-
- /**
- * 密码
- */
- private String password;
-
- // 调优参数
-
- /**
- * 连接超时
- */
- private int connTimeout;
-
- /**
- * 读写超时
- */
- private int socketTimeout;
-
- /**
- * 请求超时
- */
- private int connRequestTimeout;
-
- /**
- * 允许建立多少个连接,超出以后所有的连接都会进入等待队列
- */
- private int maxConnTotal;
-
- /**
- * 同一个 host:port 最多有几个活跃连接
- */
- private int maxConnPerRoute;
-
- /**
- * 返回 ES 配置信息。
- *
- * @return ES配置
- */
- public static EsConfig getConfig() {
- if (config == null) {
- config = doCreate();
- }
- return config;
- }
-
- /*
- * 创建配置实例。
- */
- private static EsConfig doCreate() {
- EsConfig config = new EsConfig();
- config.hosts = buildHosts();
-
- config.username = ConfigService.getPersionString("elasticsearch.http.username", ConfigGroupFile.getEsConfig());
- config.password = ConfigService.getPersionString("elasticsearch.http.password", ConfigGroupFile.getEsConfig());
-
- // 超时参数
- config.connTimeout = ConfigService.getInt("crab.elasticsearch.http.conn_timeout", 5000);
- config.socketTimeout = ConfigService.getInt("crab.elasticsearch.http.socket_timeout", 30000);
- config.connRequestTimeout = ConfigService.getInt("crab.elasticsearch.http.conn_request_timeout", 5000);
-
- // 连接数
- config.maxConnTotal = ConfigService.getInt("crab.elasticsearch.http.max_conn_total", 400);
- config.maxConnPerRoute = ConfigService.getInt("crab.elasticsearch.http.max_conn_per_route", 200);
- return config;
- }
-
- /*
- * 装装主机信息。
- */
- private static HttpHost[] buildHosts() {
- String hosts = ConfigService.getPersionString("elasticsearch.http.hosts", ConfigGroupFile.getEsConfig());
- if (hosts == null) {
- return new HttpHost[0];
- }
-
- List<HttpHost> hostList = new ArrayList<>();
- for (String host : hosts.split(",")) {
- String[] hostPort = host.split(":");
- if (hostPort.length == 1) {
- hostList.add(new HttpHost(hostPort[0])); // 10.1.1.1
- } else if (hostPort.length == 2) {
- hostList.add(new HttpHost(hostPort[0], Integer.parseInt(hostPort[1]))); // 10.1.1.1:9200
- } else if (hostPort.length == 3) {
- hostList.add(new HttpHost(hostPort[1], Integer.parseInt(hostPort[2]), hostPort[0])); // https:10.1.1.1:9200
- }
- }
- return hostList.toArray(new HttpHost[hostList.size()]);
- }
-
- /**
- * 获取节点列表。
- *
- * @return 节点名
- */
- public static HttpHost[] getHosts() {
- return getConfig().hosts;
- }
-
- /**
- * 获取用户名
- *
- * @return 用户名
- */
- public static String getUsername() {
- return getConfig().username;
- }
-
- /**
- * 获取密码。
- *
- * @return 密码
- */
- public static String getPassword() {
- return getConfig().password;
- }
-
- public static int getConnTimeout() {
- return getConfig().connTimeout;
- }
-
- public static int getSocketTimeout() {
- return getConfig().socketTimeout;
- }
-
- public static int getConnRequestTimeout() {
- return getConfig().connRequestTimeout;
- }
-
- public static int getMaxConnTotal() {
- return getConfig().maxConnTotal;
- }
-
- public static int getMaxConnPerRoute() {
- return getConfig().maxConnPerRoute;
- }
-
- @Override
- public String toString() {
- StringJoiner sj = new StringJoiner(", ", "{", "}");
- sj.add("hosts:" + Arrays.toString(hosts));
- sj.add("username:" + username);
- sj.add("connTimeout:" + connTimeout);
- sj.add("socketTimeout:" + socketTimeout);
- sj.add("connRequestTimeout:" + connRequestTimeout);
- sj.add("maxConnTotal:" + maxConnTotal);
- sj.add("maxConnPerRoute:" + maxConnPerRoute);
- return sj.toString();
- }
- }
- package *.*.common.es.common;
-
- import java.io.IOException;
-
- import org.apache.http.auth.AuthScope;
- import org.apache.http.auth.UsernamePasswordCredentials;
- import org.apache.http.impl.client.BasicCredentialsProvider;
- import org.elasticsearch.client.Node;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import *.*.api.error.ServerException;
-
- public class EsClients {
- private static final Logger logger = LoggerFactory.getLogger(EsClients.class);
-
- private RestHighLevelClient client;
-
- // 单例
- private static class Singleton {
- public static final EsClients INSTANCE = new EsClients();
- private Singleton() {}
- }
-
- private EsClients() {
- client = new RestHighLevelClient(createClientBuilder());
- }
-
- public void reBuildEsClients() {
- if (client != null) {
- try {
- client.close();
- } catch (Exception e) {
- logger.debug(e.getMessage(), e);
- }
- }
- client = new RestHighLevelClient(createClientBuilder());
- }
-
- public static RestHighLevelClient getClient(){
- return Singleton.INSTANCE.client;
- }
-
- /**
- * ES 执行操作请求的回调类。
- */
- @FunctionalInterface
- public interface ClientRequest<T> {
- T doRequest(RestHighLevelClient client) throws IOException;
- }
-
- /**
- * 执行 ES 的操作请求。
- *
- * @param request 操作的请求
- * @return 执行结果
- */
- public static <T> T executeRequest(ClientRequest<T> request) {
- try {
- return request.doRequest(Singleton.INSTANCE.client);
- } catch (Exception e) {
- logger.error(e.getMessage());
- String message = e.getMessage();
- if (message != null && message.contains("I/O reactor status: STOPPED")) {
- return reQuery(request);
- }
- throw new ServerException("", e);
- }
- }
-
- private static <T> T reQuery(ClientRequest<T> request) {
- logger.warn("we encounter a problem I/O reactor status: STOPPED");
-
- Singleton.INSTANCE.reBuildEsClients();
- try {
- return request.doRequest(Singleton.INSTANCE.client);
- } catch (Exception e) {
- throw new ServerException("new query", e);
- }
- }
-
- /*
- * 构造 builder 。
- */
- private static RestClientBuilder createClientBuilder() {
- RestClientBuilder builder = RestClient.builder(EsConfig.getHosts());
-
- // 连接超时时间
- builder.setRequestConfigCallback(requestConfigBuilder -> {
- requestConfigBuilder.setConnectTimeout(EsConfig.getConnTimeout());
- requestConfigBuilder.setSocketTimeout(EsConfig.getSocketTimeout());
- requestConfigBuilder.setConnectionRequestTimeout(EsConfig.getConnRequestTimeout());
- return requestConfigBuilder;
- });
-
- // 连接信息
- builder.setHttpClientConfigCallback(httpClientBuilder -> {
- httpClientBuilder.setMaxConnTotal(EsConfig.getMaxConnTotal());
- httpClientBuilder.setMaxConnPerRoute(EsConfig.getMaxConnPerRoute());
-
- final BasicCredentialsProvider provider = new BasicCredentialsProvider();
- provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(EsConfig.getUsername(), EsConfig.getPassword()));
- return httpClientBuilder.setDefaultCredentialsProvider(provider);
- });
-
- builder.setFailureListener(new RestClient.FailureListener() {
- @Override
- public void onFailure(Node node) {
- logger.debug("failed to execute on node: {}", node);
- }
- });
- return builder;
- }
- }
ES索引的Setting统一设置 ,包括分片信息,数据集,map的字段个数,是否开启大消息忽略
- package *.*.common.es.entity;
-
- import *.*.common.config.ConfigService;
-
- /**
- *
- * @author zhanglb
- * Create at 2022年2月17日 下午5:37:26
- */
- public class EsSettingEntity {
-
- /**
- * 分片信息
- */
- private int indexShard = 3;
- /**
- * 数据集
- */
- private int indexReplicas = 2;
- /**
- * map的字段个数
- */
- private int fieldLength = 5000;//默认5000
- /**
- * 是否开启大小写忽略
- */
- private boolean openLowercase = ConfigService.getInstance().isOpenLowercase();
-
- public EsSettingEntity() {
-
- }
-
- public EsSettingEntity(int indexShard, int indexReplicas, boolean openLowercase) {
- this.indexShard = indexShard;
- this.indexReplicas = indexReplicas;
- this.openLowercase = openLowercase;
- }
-
- /**
- * @return indexShard
- */
- public int getIndexShard() {
- return indexShard;
- }
-
- /**
- * @param indexShard
- */
- public void setIndexShard(int indexShard) {
- this.indexShard = indexShard;
- }
-
- /**
- * @return indexReplicas
- */
- public int getIndexReplicas() {
- return indexReplicas;
- }
-
- /**
- * @param indexReplicas
- */
- public void setIndexReplicas(int indexReplicas) {
- this.indexReplicas = indexReplicas;
- }
-
- /**
- * @return fieldLength
- */
- public int getFieldLength() {
- return fieldLength;
- }
-
- /**
- * @param fieldLength
- */
- public void setFieldLength(int fieldLength) {
- this.fieldLength = fieldLength;
- }
-
- /**
- * @return openLowercase
- */
- public boolean isOpenLowercase() {
- return openLowercase;
- }
-
- /**
- * @param openLowercase
- */
- public void setOpenLowercase(boolean openLowercase) {
- this.openLowercase = openLowercase;
- }
-
-
- }
ES Setting的初始化公共类
- package *.*.common.es.support;
-
- import java.util.ArrayList;
- import java.util.List;
-
- import org.elasticsearch.common.settings.Settings;
-
- import *.*.common.es.entity.EsSettingEntity;
-
- /**
- *
- * @author zhanglb
- * Create at 2022年2月17日 下午5:14:23
- */
- public class EsSetting {
-
- public EsSetting() {
-
- }
-
- /**
- * 初始化setting
- * @param entity
- * @return
- */
- public static Settings.Builder initSetting(EsSettingEntity entity) {
- Settings.Builder setting = Settings.builder().put("index.number_of_shards", entity.getIndexShard()).put("index.number_of_replicas", entity.getIndexReplicas())
- .put("index.mapping.total_fields.limit", entity.getFieldLength());
- if (entity.isOpenLowercase()) {
- setLowercase(setting);
- }
- return setting;
- }
-
- /**
- * 设置大小写不敏感
- * @param setting
- * @return
- */
- public static Settings.Builder setLowercase(Settings.Builder setting) {
- List<String> filter = new ArrayList<>();
- filter.add("lowercase");
- setting.put("index.analysis.normalizer.lowercase.type",EsActivityMapping.FIELD_TYPE_CUSTOM).putList("index.analysis.normalizer.lowercase.filter", filter);
- return setting;
- }
- }
ES查询 数据的基础类
- package *.*.common.es.support;
-
- import java.util.List;
- import java.util.concurrent.TimeUnit;
-
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.core.CountRequest;
- import org.elasticsearch.common.unit.TimeValue;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import *.*.api.error.ClientException;
- import *.*.api.error.ServerException;
- import *.*.common.config.ConfigService;
- import *.*.common.es.common.EsClients;
- import *.*.i18n.source.error.I18nErrorMessages;
-
- /**
- *
- * 从Es中查询数据工具类
- *
- * @author zhanglb
- *
- */
- public class EsDataReader {
- private static final Logger logger = LoggerFactory.getLogger(EsDataReader.class);
- private static boolean UNIT_TEST = ConfigService.getBoolean("crab.unit.test.open", false);
-
- public EsDataReader() {
- }
-
- /**
- * 执行 count 请求。
- *
- * @param queryParams 过滤条件
- * @return 累计数
- */
- public static long count(List<String> index, SearchSourceBuilder sourceBuilder) {
- if (UNIT_TEST) {
- logger.debug("es count has been suppressed for unit test.");
- return 0l;
- }
-
- try {
- CountRequest countReq = new CountRequest(index.toArray(new String[index.size()]), sourceBuilder);
- return EsClients.executeRequest(client -> client.count(countReq, RequestOptions.DEFAULT).getCount());
- } catch (ElasticsearchException e) {
- logger.error("", e);
- throw new ServerException("Failed to execute count: " + e.getMessage(), null);
- }
- }
-
- /**
- * 根据指定的查询条件,执行数据操作。
- *
- * @param queryParams 查询条件对象
- * @return 查询结果
- */
- public static SearchResponse query(List<String> index, SearchSourceBuilder sourceBuilder) {
- if (UNIT_TEST) {
- logger.debug("es query has been suppressed for unit test.");
- return null;
- }
-
- SearchResponse response;
- SearchRequest request;
- try {
- // 设置超时时间
- sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
- request = new SearchRequest(index.toArray(new String[index.size()]), sourceBuilder);
- if (logger.isDebugEnabled()) {
- logger.debug("query with condition: {}", request.source().query());
- }
- response = EsClients.executeRequest(client -> client.search(request, RequestOptions.DEFAULT));
- } catch (ElasticsearchException ee) {
- Throwable cause = ee.getCause();
- if (cause != null && cause.getMessage().contains("Result window is too large")) {
- logger.debug("", ee);
- throw ClientException.create(I18nErrorMessages.ERROR_QUERY_FAIELD_NEED_LIMIT);
- } else {
- logger.error("", ee);
- throw new ServerException("Failed to execute query: " + ee.getMessage(), null);
- }
- }
-
- if (response.isTimedOut()) {
- throw new ServerException("Query with timeout", null);
- }
- return response;
- }
- }
ES的写入数据的基础类
- package *.*.common.es.support;
-
- import java.util.List;
- import java.util.Map;
-
- import org.apache.commons.lang3.StringUtils;
- import org.elasticsearch.action.ActionListener;
- import org.elasticsearch.action.bulk.BulkItemResponse;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.client.RequestOptions;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import *.*.common.config.ConfigService;
- import *.*.common.es.common.EsClients;
- import *.*.common.es.common.EsIndexUtils;
- import *.*.common.util.ConstantFields;
-
- /**
- * 把动态记录写入到es中工具类
- *
- * @author zhanglb
- *
- */
- public class EsDataWriter {
- private static final Logger logger = LoggerFactory.getLogger(EsDataWriter.class);
- private static boolean UNIT_TEST = ConfigService.getBoolean("crab.unit.test.open", false);
-
- public EsDataWriter() {
- }
-
- public static boolean sync(String index, List<Map<String, Object>> map) {
- if (UNIT_TEST) {
- logger.debug("es sync has been suppressed for unit test.");
- return true;
- }
-
- if (StringUtils.isBlank(index) || map == null)
- return false;
-
- boolean storeIndex = false;
- if (!map.isEmpty()) {
- if (!EsIndexUtils.exists(index)) {
- logger.info("create index:" + index);
- EsIndexUtils.createIndex(index, null, null, EsActivityMapping.buildActivityMapping());
- storeIndex = true;
- }
- EsDataWriter.createObjectsAsync(index, map);
- }
- return storeIndex;
- }
-
- /**
- * 写操作类型。
- */
- public enum WriteAction {
- ADD, UPDATE, UPSERT, DELETE;
- }
-
- /**
- * 异步创建资源对象。
- *
- * @param objects 待新建的资源对象
- */
- public static void createObjectsAsync(String index, List<Map<String, Object>> objects) {
- if (UNIT_TEST) {
- logger.debug("es create object has been suppressed for unit test.");
- return;
- }
-
- writerObjectsAsync(index, objects, WriteAction.ADD);
- }
-
- /**
- * 异步更新资源对象。
- *
- * @param objects 待更新的资源对象(只需提供 ID 以及 变更的字段)
- */
- public static void updateObjectsAsync(String index, List<Map<String, Object>> objects) {
- if (UNIT_TEST) {
- logger.debug("es update object has been suppressed for unit test.");
- return;
- }
-
- writerObjectsAsync(index, objects, WriteAction.UPDATE);
- }
-
- /**
- * 异步保存(根据 ID 判断新建或更新)资源对象。
- *
- * @param objects 待保存的资源对象
- */
- public static void saveObjectsAsync(String index, List<Map<String, Object>> objects) {
- if (UNIT_TEST) {
- logger.debug("es save object has been suppressed for unit test.");
- return;
- }
-
- writerObjectsAsync(index, objects, WriteAction.UPSERT);
- }
-
- /**
- * 异步保存(根据 ID 判断新建或更新)资源对象。
- *
- * @param objects 待保存的资源对象
- */
- public static void delObjectsAsync(String index, List<Map<String, Object>> objects) {
- if (UNIT_TEST) {
- logger.debug("es delete object has been suppressed for unit test.");
- return;
- }
-
- writerObjectsAsync(index, objects, WriteAction.DELETE);
- }
-
- /**
- * 以异步的形式批量写资源对象。
- *
- * @param writeRequests 写请求
- */
- public static void writerObjectsAsync(String index, List<Map<String, Object>> writeRequests, WriteAction action) {
- if (UNIT_TEST) {
- logger.debug("es write object has been suppressed for unit test.");
- return;
- }
-
- if (writeRequests == null || writeRequests.isEmpty())
- return;
-
- final BulkRequest request = new BulkRequest();
- for (Map<String, Object> writeRequest : writeRequests) {
- switch (action) {
- case ADD:
- request.add(buildCreateRequest(index, writeRequest));
- break;
- case UPDATE:
- UpdateRequest reqUp = buildUpdateRequest(index, writeRequest, true);
- if (reqUp == null)
- break;
- request.add(reqUp);
- break;
- case UPSERT:
- UpdateRequest reqU = buildUpdateRequest(index, writeRequest, true);
- if (reqU == null)
- break;
- request.add(reqU);
- break;
- case DELETE:
- DeleteRequest reqD = buildDeleteRequest(index, writeRequest.get(ConstantFields.FIELD_ID));
- if (reqD == null)
- break;
- request.add(reqD);
- break;
- }
- }
-
- EsClients.executeRequest(client -> {
- client.bulkAsync(request, RequestOptions.DEFAULT, new BulkWriteCallback());
- return null;
- });
- }
-
- /*
- * 新建资源请求。
- *
- * @return 请求对象
- */
- private static IndexRequest buildCreateRequest(String index, Map<String, Object> map) {
- IndexRequest create = new IndexRequest(index);
- if (map.containsKey(ConstantFields.FIELD_ID)) {
- create.id(map.get(ConstantFields.FIELD_ID).toString());
- }
- create.source(map);
- return create;
- }
-
- /*
- * 更新资源请求。
- *
- * @param upsert 如果不存在是否新建
- *
- * @return 请求对象
- */
- private static UpdateRequest buildUpdateRequest(String index, Map<String, Object> map, boolean upsert) {
- if (!map.containsKey(ConstantFields.FIELD_ID)) {
- return null;
- }
- UpdateRequest update = new UpdateRequest(index, map.get(ConstantFields.FIELD_ID).toString()).retryOnConflict(3);
- update.docAsUpsert(upsert);
- update.doc(map);
- return update;
- }
-
- /*
- * 删除资源请求。
- *
- * @return 请求对象
- */
- private static DeleteRequest buildDeleteRequest(String index, Object id) {
- if (id == null)
- return null;
-
- return new DeleteRequest(index, id.toString());
- }
-
- /*
- * 回调。
- */
- private static class BulkWriteCallback implements ActionListener<BulkResponse> {
-
- @Override
- public void onFailure(Exception e) {
- logger.error("Write data with error", e);
- }
-
- @Override
- public void onResponse(BulkResponse bulkItemResponses) {
- if (bulkItemResponses.hasFailures()) {
- processError(bulkItemResponses.getItems());
- } else {
- logger.debug("Write data successfully, count: {}", bulkItemResponses.getItems().length);
- }
- }
-
- private void processError(BulkItemResponse[] responses) {
- for (BulkItemResponse response : responses) {
- if (!response.isFailed() || StringUtils.contains(response.getFailureMessage(), "document_missing_exception")) {
- continue;
- }
- logger.error("Write data with error: {}", response.getFailureMessage());
- }
- }
- }
- }
ES创建索引,修改别名等公共方法类
- package *.*.common.es.common;
-
- import java.util.Collections;
- import java.util.List;
- import java.util.Set;
-
- import org.apache.commons.lang3.StringUtils;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.admin.indices.alias.Alias;
- import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
- import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
- import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
- import org.elasticsearch.action.support.master.AcknowledgedResponse;
- import org.elasticsearch.client.GetAliasesResponse;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.indices.CreateIndexRequest;
- import org.elasticsearch.client.indices.CreateIndexResponse;
- import org.elasticsearch.client.indices.GetIndexRequest;
- import org.elasticsearch.client.indices.PutIndexTemplateRequest;
- import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.rest.RestStatus;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import *.*.common.config.ConfigService;
- import *.*.common.es.entity.EsSettingEntity;
- import *.*.common.es.support.EsSetting;
- import *.*.common.util.ObjectUtils;
- import *.*.api.error.ServerException;
-
- public class EsIndexUtils {
- private static final Logger logger = LoggerFactory.getLogger(EsIndexUtils.class);
- private static boolean UNIT_TEST = ConfigService.getBoolean("crab.unit.test.open", false);
- // 是否删除旧索引
- private static boolean deleteOldIndex = ConfigService.getBoolean("crab.es.delete.old.index", true);
-
-
- private static final String INDEX_PREX = "*_activity_";
-
- private EsIndexUtils() {
- }
-
- /**
- * 创建指定的 index。
- *
- * @param idxName 索引名
- * @param aliasName 索引的别名
- * @param mappingBuilder 索引的映射配置
- */
- public static void createIndex(String idxName, String aliasName, Settings.Builder setting, XContentBuilder mappingBuilder) {
- if (UNIT_TEST) {
- logger.debug("es create index has been suppressed for unit test.");
- return;
- }
-
- //当setting为null时,初始化默认的setting
- if (setting == null) {
- setting = EsSetting.initSetting(new EsSettingEntity());
- }
-
- CreateIndexRequest request = new CreateIndexRequest(idxName);
- request.settings(setting);
-
- if (aliasName != null) {
- request.alias(new Alias(aliasName));
- }
-
- if (mappingBuilder != null) {
- request.mapping(mappingBuilder);
- }
-
- CreateIndexResponse response = EsClients.executeRequest(client -> client.indices().create(request, RequestOptions.DEFAULT));
- if (!response.isAcknowledged()) {
- logger.error("Failed to create index '{}' with alias: {}", idxName, aliasName);
- }
- }
-
- /**
- * 创建指定的 index。
- *
- * @param idxName 索引名
- */
- public static void createIndexByTemplate(String idxName) {
- CreateIndexRequest request = new CreateIndexRequest(idxName);
- CreateIndexResponse response = EsClients.executeRequest(client -> client.indices().create(request,
- RequestOptions.DEFAULT));
-
- if (!response.isAcknowledged()) {
- logger.error("Failed to create index '{}'", idxName);
- }
- }
-
- /**
- * 迁移 index 的别名到新的。
- *
- * @param indexAlias 索引别名
- * @param oldIndexName 旧索引
- * @param newIndexName 新索引
- */
- public static void aliases(String indexAlias, String oldIndexName, String newIndexName) {
- if (UNIT_TEST) {
- logger.debug("es aliases name has been suppressed for unit test.");
- return;
- }
-
- IndicesAliasesRequest request = new IndicesAliasesRequest();
-
- // 先移除,再添加
- request.addAliasAction(
- new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE).index(oldIndexName).alias(indexAlias));
- request.addAliasAction(
- new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD).index(newIndexName).alias(indexAlias));
-
- AcknowledgedResponse response = EsClients.executeRequest(client -> client.indices().updateAliases(request, RequestOptions.DEFAULT));
- if (!response.isAcknowledged()) {
- logger.error("Failed to aliases index: {}", indexAlias);
- }
- }
-
- /**
- * 设置索引别名。
- *
- * @param indexAlias 索引别名
- * @param newIndexName 新索引
- * @param dropOldIndeces 被删除的别名所关联的索引
- */
- public static Set<String> aliases(String indexAlias, String newIndexName, boolean dropOldIndeces) {
- Set<String> oldIndices = Collections.emptySet();
- if (dropOldIndeces) {
- oldIndices = getIndexByAlias(indexAlias);
- }
- if (!oldIndices.isEmpty()) {
- oldIndices.remove(newIndexName); // 保留最新创建的索引
- }
-
- IndicesAliasesRequest request = new IndicesAliasesRequest();
-
- // 关联新索引
- request.addAliasAction(new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
- .index(newIndexName).alias(indexAlias).writeIndex(true)); // 将新增的索引设置为可写
-
- // 删除指定索引
- if (!oldIndices.isEmpty()) {
- request.addAliasAction(new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE_INDEX)
- .indices(oldIndices.toArray(new String[oldIndices.size()])));
- }
-
- AcknowledgedResponse response = EsClients.executeRequest(client -> client.indices().updateAliases(request, RequestOptions.DEFAULT));
- if (!response.isAcknowledged()) {
- logger.error("Failed to aliases index: {}", indexAlias);
- return Collections.emptySet();
- }
- // 删除旧索引
- deleteOldIndexes(oldIndices);
-
- return oldIndices;
- }
-
- private static void deleteOldIndexes(Set<String> oldIndexes) {
- if (ObjectUtils.isNullOrEmpty(oldIndexes) || !deleteOldIndex) {
- return;
- }
- for (String oldIndex : oldIndexes) {
- dropIndex(oldIndex, true);
- }
- }
-
- /**
- * 返回指定别名所关联的所有索引。
- *
- * @param indexAlias 索引别名
- * @return 别名所关联的索引列表
- */
- public static Set<String> getIndexByAlias(String indexAlias) {
- GetAliasesRequest getAliasReq = new GetAliasesRequest();
- getAliasReq.aliases(indexAlias);
- GetAliasesResponse getAliasRsp = EsClients.executeRequest(client -> client.indices().getAlias(getAliasReq, RequestOptions.DEFAULT));
- return getAliasRsp.getAliases().keySet();
- }
-
-
- /**
- * 判断指定的 index 是否存在。
- *
- * @param indexName 索引名
- * @return 如果存在返回 true,否则返回 false
- */
- public static boolean exists(String indexName) {
- if (UNIT_TEST) {
- logger.debug("es check index has been suppressed for unit test.");
- return false;
- }
-
- GetIndexRequest getRequest = new GetIndexRequest(indexName);
- return EsClients.executeRequest(client -> client.indices().exists(getRequest, RequestOptions.DEFAULT));
- }
-
- /**
- * 删除指定的 index。
- *
- * @param indexName 索引名
- */
- public static void dropIndex(String indexName, boolean ignoreIfNotExists) {
- if (UNIT_TEST) {
- logger.debug("es drop index has been suppressed for unit test.");
- return;
- }
-
- try {
- DeleteIndexRequest deleteRequest = new DeleteIndexRequest(indexName);
- EsClients.executeRequest(client -> client.indices().delete(deleteRequest, RequestOptions.DEFAULT));
- } catch (ElasticsearchException e) {
- if (ignoreIfNotExists && e.status() == RestStatus.NOT_FOUND) {
- return; // ignore
- }
- logger.error("Failed to drop index: " + indexName, e);
- throw new ServerException("Failed to drop index: " + indexName + ", reason: " + e.getMessage(), null);
- }
- }
-
- public static String fillIndex(String index) {
- if (StringUtils.isBlank(index))
- return null;
-
- StringBuffer sb = new StringBuffer(INDEX_PREX);
- if (index.startsWith(INDEX_PREX)) {
- return index;
- }
-
- return sb.append(index).toString();
- }
-
- /**
- * 创建指定的 index 模板。
- *
- * @param tplName 索引模板名
- * @param idxAliasName 索引的别名
- * @param patterns 被应用模板的索引模式
- * @param mappingBuilder 模板的索引映射配置
- */
- public static void createIndexTemplate(String tplName, String idxAliasName, List<String> patterns,
- XContentBuilder mappingBuilder) {
- PutIndexTemplateRequest request = new PutIndexTemplateRequest(tplName);
- request.settings(getIndexSettings(idxAliasName)).patterns(patterns);
-
- if (idxAliasName != null) {
- request.alias(new Alias(idxAliasName));
- }
-
- if (mappingBuilder != null) {
- request.mapping(mappingBuilder);
- }
-
- AcknowledgedResponse response = EsClients.executeRequest(client -> client.indices().putTemplate(request,
- RequestOptions.DEFAULT));
-
- if (!response.isAcknowledged()) {
- logger.error("Failed to create index template'{}' with alias: {}", tplName, idxAliasName);
- }
- }
-
- /*
- * 返回索引的定义。
- */
- private static Settings.Builder getIndexSettings(String aliasName) {
- final String paramPrefix = "pacific.es.index.";
- int shards = ConfigService.getInt(paramPrefix + aliasName + ".number_of_shards", 3);
- int replicas = ConfigService.getInt(paramPrefix + aliasName + ".number_of_replicas", 2);
- int fields = ConfigService.getInt(paramPrefix + aliasName + ".total_fields", 5000);
- return Settings.builder().put("index.number_of_shards", shards)
- .put("index.number_of_replicas", replicas).put("index.mapping.total_fields.limit", fields);
- }
- }
Elasticsearch索引动态字段基础类
- package *.*.common.es.support;
-
- import java.io.IOException;
- import java.util.List;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.stream.Collectors;
- import java.util.stream.Stream;
-
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
-
- import *.*.common.config.ConfigService;
- import *.*.common.util.ConstantFields;
- import *.*.api.error.ServerException;
-
- /**
- * Es中对应的几种动态记录的mapping
- *
- * @author zhanglb
- *
- */
- public class EsActivityMapping {
- public static final String FIELD_TYPE_KEYWORD = "keyword";
- public static final String FIELD_TYPE_DATE = "date";
- public static final String FIELD_TYPE_TEXT = "text";
- public static final String FIELD_TYPE_LONG = "long";
- public static final String FIELD_TYPE_OBJECT = "object";
- public static final String FIELD_TYPE_CUSTOM = "custom";
- public static final List<String> NEED_LOWERCASE_TYPE = Stream.of(FIELD_TYPE_KEYWORD, FIELD_TYPE_TEXT).collect(Collectors.toList());
- public static final String CONTENT = "Total-OAO-context-OBO";
- public static final boolean OPEN_LOWERCASE = ConfigService.getInstance().isOpenLowercase();
-
- private EsActivityMapping() {
- }
-
- public static XContentBuilder buildMapping(Map<String, String> propertiesMap) {
- try (XContentBuilder builder = XContentFactory.jsonBuilder();) {
- builder.startObject();
- {
- builder.startObject("properties");
- if (propertiesMap != null && !propertiesMap.isEmpty()) {
- for (Entry<String, String> entry : propertiesMap.entrySet()) {
- buildField(builder, entry.getKey(), entry.getValue());
- }
- }
- builder.endObject();
- }
- return builder.endObject();
- } catch (IOException e) {
- throw new ServerException("Failed to build index ci mapping", e);
- }
- }
-
- public static XContentBuilder buildCIMapping() {
- try (XContentBuilder builder = XContentFactory.jsonBuilder();) {
- builder.startObject();
- {
- builder.startObject("properties");
- buildField(builder, CONTENT, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.FIELD_ID, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.FIELD_CLASSCODE, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.RES_CIRCLE_FIELD, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.CREATE_TIME, FIELD_TYPE_DATE);
- buildField(builder, ConstantFields.UPDATE_TIME, FIELD_TYPE_DATE);
- buildField(builder, ConstantFields.RES_ATTRVALUES, FIELD_TYPE_OBJECT);
-
- builder.endObject();
- }
- return builder.endObject();
- } catch (IOException e) {
- throw new ServerException("Failed to build index ci mapping", e);
- }
- }
-
- /**
- * 创建配置项动态记录字段映射。
- */
- public static XContentBuilder buildActivityMapping() {
- try (XContentBuilder builder = XContentFactory.jsonBuilder();) {
- builder.startObject();
- {
- builder.startObject("properties");
- buildField(builder, ConstantFields.ACTIVITY_FIELD_ID, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_CLASSCODE, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_CLASSNAME, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_ACTYPE, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.FIELD_TYPE, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_TIME, FIELD_TYPE_DATE);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_TENANTID, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_VERSION, FIELD_TYPE_LONG);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_CIRCLEID, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.FIELD_USERID, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_USERNAME, FIELD_TYPE_TEXT);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_OPER, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_OPERNAME, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_ENTITYID, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_ENTITYNAME, FIELD_TYPE_TEXT);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_DESC, FIELD_TYPE_TEXT);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_PARENTCINAME, FIELD_TYPE_TEXT);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_TARGETCINAME, FIELD_TYPE_TEXT);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_ITSMINFO, FIELD_TYPE_OBJECT);
- buildField(builder, ConstantFields.ACTIVITY_FIELD_RELATIONNAME, FIELD_TYPE_TEXT);
- buildField(builder, ConstantFields.CI_RECORD_FIELD_SOURCE, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.FIELD_STATE, FIELD_TYPE_KEYWORD);
- buildField(builder, ConstantFields.CI_RECORD_FIELD_AUTO_CLAIM, FIELD_TYPE_KEYWORD);
- builder.endObject();
- }
- return builder.endObject();
- } catch (IOException e) {
- throw new ServerException("Failed to build index mapping", e);
- }
- }
-
- private static void buildField(XContentBuilder builder, String field, String type) throws IOException {
- builder.startObject(field);
- {
- builder.field("type", type);
- if (OPEN_LOWERCASE && NEED_LOWERCASE_TYPE.contains(type)) {
- builder.field("normalizer", "lowercase");
- }
- }
- builder.endObject();
- }
- }
写入数据
- /**
- *
- * @param list
- * @param testData
- * @param syncByAlias 是否用别名
- */
- @SuppressWarnings({ "unchecked" })
- public void insertDates(List<Test> list, CommonTestData testData, boolean syncByAlias) {
- ObjectMapper mapper = new ObjectMapper();
- String id = null;
- StringJoiner content;
- Map<String, TestClass> testClassMap = testData.getClassMap();
- Map<String, TestAttribute> testAttributeMap = testData.getAttrMap();
- if (resClassMap.isEmpty()) {
- return;
- }
- try {
- List<Map<String, Object>> objects = new ArrayList<>();
- for (Test entity : list) {
- TestClass testClass = testClassMap.get(entity.getClassCode());
- if (testClass == null) {
- continue;
- }
- List<TestAttribute> attrs = new ArrayList<>();
- for (String code : testClass.getAttrCodes()) {
- TestAttribute attr = testAttributeMap.get(code);
- if (attr != null) {
- attrs.add(attr);
- }
- }
-
- content = new StringJoiner(ElasticsearchUtil.SPLIT_SYMBOL);
- String json = mapper.writeValueAsString(ElasticsearchUtil.converVo(entity, content, testClass, attrs, testData.getDictMap()));
- Map<String, Object> productMap = mapper.readValue(json, Map.class);
- id = productMap.get(TestAttrFieldCodes.BASE_FIELD_ID).toString();
- if (productMap.containsKey(ATTRVALUES)) {
- Map<String, Object> result = new HashMap<>();
- Map<String, Object> attr = (Map<String, Object>) productMap.get(ATTRVALUES);
- for (Map.Entry<String, Object> entry : attr.entrySet()) {
- if (entry.getValue() == null || StringUtils.isBlank(entry.getValue().toString())) {
- continue;
- }
- TestAttribute att = testAttributeMap.get(entry.getKey().toString());
-
- if (att != null) {
- result.put(att.getType() + SPLIT + entry.getKey(), entry.getValue());
- content.add(String.valueOf(entry.getValue()));
- }
-
- }
- productMap.put(ATTRVALUES, result);
- }
- productMap.put(ElasticsearchUtil.CONTENT, ElasticsearchUtil.SPLIT_SYMBOL + content.toString() + ElasticsearchUtil.SPLIT_SYMBOL);
- objects.add(productMap);
- }
- if (syncByAlias) {
- EsDataWriter.saveObjectsAsync(ElasticsearchUtil.getIndexAliasName(), objects);
- } else {
- EsDataWriter.saveObjectsAsync(ElasticsearchUtil.getIndexName(), objects);
- }
-
- } catch (JsonParseException e) {
- resData.clearData();
- logger.error("Failed to parse JSON data id= {} , message : {}", id, e);
- throw new ServerException("Failed to parse JSON data" + id, e);
- } catch (JsonMappingException e) {
- resData.clearData();
- logger.error("Failed to parse JSON data id= {} , message : {}", id, e);
- throw new ServerException("Failed to process JSON data" + id, e);
- } catch (IOException e) {
- resData.clearData();
- logger.error("Failed to parse JSON data id= {} , message : {}", id, e);
- throw new ServerException("Failed to execute" + id, e);
- }
- }
删除数据
- @Override
- public void delById(String id, String type) {
- try {
- DeleteRequest deleteRequest = new DeleteRequest(ElasticsearchUtil.getIndexAliasName(), id);
- EsClients.getClient().delete(deleteRequest, RequestOptions.DEFAULT);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
查询数据
- public Map<String, Object> getSearchData(String index, String type, String value, boolean isPreciseSearch, Integer page, Integer size) {
- Map<String, Object> result = new HashMap<>();
- Integer from = 0;
- if (page - 1 >= 0) {
- from = (page - 1) * size;
- }
- try {
- if (isPreciseSearch) {
- value = "*" + ElasticsearchUtil.SPLIT_SYMBOL + value.substring(1, value.length() - 1) + ElasticsearchUtil.SPLIT_SYMBOL + "*";
- }
-
- if (value.length() == 1) {
- value = CommonUtil.replaceRegexChars(value);
- } else {
- value = CommonUtil.replaceRegexChars(value).replace("\\*", "*");
- }
- SearchResponse response = this.getResponse(index, type, value, from, size);
-
- long totalRecords = response.getHits().getTotalHits().value;
- List<Map<String, Object>> list = new ArrayList<>();
- SearchHit[] ciHits = response.getHits().getHits();
- CacheContext context = new CacheContext();
- for (int i = 0; i < ciHits.length; i++) {
- String objId = ciHits[i].getId();
- Map<String, Object> map = this.filter(ciHits[i].getSourceAsMap(), objId, context);
- map.put(TestAttrFieldCodes.BASE_FIELD_ID, objId);
- list.add(map);
- }
- context.clear();
- result.put("totalRecords", totalRecords);
- result.put("list", list);
- return result;
- } catch (Exception e) {// NOSONAR 此处增加代码稳定性
- throw new ServerException("Failed to search data", e);
- }
- }
-
- private SearchResponse getResponse(String index, String type, String value, int from, int size) {
- SearchSourceBuilder sb = new SearchSourceBuilder();
- if (ConfigService.getInstance().isEsQueryNeedPointTotalOAOContxtObo()) {
- BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
- .must(QueryBuilders.queryStringQuery("*" + value + "*").defaultField(ElasticsearchUtil.getQueryContent()));
- // 是否开启分词查询
- if (ConfigService.getInstance().isOpenEsParticipleQuery()) {
- queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.matchPhraseQuery(ElasticsearchUtil.getQueryContent(), value));
- }
- queryBuilder.filter(
- QueryBuilders.boolQuery().must(QueryBuilders.termQuery(ResAttrFieldCodes.BASE_FIELD_TENANT, ThreadLocalContext.getTenantId())));
- sb.query(queryBuilder);
- } else {
- BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(value + "*")).filter(
- QueryBuilders.boolQuery().must(QueryBuilders.termQuery(ResAttrFieldCodes.BASE_FIELD_TENANT, ThreadLocalContext.getTenantId())));
- sb.query(queryBuilder);
- }
-
- // 分页信息
- sb.from(from);
- sb.size(size);
-
- // 是否统计总数
- sb.trackTotalHits(true);
- sb.sort(ConstantFields.UPDATE_TIME, SortOrder.DESC);
- List<String> indexs = new ArrayList<>();
- indexs.add(index);
- return EsDataReader.query(indexs, sb);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。