当前位置:   article > 正文

ElasticSearch Aggregations使用总结详解_es agg

es agg

 

1.单字段情况下聚合

假设只需要对一个字段聚合,比如b字段,b字段是keyword类型,需要考虑的情况最为简单,当要对b字段聚合时语句很好写,如下即可

 

  1. {
  2. "from": 0,
  3. "size": 0,
  4. "query": {
  5. "bool": {
  6. "must": [{
  7. "bool": {
  8. "should": [{
  9. "terms": {
  10. "field_a": ["1", "2", "3"],
  11. "boost": 1.0
  12. }
  13. }, {
  14. "terms": {
  15. "field_b": ["1", "2", "3"],
  16. "boost": 1.0
  17. }
  18. }],
  19. "adjust_pure_negative": true,
  20. "minimum_should_match": "1",
  21. "boost": 1.0
  22. }
  23. }],
  24. "adjust_pure_negative": true,
  25. "boost": 1.0
  26. }
  27. },
  28. "aggregations": {
  29. "my_agg": {
  30. "terms": {
  31. "field": "field_b"
  32. }
  33. }
  34. }
  35. }

这是完整的query,后面的查询会省略掉query部分。query部分的用处也很明显:只把需要做聚合的部分过滤出来做聚合,我们需要统计的数据就在这部分中,而不是整个索引库。这样有两个好处:
1.提高效率,减少需要聚合的数据的数量
2.剔除需要考虑的意外情况,降低语句的复杂度
而聚合部分就非常简单了,仅仅对field_b聚合即可,但是很遗憾,离我们最终目标很远,这样只能统计出b字段的数据分布情况。

示例:

  1. {
  2. "query": {
  3. "bool": {
  4. "must": [{
  5. "terms": {
  6. "track_type": ["31","32"]
  7. }
  8. }
  9. ],
  10. "must_not": [],
  11. "should": []
  12. }
  13. },
  14. "from": 0,
  15. "size": 0,
  16. "sort": [],
  17. "aggs": {
  18. "obj_value": {
  19. "terms": {
  20. "field": "obj_value",
  21. "size": 100
  22. }
  23. }
  24. }
  25. }

2.多字段情况的聚合

相对于上面的那种,接下来把另外一个字段也考虑进来看看。所以我们写下了这样的请求语句:

 

  1. "aggregations": {
  2. "my_agg1": {
  3. "terms": {
  4. "field": "tag_brand_id"
  5. }
  6. },
  7. "my_agg2": {
  8. "terms": {
  9. "field": "brand_cid_array"
  10. }
  11. }
  12. }

勉强的可以看到确实也是“统计了两个字段的情况”,但是是分开的,意味着要自己去解析返回结果并做计算来得到最终的返回结果。这确实是很令人恶心的事,那还有没有其他办法呢。但是观察语句的结构发现,似乎并没有过多可以更改的余地,所以需要寻求其他灵活的解决办法。

3.script agg的聚合

简单的单聚合无法表达出多字段聚合的需求,在谷歌过后我寻找到了这样一种解决方案:使用script,即脚本来描述我的需求。下面这段agg就是为了表达我想要根据我的需求灵活处理的一个方式:

 

  1. "aggregations": {
  2. "my_agg1": {
  3. "terms": {
  4. "script": " if (doc['field_a'].values.contains('1') || doc['field_b'].values.contains('1')){1};if (doc['field_a'].values.contains('2') || doc['field_b'].values.contains('2')){2};
  5. if (doc['field_a'].values.contains('3') || doc['field_b'].values.contains('3')){3};"
  6. }
  7. }
  8. }

这一段脚本的作用很明显,就是告诉es:当a字段或者b字段包括1的时候,扔到桶1;当a字段或者b字段包括2的时候,扔到桶2;……以此类推。看上去确实似乎完全解决了开头提出来的问题,验证后效率还能接受,不是特别慢。但是正当我沾沾自喜以为解决了问题的时候,随手验证了另外一个case,就直接冷水泼头了:
a字段和b字段是可能包含同一个id比如2,但是对于统计结果来说要求算作一条。
用上面这个脚本并无法体现出这个区别,而且还会有一个问题:
请求123和请求321时会返回不同统计结果
因为ifelse语句的关系,和||的性质,在满足条件1后便会扔到桶1,而无法在去后续条件中判断。这个脚本有很明显的bug存在。但是painless毕竟是脚本,可以使用的API和关键字都非常有限,写的复杂了还会很严重影响效率,无奈这个方案也只能pass,即使它看上去差点解决了我的问题。

4.filter agg的聚合

在重新看了官方文档后,我发现了agg中的一个用法,filter agg。
filter agg的用法其实很简单,但是全意外的和我的需求很契合。之前忽视掉这个用法的主要原因是看到的示例都是对单字段做聚合。那如何同时聚合多个字段呢?从API入手验证是否可以使用比较灵活的写法

 

  1. public KeyedFilter(String key, QueryBuilder filter) {
  2. if (key == null) {
  3. throw new IllegalArgumentException("[key] must not be null");
  4. }
  5. if (filter == null) {
  6. throw new IllegalArgumentException("[filter] must not be null");
  7. }
  8. this.key = key;
  9. this.filter = filter;
  10. }

这是es提供的javaapi中filter agg的构造函数,key就是过滤名称,filter就是过滤条件。而且很友好的是,filter类型为QueryBuilder,也就是说,可以做成比较复杂的过滤方式。

 

  1. "aggregations": {
  2. "batch_count": {
  3. "filters": {
  4. "filters": {
  5. "1": {
  6. "bool": {
  7. "should": [{
  8. "term": {
  9. "field_a": {
  10. "value": "1",
  11. "boost": 1.0
  12. }
  13. }
  14. }, {
  15. "term": {
  16. "field_b": {
  17. "value": "1",
  18. "boost": 1.0
  19. }
  20. }
  21. }],
  22. "adjust_pure_negative": true,
  23. "boost": 1.0
  24. }
  25. },
  26. "2": {
  27. "bool": {
  28. "should": [{
  29. "term": {
  30. "field_a": {
  31. "value": "2",
  32. "boost": 1.0
  33. }
  34. }
  35. }, {
  36. "term": {
  37. "field_b": {
  38. "value": "2",
  39. "boost": 1.0
  40. }
  41. }
  42. }],
  43. "adjust_pure_negative": true,
  44. "boost": 1.0
  45. }
  46. },
  47. "3": {
  48. "bool": {
  49. "should": [{
  50. "term": {
  51. "field_a": {
  52. "value": "3",
  53. "boost": 1.0
  54. }
  55. }
  56. }, {
  57. "term": {
  58. "field_b": {
  59. "value": "3",
  60. "boost": 1.0
  61. }
  62. }
  63. }],
  64. "adjust_pure_negative": true,
  65. "boost": 1.0
  66. }
  67. }
  68. },
  69. "other_bucket": false,
  70. "other_bucket_key": "-1"
  71. }
  72. }
  73. }

ES使用script进行聚合

使用es进行聚合, 但是常规的聚合无法在聚合中进行复杂操作,

如:

select avg(field1> 12), sum(round(field2, 1))  from table

ES可以使用

ScriptedMetricAggregationBuilder

进行复杂的聚合操作但是目前处于试验阶段, 后期可能继续完善, 也可能删除.

 

官网原文:

This functionality is experimental and may be changed or removed completely in a future release. Elastic will take a best effort approach to fix any issues, but experimental features are not subject to the support SLA of official GA features.

引用:https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-aggregations-metrics-scripted-metric-aggregation.html 

 

案例:

  1. POST ledger/_search?size=0
  2. {
  3.     "query" : {
  4.         "match_all" : {}
  5.     },
  6.     "aggs": {
  7.         "profit": {
  8.             "scripted_metric": {
  9.                 "init_script" : "params._agg.transactions = []",
  10.                 "map_script" : "params._agg.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)"
  11.                 "combine_script" : "double profit = 0; for (t in params._agg.transactions) { profit += t } return profit",
  12.                 "reduce_script" : "double profit = 0; for (a in params._aggs) { profit += a } return profit"
  13.             }
  14.         }
  15.     }
  16. }


输出

  1. {
  2.     "took": 218,
  3.     ...
  4.     "aggregations": {
  5.         "profit": {
  6.             "value": 240.0
  7.         }
  8.    }
  9. }


