赞
踩
- 大数据架构业务场景中需要实时数据落入ES,基本上是业务数据,目的是为了封装后作为规则引擎的变量提供,是变量系统的一部分;
- 架构数据流来源于Maxwell,Spark Streaming做数据流处理,落库使用RestHighLevelClient的同步提交Bulk写入;增删改此文档不涉及,主要就是客户端以及查询的封装,为后续变量系统的应用部分;
def createESClientNew(): RestHighLevelClient = {
new RestHighLevelClient(
RestClient.builder(
new HttpHost("host", 9200, "http"),
new HttpHost("host", 9200, "http"),
new HttpHost("host", 9200, "http")
)
)
}
public class BaseEsClientSerializable implements Serializable {
public RestHighLevelClient getClient() {
return null;
}
}
public class EsClientSerializable extends BaseEsClientSerializable {
public static RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("host", 9200, "http")));
@Override
public RestHighLevelClient getClient() {
return client;
}
}
public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient> { /** * 生产对象 * @return * @throws Exception */ @Override public PooledObject<RestHighLevelClient> makeObject() throws Exception { RestHighLevelClient client = null; try { client = new RestHighLevelClient(RestClient.builder( new HttpHost("host", 9200, "http"))); } catch (Exception e) { e.printStackTrace(); } return new DefaultPooledObject<>(client); } /** * 销毁对象 * @param pooledObject * @throws Exception */ @Override public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception { RestHighLevelClient highLevelClient = pooledObject.getObject(); highLevelClient.close(); } @Override public boolean validateObject(PooledObject<RestHighLevelClient> pooledObject) { return true; } @Override public void activateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception { System.out.println("activateObject"); } @Override public void passivateObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception { System.out.println("passivateObject"); } }
public class ElasticSearchPoolUtil { // 对象池配置类,不写也可以,采用默认配置 private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); // 采用默认配置maxTotal是8,池中有8个client static { poolConfig.setMaxTotal(8); } // 要池化的对象的工厂类,这个是我们要实现的类 private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory(); // 利用对象工厂类和配置类生成对象池 private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory, poolConfig); /** * 获得对象 * * @return * @throws Exception */ public static RestHighLevelClient getClient() throws Exception { // 从池中取一个对象 RestHighLevelClient client = clientPool.borrowObject(); return client; } /** * 归还对象 * * @param client */ public static void returnClient(RestHighLevelClient client) { // 使用完毕之后,归还对象 clientPool.returnObject(client); } }
/** * 多条件match,过滤查询 * * @param client 客户端 * @param index 索引 * @param matchs 条件组,json中kv * @return hits */ def searchHitsByMatchs( client: RestHighLevelClient, index: String, matchs: JSONObject, size: Int ): SearchResponse = { val searchRequest = new SearchRequest(index) val searchSourceBuilder = new SearchSourceBuilder().size(size) val boolQueryBuilder = QueryBuilders.boolQuery() /** * 处理match,做匹配 */ matchs.keySet().toArray().foreach(jsonKey => { val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString)) boolQueryBuilder.must().add(matchQueryBuilder) }) /** * 添加bool查询构建 */ searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) client.search( searchRequest, RequestOptions.DEFAULT ) }
/** * 多条件term,过滤查询 * * @param client 客户端 * @param index 索引 * @param terms 条件组,json中kv * @return hits */ def searchHitsByTerms(client: RestHighLevelClient, index: String, terms: JSONObject, size: Int ): SearchResponse = { val searchRequest = new SearchRequest(index) val searchSourceBuilder = new SearchSourceBuilder().size(size) val boolQueryBuilder = QueryBuilders.boolQuery() /** * 处理term */ terms.keySet().toArray().foreach(jsonKey => { val termQueryBuilder = QueryBuilders.termQuery(jsonKey.toString, terms.getString(jsonKey.toString)) boolQueryBuilder.must().add(termQueryBuilder) }) searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) client.search( searchRequest, RequestOptions.DEFAULT ) }
/** * 多条件match查询返回count * * @param client * @param index * @param matchs * @return */ def searchCountByMatchs( client: RestHighLevelClient, index: String, matchs: JSONObject ): CountResponse = { val countRequest: CountRequest = new CountRequest(index) val searchSourceBuilder = new SearchSourceBuilder() val boolQueryBuilder = QueryBuilders.boolQuery() /** * 处理match,做匹配 */ matchs.keySet().toArray().foreach(jsonKey => { val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString)) boolQueryBuilder.must().add(matchQueryBuilder) }) /** * 添加bool查询构建 */ searchSourceBuilder.query(boolQueryBuilder) countRequest.source(searchSourceBuilder) client.count( countRequest, RequestOptions.DEFAULT ) }
/** * 多条件match聚合查询<avg , sum , max , min> * * @param client 客户端 * @param index 索引 * @param matchs 条件组,json中kv * @param returnFieldName 返回聚合的名字 * @param sumField 聚合的字段 * @param aggregationType <SUM , AVG , MAX , MIN , 默认值为count> 聚合类型 * @return */ def searchSumByMatchs( client: RestHighLevelClient, index: String, matchs: JSONObject, returnFieldName: String, sumField: String, aggregationType: String ): SearchResponse = { val searchRequest = new SearchRequest(index) val searchSourceBuilder = new SearchSourceBuilder() val boolQueryBuilder = QueryBuilders.boolQuery() /** * 处理match,做匹配 */ matchs.keySet().toArray().foreach(jsonKey => { val matchQueryBuilder = QueryBuilders.matchQuery(jsonKey.toString, matchs.getString(jsonKey.toString)) boolQueryBuilder.must().add(matchQueryBuilder) }) /** * 添加bool查询构建 */ searchSourceBuilder.query(boolQueryBuilder) /** * 聚合查询,根据不同入参算不同结果 * * feild : Sets the field to use for this aggregation. 设置用于此聚合的字段。 * sum : Create a new {@link Sum} aggregation with the given name.用给定的名称创建一个新的{@link Sum}聚合。 */ val aggregationBuilder: ValuesSourceAggregationBuilder.LeafOnly[_ >: ValuesSource.Numeric <: ValuesSource, _ >: SumAggregationBuilder with MaxAggregationBuilder with MinAggregationBuilder with AvgAggregationBuilder with ValueCountAggregationBuilder <: ValuesSourceAggregationBuilder.LeafOnly[_ >: ValuesSource.Numeric <: ValuesSource, _ >: SumAggregationBuilder with MaxAggregationBuilder with MinAggregationBuilder with AvgAggregationBuilder with ValueCountAggregationBuilder]] = aggregationType match { case "SUM" => AggregationBuilders.sum(returnFieldName).field(sumField) case "MAX" => AggregationBuilders.max(returnFieldName).field(sumField) case "MIN" => AggregationBuilders.min(returnFieldName).field(sumField) case "AVG" => AggregationBuilders.avg(returnFieldName).field(sumField) case _ => AggregationBuilders.count(returnFieldName).field(sumField) } /** * 添加聚合查询 */ searchSourceBuilder.aggregation(aggregationBuilder) /** * 将所有条件添加到请求 */ searchRequest.source(searchSourceBuilder) client.search( searchRequest, RequestOptions.DEFAULT ) }
/**
* 解析 searchResponse 为 SearchHit 集合
* @param searchResponse
* @return
*/
def analysisSearchResponseToHits(
searchResponse : SearchResponse
): Array[SearchHit] ={
searchResponse.getHits.getHits
}
/**
* 解析 CountResponse 为 long 值
* @param countResponse
* @return
*/
def analysisCountResponseToLong(countResponse : CountResponse): Long ={
countResponse.getCount
}
/** * 解析 searchResponse 为 各种聚合Double 值 * @param searchResponse * @param returnFieldName * @param aggregationType * @return */ def analysisSearchResponseToAggregationFloat( searchResponse : SearchResponse , returnFieldName: String , aggregationType: String ): Double ={ val aggregations = searchResponse.getAggregations /** * 得到单个Aggregation并进行实现 */ val aggregation: Aggregation = aggregations.get(returnFieldName).asInstanceOf[Aggregation] /** * 根据Aggregation的接口转实现类 * An aggregation. Extends {@link ToXContent} as it makes it easier to print out its content. */ aggregationType match { case "SUM" => aggregation.asInstanceOf[ParsedSum].value() case "MAX" => aggregation.asInstanceOf[ParsedMax].value() case "MIN" => aggregation.asInstanceOf[ParsedMin].value() case "AVG" => aggregation.asInstanceOf[ParsedAvg].value() case _ => aggregation.asInstanceOf[ParsedValueCount].value() } }
/** * 连接 */ val client = EsClientSerializable.client /** * 测试searchCountByMatchs */ val matchs = new JSONObject() matchs.put("cert_no.keyword", "xxx") val countResponse = searchCountByMatchs( client, "vs_fms_repay_plan_wide_table_new", matchs ) println(SingleResponseAnalysisUtils.analysisCountResponseToLong(countResponse)) /** * 测试 searchSumByMatchs */ val searchResponse = searchSumByMatchs( client, "vs_fms_repay_plan_wide_table_new", matchs, "result_principal", "principal", "MAX" ) println(SingleResponseAnalysisUtils.analysisSearchResponseToAggregationFloat( searchResponse, "result_principal", "MAX" ))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。