当前位置:   article > 正文

Elasticsearch入门(RestHighLevelClient)-客户端封装-API查询_resthighlevelclient api

resthighlevelclient api

背景

- 大数据架构业务场景中需要实时数据落入ES,基本上是业务数据,目的是为了封装后作为规则引擎的变量提供,是变量系统的一部分;
- 架构数据流来源于Maxwell,Spark Streaming做数据流处理,落库使用RestHighLevelClient的同步提交Bulk写入;增删改此文档不涉及,主要就是客户端以及查询的封装,为后续变量系统的应用部分;
  • 1
  • 2

客户端

streaming直接使用

def createESClientNew(): RestHighLevelClient = {
  new RestHighLevelClient(
    RestClient.builder(
      new HttpHost("host", 9200, "http"),
      new HttpHost("host", 9200, "http"),
      new HttpHost("host", 9200, "http")
    )
  )
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

集群节点之间使用,序列化

public class BaseEsClientSerializable implements Serializable {
    public RestHighLevelClient getClient() {
        return null;
    }
}

public class EsClientSerializable extends BaseEsClientSerializable {
    public static RestHighLevelClient client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("host", 9200, "http")));

    @Override
    public RestHighLevelClient getClient() {
        return client;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

配置连接池查询

public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient> {

    /**
     * 生产对象
     * @return
     * @throws Exception
     */
    @Override
    public PooledObject<RestHighLevelClient> makeObject() throws Exception {
        RestHighLevelClient client = null;
        try {
            client = new RestHighLevelClient(RestClient.builder(
                    new HttpHost("host", 9200, "http")));

        } catch (Exception e) {
            e.printStackTrace();
        }
        return new DefaultPooledObject<>(client);

    }

    /**
     * 销毁对象
     * @param pooledObject
     * @throws Exception
     */
    @Override
    public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
        RestHighLevelClient highLevelClient = pooledObject.getObject();
        highLevelClient.close();
    }

    @Override
    public boolean validateObject(PooledObject<RestHighLevelClient> pooledObject) {
        return true;
    }

    @Override
    public void activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
        System.out.println("activateObject");

    }

    @Override
    public void passivateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
        System.out.println("passivateObject");

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
public class ElasticSearchPoolUtil {

    // 对象池配置类,不写也可以,采用默认配置
    private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    // 采用默认配置maxTotal是8,池中有8个client
    static {
        poolConfig.setMaxTotal(8);
    }
    // 要池化的对象的工厂类,这个是我们要实现的类
    private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();
    // 利用对象工厂类和配置类生成对象池
    private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory,
            poolConfig);

    /**
     * 获得对象
     *
     * @return
     * @throws Exception
     */
    public static RestHighLevelClient getClient() throws Exception {
        // 从池中取一个对象
        RestHighLevelClient client = clientPool.borrowObject();
        return client;
    }

    /**
     * 归还对象
     *
     * @param client
     */
    public static void returnClient(RestHighLevelClient client) {
        // 使用完毕之后,归还对象
        clientPool.returnObject(client);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

查询

  • 没有封装的很完全,多个熟悉之后按照业务场景添加就好,目的就是后续查询不需要开发,直接配置条件即可;

多条件match,过滤查询

/**
 * 多条件match,过滤查询
 *
 * @param client 客户端
 * @param index 索引
 * @param matchs 条件组,json中kv
 * @return hits
 */
def searchHitsByMatchs(
                        client: RestHighLevelClient,
                        index: String,
                        matchs: JSONObject,
                        size: Int
                      ): SearchResponse = {
  val searchRequest = new SearchRequest(index)
  val searchSourceBuilder = new SearchSourceBuilder().size(size)
  val boolQueryBuilder = QueryBuilders.boolQuery()

  /**
   * 处理match,做匹配
   */
  matchs.keySet().toArray().foreach(jsonKey => {
    val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString))
    boolQueryBuilder.must().add(matchQueryBuilder)
  })

  /**
   * 添加bool查询构建
   */
  searchSourceBuilder.query(boolQueryBuilder)
  searchRequest.source(searchSourceBuilder)
  client.search(
    searchRequest,
    RequestOptions.DEFAULT
  )
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