关键词说明:

  1. 脚本化度量标准聚合在其执行的4个阶段使用脚本:
  2. init_script
  3. 在任何文件集合之前执行。允许聚合设置任何初始状态。
  4. 在上面的示例中,在对象中init_script创建一个数组。transactions_agg
  5. map_script
  6. 每个收集的文件执行一次。这是唯一必需的脚本。如果未指定combine_script,则生成的状态需要存储在名为的对象中_agg。
  7. 在上面的示例中,map_script检查type字段的值。如果值为sale,则amount字段的值将添加到transactions数组中。如果类型字段的值不是销售,则金额字段的否定值将添加到交易中。
  8. combine_script
  9. 文档收集完成后,在每个分片上执行一次。允许聚合合并从每个分片返回的状态。如果未提供combine_script,则组合阶段将返回聚合变量。
  10. 在上面的示例中,combine_script迭代遍历所有存储的事务,对profit变量中的值求和并最终返回profit。
  11. reduce_script
  12. 在所有分片返回结果后,在协调节点上执行一次。该脚本提供对变量的访问,该变量_aggs是每个分片上combine_script结果的数组。如果未提供reduce_script,则reduce阶段将返回_aggs变量。
  13. 在上面的示例中,reduce_script迭代通过profit每个分片返回的值,在返回最终组合利润之前对值进行求和,该最终组合利润将在聚合的响应中返回。

JavaAPI

案例

  1. ScriptedMetricAggregationBuilder aggregation = AggregationBuilders
  2.     .scriptedMetric("agg")
  3.     .initScript(new Script("params._agg.heights = []"))
  4.     .mapScript(new Script("params._agg.heights.add(doc.gender.value == 'male' ? doc.height.value : -1.0 * doc.height.value)"))
  5.     .combineScript(new Script("double heights_sum = 0.0; for (t in params._agg.heights) { heights_sum += t } return heights_sum"))
  6.     .reduceScript(new Script("double heights_sum = 0.0; for (a in params._aggs) { heights_sum += a } return heights_sum"));
  7. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
  8. import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
  9. // sr is here your SearchResponse object
  10. ScriptedMetric agg = sr.getAggregations().get("agg");
  11. Object scriptedResult = agg.aggregation();
  12. logger.info("scriptedResult [{}]", scriptedResult);


输出:

  1. scriptedResult object [Double]
  2. scriptedResult [2.171917696507009]

elasticsearch aggregation groovy script 语法各种输出

返回map   

  1. "scripted_terms": { 
  2.    
  3. "scripted_metric": { 
  4.     
  5. "init_script": "_agg[\"prd\"] = []",  
  6.     "map_script": "if(doc[\"cat2_id\"].value) {_agg.prd.add(doc[\"cat2_id\"].value.toString())}",  
  7.     "combine_script": "combined = [:]; for (tmp in _agg.prd) { if(!combined[tmp]) { combined[tmp] = 1 } }; return combined",
  8.     "reduce_script": "reduced = [:]; for (a in _aggs) {  for (entry in a) { word = entry.key; if (!reduced[word] ) { reduced[word] = entry.value; }  } }; return reduced"
  9.    
  10.   
  11. }

 

返回array

  1. "scripted_terms": { 
  2.       "scripted_metric": { 
  3.         "init_script": "_agg[\"prd\"] = []"
  4.         "map_script": "if(doc[\"cat2_id\"].value) {_agg.prd.add(doc[\"cat2_id\"].value.toString())}"
  5.         "combine_script": "combined = [:]; for (tmp in _agg.prd) { if(!combined[tmp]) { combined[tmp] = 1 } }; return combined",
  6.         "reduce_script": "reduced = []; for (a in _aggs) { for (entry in a) {  reduced.add(entry.key);  } }; return reduced"
  7.       } 
  8.     }

统计求和

  1. "agg1" : {
  2.       "scripted_metric" : {
  3.         "init_script" : {
  4.           "inline" : "_agg[\"prd\"] = []"
  5.         },
  6.         "map_script" : {
  7.           "inline" : "if(doc[\"cat2_id\"].value) {_agg.prd.add(doc[\"cat2_id\"].value.toString())}"
  8.         },
  9.         "combine_script" : {
  10.           "inline" : "combined = [:]; for (tmp in _agg.prd) { if(!combined[tmp]) { combined[tmp] = 1 } else { combined[tmp]=combined[tmp]+1 } }; return combined"
  11.         },
  12.         "reduce_script" : {
  13.           "inline" : "reduced = [:]; for (a in _aggs) {  for (entry in a) { word = entry.key; if (!reduced[word] ) { reduced[word] = entry.value; } else { reduced[word]=reduced[word]+entry.value}  } }; return reduced"
  14.         }
  15.       }
  16.     }

返回set

  1. {
  2. "query": {
  3. "bool": {
  4. "must": [{
  5. "term": {
  6. "business_code": "005"
  7. }
  8. }, {
  9. "term": {
  10. "tenant_code": "00000007"
  11. }
  12. }, {
  13. "term": {
  14. "delete_status": "0"
  15. }
  16. }, {
  17. "bool": {
  18. "should": [{
  19. "term": {
  20. "list_type": "02"
  21. }
  22. }, {
  23. "term": {
  24. "list_type": "01"
  25. }
  26. }]
  27. }
  28. }],
  29. "must_not": [],
  30. "should": []
  31. }
  32. },
  33. "from": 0,
  34. "size": 1000,
  35. "sort": [],
  36. "aggs": {
  37. "obj_value": {
  38. "terms": {
  39. "field": "obj_value",
  40. "size": 1000
  41. },
  42. "aggs": {
  43. "list_types": {
  44. "scripted_metric": {
  45. "init_script": "params._agg.list_type_set=[];",
  46. "map_script": "params._agg.list_type_set.add(doc['list_type'].value)",
  47. "reduce_script": "def list_types=new HashSet();params._aggs.forEach(item->{list_types.addAll(item.list_type_set);});return list_types"
  48. }
  49. }
  50. }
  51. }
  52. }
  53. }

返回string

  1. {
  2. "query": {
  3. "bool": {
  4. "must": [{
  5. "term": {
  6. "business_code": "005"
  7. }
  8. }, {
  9. "term": {
  10. "tenant_code": "00000007"
  11. }
  12. }, {
  13. "term": {
  14. "delete_status": "0"
  15. }
  16. }, {
  17. "bool": {
  18. "should": [{
  19. "term": {
  20. "list_type": "02"
  21. }
  22. }, {
  23. "term": {
  24. "list_type": "01"
  25. }
  26. }]
  27. }
  28. }],
  29. "must_not": [],
  30. "should": []
  31. }
  32. },
  33. "from": 0,
  34. "size": 1000,
  35. "sort": [],
  36. "aggs": {
  37. "obj_value": {
  38. "terms": {
  39. "field": "obj_value",
  40. "size": 1000
  41. },
  42. "aggs": {
  43. "list_types": {
  44. "scripted_metric": {
  45. "init_script": "params._agg.list_type_set=[];",
  46. "map_script": "params._agg.list_type_set.add(doc['list_type'].value)",
  47. "reduce_script": "def list_types=new HashSet();params._aggs.forEach(item->{list_types.addAll(item.list_type_set);});return list_types.toString()"
  48. }
  49. }
  50. }
  51. }
  52. }
  53. }

计算两个字段的乘积

每成交一笔生成一条记录,其中有两个字段,一个是单价(price),一个是成交量(amount),计算出5天内成交总额

  1. curl -XPOST 'localhost:9200/order/_search?size=0&pretty' -H 'Content-Type: application/json' -d'
  2. {
  3. "query" : {
  4. "filtered": {
  5. "query": {"match_all": {}},
  6. "filter": {
  7. "range": {
  8. "date": {
  9. "from": ...,
  10. "to": ...
  11. }
  12. }
  13. }
  14. }
  15. }
  16. },
  17. "aggs": {
  18. "total": {
  19. "scripted_metric": {
  20. "init_script" : "params._agg.transactions = []",
  21. "map_script" : "params._agg.transactions.add(doc.price.value * doc.amount.value)",
  22. "combine_script" : "double total = 0; for (t in params._agg.transactions) { total += t } return total",
  23. "reduce_script" : "double total = 0; for (a in params._aggs) { total += a } return total"
  24. }
  25. }
  26. }
  27. }
  28. '

聚合查询结果后过滤

 

  1. {
  2. "query": {
  3. "bool": {
  4. "must": [{
  5. "term": {
  6. "business_code": "005"
  7. }
  8. }, {
  9. "term": {
  10. "tenant_code": "00000007"
  11. }
  12. }, {
  13. "term": {
  14. "delete_status": "0"
  15. }
  16. }, {
  17. "bool": {
  18. "should": [{
  19. "term": {
  20. "list_type": "02"
  21. }
  22. }, {
  23. "term": {
  24. "list_type": "01"
  25. }
  26. }]
  27. }
  28. }],
  29. "must_not": [],
  30. "should": []
  31. }
  32. },
  33. "from": 0,
  34. "size": 1000,
  35. "sort": [],
  36. "post_filter": {
  37. "bool": {
  38. "must": [{
  39. "wildcard": {
  40. "list_types": "*02*"
  41. }
  42. }]
  43. }
  44. },
  45. "aggs": {
  46. "obj_value": {
  47. "terms": {
  48. "field": "obj_value",
  49. "size": 1000
  50. },
  51. "aggs": {
  52. "list_types": {
  53. "scripted_metric": {
  54. "init_script": "params._agg.list_type_set=[];",
  55. "map_script": "params._agg.list_type_set.add(doc['list_type'].value)",
  56. "reduce_script": "def list_types=new HashSet();params._aggs.forEach(item->{list_types.addAll(item.list_type_set);});return list_types.toString()"
  57. }
  58. }
  59. }
  60. }
  61. }
  62. }

 

