赞
踩
@Configuration public class ElasticSearchConfig { @Autowired private Environment env; @Bean public RestHighLevelClient restHighLevelClient() { // 拆分ip地址 List<HttpHost> hostLists = new ArrayList<>(); String[] hostList = address.split(","); for (String addr : hostList) { String host = addr.split(":")[0]; String port = addr.split(":")[1]; //参数1:IP,参数2:端口号,参数三:协议 hostLists.add(new HttpHost(host, Integer.parseInt(port), schema)); } // 转换成 HttpHost 数组 HttpHost[] httpHost = hostLists.toArray(new HttpHost[]{}); // 构建连接对象 RestClientBuilder builder = RestClient.builder(httpHost); // 异步连接延时配置 //配置请求超时超时,分为 连接超时(默认1s) 和 套接字超时(默认30s) builder.setRequestConfigCallback(requestConfigBuilder -> { requestConfigBuilder.setConnectTimeout(connectTimeout);//配置连接超时时间 requestConfigBuilder.setSocketTimeout(socketTimeout);//配置套接字超时时间 requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);//获取连接的超时时间 return requestConfigBuilder; }); // 异步连接数配置 builder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setMaxConnTotal(maxConnectNum);//配置最大连接数量 httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);//配置最大的路由连接数 return httpClientBuilder; }); RestHighLevelClient client = new RestHighLevelClient(builder); //得到索引名数组 String[] indexNames = getIndexNames(); List<String> no_created_indexs = indexExists(client, indexNames);//得到索引没有创建的集合 if(no_created_indexs.size()!=0) { //创建index for(String indexName:no_created_indexs) { createIndex(new RestHighLevelClient(builder),indexName);//创建该索引 } } return new RestHighLevelClient(builder); } /** * 创建index,因为CreateIndexRequest属于ddl操作,需要在传入时传一个new的client,在执行完会close client * @param client * @param indexName * @return */ private boolean createIndex(RestHighLevelClient client,String indexName) { CreateIndexRequest request = new CreateIndexRequest(indexName); request.settings(Settings.builder() .put("index.number_of_shards",3) //分片数 //TODO 最好能配置化 .put("index.number_of_replicas", 1));//副本数 //TODO 最好能配置化 request.alias(new Alias(indexName+"alias"));//设置别名 // request.setTimeout(TimeValue.timeValueMinutes(2));//设置创建索引超时2分钟 // 同步请求 try { CreateIndexResponse createIndexResponse = client.indices().create(request,RequestOptions.DEFAULT); // 处理响应 boolean acknowledged = createIndexResponse.isAcknowledged(); boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged(); System.out.println(acknowledged+","+shardsAcknowledged); client.close(); return true; } catch (IOException e) { return false; } } private String[] getIndexNames() { String[] ins = env.getProperty("elasticsearchinfo.index_list").split(","); return ins; } private Logger log = LoggerFactory.getLogger(ElasticSearchConfig.class); /** * 判断索引是否存在 * @param client * @param indexNames * @return 未创建的索引list */ public List<String> indexExists(RestHighLevelClient client,String[] indexNames) { GetIndexRequest request = new GetIndexRequest(); List<String> no_created_indexs = new ArrayList<String>(); for(String indexName:indexNames) { request.indices(indexName);//设置索引 try { if(client.indices().exists(request, RequestOptions.DEFAULT)) { }else {//如果不存在,添加到list中 no_created_indexs.add(indexName); } } catch(ConnectException e) { log.error("es服务连接失败或未启动"); } catch (IOException e) { e.printStackTrace(); no_created_indexs.add(indexName); } } return no_created_indexs; } }
说明:在进行CURD之前上面的配置必不可少
public ResponseData doInsertPomenzhi(PoMenzhi po) throws IOException { //构建新增请求体 if (po.getId() == null || "".equals(po.getId())) { return ResponseDataUtil.buildError("es索引id缺失"); } //1、添加新文档需要调用IndexRequest请求 IndexRequest indexRequest = new IndexRequest(index_shanghai, type_shanghai, po.getId().toString()); //source方法;将文档源设置为索引 indexRequest.source(JSON.toJSONString(po), XContentType.JSON); //发送请求 //同步执行 当以下列方式执行IndexRequest时,客户端在继续执行代码之前,会等待返回索引响应: IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); /**indexResponse 示例: * { * "_shards" : { * "total" : 2, * "failed" : 0, * "successful" : 1 * }, * "_index" : "twitter", * "_type" : "_doc", * "_id" : "1", * "_version" : 1, * "_seq_no" : 0, * "_primary_term" : 1, * "result" : "created" * } */ //获取结果,进行比较 if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { return ResponseDataUtil.buildSuccess("创建成功"); } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { return ResponseDataUtil.buildSuccess("更新成功"); } //获取分片信息 ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { return ResponseDataUtil.buildSuccess("集群部分创建成功"); } //创建失败 if (shardInfo.getFailed() > 0) { ArrayList<Object> reason = new ArrayList<>(); for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { reason.add(failure.reason()); } return ResponseDataUtil.buildError("500", "创建失败", reason); } return ResponseDataUtil.buildError("500", "创建失败", "原因未知"); }
public ResponseData doUpdate(PoMenzhi po) throws IOException { if (po.getId() == null || "".equals(po.getId())) { return ResponseDataUtil.buildError("es索引id缺失"); } //构建更新请求体 UpdateRequest request = new UpdateRequest(index_shanghai, type_shanghai, po.getId().toString()); request.doc(JSON.toJSONString(po), XContentType.JSON); //发送请求 UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); //处理请求结果 if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { return ResponseDataUtil.buildSuccess("创建成功"); } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { return ResponseDataUtil.buildSuccess("更新成功"); } else { return ResponseDataUtil.buildError("更新失败"); } }
public ResponseData doDelete(Long id) throws IOException { DeleteRequest request = new DeleteRequest(index_shanghai, type_shanghai, id.toString()); DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT); //删除响应体 ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { return ResponseDataUtil.buildSuccess("删除成功"); } if (shardInfo.getFailed() > 0) { ArrayList<Object> reason = new ArrayList<>(); for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { reason.add(failure.reason()); } return ResponseDataUtil.buildError("500", "删除失败", reason); } else { return ResponseDataUtil.buildError("500", "删除失败", "原因未知"); } }
public ResponseData getByHitwords(String hitword) throws IOException { //1、创建搜索请求对象SearchRequest,设置查询的指定索引和类型 SearchRequest searchRequest = new SearchRequest(index_shanghai); searchRequest.types(type_shanghai); //2、创建搜索内容对象SearchSourceBuilder SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //3、创建查询对象MatchQueryBuilder,以及MatchQueryBuilder对象的配置 MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("fulltext", hitword); //启动模糊查询 matchQueryBuilder.fuzziness(Fuzziness.AUTO); //设置最大扩展选项以控制查询的模糊过程 matchQueryBuilder.maxExpansions(10); //4、将查询对象MatchQueryBuilder添加到搜索内容对象SearchSourceBuilder中,以及SearchSourceBuilder对象的配置 searchSourceBuilder.query(matchQueryBuilder); //设置查询的起始索引位置 searchSourceBuilder.from(0); //设置查询的数量 searchSourceBuilder.size(5); //设置超时时间 searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //5、将搜索内容对象SearchSourceBuilder添加到搜索请求对象SearchRequest中 searchRequest.source(searchSourceBuilder); //发送搜索请求 SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); SearchHit[] searchHits = hits.getHits(); //解析响应结果 ArrayList list = new ArrayList<>(); for (SearchHit hit : searchHits) { Map<String, Object> map = hit.getSourceAsMap(); list.add(JSONObject.parseObject(JSON.toJSONString(map)).toJavaObject(PoMenzhiShanghai.class)); } return ResponseDataUtil.buildSuccess(list); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。