赞
踩
- # 版本号
- <version.elasticsearch-rest-high-level-client>6.5.0</version.elasticsearch-rest-high-level-client>
-
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${version.elasticsearch-rest-high-level-client}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-client</artifactId>
- <version>${version.elasticsearch-rest-high-level-client}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${version.elasticsearch-rest-high-level-client}</version>
- </dependency>
- public RestHighLevelClient restHighLevelClient() {
-
- RestClientBuilder rclientBuilder = RestClient.builder(new HttpHost(host, port, "http"))
- .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
- @Override
- public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
- return httpClientBuilder
- .setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis());
- }
- });
- return new RestHighLevelClient(rclientBuilder);
- /**
- * @Description: 根据es主键删除数据
- * @Params:
- * @Return:
- * @Author: Mr.myq
- * @Date: 2022/12/2011:04
- */
- @Override
- public void deleteAllByIds(List<String> ids) {
- if (!CollectionUtils.isEmpty(ids)) {
- try {
- List<List<String>> outherList = SubListUtil.splitList(ids, count);
- for (List<String> innerList : outherList) {
- //批量删除数据
- BulkRequest request = new BulkRequest();
- request.timeout("60s");
- for (String id : innerList) {
- // 类型json/_doc
- DeleteRequest source = new DeleteRequest().index("索引").id(id).type(”_doc“);
- request.add(source);
- }
- BulkResponse response = restHighLevelClient().bulk(request, RequestOptions.DEFAULT);
- log.debug("ES批量删除数据 是否有失败 : " + response.hasFailures());
- }
- } catch (IOException e) {
- e.printStackTrace();
- log.error("ES批量删除数据: " + e.toString());
- throw new RuntimeException("ES批量删除数据: {}", e);
- } finally {
- ids.clear();
- }
- }
- }
- /**
- * 批量更新es数据
- *
- * @param esDateList
- */
- private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
- if (CollectionUtils.isEmpty(esDateList)) {
- return;
- }
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout("200s");
- try {
- for (int i = 0; i < esDateList.size(); i++) {
- CmsGoodsEntity object = esDateList.get(i);
- Map<String, Object> param = new LinkedHashMap<>();
- String goodid = object.getGoodid();
- param.put("tariffs_rate", object.getTariffsRate());
- param.put("inspection_charges_fee", object.getInspectionChargesFee());
- param.put("rates", object.getRates());
- UpdateRequest updateRequest = new UpdateRequest(”索引“, "类型", goodid);
- updateRequest.doc(param);
- bulkRequest.add(updateRequest);
- }
- // 操作ES
- BulkResponse bulk = restHighLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
- log.info("批量更新ES 是否有失败 {} ", bulk.hasFailures());
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException("***********批量更新ES数据异常***********");
- } finally {
- esDateList.clear();
- }
- }
- /**
- * @Description: 根据查看条件删除
- * @Params:
- * @Return:
- * @Author: Mr.myq
- * @Date: 2023/2/1415:25
- */
- @Override
- public void deleteBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source) {
- //通过QueryBuilders中的搜索逻辑
- BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
- //1 设置条件
- //设置删除条件: key = value
- TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
- if (!StringUtils.isEmpty(hisp)) {
- TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
- queryBuilder.must(hispTermQueryBuilder);
- }
- TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
- queryBuilder.must(sourceTermQueryBuilder);
- queryBuilder.must(supplierIdTermQueryBuilder);
-
- //2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
- DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
- deleteByQueryRequest.setTimeout("6000s");
- deleteByQueryRequest.setQuery(queryBuilder);
- //指定删除索引
- deleteByQueryRequest.indices(restClientConfig.getIndex());
- deleteByQueryRequest.setConflicts("proceed");
- try {
- //3 通过deleteByQuery来发起删除请求
- BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
- if (deleteResponse.getDeleted() >= 1) {
- log.info("deleteData,删除成功,删除文档条数: " + deleteResponse.getDeleted() + " ,indexName:" + restClientConfig.getIndex());
- }
- } catch (IOException e) {
- e.printStackTrace();
- log.error("无法连接到ES目标服务器");
- throw new TargetServerException("无法连接到ES目标服务器");
- }
- }
- /**
- * @Description: 根据查看条件更新
- * @Params:
- * @Return:
- * @Author: Mr.myq
- * @Date: 2023/2/1415:25
- */
- public void updateBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source,String value) {
- //通过QueryBuilders中的搜索逻辑
- BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
- //1 设置条件
- //设置删除条件: key = value
- TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
- if (!StringUtils.isEmpty(hisp)) {
- TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
- queryBuilder.must(hispTermQueryBuilder);
- }
- TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
- queryBuilder.must(sourceTermQueryBuilder);
- queryBuilder.must(supplierIdTermQueryBuilder);
-
- //2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
- UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
- updateByQueryRequest.setTimeout("6000s");
- updateByQueryRequest.setQuery(queryBuilder);
-
- // 这个实际上就是对应给脚本中传参数的对象。
- HashMap<String, Object> params = new HashMap<>(16);
- params.put("hobby", "修改的参数value");
- final Script script = new Script(
- ScriptType.INLINE, "painless",
- "ctx._source.hobby = params.hobby",
- params);
- updateByQueryRequest.setScript(script);
- //指定索引
- updateByQueryRequest.indices(restClientConfig.getIndex());
- updateByQueryRequest.setConflicts("proceed");
- try {
- //3 通过deleteByQuery来发起删除请求
- BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
- if (deleteResponse.getUpdated() >= 1) {
- log.info("更新成功,更新文档条数: " + deleteResponse.getUpdated() + " ,indexName:" + restClientConfig.getIndex());
- }
- } catch (IOException e) {
- e.printStackTrace();
- log.error("无法连接到ES目标服务器");
- throw new TargetServerException("无法连接到ES目标服务器");
- }
- }
-
- /**
- * 批量更新es数据
- *
- * @param esDateList
- */
- private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
- if (CollectionUtils.isEmpty(esDateList)) {
- return;
- }
- BulkRequest bulkRequest = new BulkRequest();
- bulkRequest.timeout("600s");
- RestHighLevelClient restHighLevelClient = restClientConfig.restHighLevelClient();
- try {
- for (int i = 0; i < esDateList.size(); i++) {
- CmsGoodsEntity object = esDateList.get(i);
- Map<String, Object> param = new LinkedHashMap<>();
- String goodid = object.getGoodid();
- param.put("tariffs_rate", object.getTariffsRate());
- param.put("inspection_charges_fee", object.getInspectionChargesFee());
- param.put("rates", object.getRates());
- UpdateRequest updateRequest = new UpdateRequest(restClientConfig.getIndex(), "_doc", goodid);
- updateRequest.doc(param);
- bulkRequest.add(updateRequest);
- }
- // 操作ES
- BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- log.warn("批量更新ES 是否有失败 {} ", bulk.hasFailures());
- } catch (IOException e) {
- log.error("批量更新ES数据异常,e", e);
- e.printStackTrace();
- throw new RuntimeException("ES批量更新数据异常:{}" + e);
- } finally {
- try {
- restHighLevelClient.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- esDateList.clear();
- }
- }
- public void updateHobby(String user, ESEntity esEntity) throws IOException {
- final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("user", user));
- // 这个实际上就是对应给脚本中传参数的对象。
- HashMap<String, Object> params = new HashMap<>(16);
- params.put("hobby", "和女朋友爬山");
- final Script script = new Script(
- ScriptType.INLINE, "painless",
- "ctx._source.hobby = params.hobby",
- params);
- updateByQuery(queryBuilder, "索引名称", script);
- }
出现: 版本冲突、文档类型不对、JAR包与使用的API不一致或其他问题。都可参考以下连接。
以上代码需要变动一下,将一些参数替换掉。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。