过滤 仅 01 情况

参照:https://www.elastic.co/guide/cn/elasticsearch/guide/current/_post_filter.html

官网API引用:  

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.3/_metrics_aggregations.html#_use_aggregation_response_13

聚合分组排序分页查询

  1. {
  2. "query": {
  3. "bool": {
  4. "must": [
  5. {
  6. "term": {
  7. "business_code": "005"
  8. }
  9. },
  10. {
  11. "term": {
  12. "tenant_code": "00000007"
  13. }
  14. },
  15. {
  16. "term": {
  17. "delete_status": "0"
  18. }
  19. },
  20. {
  21. "bool": {
  22. "should": [
  23. {
  24. "term": {
  25. "list_type": "02"
  26. }
  27. },
  28. {
  29. "term": {
  30. "list_type": "01"
  31. }
  32. }
  33. ]
  34. }
  35. }
  36. ],
  37. "must_not": [],
  38. "should": []
  39. }
  40. },
  41. "from": 0,
  42. "size": 0,
  43. "sort": [
  44. {
  45. "create_time": {
  46. "order": "desc"
  47. }
  48. }
  49. ],
  50. "post_filter": {
  51. "bool": {
  52. "must": [
  53. {
  54. "term": {
  55. "list_type": "02"
  56. }
  57. }
  58. ]
  59. }
  60. },
  61. "aggs": {
  62. "obj_value": {
  63. "terms": {
  64. "field": "obj_value",
  65. "order": {
  66. "create_time_order": "desc"
  67. },
  68. "size": 1000
  69. },
  70. "aggs": {
  71. "list_type_value": {
  72. "scripted_metric": {
  73. "init_script": "params._agg.list_type_set=[];",
  74. "map_script": "params._agg.list_type_set.add(doc['list_type'].value)",
  75. "reduce_script": "def list_types=new HashSet();def ret=1; params._aggs.forEach(item->{list_types.addAll(item.list_type_set);}); if(list_types.contains('01')){ret*=2} if(list_types.contains('02')){ret*=3} return ret;"
  76. }
  77. },
  78. "list_types": {
  79. "scripted_metric": {
  80. "init_script": "params._agg.list_type_set=[];",
  81. "map_script": "params._agg.list_type_set.add(doc['list_type'].value)",
  82. "reduce_script": "def list_types=new HashSet();params._aggs.forEach(item->{list_types.addAll(item.list_type_set);});return list_types.toString()"
  83. }
  84. },
  85. "list_type_filter": {
  86. "bucket_selector": {
  87. "buckets_path": {
  88. "listTypeValue": "list_type_value.value"
  89. },
  90. "script": "params.listTypeValue % 3 == 0"
  91. }
  92. },
  93. "create_time_order": {
  94. "max": {
  95. "field": "create_time"
  96. }
  97. },
  98. "bucket_field": {
  99. "bucket_sort": {
  100. "sort": [
  101. {
  102. "create_time_order": {
  103. "order": "desc"
  104. }
  105. }
  106. ],
  107. "from": 10,
  108. "size": 10
  109. }
  110. }
  111. }
  112. }
  113. }
  114. }

特别注意:

外层排序字段应该与内层排序一致,from、size均保持一致

内层bucket size大小应大于等于 内层from*size大小

部分java关键代码

 

  1. /**
  2. * 根据参数查询
  3. *
  4. * @param queryParam 查询参数
  5. * @return 查询构建器
  6. */
  7. public static TermsAggregationBuilder queryAggregationBuilder(ObjectMultiListQueryParam queryParam) {
  8. int page = queryParam.getPageNum() == null || queryParam.getPageNum() <= 0 ? 1 : queryParam.getPageNum();
  9. int pageSize = queryParam.getPageSize() == null || queryParam.getPageSize() <= 0 ? 50
  10. : queryParam.getPageSize();
  11. pageSize = pageSize > 500 ? 500 : pageSize;
  12. int size = page * pageSize;
  13. //聚合分组 先按value
  14. TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ObjectBasicFieldCst.OBJ_VALUE)
  15. .field(ObjectBasicFieldCst.OBJ_VALUE)
  16. .size(size);
  17. StringBuilder reduceScriptSb = new StringBuilder();
  18. reduceScriptSb.append(
  19. "def list_types=new HashSet();def ret=1; params._aggs.forEach(item->{list_types.addAll(item"
  20. + ".list_type_set);}); ");
  21. if (CollectionUtils.isNotEmpty(queryParam.getIncludeListTypes())) {
  22. for (String code : queryParam.getIncludeListTypes()) {
  23. ListTypeEnum listTypeEnum = ListTypeEnum.parse(code);
  24. if (ObjectUtils.nonNull(listTypeEnum)) {
  25. reduceScriptSb.append("if(list_types.contains('").append(listTypeEnum.getCode()).append("')){ret*=")
  26. .append(listTypeEnum.getScore()).append("}");
  27. }
  28. }
  29. }
  30. reduceScriptSb.append("return ret;");
  31. // 算分
  32. Script initScript = new Script("params._agg.list_type_set=[];");
  33. Script mapScript = new Script("params._agg.list_type_set.add(doc['list_type'].value)");
  34. Script reduceScript = new Script(reduceScriptSb.toString());
  35. ScriptedMetricAggregationBuilder listTypeValueScriptedMetricAggregationBuilder = AggregationBuilders
  36. .scriptedMetric(
  37. ObjectBasicFieldCst.LIST_TYPES_VALUE).initScript(initScript).mapScript(mapScript).reduceScript(
  38. reduceScript);
  39. aggregationBuilder.subAggregation(listTypeValueScriptedMetricAggregationBuilder);
  40. Script listTypesReduceScript = new Script(
  41. "def list_types=new HashSet();params._aggs.forEach(item->{list_types.addAll(item.list_type_set);});return"
  42. + " list_types");
  43. ScriptedMetricAggregationBuilder listTypeScriptedMetricAggregationBuilder = AggregationBuilders.scriptedMetric(
  44. ObjectBasicFieldCst.LIST_TYPE_SET).initScript(initScript).mapScript(mapScript).reduceScript(
  45. listTypesReduceScript);
  46. aggregationBuilder.subAggregation(listTypeScriptedMetricAggregationBuilder);
  47. if (CollectionUtils.isNotEmpty(queryParam.getMustIncludeListTypes())) {
  48. // 声明BucketPath,用于后面的bucket筛选
  49. Map<String, String> bucketsPathsMap = new HashMap<>(1, 1);
  50. bucketsPathsMap.put("listTypesValue", "list_types_value.value");
  51. int filterScore = 1;
  52. for (String code : queryParam.getMustIncludeListTypes()) {
  53. ListTypeEnum listTypeEnum = ListTypeEnum.parse(code);
  54. if (ObjectUtils.nonNull(listTypeEnum)) {
  55. filterScore *= listTypeEnum.getScore();
  56. }
  57. }
  58. // 设置脚本
  59. Script listTypeFilterScript = new Script("params.listTypesValue % " + filterScore + " == 0");
  60. // 构建bucket选择器
  61. BucketSelectorPipelineAggregationBuilder filterBs = PipelineAggregatorBuilders.bucketSelector(
  62. "list_type_filter",
  63. bucketsPathsMap, listTypeFilterScript);
  64. aggregationBuilder.subAggregation(filterBs);
  65. }
  66. //
  67. MaxAggregationBuilder maxAggregationBuilder = AggregationBuilders.max("create_time_order").field(
  68. ObjectBasicFieldCst.CREATE_TIME);
  69. aggregationBuilder.subAggregation(maxAggregationBuilder);
  70. List<FieldSortBuilder> sorts = Lists.newArrayList();
  71. FieldSortBuilder fieldSortBuilder = new FieldSortBuilder("create_time_order");
  72. fieldSortBuilder.order(SortOrder.DESC);
  73. sorts.add(fieldSortBuilder);
  74. BucketSortPipelineAggregationBuilder bucketSortPipelineAggregationBuilder = PipelineAggregatorBuilders
  75. .bucketSort("create_time_bucket_field", sorts).from(page - 1).size(pageSize);
  76. aggregationBuilder.subAggregation(bucketSortPipelineAggregationBuilder);
  77. aggregationBuilder.executionHint("map").collectMode(Aggregator.SubAggCollectionMode.BREADTH_FIRST);
  78. return aggregationBuilder;
  79. }

post filter

  1. searchSourceBuilder.size(0);
  2. searchSourceBuilder.aggregation(termsAggregationBuilder);
  3. if (CollectionsHelper.isNotEmpty(param.getMustIncludeListTypes())) {
  4. QueryBuilder postQueryBuilder = QueryBuilders.boolQuery().must(
  5. QueryBuilders.termsQuery(ObjectBasicFieldCst.LIST_TYPE,
  6. param.getMustIncludeListTypes()));
  7. searchSourceBuilder.postFilter(postQueryBuilder);
  8. }
  9. searchRequest.source(searchSourceBuilder);

 