多条件term,过滤查询

/**
 * 多条件term,过滤查询
 *
 * @param client 客户端
 * @param index 索引
 * @param terms 条件组,json中kv
 * @return hits
 */
def searchHitsByTerms(client: RestHighLevelClient,
                      index: String,
                      terms: JSONObject,
                      size: Int
                     ): SearchResponse = {
  val searchRequest = new SearchRequest(index)
  val searchSourceBuilder = new SearchSourceBuilder().size(size)
  val boolQueryBuilder = QueryBuilders.boolQuery()

  /**
   * 处理term
   */
  terms.keySet().toArray().foreach(jsonKey => {
    val termQueryBuilder = QueryBuilders.termQuery(jsonKey.toString, terms.getString(jsonKey.toString))
    boolQueryBuilder.must().add(termQueryBuilder)
  })
  searchSourceBuilder.query(boolQueryBuilder)
  searchRequest.source(searchSourceBuilder)

  client.search(
    searchRequest,
    RequestOptions.DEFAULT
  )
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

多条件match查询返回count

/**
 * 多条件match查询返回count
 *
 * @param client
 * @param index
 * @param matchs
 * @return
 */
def searchCountByMatchs(
                         client: RestHighLevelClient,
                         index: String,
                         matchs: JSONObject
                       ): CountResponse = {
  val countRequest: CountRequest = new CountRequest(index)
  val searchSourceBuilder = new SearchSourceBuilder()
  val boolQueryBuilder = QueryBuilders.boolQuery()

  /**
   * 处理match,做匹配
   */
  matchs.keySet().toArray().foreach(jsonKey => {
    val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString))
    boolQueryBuilder.must().add(matchQueryBuilder)
  })

  /**
   * 添加bool查询构建
   */
  searchSourceBuilder.query(boolQueryBuilder)
  countRequest.source(searchSourceBuilder)
  client.count(
    countRequest,
    RequestOptions.DEFAULT
  )
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

多条件match聚合查询

/**
 * 多条件match聚合查询<avg , sum , max  , min>
 *
 * @param client 客户端
 * @param index 索引
 * @param matchs 条件组,json中kv
 * @param returnFieldName 返回聚合的名字
 * @param sumField 聚合的字段
 * @param aggregationType <SUM , AVG , MAX , MIN  , 默认值为count> 聚合类型
 * @return
 */
def searchSumByMatchs(
                       client: RestHighLevelClient,
                       index: String,
                       matchs: JSONObject,
                       returnFieldName: String,
                       sumField: String,
                       aggregationType: String
                     ): SearchResponse = {
  val searchRequest = new SearchRequest(index)
  val searchSourceBuilder = new SearchSourceBuilder()
  val boolQueryBuilder = QueryBuilders.boolQuery()

  /**
   * 处理match,做匹配
   */
  matchs.keySet().toArray().foreach(jsonKey => {
    val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString))
    boolQueryBuilder.must().add(matchQueryBuilder)
  })

  /**
   * 添加bool查询构建
   */
  searchSourceBuilder.query(boolQueryBuilder)


  /**
   * 聚合查询,根据不同入参算不同结果
   *
   * feild : Sets the field to use for this aggregation. 设置用于此聚合的字段。
   * sum : Create a new {@link Sum} aggregation with the given name.用给定的名称创建一个新的{@link Sum}聚合。
   */

  val aggregationBuilder: ValuesSourceAggregationBuilder.LeafOnly[_ >: ValuesSource.Numeric <: ValuesSource, _ >:
    SumAggregationBuilder with MaxAggregationBuilder with MinAggregationBuilder with AvgAggregationBuilder with ValueCountAggregationBuilder <:
    ValuesSourceAggregationBuilder.LeafOnly[_ >:
      ValuesSource.Numeric <: ValuesSource, _ >: SumAggregationBuilder with MaxAggregationBuilder with MinAggregationBuilder with AvgAggregationBuilder with ValueCountAggregationBuilder]] =
    aggregationType match {
      case "SUM" =>
        AggregationBuilders.sum(returnFieldName).field(sumField)
      case "MAX" =>
        AggregationBuilders.max(returnFieldName).field(sumField)
      case "MIN" =>
        AggregationBuilders.min(returnFieldName).field(sumField)
      case "AVG" =>
        AggregationBuilders.avg(returnFieldName).field(sumField)
      case _ =>
        AggregationBuilders.count(returnFieldName).field(sumField)
    }

  /**
   * 添加聚合查询
   */
  searchSourceBuilder.aggregation(aggregationBuilder)

  /**
   * 将所有条件添加到请求
   */
  searchRequest.source(searchSourceBuilder)
  client.search(
    searchRequest,
    RequestOptions.DEFAULT
  )
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

