赞
踩
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery; import com.fasterxml.jackson.core.JsonProcessingException; import com.wangyao.common.vo.EsPageVO; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.List; import java.util.Map; import java.util.function.Consumer; public abstract class SearchQueryEngine<T> { /** * 新增集合元素 * @param indexName * @param t * @throws JsonProcessingException */ public abstract void saveOne(T t,String indexName); /** * 批量新增集合元素 * @param list * @param indexName * @return * @throws JsonProcessingException */ public abstract int saveBatch(List<T> list,String indexName); /** * 批量修改集合元素 * @param list * @param indexName * @return * @throws JsonProcessingException */ public abstract int updateBatch(List<T> list,String indexName); /*** * Desc: * @param t 泛型 * @param indexName 索引名 * @author lisong * @date 2023/12/11 17:58 */ public abstract void updateOne(T t,String indexName); /** * 根据script单条更新数据 * @param id id * @param params * @param script * @param indexName */ public abstract void updateOneByScript(String id, Map<String, Object> params, String script, String indexName); /** * 根据script批量更新数据 * @param idLists * @param params * @param script * @param indexName */ public abstract void updateBatchByScript(List<String> idLists, Map<String,Map<String, Object>> params, String script, String indexName); /** * 查询单条记录 * @param id * @param clazz * @param indexName * @return */ public abstract T findOne(String id,Class<T> clazz, String indexName); /** * 查询统计 * @param query * @param indexName * @return */ public abstract long findCount(T query, String indexName); /** * 查询多条记录 * @param query * @param clazz * @param indexName * @return */ public abstract <R> List<R> findList(T query, Class<R> clazz, String indexName); /** * 查询多条记录 * @param query 查询条件 * @param clazz 返回值类型 * @param indexName 索引名称 * @param boolQueryConsumer bool查询条件 * @return <R> */ public abstract <R> List<R> findList(T query, Class<R> clazz, String indexName, Consumer<BoolQuery.Builder> boolQueryConsumer); /** * 查询多条记录 * @param query 查询条件 * @param clazz 返回值类型 * @param indexName 索引名称 * @param boolQueryConsumer bool查询条件 * @param boolFilterConsumer bool过滤条件 * @return <R> */ public abstract <R> List<R> findList(T query, Class<R> clazz, String indexName, Consumer<BoolQuery.Builder> boolQueryConsumer, Consumer<BoolQuery.Builder> boolFilterConsumer); /** * 分页查询多条记录 * @param query 查询条件 * @param clazz 返回值类型 * @param indexName 索引名称 * @return */ public abstract <R> EsPageVO<R> findPage(T query, Class<R> clazz, String indexName); /** * 分页查询多条记录 * @param query 查询条件 * @param clazz 返回值类型 * @param indexName 索引名称 * @param boolQueryConsumer bool查询条件 * @return <R> */ public abstract <R> EsPageVO<R> findPage(T query, Class<R> clazz, String indexName, Consumer<BoolQuery.Builder> boolQueryConsumer); /** * 分页查询多条记录 * @param query 查询条件 * @param clazz 返回值类型 * @param indexName 索引名称 * @param boolQueryConsumer bool查询条件 * @param boolFilterConsumer bool过滤条件 * @return <R> */ public abstract <R> EsPageVO<R> findPage(T query, Class<R> clazz, String indexName, Consumer<BoolQuery.Builder> boolQueryConsumer, Consumer<BoolQuery.Builder> boolFilterConsumer); /** * 聚合查询 * @param query * @param clazz * @param indexName * @return */ public abstract <R> List<R> aggregation(T query, Class<R> clazz, String indexName); /** * 删除ES数据 * @param id id * @param indexName 索引名称 */ public abstract void delete(String id, String indexName); /** * 批量删除集合元素 * @param list id集合 * @param clazz Document class * @param indexName 索引名称 */ public abstract <D> void deleteBatch(List<String> list, Class<D> clazz, String indexName); // 根据分类ID 删除 public abstract <D> void deleteSeach(List<String> list, Class<D> clazz, String indexName); /** * 获取字段值 * @param field * @param obj * @return */ protected Object getFieldValue(Field field, Object obj) { boolean isAccessible = field.isAccessible(); field.setAccessible(true); try { return field.get(obj); } catch (IllegalAccessException e) { throw new RuntimeException(e.getMessage()); } finally { field.setAccessible(isAccessible); } } /** * 查list */ public abstract <R> List<R> seachList( Class<R> clazz, String indexName);
@Component public class SimpleSearchQueryEngine<T> extends SearchQueryEngine<T> { private final int numberOfRowsPerScan = 10; @Resource protected ElasticsearchTemplate elasticsearchTemplate; @Override public int saveBatch(List<T> list,String indexName){ Field id = getField(list); if (id == null) { return 0; } List<IndexQuery> bulkIndex = new ArrayList<>(); for (T t : list) { bulkIndex.add(new IndexQueryBuilder().withId(String.valueOf(getFieldValue(id, t))).withObject(t).build()); } // 批量新增 if(CollectionUtil.isNotEmpty(bulkIndex)){ RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.bulkIndex(bulkIndex,IndexCoordinates.of(indexName)); elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } return list.size(); } @Override public void saveOne(T t,String indexName){ Field id = getField(t); IndexQuery bulkIndex = new IndexQueryBuilder().withId(String.valueOf(getFieldValue(id, t))).withObject(t).build(); // 新增 RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.index(bulkIndex,IndexCoordinates.of(indexName)); elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } @Override public int updateBatch(List<T> list,String indexName){ Field id = getField(list); if (id == null) { return 0; } List<UpdateQuery> bulkUpdate = new ArrayList<>(); for (T t : list) { UpdateQuery updateQuery = UpdateQuery. builder(getFieldValue(id, t).toString()). withDocument(Document.parse(JSON.toJSONStringWithDateFormat(t, DateFormatConstant.YYYY_MM_DD_T_HH_MM_SS_SSS, SerializerFeature.WriteDateUseDateFormat))).build(); bulkUpdate.add(updateQuery); } // 批量修改 if(CollectionUtil.isNotEmpty(bulkUpdate)){ RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.bulkUpdate(bulkUpdate,IndexCoordinates.of(indexName)); elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } return list.size(); } @Override public void updateOne(T t,String indexName){ Field id = getField(t); UpdateQuery updateQuery = UpdateQuery. builder(getFieldValue(id, t).toString()). withDocument(Document.parse(JSON.toJSONStringWithDateFormat(t,DateFormatConstant.YYYY_MM_DD_T_HH_MM_SS_SSS, SerializerFeature.WriteDateUseDateFormat))).build(); RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.update(updateQuery,IndexCoordinates.of(indexName)); // 删除立即刷新 处理后恢复原配置 elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } @Override public void updateOneByScript(String id,Map<String, Object> params, String script, String indexName) { UpdateQuery updateQuery =UpdateQuery. builder(id). withScript(script). withScriptedUpsert(true). withParams(params). withScriptType(ScriptType.INLINE). build(); RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.update(updateQuery,IndexCoordinates.of(indexName)); // 删除立即刷新 处理后恢复原配置 elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } @Override public void updateBatchByScript(List<String> idLists, Map<String,Map<String, Object>> params, String script, String indexName) { List<UpdateQuery> bulkUpdate = new ArrayList<>(); if(CollectionUtil.isNotEmpty(idLists)){ idLists.forEach(id-> bulkUpdate.add(UpdateQuery. builder(id). withScript(script). withScriptedUpsert(true). withParams(params.get(id)). withScriptType(ScriptType.INLINE).build()) ); } RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.bulkUpdate(bulkUpdate,IndexCoordinates.of(indexName)); // 删除立即刷新 处理后恢复原配置 elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } /** * 反射获取id * @param list * @return */ @Nullable private Field getField(List<T> list) { if (CollectionUtils.isEmpty(list)) { return null; } T base = list.get(0); Field id = null; for (Field field : base.getClass().getDeclaredFields()) { DocumentId businessId = field.getAnnotation(DocumentId.class); if (businessId != null) { id = field; break; } } if (id == null) { throw new WangyaoException("Can't find @TableId on " + base.getClass().getName()); } return id; } /** * 反射获取id * @param * @return */ @Nullable private Field getField(T base) { Field id = null; for (Field field : base.getClass().getDeclaredFields()) { DocumentId businessId = field.getAnnotation(DocumentId.class); if (businessId != null) { id = field; break; } } if (id == null) { throw new WangyaoException("Can't find @TableId on " + base.getClass().getName()); } return id; } /** * 根据id单条查询 * @param id * @param clazz * @param indexName * @return */ @Override public T findOne(String id,Class<T> clazz, String indexName) { return elasticsearchTemplate.get(id, clazz, IndexCoordinates.of(indexName)); } /** * 查询统计 * @param query * @param indexName * @return */ @Override public long findCount(T query, String indexName){ //封装查询条件 NativeQueryBuilder nativeQueryBuilder = buildNativeQueryBuilder(query); return elasticsearchTemplate.count(nativeQueryBuilder.build(), IndexCoordinates.of(indexName)); } @Override public <R> List<R> findList(T query, Class<R> clazz, String indexName) { List<R> collect = new ArrayList<>(); //封装查询条件 NativeQueryBuilder nativeQueryBuilder = buildNativeQueryBuilder(query); SearchHits<R> aggSearchResult = getSearchHits(clazz, indexName, nativeQueryBuilder); try { if(CollectionUtil.isNotEmpty(aggSearchResult)){ collect = aggSearchResult.stream().map(SearchHit::getContent).collect(Collectors.toList()); } } catch (Exception e) { throw new WangyaoException(e.getMessage()); } return collect; } @Override public <R> List<R> findList(T query, Class<R> clazz, String indexName, Consumer<BoolQuery.Builder> boolQueryConsumer) { return findList(query, clazz, indexName, boolQueryConsumer, null); } @Override public <R> List<R> findList(T query, Class<R> clazz, String indexName, Consumer<BoolQuery.Builder> boolQueryConsumer, Consumer<BoolQuery.Builder> boolFilterConsumer) { List<R> collect = new ArrayList<>(); //封装查询条件 NativeQueryBuilder nativeQueryBuilder = buildNativeQueryBuilder(query, boolQueryConsumer, boolFilterConsumer); SearchHits<R> aggSearchResult = getSearchHits(clazz, indexName, nativeQueryBuilder); try { if(CollectionUtil.isNotEmpty(aggSearchResult)){ collect = aggSearchResult.stream().map(SearchHit::getContent).collect(Collectors.toList()); } } catch (Exception e) { throw new WangyaoException(e.getMessage()); } return collect; } @Override public <R> EsPageVO<R> findPage(T query, Class<R> clazz, String indexName) { return findPage(query, clazz, indexName, null); } @Override public <R> EsPageVO<R> findPage(T query, Class<R> clazz, String indexName, Consumer<BoolQuery.Builder> boolQueryConsumer) { return findPage(query, clazz, indexName, boolQueryConsumer, null); } @Override public <R> EsPageVO<R> findPage(T query, Class<R> clazz, String indexName, Consumer<BoolQuery.Builder> boolQueryConsumer, Consumer<BoolQuery.Builder> boolFilterConsumer) { EsPageVO<R> esPageVO = new EsPageVO<>(); //封装查询条件 NativeQueryBuilder nativeQueryBuilder = buildNativeQueryBuilder(query, boolQueryConsumer, boolFilterConsumer); SearchHits<R> aggSearchResult = getSearchHits(clazz, indexName, nativeQueryBuilder); try { esPageVO.setList(aggSearchResult.stream().map(SearchHit::getContent).collect(Collectors.toList())); //===============分页信息====================// //总记录数 long total = aggSearchResult.getTotalHits(); esPageVO.setTotal(total); // 总页码 if (query instanceof EsPageDTO pageDTO) { int totalPages = (int)total % pageDTO.getPageSize() == 0 ? (int)total / pageDTO.getPageSize() : ((int)total / pageDTO.getPageSize() + 1); esPageVO.setPages(totalPages); } } catch (Exception e) { throw new WangyaoException(e.getMessage()); } return esPageVO; } /** * 获取查询结果 * @param clazz * @param indexName * @param nativeQueryBuilder * @param <R> * @return */ @NotNull private <R> SearchHits<R> getSearchHits(Class<R> clazz, String indexName, NativeQueryBuilder nativeQueryBuilder) { return elasticsearchTemplate.search(nativeQueryBuilder.build(), clazz, IndexCoordinates.of(indexName)); } @Override public <R> List<R> aggregation(T query, Class<R> clazz,String indexName) { List<R> collect = new ArrayList<>(); //封装查询条件 NativeQueryBuilder nativeQueryBuilder = buildNativeQueryBuilder(query); //封装聚合条件 nativeQueryBuilder = buildGroupBy(nativeQueryBuilder,query); SearchHits<R> search = elasticsearchTemplate.search(nativeQueryBuilder.build(), clazz, IndexCoordinates.of(indexName)); try { if(CollectionUtil.isNotEmpty(search)){ collect = search.stream().map(SearchHit::getContent).collect(Collectors.toList()); } } catch (Exception e) { throw new WangyaoException(e.getMessage()); } return collect; } @Override public void delete(String id, String indexName) { RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.delete(id, IndexCoordinates.of(indexName)); // 删除立即刷新 处理后恢复原配置 elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } @Override public <D> void deleteBatch(List<String> list, Class<D> clazz, String indexName) { NativeQueryBuilder builder = new NativeQueryBuilder(); builder.withQuery(QueryBuilders.terms(a -> a.field("_id").terms(b -> b.value(list.stream().map(FieldValue::of).toList())))); RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.delete(builder.build(), clazz, IndexCoordinates.of(indexName)); // 删除立即刷新 处理后恢复原配置 elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } //自定义条件删除 @Override public <D> void deleteSeach(List<String> list, Class<D> clazz, String indexName) { NativeQueryBuilder builder = new NativeQueryBuilder(); builder.withQuery(QueryBuilders.terms(a -> a.field("hotCategoryId").terms(b -> b.value(list.stream().map(FieldValue::of).toList())))); RefreshPolicy refreshPolicy = elasticsearchTemplate.getRefreshPolicy(); elasticsearchTemplate.setRefreshPolicy(RefreshPolicy.IMMEDIATE); elasticsearchTemplate.delete(builder.build(), clazz, IndexCoordinates.of(indexName)); // 删除立即刷新 处理后恢复原配置 elasticsearchTemplate.setRefreshPolicy(refreshPolicy); } /** * GroupBy构建 * @param query * @return */ @SuppressWarnings("unchecked") private NativeQueryBuilder buildGroupBy(NativeQueryBuilder nativeQueryBuilder,T query) { List<Field> sumList = new ArrayList<>(); Object groupByCollection = null; Class<? extends Object> clazz = query.getClass(); for (Field field : clazz.getDeclaredFields()) { Sum sumAnnotation = field.getAnnotation(Sum.class); if (sumAnnotation != null) { sumList.add(field); } GroupBy groupByAnnotation = field.getAnnotation(GroupBy.class); Object value = getFieldValue(field, query); if (groupByAnnotation == null || value == null) { } else if (!(value instanceof Collection)) { throw new WangyaoException("GroupBy filed must be collection"); } else if (CollectionUtils.isEmpty((Collection<?>) value)) { } else if (groupByCollection != null) { throw new WangyaoException("Only one @GroupBy is allowed"); } else { groupByCollection = value; } } Iterator<String> iterator = ((Collection<String>) groupByCollection).iterator(); Map<String,String> groupFieldMap = new HashMap<>(16); return recursiveAddAggregation(nativeQueryBuilder,iterator, sumList,groupFieldMap); } private NativeQueryBuilder buildNativeQueryBuilder(T query) { NativeQueryBuilder nativeSearchQueryBuilder = new NativeQueryBuilder(); BoolQuery.Builder boolFilter = QueryBuilders.bool(); BoolQuery.Builder boolQuery = QueryBuilders.bool(); Field[] declaredFields = query.getClass().getDeclaredFields(); for (Field field:declaredFields){ handleConditions(query, nativeSearchQueryBuilder, field, boolFilter, boolQuery); } nativeSearchQueryBuilder.withFilter(boolFilter.build()._toQuery()); nativeSearchQueryBuilder.withQuery(boolQuery.build()._toQuery()); // 分页设置 if (query instanceof EsPageDTO pageDTO) { if (Objects.nonNull(pageDTO.getPageNum()) && Objects.nonNull(pageDTO.getPageSize())) { Pageable pageable = Pageable.ofSize(pageDTO.getPageSize()).withPage(pageDTO.getPageNum() - 1); nativeSearchQueryBuilder.withPageable(pageable); } } return nativeSearchQueryBuilder; } /** * 封装查询条件 * @param query 查询条件 * @param nativeSearchQueryBuilder queryBuilder * @param field 字段 */ @SuppressWarnings("unchecked") private void handleConditions(T query, NativeQueryBuilder nativeSearchQueryBuilder, Field field, BoolQuery.Builder boolFilter, BoolQuery.Builder boolQuery) { if(Objects.nonNull(getFieldValue(field,query))&& ConditionsEnum.termMap.getDesc().equals(field.getName())){ List<Query> termListQuery = Lists.newArrayList(); Map<String, Object> fieldValue = (Map<String, Object>) getFieldValue(field, query); fieldValue.forEach((k,v)-> termListQuery.add(QueryBuilders.term(a -> a.field(k).value(String.valueOf(v)))) ); boolFilter.filter(termListQuery); } if(Objects.nonNull(getFieldValue(field,query))&&ConditionsEnum.notTermMap.getDesc().equals(field.getName())){ listQuery(query, field, boolFilter); } if(Objects.nonNull(getFieldValue(field,query))&&ConditionsEnum.nestedMap.getDesc().equals(field.getName())){ List<Query> nestedListQuery = Lists.newArrayList(); Map<String, Object> fieldValue = (Map<String, Object>) getFieldValue(field, query); fieldValue.forEach((k,v)->{ Map<String, Object> maps = (Map<String, Object>) v; maps.forEach((k1,v1) -> nestedListQuery.add( QueryBuilders.nested( a -> a.path(k).query(b -> b.term(c -> c.field(k+"."+k1) .value(String.valueOf(v1))))))); } ); boolQuery.must(nestedListQuery); } if(Objects.nonNull(getFieldValue(field,query))&&ConditionsEnum.nestedTermsMap.getDesc().equals(field.getName())){ List<Query> nestedTermsListQuery = Lists.newArrayList(); Map<String, Object> fieldValue = (Map<String, Object>) getFieldValue(field, query); fieldValue.forEach((k,v)->{ Map<String, List<Object>> maps = (Map<String, List<Object>>) v; maps.forEach((k1,v1) -> nestedTermsListQuery.add( QueryBuilders.nested( a -> a.path(k).query(b -> b.terms(c -> c.field(k+"."+k1).terms(d -> d.value(v1.stream().map(o -> FieldValue.of(o.toString())).toList()))))))); } ); boolQuery.must(nestedTermsListQuery); } if(Objects.nonNull(getFieldValue(field,query))&&ConditionsEnum.termsMap.getDesc().equals(field.getName())){ List<Query> termsListQuery = Lists.newArrayList(); Map<String, List<Object>> fieldValue = (Map<String, List<Object>>) getFieldValue(field, query); List<FieldValue> termValues = new ArrayList<>(); fieldValue.forEach((k,v)-> { for (Object t : v) { termValues.add(FieldValue.of(t.toString())); } termsListQuery.add(QueryBuilders.terms(a -> a.field(k).terms(b -> b.value(termValues)))); }); boolFilter.filter(termsListQuery); } if(Objects.nonNull(getFieldValue(field,query))&&ConditionsEnum.rangeMap.getDesc().equals(field.getName())){ List<Query> rangeListQuery = Lists.newArrayList(); Map<String, SearchParamsDTO.RangeTime> fieldValue = (Map<String, SearchParamsDTO.RangeTime>) getFieldValue(field, query); fieldValue.forEach((k,v)-> { if (v.getStartTimeOptional().isPresent() || v.getEndTimeOptional().isPresent()) { rangeListQuery.add(QueryBuilders.range(b -> { b.field(k); v.getStartTimeOptional().ifPresent(startTime -> b.gt(JsonData.fromJson(String.valueOf(startTime.toInstant(ZoneOffset.of("+8")).toEpochMilli())))); v.getEndTimeOptional().ifPresent(endTime -> b.lt(JsonData.fromJson(String.valueOf(endTime.toInstant(ZoneOffset.of("+8")).toEpochMilli())))); return b; })); } }); if (CollUtil.isNotEmpty(rangeListQuery)) { boolFilter.filter(rangeListQuery); } } if(Objects.nonNull(getFieldValue(field,query))&&ConditionsEnum.matchMap.getDesc().equals(field.getName())){ Map<String, Object> fieldValue = (Map<String, Object>) getFieldValue(field, query); List<Query> mustListQuery = Lists.newArrayList(); fieldValue.forEach((k,v)-> mustListQuery.add(QueryBuilders.match(a -> a.field(k).query(String.valueOf(v)))) ); boolQuery.must(mustListQuery); } if(Objects.nonNull(getFieldValue(field,query))&&ConditionsEnum.sortMap.getDesc().equals(field.getName())){ Map<String, Object> fieldValue = (Map<String, Object>) getFieldValue(field, query); fieldValue.forEach((k,v)-> { if(ConstantNumeral.ONE.value() == (int)v){ nativeSearchQueryBuilder.withSort(Sort.by(k).descending()); }else{ nativeSearchQueryBuilder.withSort(Sort.by(k).ascending()); } }); } // 开启track_total_hits 解决elasticsearch单次查询1w条问题 nativeSearchQueryBuilder.withTrackTotalHits(true); } private void listQuery(T query, Field field, BoolQuery.Builder boolFilter) { List<Query> notTermListQuery = Lists.newArrayList(); Map<String, Object> fieldValue = (Map<String, Object>) getFieldValue(field, query); fieldValue.forEach((k,v)-> notTermListQuery.add(QueryBuilders.term(a -> a.field(k).value(String.valueOf(v)))) ); boolFilter.mustNot(notTermListQuery); } /** * 添加Aggregation * @param iterator * @param groupFieldMap * @return */ private NativeQueryBuilder recursiveAddAggregation(NativeQueryBuilder nativeQueryBuilder, Iterator<String> iterator, List<Field> sumList, Map<String, String> groupFieldMap) { String groupBy = iterator.next(); Aggregation groupByAgg = Aggregation.of(a -> a.terms(b -> b.field(groupBy).size(numberOfRowsPerScan)) .aggregations("group_by_agg_"+groupBy,Aggregation.of(c -> c.terms(d -> d.field(groupBy).size(numberOfRowsPerScan))))); if (iterator.hasNext()) { groupFieldMap.put(groupBy,groupBy); //最后一个元素前递归拼接agg条件 recursiveAddAggregation(nativeQueryBuilder.withAggregation(groupBy,groupByAgg),iterator, sumList, groupFieldMap); } else { if(!groupFieldMap.containsKey(groupBy)){ //最后一个元素不做递归,否则上去.next就会报错 nativeQueryBuilder = nativeQueryBuilder.withAggregation(groupBy,groupByAgg); groupFieldMap.put(groupBy,groupBy); } for (Field field : sumList) { Aggregation sumAgg = Aggregation.of(a -> a.terms(b -> b.field(field.getName()).size(numberOfRowsPerScan)) .aggregations("sum_by_agg_"+field.getName(),Aggregation.of(c -> c.sum(d -> d.field(field.getName()))))); nativeQueryBuilder.withAggregation(field.getName(),sumAgg); } sumList.clear(); } return nativeQueryBuilder; } @Override public <R> List<R> seachList( Class<R> clazz, String indexName) { List<R> collect = new ArrayList<>(); SearchHits<R> aggSearchResult = getSearchHitsAll(clazz, indexName); try { if(CollectionUtil.isNotEmpty(aggSearchResult)){ collect = aggSearchResult.stream().map(SearchHit::getContent).collect(Collectors.toList()); } } catch (Exception e) { throw new WangyaoException(e.getMessage()); } return collect; } /** * 获取查询结果 * @param clazz * @param indexName * @param <R> * @return */ @NotNull private <R> SearchHits<R> getSearchHitsAll(Class<R> clazz, String indexName) { NativeQueryBuilder nativeQueryBuilder = new NativeQueryBuilder(); Pageable pageable = Pageable.ofSize(1000); nativeQueryBuilder.withPageable(pageable); return elasticsearchTemplate.search(nativeQueryBuilder.build(),clazz, IndexCoordinates.of(indexName)); } }
public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { private static final Log LOGGER = LogFactory.getLog(ElasticsearchTemplate.class); private final ElasticsearchClient client; private final RequestConverter requestConverter; private final ResponseConverter responseConverter; private final JsonpMapper jsonpMapper; private final ElasticsearchExceptionTranslator exceptionTranslator; public ElasticsearchTemplate(ElasticsearchClient client) { Assert.notNull(client, "client must not be null"); this.client = client; this.jsonpMapper = ((ElasticsearchTransport)client._transport()).jsonpMapper(); this.requestConverter = new RequestConverter(this.elasticsearchConverter, this.jsonpMapper); this.responseConverter = new ResponseConverter(this.jsonpMapper); this.exceptionTranslator = new ElasticsearchExceptionTranslator(this.jsonpMapper); } public ElasticsearchTemplate(ElasticsearchClient client, ElasticsearchConverter elasticsearchConverter) { super(elasticsearchConverter); Assert.notNull(client, "client must not be null"); this.client = client; this.jsonpMapper = ((ElasticsearchTransport)client._transport()).jsonpMapper(); this.requestConverter = new RequestConverter(elasticsearchConverter, this.jsonpMapper); this.responseConverter = new ResponseConverter(this.jsonpMapper); this.exceptionTranslator = new ElasticsearchExceptionTranslator(this.jsonpMapper); } protected AbstractElasticsearchTemplate doCopy() { return new ElasticsearchTemplate(this.client, this.elasticsearchConverter); } public IndexOperations indexOps(Class<?> clazz) { return new IndicesTemplate(this.client.indices(), this.elasticsearchConverter, clazz); } public IndexOperations indexOps(IndexCoordinates index) { return new IndicesTemplate(this.client.indices(), this.elasticsearchConverter, index); } public ClusterOperations cluster() { return new ClusterTemplate(this.client.cluster(), this.elasticsearchConverter); } @Nullable public <T> T get(String id, Class<T> clazz, IndexCoordinates index) { GetRequest getRequest = this.requestConverter.documentGetRequest(this.elasticsearchConverter.convertId(id), this.routingResolver.getRouting(), index, false); GetResponse<EntityAsMap> getResponse = (GetResponse)this.execute((client) -> { return client.get(getRequest, EntityAsMap.class); }); AbstractElasticsearchTemplate.ReadDocumentCallback<T> callback = new AbstractElasticsearchTemplate.ReadDocumentCallback(this, this.elasticsearchConverter, clazz, index); return callback.doWith(DocumentAdapters.from(getResponse)); } public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) { Assert.notNull(query, "query must not be null"); Assert.notNull(clazz, "clazz must not be null"); MgetRequest request = this.requestConverter.documentMgetRequest(query, clazz, index); MgetResponse<EntityAsMap> result = (MgetResponse)this.execute((client) -> { return client.mget(request, EntityAsMap.class); }); AbstractElasticsearchTemplate.ReadDocumentCallback<T> callback = new AbstractElasticsearchTemplate.ReadDocumentCallback(this, this.elasticsearchConverter, clazz, index); return (List)DocumentAdapters.from(result).stream().map((multiGetItem) -> { return MultiGetItem.of(multiGetItem.isFailed() ? null : callback.doWith((Document)multiGetItem.getItem()), multiGetItem.getFailure()); }).collect(Collectors.toList()); } public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) { Assert.notNull(queries, "queries must not be null"); Assert.notNull(bulkOptions, "bulkOptions must not be null"); Assert.notNull(index, "index must not be null"); this.doBulkOperation(queries, bulkOptions, index); } public ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates index) { Assert.notNull(query, "query must not be null"); DeleteByQueryRequest request = this.requestConverter.documentDeleteByQueryRequest(query, clazz, index, this.getRefreshPolicy()); DeleteByQueryResponse response = (DeleteByQueryResponse)this.execute((client) -> { return client.deleteByQuery(request); }); return this.responseConverter.byQueryResponse(response); } public UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index) { UpdateRequest<Document, ?> request = this.requestConverter.documentUpdateRequest(updateQuery, index, this.getRefreshPolicy(), this.routingResolver.getRouting()); co.elastic.clients.elasticsearch.core.UpdateResponse<Document> response = (co.elastic.clients.elasticsearch.core.UpdateResponse)this.execute((client) -> { return client.update(request, Document.class); }); return UpdateResponse.of(TypeUtils.result(response.result())); } public ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { Assert.notNull(updateQuery, "updateQuery must not be null"); Assert.notNull(index, "index must not be null"); UpdateByQueryRequest request = this.requestConverter.documentUpdateByQueryRequest(updateQuery, index, this.getRefreshPolicy()); UpdateByQueryResponse byQueryResponse = (UpdateByQueryResponse)this.execute((client) -> { return client.updateByQuery(request); }); return this.responseConverter.byQueryResponse(byQueryResponse); } public String doIndex(IndexQuery query, IndexCoordinates indexCoordinates) { Assert.notNull(query, "query must not be null"); Assert.notNull(indexCoordinates, "indexCoordinates must not be null"); IndexRequest<?> indexRequest = this.requestConverter.documentIndexRequest(query, indexCoordinates, this.refreshPolicy); IndexResponse indexResponse = (IndexResponse)this.execute((client) -> { return client.index(indexRequest); }); Object queryObject = query.getObject(); if (queryObject != null) { query.setObject(this.updateIndexedObject(queryObject, IndexedObjectInformation.of(indexResponse.id(), indexResponse.seqNo(), indexResponse.primaryTerm(), indexResponse.version()))); } return indexResponse.id(); } protected boolean doExists(String id, IndexCoordinates index) { Assert.notNull(id, "id must not be null"); Assert.notNull(index, "index must not be null"); GetRequest request = this.requestConverter.documentGetRequest(id, this.routingResolver.getRouting(), index, true); return ((GetResponse)this.execute((client) -> { return client.get(request, EntityAsMap.class); })).found(); } protected String doDelete(String id, @Nullable String routing, IndexCoordinates index) { Assert.notNull(id, "id must not be null"); Assert.notNull(index, "index must not be null"); DeleteRequest request = this.requestConverter.documentDeleteRequest(this.elasticsearchConverter.convertId(id), routing, index, this.getRefreshPolicy()); return ((DeleteResponse)this.execute((client) -> { return client.delete(request); })).id(); } public ReindexResponse reindex(ReindexRequest reindexRequest) { Assert.notNull(reindexRequest, "reindexRequest must not be null"); co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = this.requestConverter.reindex(reindexRequest, true); co.elastic.clients.elasticsearch.core.ReindexResponse reindexResponse = (co.elastic.clients.elasticsearch.core.ReindexResponse)this.execute((client) -> { return client.reindex(reindexRequestES); }); return this.responseConverter.reindexResponse(reindexResponse); } public String submitReindex(ReindexRequest reindexRequest) { co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = this.requestConverter.reindex(reindexRequest, false); co.elastic.clients.elasticsearch.core.ReindexResponse reindexResponse = (co.elastic.clients.elasticsearch.core.ReindexResponse)this.execute((client) -> { return client.reindex(reindexRequestES); }); if (reindexResponse.task() == null) { throw new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request"); } else { return reindexResponse.task(); } } public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) { BulkRequest bulkRequest = this.requestConverter.documentBulkRequest(queries, bulkOptions, index, this.refreshPolicy); BulkResponse bulkResponse = (BulkResponse)this.execute((client) -> { return client.bulk(bulkRequest); }); List<IndexedObjectInformation> indexedObjectInformationList = this.checkForBulkOperationFailure(bulkResponse); this.updateIndexedObjectsWithQueries(queries, indexedObjectInformationList); return indexedObjectInformationList; } public String getClusterVersion() { return (String)this.execute((client) -> { return client.info().version().number(); }); } public String getVendor() { return "Elasticsearch"; } public String getRuntimeLibraryVersion() { return Version.VERSION != null ? Version.VERSION.toString() : "0.0.0.?"; } public long count(Query query, @Nullable Class<?> clazz, IndexCoordinates index) { Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); SearchRequest searchRequest = this.requestConverter.searchRequest(query, clazz, index, true, false); SearchResponse<EntityAsMap> searchResponse = (SearchResponse)this.execute((client) -> { return client.search(searchRequest, EntityAsMap.class); }); return searchResponse.hits().total().value(); } public <T> SearchHits<T> search(Query query, Class<T> clazz, IndexCoordinates index) { Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); SearchRequest searchRequest = this.requestConverter.searchRequest(query, clazz, index, false, false); SearchResponse<EntityAsMap> searchResponse = (SearchResponse)this.execute((client) -> { return client.search(searchRequest, EntityAsMap.class); }); AbstractElasticsearchTemplate.ReadDocumentCallback<T> readDocumentCallback = new AbstractElasticsearchTemplate.ReadDocumentCallback(this, this.elasticsearchConverter, clazz, index); SearchDocumentResponse.EntityCreator<T> entityCreator = this.getEntityCreator(readDocumentCallback); AbstractElasticsearchTemplate.SearchDocumentResponseCallback<SearchHits<T>> callback = new AbstractElasticsearchTemplate.ReadSearchDocumentResponseCallback(this, clazz, index); return (SearchHits)callback.doWith(SearchDocumentResponseBuilder.from(searchResponse, entityCreator, this.jsonpMapper)); } protected <T> SearchHits<T> doSearch(MoreLikeThisQuery query, Class<T> clazz, IndexCoordinates index) { Assert.notNull(query, "query must not be null"); Assert.notNull(clazz, "clazz must not be null"); Assert.notNull(index, "index must not be null"); return this.search(((NativeQueryBuilder)NativeQuery.builder().withQuery((q) -> { return q.moreLikeThis(this.requestConverter.moreLikeThisQuery(query, index)); }).withPageable(query.getPageable())).build(), clazz, index); } public <T> SearchScrollHits<T> searchScrollStart(long scrollTimeInMillis, Query query, Class<T> clazz, IndexCoordinates index) { Assert.notNull(query, "query must not be null"); Assert.notNull(query.getPageable(), "pageable of query must not be null."); SearchRequest request = this.requestConverter.searchRequest(query, clazz, index, false, scrollTimeInMillis); SearchResponse<EntityAsMap> response = (SearchResponse)this.execute((client) -> { return client.search(request, EntityAsMap.class); }); return this.getSearchScrollHits(clazz, index, response); } public <T> SearchScrollHits<T> searchScrollContinue(String scrollId, long scrollTimeInMillis, Class<T> clazz, IndexCoordinates index) { Assert.notNull(scrollId, "scrollId must not be null"); ScrollRequest request = ScrollRequest.of((sr) -> { return sr.scrollId(scrollId).scroll(Time.of((t) -> { return t.time("" + scrollTimeInMillis + "ms"); })); }); ScrollResponse<EntityAsMap> response = (ScrollResponse)this.execute((client) -> { return client.scroll(request, EntityAsMap.class); }); return this.getSearchScrollHits(clazz, index, response); } private <T> SearchScrollHits<T> getSearchScrollHits(Class<T> clazz, IndexCoordinates index, ResponseBody<EntityAsMap> response) { AbstractElasticsearchTemplate.ReadDocumentCallback<T> documentCallback = new AbstractElasticsearchTemplate.ReadDocumentCallback(this, this.elasticsearchConverter, clazz, index); AbstractElasticsearchTemplate.SearchDocumentResponseCallback<SearchScrollHits<T>> callback = new AbstractElasticsearchTemplate.ReadSearchScrollDocumentResponseCallback(this, clazz, index); return (SearchScrollHits)callback.doWith(SearchDocumentResponseBuilder.from(response, this.getEntityCreator(documentCallback), this.jsonpMapper)); } public void searchScrollClear(List<String> scrollIds) { Assert.notNull(scrollIds, "scrollIds must not be null"); if (!scrollIds.isEmpty()) { ClearScrollRequest request = ClearScrollRequest.of((csr) -> { return csr.scrollId(scrollIds); }); this.execute((client) -> { return client.clearScroll(request); }); } } public <T> List<SearchHits<T>> multiSearch(List<? extends Query> queries, Class<T> clazz, IndexCoordinates index) { Assert.notNull(queries, "queries must not be null"); Assert.notNull(clazz, "clazz must not be null"); List<MultiSearchQueryParameter> multiSearchQueryParameters = new ArrayList(queries.size()); Iterator var5 = queries.iterator(); while(var5.hasNext()) { Query query = (Query)var5.next(); multiSearchQueryParameters.add(new MultiSearchQueryParameter(query, clazz, this.getIndexCoordinatesFor(clazz))); } return (List)this.doMultiSearch(multiSearchQueryParameters).stream().map((searchHits) -> { return searchHits; }).collect(Collectors.toList()); } public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class<?>> classes) { Assert.notNull(queries, "queries must not be null"); Assert.notNull(classes, "classes must not be null"); Assert.isTrue(queries.size() == classes.size(), "queries and classes must have the same size"); List<MultiSearchQueryParameter> multiSearchQueryParameters = new ArrayList(queries.size()); Iterator<Class<?>> it = classes.iterator(); Iterator var5 = queries.iterator(); while(var5.hasNext()) { Query query = (Query)var5.next(); Class<?> clazz = (Class)it.next(); multiSearchQueryParameters.add(new MultiSearchQueryParameter(query, clazz, this.getIndexCoordinatesFor(clazz))); } return this.doMultiSearch(multiSearchQueryParameters); } public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class<?>> classes, IndexCoordinates index) { Assert.notNull(queries, "queries must not be null"); Assert.notNull(classes, "classes must not be null"); Assert.notNull(index, "index must not be null"); Assert.isTrue(queries.size() == classes.size(), "queries and classes must have the same size"); List<MultiSearchQueryParameter> multiSearchQueryParameters = new ArrayList(queries.size()); Iterator<Class<?>> it = classes.iterator(); Iterator var6 = queries.iterator(); while(var6.hasNext()) { Query query = (Query)var6.next(); Class<?> clazz = (Class)it.next(); multiSearchQueryParameters.add(new MultiSearchQueryParameter(query, clazz, index)); } return this.doMultiSearch(multiSearchQueryParameters); } private List<SearchHits<?>> doMultiSearch(List<MultiSearchQueryParameter> multiSearchQueryParameters) { MsearchRequest request = this.requestConverter.searchMsearchRequest(multiSearchQueryParameters); MsearchResponse<EntityAsMap> msearchResponse = (MsearchResponse)this.execute((client) -> { return client.msearch(request, EntityAsMap.class); }); List<MultiSearchResponseItem<EntityAsMap>> responseItems = msearchResponse.responses(); Assert.isTrue(multiSearchQueryParameters.size() == responseItems.size(), "number of response items does not match number of requests"); List<SearchHits<?>> searchHitsList = new ArrayList(multiSearchQueryParameters.size()); Iterator<MultiSearchQueryParameter> queryIterator = multiSearchQueryParameters.iterator(); Iterator<MultiSearchResponseItem<EntityAsMap>> responseIterator = responseItems.iterator(); while(queryIterator.hasNext()) { MultiSearchQueryParameter queryParameter = (MultiSearchQueryParameter)queryIterator.next(); MultiSearchResponseItem<EntityAsMap> responseItem = (MultiSearchResponseItem)responseIterator.next(); if (responseItem.isResult()) { Class clazz = queryParameter.clazz; AbstractElasticsearchTemplate.ReadDocumentCallback<?> documentCallback = new AbstractElasticsearchTemplate.ReadDocumentCallback(this, this.elasticsearchConverter, clazz, queryParameter.index); AbstractElasticsearchTemplate.SearchDocumentResponseCallback<SearchHits<?>> callback = new AbstractElasticsearchTemplate.ReadSearchDocumentResponseCallback(this, clazz, queryParameter.index); SearchHits<?> searchHits = (SearchHits)callback.doWith(SearchDocumentResponseBuilder.from(responseItem.result(), this.getEntityCreator(documentCallback), this.jsonpMapper)); searchHitsList.add(searchHits); } else if (LOGGER.isWarnEnabled()) { LOGGER.warn(String.format("multisearch responsecontains failure: {}", responseItem.failure().error().reason())); } } return searchHitsList; } public String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) { Assert.notNull(index, "index must not be null"); Assert.notNull(keepAlive, "keepAlive must not be null"); Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null"); OpenPointInTimeRequest request = this.requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable); return ((OpenPointInTimeResponse)this.execute((client) -> { return client.openPointInTime(request); })).id(); } public Boolean closePointInTime(String pit) { Assert.notNull(pit, "pit must not be null"); ClosePointInTimeRequest request = this.requestConverter.searchClosePointInTime(pit); ClosePointInTimeResponse response = (ClosePointInTimeResponse)this.execute((client) -> { return client.closePointInTime(request); }); return response.succeeded(); } public <T> T execute(ClientCallback<T> callback) { Assert.notNull(callback, "callback must not be null"); try { return callback.doWithClient(this.client); } catch (RuntimeException | IOException var3) { throw this.exceptionTranslator.translateException(var3); } } public Query matchAllQuery() { return NativeQuery.builder().withQuery((qb) -> { return qb.matchAll((mab) -> { return mab; }); }).build(); } public Query idsQuery(List<String> ids) { return NativeQuery.builder().withQuery((qb) -> { return qb.ids((iq) -> { return iq.values(ids); }); }).build(); } protected List<IndexedObjectInformation> checkForBulkOperationFailure(BulkResponse bulkResponse) { if (bulkResponse.errors()) { Map<String, String> failedDocuments = new HashMap(); Iterator var3 = bulkResponse.items().iterator(); while(var3.hasNext()) { BulkResponseItem item = (BulkResponseItem)var3.next(); if (item.error() != null) { failedDocuments.put(item.id(), item.error().reason()); } } throw new BulkFailureException("Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + failedDocuments + "]", failedDocuments); } else { return (List)bulkResponse.items().stream().map((itemx) -> { return IndexedObjectInformation.of(itemx.id(), itemx.seqNo(), itemx.primaryTerm(), itemx.version()); }).collect(Collectors.toList()); } } @FunctionalInterface public interface ClientCallback<T> { T doWithClient(ElasticsearchClient client) throws IOException; } static record MultiSearchQueryParameter(Query query, Class<?> clazz, IndexCoordinates index) { MultiSearchQueryParameter(Query query, Class<?> clazz, IndexCoordinates index) { this.query = query; this.clazz = clazz; this.index = index; } public Query query() { return this.query; } public Class<?> clazz() { return this.clazz; } public IndexCoordinates index() { return this.index; } }
public class DateFormatConstant {
public static final String YYYY_MM_DD_T_HH_MM_SS_SSS = "yyyy-MM-dd'T'HH:mm:ss.SSS";
public static final String YYYYMMDD = "yyyyMMdd";
}
接口
List<EsSharVO> selectList(EsShareDTO shareDTO);
实现
@Resource
private SearchQueryEngine<SearchParamsDTO> searchQueryEngine;
列表查询方法
@Override
public List<EsSharVO> selectList(EsShareDTO shareDTO) {
if (Objects.isNull(shareDTO)) {
return null;
}
Consumer<BoolQuery.Builder> boolFilterConsumer = handleQueryType(shareDTO);
return searchQueryEngine.findList(getSearchParamsDTO(shareDTO), EsSharVO.class, ShareInfoDocument.DISTRIBUTED_ID_KEY, null, boolFilterConsumer);
}
接口
EsPageVO<EsSharVO> selectPage(EsShareDTO shareDTO);
实现
@Resource
private SearchQueryEngine<SearchParamsDTO> searchQueryEngine;
分页查询方法
@Override
public EsPageVO<EsShareInfoVO> selectPage(EsShareInfoParamDTO shareInfoEsDTO) {
if (Objects.isNull(shareInfoEsDTO)) {
return null;
}
Consumer<BoolQuery.Builder> boolFilterConsumer = handleQueryType(shareInfoEsDTO);
return searchQueryEngine.findPage(getSearchParamsDTO(shareInfoEsDTO), EsShareInfoVO.class, ShareInfoDocument.DISTRIBUTED_ID_KEY, null, boolFilterConsumer);
}
接口
void saveBatch(List<EsSharVO> sharList);
实现
@Resource
private SearchQueryEngine<SearchParamsDTO> searchQueryEngine;
批量保存方法
@Override
public void saveBatch(List<EsShareInfoDTO> shareInfoList) {
shareInfoSaveOrUpdateEngine.saveBatch(BeanUtil.copyList(shareInfoList, ShareInfoDocument.class), ShareInfoDocument.DISTRIBUTED_ID_KEY);
}
接口
void updateById(EsShareDTO shareDTO);
实现
@Resource
private SearchQueryEngine<SearchParamsDTO> searchQueryEngine;
根据ID更新 方法
@Override
public void updateById(EsShareInfoDTO shareInfo) {
shareInfoSaveOrUpdateEngine.updateOne(BeanUtil.copyProperties(shareInfo, ShareInfoDocument.class), ShareInfoDocument.DISTRIBUTED_ID_KEY);
}
接口
void updateByOrderId(EsShareDTO shareDTO);
实现
@Resource
private SearchQueryEngine<SearchParamsDTO> searchQueryEngine;
根据订单ID更新方法
@Override
public void updateBySysOrderId(EsShareDTO shareInfo) {
UpdateQuery updateQuery = UpdateQuery
.builder(new NativeQuery(QueryBuilders.term(filed -> filed.field("sysOrderId").value(shareInfo.getSysOrderId()))))
.withDocument(Document.parse(JSON.toJSONStringWithDateFormat(BeanUtil.copyProperties(shareInfo, ShareInfoDocument.class), DateFormatConstant.YYYY_MM_DD_T_HH_MM_SS_SSS, SerializerFeature.WriteDateUseDateFormat)))
.build();
elasticsearchTemplate.update(updateQuery, IndexCoordinates.of(ShareInfoDocument.DISTRIBUTED_ID_KEY));
}
接口 void updateBatchByScript(List<EsSharDTO> esSharList); 实现 @Resource private SearchQueryEngine<EsSharDTO> search; 批量更新 方法 @Override public void updateBatchByScript(List<EsShareInfoDTO> esShareInfoList) { if (CollUtil.isEmpty(esShareInfoList)) { return; } EsShareInfoDTO esShareInfoDTO = esShareInfoList.get(0); String script = handleScript(esShareInfoDTO); Map<String, Map<String, Object>> params = Maps.newHashMap(); esShareInfoList.forEach(shareInfoDTO -> { Map<String, Object> param = Maps.newHashMap(); param.put("updateTime", shareInfoDTO.getUpdateTime()); handleParam(shareInfoDTO, param); params.put(shareInfoDTO.getId().toString(), param); }); shareInfoSaveOrUpdateEngine.updateBatchByScript(params, script, ShareInfoDocument.DISTRIBUTED_ID_KEY); }
接口 /** * 批量更新 * @param payId 支付单号 * @param confirmStatus 确认状态 */ void updateBatchByScriptForPayId(String payId, Integer status); 实现 @Resource private SearchQueryEngine<SearchParamsDTO> searchQueryEngine; 批量更新 利用脚本实现 @Override public void updateBatchByScriptForPayId(String payId, Integer confirmStatus) { String script = "ctx._source.payId=params.payId;ctx._source.confirmStatus=params.confirmStatus;ctx._source.updateTime=params.updateTime;"; Map<String, Object> param = Maps.newHashMap(); param.put("payId", payId); param.put("confirmStatus", confirmStatus); param.put("updateTime", LocalDateTime.now()); UpdateQuery updateQuery = UpdateQuery. builder(new NativeQuery(QueryBuilders.term(filed -> filed.field("payId.keyword").value(payId)))). withScript(script). withScriptedUpsert(true). withParams(param). withScriptType(ScriptType.INLINE).build(); elasticsearchTemplate.updateByQuery(updateQuery, IndexCoordinates.of(ShareInfoDocument.DISTRIBUTED_ID_KEY)); }
(1)示例
private EsSharDTO getSearchParamsDTO(EsSharDTO shareInfoEsDTO) { SearchParamsDTO searchParamsDTO = new SearchParamsDTO(); Map<String, Object> termsMap = new HashMap<>(2); if (CollUtil.isNotEmpty(shareInfoEsDTO.getIdList())) { termsMap.put("_id", shareInfoEsDTO.getIdList()); } if (CollUtil.isNotEmpty(shareInfoEsDTO.getSettlementStatusList())) { termsMap.put("settlementStatus", shareInfoEsDTO.getSettlementStatusList()); } if (CollUtil.isNotEmpty(termsMap)) { searchParamsDTO.setTermsMap(termsMap); } handelTermMap(shareInfoEsDTO, searchParamsDTO); if (Objects.nonNull(shareInfoEsDTO.getNotWhetherShare())) { searchParamsDTO.setNotTermMap(Map.of("whetherShare", ConstantNumeral.ZERO.value())); } Map<String, SearchParamsDTO.RangeTime> rangeMap = new HashMap<>(ConstantNumeral.FOUR.value()); if (Objects.nonNull(shareInfoEsDTO.getStartReceivedTime()) || Objects.nonNull(shareInfoEsDTO.getEndReceivedTime())) { rangeMap.put("receivedTime", new SearchParamsDTO.RangeTime(shareInfoEsDTO.getStartReceivedTime(), shareInfoEsDTO.getEndReceivedTime())); } if (Objects.nonNull(shareInfoEsDTO.getStartConfirmTime()) || Objects.nonNull(shareInfoEsDTO.getEndConfirmTime())) { rangeMap.put("confirmTime", new SearchParamsDTO.RangeTime(shareInfoEsDTO.getStartConfirmTime(), shareInfoEsDTO.getEndConfirmTime())); } if (!rangeMap.isEmpty()) { searchParamsDTO.setRangeMap(rangeMap); } searchParamsDTO.setPageNum(shareInfoEsDTO.getPageNum()); searchParamsDTO.setPageSize(shareInfoEsDTO.getPageSize()); return searchParamsDTO; }
(2)示例
private static void handelTermMap(EsShareInfoParamDTO shareInfoEsDTO, SearchParamsDTO searchParamsDTO) { Map<String, Object> termMap = new HashMap<>(ConstantNumeral.FOUR.value()); if (Objects.nonNull(shareInfoEsDTO.getUserId())) { termMap.put("userId", shareInfoEsDTO.getUserId()); } if (StrUtil.isNotBlank(shareInfoEsDTO.getPayId())) { termMap.put("payId.keyword", shareInfoEsDTO.getPayId()); } if (Objects.nonNull(shareInfoEsDTO.getSettlementStatus())) { termMap.put("settlementStatus", shareInfoEsDTO.getSettlementStatus()); } if (!termMap.isEmpty()) { searchParamsDTO.setTermMap(termMap); } }
(3)示例
private Consumer<BoolQuery.Builder> handleQueryType(EsShareInfoParamDTO paramDTO) { if (Objects.isNull(paramDTO.getQueryType())) { return null; } paramDTO.setShareStatus(ShareStatusEnum.COMPLETED.code); if (Objects.equals(ConstantNumeral.ZERO.value(), paramDTO.getQueryType())) { paramDTO.setEndConfirmTime(LocalDate.now().atTime(0,0,0)); paramDTO.setOrderStatus(OrderStatusEnum.SUCCESS.code); } else if (Objects.equals(ConstantNumeral.ONE.value(), paramDTO.getQueryType())) { paramDTO.setSettlementStatusList(Arrays.asList(SettlementStatusEnum.PAYMENT_HAS_BEEN_RECEIVED.code, SettlementStatusEnum.WITHDRAWING.code, SettlementStatusEnum.ALREADY_REQUESTED.code)); } else if (Objects.equals(ConstantNumeral.TWO.value(), paramDTO.getQueryType())) { paramDTO.setSettlementStatus(SettlementStatusEnum.NO_WITHDRAWAL.code); paramDTO.setConfirmStatus(ConfirmStatusEnum.PLATFORM_CONFIRMATION.code); List<Query> queryList = Lists.newArrayList(); BoolQuery.Builder boolFilter = QueryBuilders.bool(); boolFilter.mustNot(QueryBuilders.term(a -> a.field("orderStatus").value(OrderStatusEnum.SUCCESS.code))); queryList.add(boolFilter.build()._toQuery()); boolFilter = QueryBuilders.bool(); boolFilter.filter(QueryBuilders.range(b -> { b.field("confirmTime"); b.gt(JsonData.fromJson(LocalDate.now().atTime(0,0,0).toString())); return b; })); queryList.add(boolFilter.build()._toQuery()); boolFilter = QueryBuilders.bool(); boolFilter.should(queryList); BoolQuery.Builder finalBoolFilter = boolFilter; return boolQuery -> boolQuery.must(finalBoolFilter.build()._toQuery()); } return null; }
(4)示例
private String handleScript(EsShareInfoDTO esShareInfoDTO) { String script = "ctx._source.updateTime=params.updateTime;"; if (Objects.nonNull(esShareInfoDTO.getPayOrderId())) { script += "ctx._source.payOrderId=params.payOrderId;"; } if (Objects.nonNull(esShareInfoDTO.getOrderStatus())) { script += "ctx._source.orderStatus=params.orderStatus;"; } if (Objects.nonNull(esShareInfoDTO.getFinishTime())) { script += "ctx._source.finishTime=params.finishTime;"; } if (Objects.nonNull(esShareInfoDTO.getOperateUserId())) { script += "ctx._source.operateUserId=params.operateUserId;"; } return script; }
(5)示例
```
private void handleParam(EsShareInfoDTO shareInfoDTO, Map<String, Object> param) {
if (Objects.nonNull(shareInfoDTO.getPayOrderId())) {
param.put("payOrderId", shareInfoDTO.getPayOrderId());
}
if (Objects.nonNull(shareInfoDTO.getOperateUserId())) {
param.put("operateUserId", shareInfoDTO.getOperateUserId());
}
if (Objects.nonNull(shareInfoDTO.getOperateUserName())) {
param.put("operateUserName", shareInfoDTO.getOperateUserName());
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。