==============================================

bucket_selector

前提
假设我们的文档包括以下几个字段 : activityId, clientIp, orderNumber

目标
依据activityId(策略ID) + clientIp(IP地址)分组聚合, 查找相同策略ID+相同IP下订单数目超过2的聚合结果

实现

  1. {
  2. "request_body": {
  3. // 不返回具体的查询数据
  4. "size": 0,
  5. "aggs": {
  6. "group_by_activityId": {
  7. "terms": {
  8. // 多字段聚合
  9. "script": "doc['activityId'].values +'#split#'+ doc['clientIp'].values",
  10. // 设置聚合返回的最大数目
  11. "size": 2147483647
  12. },
  13. "aggs": {
  14. // 依据orderNumber去重统计数目
  15. "orderNumber_count": {
  16. "cardinality": {
  17. "field": "orderNumber"
  18. }
  19. },
  20. "orderNumber_count_filter": {
  21. "bucket_selector": {
  22. "buckets_path": {
  23. "orderNumberCount": "orderNumber_count"
  24. },
  25. // 筛选去数目>1
  26. "script": "params.orderNumberCount>1"
  27. }
  28. }
  29. }
  30. }
  31. }
  32. }
  33. }

这个聚合使用于做分桶后的过滤的,父聚合传下来的参数需要为数值型,聚合中的script需要返回一个布尔型的结果

语法

  1. {
  2. "bucket_selector": {
  3. "buckets_path": {
  4. "my_var1": "the_sum",
  5. "my_var2": "the_value_count"
  6. },
  7. "script": "params.my_var1 > params.my_var2"
  8. }
  9. }

参数

参数描述是否必填默认值
script过滤条件 
buckets_path上层聚合的变量 
gap_policy当出现间隔时候的处理方式skip

#示例 返回按月聚合后销售额大于400的结果

  1. POST /sales/_search
  2. {
  3. "size": 0,
  4. "aggs" : {
  5. "sales_per_month" : {
  6. "date_histogram" : {
  7. "field" : "date",
  8. "interval" : "month"
  9. },
  10. "aggs": {
  11. "total_sales": {
  12. "sum": {
  13. "field": "price"
  14. }
  15. },
  16. "sales_bucket_filter": {
  17. "bucket_selector": {
  18. "buckets_path": {
  19. "totalSales": "total_sales"
  20. },
  21. "script": "params.totalSales > 200"
  22. }
  23. }
  24. }
  25. }
  26. }
  27. }

每个IP登录人数超过2的IP

这个是对登录记录用户ID的去重数聚合,然后过滤。对用户ID进行去重可以使用Cardinality Aggregation聚合,然后再使用Bucket Selector Aggregation聚合过滤器过滤数据。具体内容如下: 查询语句

  1. {
  2. "aggs": {
  3. "IP": {
  4. "terms": {
  5. "field": "IP",
  6. "size": 3000,
  7. "order": {
  8. "distinct": "desc"
  9. },
  10. "min_doc_count": 5
  11. },
  12. "aggs": {
  13. "distinct": {
  14. "cardinality": {
  15. "field": "IP.keyword"
  16. }
  17. },
  18. "dd":{
  19. "bucket_selector": {
  20. "buckets_path": {"userCount":"distinct"},
  21. "script": "params.userCount > 2"
  22. }
  23. }
  24. }
  25. }
  26. },
  27. "size": 0
  28. }

桶聚合选择器: 

https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-bucket-selector-aggregation.html 

Elasticsearch多字段分组聚合, 并对分组聚合的count进行筛选 

前提
假设我们的文档包括以下几个字段 : activityId, clientIp, orderNumber

目标
依据activityId(策略ID) + clientIp(IP地址)分组聚合, 查找相同策略ID+相同IP下订单数目超过2的聚合结果

实现

  1. {
  2. "request_body": {
  3. // 不返回具体的查询数据
  4. "size": 0,
  5. "aggs": {
  6. "group_by_activityId": {
  7. "terms": {
  8. // 多字段聚合
  9. "script": "doc['activityId'].values +'#split#'+ doc['clientIp'].values",
  10. // 设置聚合返回的最大数目
  11. "size": 2147483647
  12. },
  13. "aggs": {
  14. // 依据orderNumber去重统计数目
  15. "orderNumber_count": {
  16. "cardinality": {
  17. "field": "orderNumber"
  18. }
  19. },
  20. "orderNumber_count_filter": {
  21. "bucket_selector": {
  22. "buckets_path": {
  23. "orderNumberCount": "orderNumber_count"
  24. },
  25. // 筛选去数目>1
  26. "script": "params.orderNumberCount>1"
  27. }
  28. }
  29. }
  30. }
  31. }
  32. }
  33. }

======================================

常见问题:

 1、类型异常

  1. "type": "search_phase_execution_exception",
  2. "reason": "all shards failed",
  3. "phase": "query",
  4. "grouped": true,
  5. "failed_shards": [
  6. {
  7. "shard": 0,
  8. "index": "lbs_20190410",
  9. "node": "Uj-ZStATT9y66mIBIHbpKA",
  10. "reason": {
  11. "type": "i_o_exception",
  12. "reason": "can not write type [class java.util.HashSet]"
  13. }
  14. }
  15. ]

 只支持primitive types,String,Map,Array四种类型

异常查询语句

  1. {
  2. "query": {
  3. "bool": {
  4. "must": [{
  5. "term": {
  6. "business_code": "005"
  7. }
  8. }, {
  9. "term": {
  10. "tenant_code": "00000007"
  11. }
  12. }, {
  13. "term": {
  14. "delete_status": "0"
  15. }
  16. }, {
  17. "bool": {
  18. "should": [{
  19. "term": {
  20. "list_type": "02"
  21. }
  22. }, {
  23. "term": {
  24. "list_type": "01"
  25. }
  26. }]
  27. }
  28. }],
  29. "must_not": [],
  30. "should": []
  31. }
  32. },
  33. "from": 0,
  34. "size": 1000,
  35. "sort": [],
  36. "aggs": {
  37. "obj_value": {
  38. "terms": {
  39. "field": "obj_value",
  40. "size": 1000
  41. },
  42. "aggs": {
  43. "list_types": {
  44. "scripted_metric": {
  45. "init_script": "params._agg.list_type_set=new HashSet();",
  46. "map_script": "params._agg.list_type_set.add(doc['list_type'].value)",
  47. "reduce_script": "def list_types=new HashSet();params._aggs.forEach(item->{list_types.addAll(item.list_type_set);});return list_types"
  48. }
  49. }
  50. }
  51. }
  52. }
  53. }

解决方式:

 

https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-aggregations-metrics-scripted-metric-aggregation.html#_allowed_return_types
  
Whilst any valid script object can be used within a single script, the scripts must return or store in the _agg object only the following types:

primitive types
String
Map (containing only keys and values of the types listed here)
Array (containing elements of only the types listed here)
 
params._agg.a=new HashSet(); 改为params._agg.a=[]; 

2、聚合返回仅10条(默认)

解决方式,需要在聚合中增加限制

3、elasticsearch中must和should组合查询

 

例如在a=1且b=2的数据中,找出c=1或者d=2的数据
  1. 例如在a=1且b=2的数据中,找出c=1或者d=2的数据:
  2. {"query": {
  3. "bool": {
  4.   "must": [
  5.   {"term": {"a": "1"}},
  6. {"term":{"b": "2"}}
  7.   ],
  8. "should": [
  9. {"term": {"c": "1"}},
  10.     {"term": {"d": "2"}}
  11.   ]
  12. }
  13. }
  14. }
  1. 这样写的时候should是没有用的,这是新手可能犯的错误之一。
  2. 在编写查询条件的时候,不能用口头上的逻辑进行编写,而是要换成数学逻辑才能进行执行(数据库同理)。
  3. 如上例,数学逻辑应该是 (a==1&&b==2&&c==1)||(a==1&&b==2&&d==2)(java and c语言版),这样的结构去查询。

解决

  1. 具体写法有2种:
  2. {
  3. "query": {
  4. "bool": {
  5. "should": [
  6. {
  7. "bool": {
  8. "must": [
  9. {"term": {"a": "1"}},
  10. {"term":{"b": "2"}},
  11. {"term": {"c": "1"}}
  12. ]
  13. }
  14. },
  15. {
  16. "bool": {
  17. "must": [
  18. {"term": {"a": "1"}},
  19. {"term":{"b": "2"}},
  20.   {"term": {"d": "2"}}
  21. ]
  22. }
  23. }
  24. ]
  25. }
  26. },
  27. "sort": {
  28. "time": {
  29. "order": "desc"
  30. }
  31. },
  32. "size": 100
  33. }
  34. 或者:
  35. {
  36. "query": {
  37. "bool": {
  38. "must": [
  39. {"term": {"a": "1"}},
  40. {"term":{"b": "2"}}
  41. {
  42. "bool": {
  43. "should": [
  44. {"term": {"c": "1"}},
  45.   {"term": {"d": "2"}}
  46. ]
  47. }
  48. }
  49. ]
  50. }
  51. },
  52. "sort": {
  53. "time": {
  54. "order": "desc"
  55. }
  56. },
  57. "size": 100
  58. }