数据解析

解析searchResponse为SearchHit 集合

/**
 * 解析 searchResponse 为 SearchHit 集合
 * @param searchResponse
 * @return
 */
def analysisSearchResponseToHits(
                                  searchResponse : SearchResponse
                                ): Array[SearchHit] ={
  searchResponse.getHits.getHits
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

解析CountResponse为long值

/**
 * 解析 CountResponse 为 long 值
 * @param countResponse
 * @return
 */
def analysisCountResponseToLong(countResponse : CountResponse): Long ={
  countResponse.getCount
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

解析searchResponse为各种聚合Double值

/**
 * 解析 searchResponse 为 各种聚合Double 值
 * @param searchResponse
 * @param returnFieldName
 * @param aggregationType
 * @return
 */
def analysisSearchResponseToAggregationFloat(
                                  searchResponse : SearchResponse ,
                                  returnFieldName: String ,
                                  aggregationType: String
                                ): Double ={
  val aggregations = searchResponse.getAggregations
  /**
   * 得到单个Aggregation并进行实现
   */
  val aggregation: Aggregation = aggregations.get(returnFieldName).asInstanceOf[Aggregation]
  /**
   * 根据Aggregation的接口转实现类
   * An aggregation. Extends {@link ToXContent} as it makes it easier to print out its content.
   */
  aggregationType match {
    case "SUM" =>
      aggregation.asInstanceOf[ParsedSum].value()
    case "MAX" =>
      aggregation.asInstanceOf[ParsedMax].value()
    case "MIN" =>
      aggregation.asInstanceOf[ParsedMin].value()
    case "AVG" =>
      aggregation.asInstanceOf[ParsedAvg].value()
    case _ =>
      aggregation.asInstanceOf[ParsedValueCount].value()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

调用使用

/**
 * 连接
 */
val client = EsClientSerializable.client

/**
 * 测试searchCountByMatchs
 */
val matchs = new JSONObject()
matchs.put("cert_no.keyword", "xxx")
val countResponse = searchCountByMatchs(
  client,
  "vs_fms_repay_plan_wide_table_new",
  matchs
)
println(SingleResponseAnalysisUtils.analysisCountResponseToLong(countResponse))

/**
 * 测试 searchSumByMatchs
 */
val searchResponse = searchSumByMatchs(
  client,
  "vs_fms_repay_plan_wide_table_new",
  matchs,
  "result_principal",
  "principal",
  "MAX"
)
println(SingleResponseAnalysisUtils.analysisSearchResponseToAggregationFloat(
  searchResponse,
  "result_principal",
  "MAX"
))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

说明

  • 后续会将ES实际项目中的部分逐渐更新出来;
  • 包含在大数据场景下:实时、离线不同项目中的运用;
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/68688
推荐阅读
相关标签
  

闽ICP备14008679号