JAVA API

  1. QueryBuilder query = QueryBuilders.boolQuery()
  2. .should(QueryBuilders.boolQuery()
  3. .filter(QueryBuilders.termQuery("a", 1))
  4. .filter(QueryBuilders.termQuery("b", 2))
  5. .filter(QueryBuilders.termQuery("c", 1))
  6. .should(QueryBuilders.boolQuery()
  7. .filter(QueryBuilders.termQuery("a", 1))
  8. .filter(QueryBuilders.termQuery("b", 2))
  9. .filter(QueryBuilders.termQuery("d", 2));
  10.  

============================================

其实shoule在与must或者filter同级时,默认是不需要满足should中的任何条件的,此时我们可以加上minimum_should_match 参数,来达到我们的目的,即上述代码改为:

  1. {
  2. "query": {
  3.  
  4. "bool": {   
  5. "must": [
  6.    {
  7. "match": {     
  8. "c": "3"   
  9. }
  10. }   
  11. ],
  12.  "should": [
  13.    {
  14. "match": {    
  15. "a": "1"    
  16. },
  17.     {
  18. "match": {     
  19. "b": "2"
  20.     
  21. }
  22. }
  23.   ],
  24.   "minimum_should_match": 1
  25. }
  26. }
  27. }

 

上述代码表示,必须满足must中的所有条件,并且至少满足should中的一个条件,这样,就得到了预期的结果。 

==========================================================

JAVA开发

 

Aggregation 概述

Aggregation 可以和普通查询结果并存,一个查询结果中也允许包含多个不相关的Aggregation. 如果只关心聚合结果而不关心查询结果的话会把SearchSource的size设置为0,能有效提高性能.

Aggregation 类型

  1. Metrics:
    简单聚合类型, 对于目标集和中的所有文档计算聚合指标, 一般没有嵌套的sub aggregations. 比如 平均值(avg) , 求和 (sum), 计数 (count), 基数 (cardinality). Cardinality对应distinct count

  2. Bucketing:
    桶聚合类型, 在一系列的桶而不是所有文档上计算聚合指标,每个桶表示原始结果集合中符合某种条件的子集. 一般有嵌套的sub aggregations. 典型的如TermsAggregation, HistogramAggregation

  3. Matrix:
    矩阵聚合, 多维度聚合, 即根据两个或者多个聚合维度计算二维甚至多维聚合指标表格. 目前貌似只有一种MatrixStatAggregation. 并且目前不支持脚本(scripting)

  4. Pipeline:
    管道聚合, 在之前聚合结果的基础上再次进行聚合计算, 往往和Bucketing Aggregation 结合起来使用. 举列: 先求出过去30天每天的交易总金额 (Bucketing aggregation),再统计交易总金额大于10000的天数 (Pipeline aggregation).

Aggregation 结构

Aggregation request:
两层结构:
Aggregation -> SubAggregation
Sub aggregation是在原来的Aggregation的计算结果中进一步做聚合计算

Aggregation response:
三层结构: (针对Bucketing aggregation) MultiBucketsAggregation -> Buckets -> Aggregations

Aggregation 属性:
name: 和请求中的Aggregation的名字对应
buckets: 每个Bucket对应Agggregation结果中每一个可能的取值和相应的聚合结果.

Bucket 属性:
key: 对应的是聚合维度可能的取值, 具体的值和Aggregation的类型有关, 比如Term aggregation (按交易类型计算总金额), 那么Bucket key值就是所有可能的交易类型 (credit/debit etc). 又比如DateHistogram aggregation (按天计算交易笔数), 那么Bucket key值就是具体的日期.
docCount: 对应的是每个桶中的文本数量.
value: 对应的是聚合指标的计算结果. 注意如果是多层Aggregation计算, 中间层的Aggregation value一般没有值, 比如Term aggregation. 只有到底层具体计算指标的Aggregation才有值.
aggregations: 对应请求中当前Aggregation的subAggregation的计算结果 (如果存在)

SQL映射成Aggregation

SQL映射实现的前提: 只针对聚合计算,即sql select部分存在聚合函数类型的column

映射过程很难直接描述,上几个例子方便大家理解,反正SQL的结构也无非就是SELECT/FROM/WHERE/GROUP BY/HAVING/ORDER BY. ORDER BY先不讨论,一般聚合结果不太关心顺序. FROM也很容易理解,就是索引的名字.

SQL组成部分对应的ES Builder:

Column 1Column 2Column 3
select column (聚合函数)MetricsAggregationBuilder 由 column对应聚合函数决定 (例如 MaxAggregationBuilder) 
select column (group by 字段)Bucket key 
whereFiltersAggregationBuilder + FiltersAggregator.KeydFilterkeyedFilter = FiltersAggregator.KeyedFilter("combineCondition", sub QueryBuilder) <br/> AggregationBuilders.filters("whereAggr", keyedFilter)
group byTermsAggregationBuilderAggregationBuilders.terms("aggregation name").field(fieldName)
havingMetricsAggregationBuilder 由 having 条件聚合函数决定 (例如 MaxAggregationBuilder) + BucketSelectorPipelineAggregationBuilderPipelineAggregatorBuilders.bucketSelector(aggregationName, bucketPathMap, script)

常用的SQL运算符和聚合函数对应的ES Builder:

Sql elementAggregation TypeCode to build
count(field)ValueCountAggregationBuilderAggregationBuilders.count(metricsName).field(fieldName)
count(distinct field)CardinalityAggregationBuilderAggregationBuilders.cardinality(metricsName).field(fieldName)
sum(field)SumAggregationBuilderAggregationBuilders.sum(metricsName).field(fieldName)
min(field)MinAggregationBuilderAggregationBuilders.min(metricsName).field(fieldName)
max(field)MaxAggregationBuilderAggregationBuilders.max(metricsName).field(fieldName)
avg(field)AvgAggregationBuilderAggregationBuilders.avg(metricsName).field(fieldName)
ANDBoolQueryBuilderQueryBuilders.boolQuery().must().add(sub QueryBuilder)
ORBoolQueryBuilderQueryBuilders.boolQuery().should().add(sub QueryBuilder)
NOTBoolQueryBuilderQueryBuilders.boolQuery().mustNot().add(sub QueryBuilder)
=TermQueryBuilderQueryBuilders.termQuery(fieldName, value)
INTermsQueryBuilderQueryBuilders.termsQuery(fieldName, values)
LIKEWildcardQueryBuilderQueryBuilders.wildcardQuery(fieldName, value)
>RangeQueryBuilderQueryBuilders.rangeQuery(fieldName).gt(value)
>=RangeQueryBuilderQueryBuilders.rangeQuery(fieldName).gte(value)
<RangeQueryBuilderQueryBuilders.rangeQuery(fieldName).lt(value)
<=RangeQueryBuilderQueryBuilders.rangeQuery(fieldName).lte(value)

1.select count(payerId) as payerCount from Payment group by country

技术分享图片
这里需要注意的是payerId这个doc的属性在实际构造的Aggregation query 中变成了 payerId.keyword,Elasticsearch 默认对于分词的字段(text类型)不支持聚合,会报出 "Fielddata is disabled on text fields by default. Set fielddata=true"的错误. fielddata聚合是一个非常costly的运算,一般不建议使用. 好在Elasticsearch索引时默认会对payerId这个属性生成两个字段, payerId 是分词的text类型, payerId.keyword是不分词的keyword类型.

2.select max(payerId) from Payment group by accountId, country
技术分享图片
两个group by 条件对应两层term aggregation

3.select count(distinct payerId) as payerCount from Payment where country in (‘CN‘, ‘GE‘) group by accountId, country
技术分享图片
增加了where条件, 在顶层是一个FiltersAggregationBuilder. 其中分为两部分, 其中filters对应的是所有查询条件构建的一个KeyedFilter, 其中又包含了多个子查询条件. aggregations 对应的是groupBy条件和select部分的聚合函数

4.select count(distinct payerId) as payerCount from Payment where withinTime(createAt, 1, ‘DAY‘) and name like ‘%SH%‘ group by accountId, country
技术分享图片
多个where条件, 用BoolQueryBuilder组合起来

5.select max(amount) as maxAmt, min(amount) as minAmt from Payment where amount > 1000.00 or amount <= 50.53 group by accountId, country having count(distinct beneficiaryId) > 3 and sum(amount) > 1530.20
技术分享图片
史上最复杂SQL产生! 这里主要关注having部分的处理, 用到了Pipeline类型的BucketSelectorPipelineAggregationBuilder. 在最后一个GroupBy 条件对应的term aggregation下增加了两类子节点: sub aggregations 除了包括select 部分的聚合函数还包括having条件对应的聚合函数. pipeline aggregations 包括having条件对应的 BucketSelectorPipelineAggregationBuilder. BucketSelectorPipelineAggregationBuilder 主要的属性有: bucketsPathMap: 保存了path的名字和对应的聚合属性的映射,script: 用脚本描述聚合条件,但是条件左侧不直接使用属性名而是path的名字替换
注意虽然从逻辑上来说having 条件是应用在之前计算出聚合的结果之上, 但是从ES Aggregation的结构来看, BucketSelectorPipelineAggregationBuilder和having 条件中对应聚合指标的Aggregation是兄弟关系而不是父子关系!
另外要注意script path 是对于兄弟节点(sibling node)一个相对路径而不是从根节点Aggregation的绝对路径,用的是聚合属性的名称而不是Aggregation本身的名称. 并且要求根据路径访问到的Bucket必须是唯一的,因为BucketSelector只是根据条件判断当前Bucket是否被选择, 如果路径返回多个Bucket则无法应用这种Bool判断.

6.select count(paymentId) from Payment group by timeRange(createdAt, ‘1D‘, ‘yyyy/MM/dd‘)
技术分享图片
这里用到一个自定义函数timeRage, 表示对于createAt这个属性按天聚合,对应的ES aggregation类型为DateHistogramAggregation

其他注意事项

Bucket count

Distinct count: Elasticsearch 采用的是基于hyperLogLog的近似算法.

Reference

https://www.elastic.co/guide/en/elasticsearch/reference/current/fielddata.html

======================================================================

Elasticsearch-sql groupby 聚合查询limit查询操作

在使用Elasticsearch-sql插件查询ES中,我们经常遇到多个字段group by聚合查询,例如:

  1. select /*! IGNORE_UNAVAILABLE */ SUM(errorCount) as num
  2. from ctbpm-js-data-2018-w32,ctbpm-js-data-2018-w27,ctbpm-js-data-2018-w28,
  3. ctbpm-js-data-2018-w29,ctbpm-js-data-2018-w30,ctbpm-js-data-2018-w31
  4. where appCode = '5f05acfc9a084d9f9a07e165a2516c18' and logTime>= '2018-07-07T09:57:15.436Z' and logTime<= '2018-08-07T09:57:15.436Z'
  5. group by pageRef,province,city,ip limit 100

解析后:

  1. {
  2. "from": 0,
  3. "size": 0,
  4. "query": {
  5. "bool": {
  6. "filter": [
  7. {
  8. "bool": {
  9. "must": [
  10. {
  11. "bool": {
  12. "must": [
  13. {
  14. "match_phrase": {
  15. "appCode": {
  16. "query": "5f05acfc9a084d9f9a07e165a2516c18",
  17. "slop": 0,
  18. "boost": 1
  19. }
  20. }
  21. },
  22. {
  23. "range": {
  24. "logTime": {
  25. "from": "2018-07-07T09:57:15.436Z",
  26. "to": null,
  27. "include_lower": true,
  28. "include_upper": true,
  29. "boost": 1
  30. }
  31. }
  32. },
  33. {
  34. "range": {
  35. "logTime": {
  36. "from": null,
  37. "to": "2018-08-07T09:57:15.436Z",
  38. "include_lower": true,
  39. "include_upper": true,
  40. "boost": 1
  41. }
  42. }
  43. }
  44. ],
  45. "disable_coord": false,
  46. "adjust_pure_negative": true,
  47. "boost": 1
  48. }
  49. }
  50. ],
  51. "disable_coord": false,
  52. "adjust_pure_negative": true,
  53. "boost": 1
  54. }
  55. }
  56. ],
  57. "disable_coord": false,
  58. "adjust_pure_negative": true,
  59. "boost": 1
  60. }
  61. },
  62. "_source": {
  63. "includes": [
  64. "SUM"
  65. ],
  66. "excludes": []
  67. },
  68. "aggregations": {
  69. "pageRef": {
  70. "terms": {
  71. "field": "pageRef",
  72. "size": 100,
  73. "shard_size": 2000,
  74. "min_doc_count": 1,
  75. "shard_min_doc_count": 0,
  76. "show_term_doc_count_error": false,
  77. "order": [
  78. {
  79. "_count": "desc"
  80. },
  81. {
  82. "_term": "asc"
  83. }
  84. ]
  85. },
  86. "aggregations": {
  87. "province": {
  88. "terms": {
  89. "field": "province",
  90. "size": 10,
  91. "min_doc_count": 1,
  92. "shard_min_doc_count": 0,
  93. "show_term_doc_count_error": false,
  94. "order": [
  95. {
  96. "_count": "desc"
  97. },
  98. {
  99. "_term": "asc"
  100. }
  101. ]
  102. },
  103. "aggregations": {
  104. "city": {
  105. "terms": {
  106. "field": "city",
  107. "size": 10,
  108. "min_doc_count": 1,
  109. "shard_min_doc_count": 0,
  110. "show_term_doc_count_error": false,
  111. "order": [
  112. {
  113. "_count": "desc"
  114. },
  115. {
  116. "_term": "asc"
  117. }
  118. ]
  119. },
  120. "aggregations": {
  121. "ip": {
  122. "terms": {
  123. "field": "ip",
  124. "size": 10,
  125. "min_doc_count": 1,
  126. "shard_min_doc_count": 0,
  127. "show_term_doc_count_error": false,
  128. "order": [
  129. {
  130. "_count": "desc"
  131. },
  132. {
  133. "_term": "asc"
  134. }
  135. ]
  136. },
  137. "aggregations": {
  138. "num": {
  139. "sum": {
  140. "field": "errorCount"
  141. }
  142. }
  143. }
  144. }
  145. }
  146. }
  147. }
  148. }
  149. }
  150. }
  151. }
  152. }

我们看到解析后的json看到:limit 15中的15只对group by 后面的第一个字段起作用,其他的字段size其实都是10,limit并没起作用,这就是Elasticsearch-sql针对group by存在的问题。

解决方式为:

使用terms(field='correspond_brand_name',size='10',alias='correspond_brand_name',include='\".*sport.*\"',exclude='\"water_.*\"')")

注意:这种方式不再添加limit关键词,另外还要注意group by后面字段的顺序不一样,因为数据的情况,查询结果条数不一样,但是整体是没有问题的。

  1. select /*! IGNORE_UNAVAILABLE */ SUM(errorCount) as num
  2. from ctbpm-js-data-2018-w32,ctbpm-js-data-2018-w27,ctbpm-js-data-2018-w28,
  3. ctbpm-js-data-2018-w29,ctbpm-js-data-2018-w30,ctbpm-js-data-2018-w31
  4. where appCode = '5f05acfc9a084d9f9a07e165a2516c18' and logTime>= '2018-07-07T09:57:15.436Z' and logTime<= '2018-08-07T09:57:15.436Z'
  5. group by terms(field='pageRef',size='15',alias='pageRef'),
  6. terms(field='province',size='15',alias='province'),
  7. terms(field='city',size='15',alias='city'),
  8. terms(field='ip',size='15',alias='ip')

解析后:

  1. {
  2. "from": 0,
  3. "size": 0,
  4. "query": {
  5. "bool": {
  6. "filter": [
  7. {
  8. "bool": {
  9. "must": [
  10. {
  11. "bool": {
  12. "must": [
  13. {
  14. "match_phrase": {
  15. "appCode": {
  16. "query": "5f05acfc9a084d9f9a07e165a2516c18",
  17. "slop": 0,
  18. "boost": 1
  19. }
  20. }
  21. },
  22. {
  23. "range": {
  24. "logTime": {
  25. "from": "2018-07-07T09:57:15.436Z",
  26. "to": null,
  27. "include_lower": true,
  28. "include_upper": true,
  29. "boost": 1
  30. }
  31. }
  32. },
  33. {
  34. "range": {
  35. "logTime": {
  36. "from": null,
  37. "to": "2018-08-07T09:57:15.436Z",
  38. "include_lower": true,
  39. "include_upper": true,
  40. "boost": 1
  41. }
  42. }
  43. }
  44. ],
  45. "disable_coord": false,
  46. "adjust_pure_negative": true,
  47. "boost": 1
  48. }
  49. }
  50. ],
  51. "disable_coord": false,
  52. "adjust_pure_negative": true,
  53. "boost": 1
  54. }
  55. }
  56. ],
  57. "disable_coord": false,
  58. "adjust_pure_negative": true,
  59. "boost": 1
  60. }
  61. },
  62. "_source": {
  63. "includes": [
  64. "SUM"
  65. ],
  66. "excludes": []
  67. },
  68. "aggregations": {
  69. "pageRef": {
  70. "terms": {
  71. "field": "pageRef",
  72. "size": 15,
  73. "min_doc_count": 1,
  74. "shard_min_doc_count": 0,
  75. "show_term_doc_count_error": false,
  76. "order": [
  77. {
  78. "_count": "desc"
  79. },
  80. {
  81. "_term": "asc"
  82. }
  83. ]
  84. },
  85. "aggregations": {
  86. "province": {
  87. "terms": {
  88. "field": "province",
  89. "size": 15,
  90. "min_doc_count": 1,
  91. "shard_min_doc_count": 0,
  92. "show_term_doc_count_error": false,
  93. "order": [
  94. {
  95. "_count": "desc"
  96. },
  97. {
  98. "_term": "asc"
  99. }
  100. ]
  101. },
  102. "aggregations": {
  103. "city": {
  104. "terms": {
  105. "field": "city",
  106. "size": 15,
  107. "min_doc_count": 1,
  108. "shard_min_doc_count": 0,
  109. "show_term_doc_count_error": false,
  110. "order": [
  111. {
  112. "_count": "desc"
  113. },
  114. {
  115. "_term": "asc"
  116. }
  117. ]
  118. },
  119. "aggregations": {
  120. "ip": {
  121. "terms": {
  122. "field": "ip",
  123. "size": 15,
  124. "min_doc_count": 1,
  125. "shard_min_doc_count": 0,
  126. "show_term_doc_count_error": false,
  127. "order": [
  128. {
  129. "_count": "desc"
  130. },
  131. {
  132. "_term": "asc"
  133. }
  134. ]
  135. },
  136. "aggregations": {
  137. "num": {
  138. "sum": {
  139. "field": "errorCount"
  140. }
  141. }
  142. }
  143. }
  144. }
  145. }
  146. }
  147. }
  148. }
  149. }
  150. }
  151. }

从解析后的内容看出:四个字段的size都是15了,可以使用postman查询看看,结果是正确的。

 语法来自: https://github.com/NLPchina/elasticsearch-sql中的terms用法。

补充:如果是nested(嵌套查询),比如:

  1. select /*! IGNORE_UNAVAILABLE */ SUM(errorCount) as num
  2. from ctbpm-js-data-2018-w32,ctbpm-js-data-2018-w27,ctbpm-js-data-2018-w28,ctbpm-js-data-2018-w29,ctbpm-js-data-2018-w30,ctbpm-js-data-2018-w31
  3. where appCode = '5f05acfc9a084d9f9a07e165a2516c18'
  4. and logTime>= '2018-07-08T06:20:13.144Z'
  5. and logTime<= '2018-08-08T06:20:13.144Z'
  6. group by pageRef,province,city,ip,nested(errors.message) limit 10

那么需要这么来查:

  1. select /*! IGNORE_UNAVAILABLE */ SUM(errorCount) as num
  2. from ctbpm-js-data-2018-w32,ctbpm-js-data-2018-w27,ctbpm-js-data-2018-w28,ctbpm-js-data-2018-w29,ctbpm-js-data-2018-w30,ctbpm-js-data-2018-w31
  3. where appCode = '5f05acfc9a084d9f9a07e165a2516c18'
  4. and logTime>= '2018-07-08T06:20:13.144Z'
  5. and logTime<= '2018-08-08T06:20:13.144Z'
  6. group by terms(field='pageRef',size='15',alias='pageRef'),
  7. terms(field='province',size='1',alias='province'),
  8. terms(field='city',size='2',alias='city'),
  9. terms(field='ip',size='3',alias='ip'),
  10. terms(field='errors.message',size='4',alias='errors.message',nested="errors")

========================================================

elasticsearch的先聚合和过滤、先过滤再聚合的详解

对于elasticsearch的聚合和过滤,他的结果并不会受到你写的顺序而影响。换句话说就是你无论是在聚合语句的前面写过滤条件,还是在过滤语句后面写过滤条件都不会影响他的结果。他都会先过滤再聚合和关系数据库一样先where后group by。

但是如果你想过滤条件不影响聚合(agg)结果,而只是改变hits结果;可以使用setPostFilter() 这个方法

 

eg:全部数据

代码:

  1. SearchResponse response = null;
  2. SearchRequestBuilder responsebuilder = client.prepareSearch("company")
  3. .setTypes("employee").setFrom(0).setSize(250);
  4. AggregationBuilder aggregation = AggregationBuilders
  5. .terms("agg")
  6. .field("age") ;
  7. response = responsebuilder
  8. .addAggregation(aggregation)
  9. .setExplain(true).execute().actionGet();
  10. SearchHits hits = response.getHits();
  11. Terms agg = response.getAggregations().get("agg");

结果: 仅聚合结果不过滤(注意看hits和agg里的结果)

  1. {
  2.     "took":100,
  3.     "timed_out":false,
  4.     "_shards":{
  5.         "total":5,
  6.         "successful":5,
  7.         "failed":0
  8.     },
  9.     "hits":{
  10.         "total":7,
  11.         "max_score":1,
  12.         "hits":[
  13.             {
  14.                 "_shard":1,
  15.                 "_node":"fvp3NBT5R5i6CqN3y2LU4g",
  16.                 "_index":"company",
  17.                 "_type":"employee",
  18.                 "_id":"5",
  19.                 "_score":1,
  20.                 "_source":{
  21.                     "name":"Fresh",
  22.                     "age":22
  23.                 },
  24.                 "_explanation":Object{...}
  25.             },
  26.             {
  27.                 "_shard":1,
  28.                 "_node":"fvp3NBT5R5i6CqN3y2LU4g",
  29.                 "_index":"company",
  30.                 "_type":"employee",
  31.                 "_id":"10",
  32.                 "_score":1,
  33.                 "_source":{
  34.                     "name":"Henrry",
  35.                     "age":30
  36.                 },
  37.                 "_explanation":Object{...}
  38.             },
  39.             {
  40.                 "_shard":1,
  41.                 "_node":"fvp3NBT5R5i6CqN3y2LU4g",
  42.                 "_index":"company",
  43.                 "_type":"employee",
  44.                 "_id":"9",
  45.                 "_score":1,
  46.                 "_source":{
  47.                     "address":{
  48.                         "country":"china",
  49.                         "province":"jiangsu",
  50.                         "city":"nanjing",
  51.                         "area":{
  52.                             "pos":"10001"
  53.                         }
  54.                     }
  55.                 },
  56.                 "_explanation":Object{...}
  57.             },
  58.             {
  59.                 "_shard":2,
  60.                 "_node":"fvp3NBT5R5i6CqN3y2LU4g",
  61.                 "_index":"company",
  62.                 "_type":"employee",
  63.                 "_id":"2",
  64.                 "_score":1,
  65.                 "_source":{
  66.                     "address":{
  67.                         "country":"china",
  68.                         "province":"jiangsu",
  69.                         "city":"nanjing"
  70.                     },
  71.                     "name":"jack_1",
  72.                     "age":19,
  73.                     "join_date":"2016-01-01"
  74.                 },
  75.                 "_explanation":Object{...}
  76.             },
  77.             {
  78.                 "_shard":2,
  79.                 "_node":"fvp3NBT5R5i6CqN3y2LU4g",
  80.                 "_index":"company",
  81.                 "_type":"employee",
  82.                 "_id":"4",
  83.                 "_score":1,
  84.                 "_source":{
  85.                     "name":"willam",
  86.                     "age":18
  87.                 },
  88.                 "_explanation":Object{...}
  89.             },
  90.             {
  91.                 "_shard":2,
  92.                 "_node":"fvp3NBT5R5i6CqN3y2LU4g",
  93.                 "_index":"company",
  94.                 "_type":"employee",
  95.                 "_id":"6",
  96.                 "_score":1,
  97.                 "_source":{
  98.                     "name":"Avivi",
  99.                     "age":30
  100.                 },
  101.                 "_explanation":Object{...}
  102.             },
  103.             {
  104.                 "_shard":4,
  105.                 "_node":"K7qK1ncMQUuIe0K6VSVMJA",
  106.                 "_index":"company",
  107.                 "_type":"employee",
  108.                 "_id":"3",
  109.                 "_score":1,
  110.                 "_source":{
  111.                     "address":{
  112.                         "country":"china",
  113.                         "province":"shanxi",
  114.                         "city":"xian"
  115.                     },
  116.                     "name":"marry",
  117.                     "age":35,
  118.                     "join_date":"2015-01-01"
  119.                 },
  120.                 "_explanation":Object{...}
  121.             }
  122.         ]
  123.     },
  124.     "aggregations":{
  125.         "agg":{
  126.             "doc_count_error_upper_bound":0,
  127.             "sum_other_doc_count":0,
  128.             "buckets":[
  129.                 {
  130.                     "key":30,
  131.                     "doc_count":2
  132.                 },
  133.                 {
  134.                     "key":18,
  135.                     "doc_count":1
  136.                 },
  137.                 {
  138.                     "key":19,
  139.                     "doc_count":1
  140.                 },
  141.                 {
  142.                     "key":22,
  143.                     "doc_count":1
  144.                 },
  145.                 {
  146.                     "key":35,
  147.                     "doc_count":1
  148.                 }
  149.             ]
  150.         }
  151.     }
  152. }

1、setQuery() 写在前面

代码:

  1. SearchResponse response = null;
  2. SearchRequestBuilder responsebuilder = client.prepareSearch("company")
  3. .setTypes("employee").setFrom(0).setSize(250);
  4. AggregationBuilder aggregation = AggregationBuilders
  5. .terms("agg")
  6. .field("age") ;
  7. response = responsebuilder
  8. .setQuery(QueryBuilders.rangeQuery("age").gt(30).lt(40))
  9. .addAggregation(aggregation)
  10. .setExplain(true).execute().actionGet();
  11. SearchHits hits = response.getHits();
  12. Terms agg = response.getAggregations().get("agg");

结果:

  1. {
  2.     "took":538,
  3.     "timed_out":false,
  4.     "_shards":{
  5.         "total":5,
  6.         "successful":5,
  7.         "failed":0
  8.     },
  9.     "hits":{
  10.         "total":1,
  11.         "max_score":1,
  12.         "hits":[
  13.             {
  14.                 "_shard":4,
  15.                 "_node":"anlkGjjuQ0G6DODpZgiWrQ",
  16.                 "_index":"company",
  17.                 "_type":"employee",
  18.                 "_id":"3",
  19.                 "_score":1,
  20.                 "_source":{
  21.                     "address":{
  22.                         "country":"china",
  23.                         "province":"shanxi",
  24.                         "city":"xian"
  25.                     },
  26.                     "name":"marry",
  27.                     "age":35,
  28.                     "join_date":"2015-01-01"
  29.                 },
  30.                 "_explanation":Object{...}
  31.             }
  32.         ]
  33.     },
  34.     "aggregations":{
  35.         "agg":{
  36.             "doc_count_error_upper_bound":0,
  37.             "sum_other_doc_count":0,
  38.             "buckets":[
  39.                 {
  40.                     "key":35,
  41.                     "doc_count":1
  42.                 }
  43.             ]
  44.         }
  45.     }
  46. }

 

2、setQuery() 写在后面

代码:

  1. SearchResponse response = null;
  2. SearchRequestBuilder responsebuilder = client.prepareSearch("company")
  3. .setTypes("employee").setFrom(0).setSize(250);
  4. AggregationBuilder aggregation = AggregationBuilders
  5. .terms("agg")
  6. .field("age") ;
  7. response = responsebuilder
  8. .addAggregation(aggregation)
  9. .setQuery(QueryBuilders.rangeQuery("age").gt(30).lt(40)
  10. .setExplain(true).execute().actionGet();
  11. SearchHits hits = response.getHits();
  12. Terms agg = response.getAggregations().get("agg");

结果:

  1.   
  2. {
  3.     "took":538,
  4.     "timed_out":false,
  5.     "_shards":{
  6.         "total":5,
  7.         "successful":5,
  8.         "failed":0
  9.     },
  10.     "hits":{
  11.         "total":1,
  12.         "max_score":1,
  13.         "hits":[
  14.             {
  15.                 "_shard":4,
  16.                 "_node":"anlkGjjuQ0G6DODpZgiWrQ",
  17.                 "_index":"company",
  18.                 "_type":"employee",
  19.                 "_id":"3",
  20.                 "_score":1,
  21.                 "_source":{
  22.                     "address":{
  23.                         "country":"china",
  24.                         "province":"shanxi",
  25.                         "city":"xian"
  26.                     },
  27.                     "name":"marry",
  28.                     "age":35,
  29.                     "join_date":"2015-01-01"
  30.                 },
  31.                 "_explanation":Object{...}
  32.             }
  33.         ]
  34.     },
  35.     "aggregations":{
  36.         "agg":{
  37.             "doc_count_error_upper_bound":0,
  38.             "sum_other_doc_count":0,
  39.             "buckets":[
  40.                 {
  41.                     "key":35,
  42.                     "doc_count":1
  43.                 }
  44.             ]
  45.         }
  46.     }
  47. }

3、setPostFilter() 在聚合.aggAggregation()方法后

代码:

  1. SearchResponse response = null;
  2. SearchRequestBuilder responsebuilder = client.prepareSearch("company")
  3. .setTypes("employee").setFrom(0).setSize(250);
  4. AggregationBuilder aggregation = AggregationBuilders
  5. .terms("agg")
  6. .field("age") ;
  7. response = responsebuilder
  8. .addAggregation(aggregation)
  9. .setPostFilter(QueryBuilders.rangeQuery("age").gt(30).lt(40))
  10. .setExplain(true).execute().actionGet();
  11. SearchHits hits = response.getHits();
  12. Terms agg = response.getAggregations().get("agg");

结果:

  1. {
  2. "took":7,
  3. "timed_out":false,
  4. "_shards":{
  5. "total":5,
  6. "successful":5,
  7. "failed":0
  8. },
  9. "hits":{
  10. "total":1,
  11. "max_score":1,
  12. "hits":[
  13. {
  14. "_shard":4,
  15. "_node":"fvp3NBT5R5i6CqN3y2LU4g",
  16. "_index":"company",
  17. "_type":"employee",
  18. "_id":"3",
  19. "_score":1,
  20. "_source":{
  21. "address":{
  22. "country":"china",
  23. "province":"shanxi",
  24. "city":"xian"
  25. },
  26. "name":"marry",
  27. "age":35,
  28. "join_date":"2015-01-01"
  29. },
  30. "_explanation":Object{...}
  31. }
  32. ]
  33. },
  34. "aggregations":{
  35. "agg":{
  36. "doc_count_error_upper_bound":0,
  37. "sum_other_doc_count":0,
  38. "buckets":[
  39. {
  40. "key":30,
  41. "doc_count":2
  42. },
  43. {
  44. "key":18,
  45. "doc_count":1
  46. },
  47. {
  48. "key":19,
  49. "doc_count":1
  50. },
  51. {
  52. "key":22,
  53. "doc_count":1
  54. },
  55. {
  56. "key":35,
  57. "doc_count":1
  58. }
  59. ]
  60. }
  61. }
  62. }

4、setPostFilter() 在聚合.aggAggregation()方法前

代码:

  1. SearchResponse response = null;
  2. SearchRequestBuilder responsebuilder = client.prepareSearch("company")
  3. .setTypes("employee").setFrom(0).setSize(250);
  4. AggregationBuilder aggregation = AggregationBuilders
  5. .terms("agg")
  6. .field("age") ;
  7. response = responsebuilder
  8. .setPostFilter(QueryBuilders.rangeQuery("age").gt(30).lt(40))
  9. .addAggregation(aggregation)
  10. .setExplain(true).execute().actionGet();
  11. SearchHits hits = response.getHits();
  12. Terms agg = response.getAggregations().get("agg");

结果:

  1. {
  2. "took":5115,
  3. "timed_out":false,
  4. "_shards":{
  5. "total":5,
  6. "successful":5,
  7. "failed":0
  8. },
  9. "hits":{
  10. "total":1,
  11. "max_score":1,
  12. "hits":[
  13. {
  14. "_shard":4,
  15. "_node":"b8cNIO5cQr2MmsnsuluoNQ",
  16. "_index":"company",
  17. "_type":"employee",
  18. "_id":"3",
  19. "_score":1,
  20. "_source":{
  21. "address":{
  22. "country":"china",
  23. "province":"shanxi",
  24. "city":"xian"
  25. },
  26. "name":"marry",
  27. "age":35,
  28. "join_date":"2015-01-01"
  29. },
  30. "_explanation":Object{...}
  31. }
  32. ]
  33. },
  34. "aggregations":{
  35. "agg":{
  36. "doc_count_error_upper_bound":0,
  37. "sum_other_doc_count":0,
  38. "buckets":[
  39. {
  40. "key":30,
  41. "doc_count":2
  42. },
  43. {
  44. "key":18,
  45. "doc_count":1
  46. },
  47. {
  48. "key":19,
  49. "doc_count":1
  50. },
  51. {
  52. "key":22,
  53. "doc_count":1
  54. },
  55. {
  56. "key":35,
  57. "doc_count":1
  58. }
  59. ]
  60. }
  61. }
  62. }

总结:

(补充说明:setQuery()会查询后进行打分, 而setPostFilter()查询会不打分,只是判断查询结果是否满足过滤条件, 满足的话返回吗即处理"是与不是"的问题)

可以从运行的结果很好的看出无论是setPostFilter()还是setQuery(),它放在那的顺序并不会影响他的结果。更可以看出setQuery()这个方法的过滤条件不仅会影响它的hits的结果还会影响他的聚合(agg)结果。然而对于setPostFilter()这个方法,它只会影响hits的结果,并不会影响它的聚合(agg)结果。

PS:

springboot中elasticSearch查询   https://blog.csdn.net/Topdandan/article/details/81436141

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/68149
推荐阅读
相关标签
  

闽ICP备14008679号