赞
踩
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields
es也可以使用sql进行查询,将es结构对应关系数据库进行sql的查询即可如下:
http://127.0.0.1:9200/_sql?sql=select count(*) from abc_index*
仅用于验证es查询结果,因为sql的执行在内存中进行,非常耗时。
Elasticsearch的交互,可以使用JAVA API,也可以直接使用es提供的REST API 。
curl -X <HTTP Verb> /<Index>/<Type>/<ID>
运行elasticsearch-5.1.1、kibana-5.1.1。
首先添加依赖;
yml文件的配置如下:
#es的默认名称,如果安装es时没有做特殊的操作名字都是此名称
spring.data.elasticsearch.cluster-name=elasticsearch
# Elasticsearch 集群节点服务地址,用逗号分隔,如果没有指定其他就启动一个客户端节点,默认java访问端口9300
spring.data.elasticsearch.cluster-nodes=localhost:9300
# 设置连接超时时间
spring.data.elasticsearch.properties.transport.tcp.connect_timeout=120s
#查看es集群状态 结果分析->(1) curl -X GET "localhost:9200/_cat/health?v" #查看节点的详细信息 结果分析->(9) curl -X GET "localhost:9200/_cat/nodes?v" #查看集群健康信息 结果分析->(2) curl localhost:9200/_cluster/health?pretty #显示集群系统信息,包括CPU JVM、节点、分片等等 curl -XGET localhost:9200/_cluster/stats?pretty=true #获取集群堆积的任务 curl -XGET localhost:9200/_cluster/pending_tasks?pretty=true #查询所有索引,pretty:格式化 curl -XGET 'localhost:9200/_cat/indices?v&pretty' #查询第一个索引的未分配原因。结果分析->(3) curl localhost:9200/_cluster/allocation/explain #查询指定索引索引名未分配的原因。结果分析->(4) curl -XGET localhost:9200/_cluster/allocation/explain?pretty -d' { "index": "skyeye-dns-2018.12.21", "shard": 1, "primary": true } ## 创建index:customer pretty参数:表示请求响应的JSON格式化之后打印出来,方便开发者阅读。student为type curl -X PUT "localhost:9200/customer?pretty" curl -X PUT "localhost:9200/customer/student/1" -d '{"name":"jack","age":30,"info":"Ilove you"}' ## 删除index:index02 curl -X DELETE http://192.168.168.101:9200/index02 ## 查询具体索引:bank,bank2内容 结果分析->(6) curl -X GET "localhost:9200/bank,bank2/_search?q=*&sort=account_number:asc&pretty&ignore_unavailable=true" ## 打开/关闭index curl -XPOST http://192.168.168.101:9200/index01/_close curl -XPOST http://192.168.168.101:9200/index01/_open ## 创建/全量更新Document 不存在则创建,存在则覆盖。 结果分析->(7) curl -X PUT "localhost:9200/customer/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "name": "John Doe" }' ## 查询Document curl -X GET "localhost:9200/customer/_doc/1?pretty" ## 更新Document POST /ecommerce/product/1/_update { "doc": { "name": "jiaqiangban gaolujie yagao" } } POST /_bulk { "update": { "_index": "ecommerce", "_type": "product", "_id": "4","retry_on_conflict" : 3 }} { "doc" : {"test_field" : "test update"} } ## 删除Document curl -X DELETE "localhost:9200/customer?pretty" POST /_bulk { "delete": { "_index": "ecommerce", "_type": "product", "_id": "4"}} ## 查询返回最近10条 curl -XPOST 'localhost:9200/logstash-zhifubao-2018.09.18/_search?pretty' -d '{"query": { "match_all": {} },"from": 10,"size": 10}' ## 查询索引状态 curl -XGET http://localhost:9200/logstash-zhifubao-2018.08.15/_status #根据business_no查询,多个条件用逗号拼接 curl -XGET http://localhost:9200/logstash-zhifubao-2018.08.15/_search?q=message:0ea1b2df-caa4-457c-8cc1-294f5e9284c7 #根据business_no查询,只返回特定字段 curl -XGET http://localhost:9200/logstash-zhifubao-2018.08.15/_search?q=message:0ea1b2df-caa4-457c-8cc1-294f5e9284c7?_source=message #分页 curl -XGET http://localhost:9200/shb01/stu/_search?size=2&from=0 #更新部分字段 crul –XPUT http:localhost:9200/shb01/student/1/_update?version=1 –d ‘{“doc”:{“name”:”updatename”} #根据id删除 curl -XDELETE http://localhost:9200/shb01/student/AVad05EExskBS1Rg2tdq #删除所有的索引库中名称为tom的文档 curl -XDELETE http://localhost:9200/_all/_query?q=name:tom
curl -X GET "localhost:9200/_cat/health?v"
get为访问方式,localhost:9200/_cat/health?v
为访问链接。
从这个响应中,我们可以看到集群的名称,状态,节点数,分片数等等,其中:
curl localhost:9200/_cluster/health?pretty { "cluster_name" : "es", # 集群名称 "status" : "green", # 集群状态,green代表健康,yellow表示部分副分片未分配,red表示部分索引的主副分片未分配。 "timed_out" : false, # "number_of_nodes" : 3, # 在线的节点数 "number_of_data_nodes" : 3, # 在线的数据节点数 "active_primary_shards" : 549, # 活跃的主分片数量 "active_shards" : 1098, # 活跃的分片数量,包括主分片和副本 "relocating_shards" : 2, # 正在移动的分片数量。 "initializing_shards" : 0, # 正在初始化的分片的数量。 "unassigned_shards" : 0, # 未分配的分片数量 "delayed_unassigned_shards" : 0, # 由于节点离线导致延迟分配的分片数量。 "number_of_pending_tasks" : 0, # "number_of_in_flight_fetch" : 0, # "task_max_waiting_in_queue_millis" : 0, # "active_shards_percent_as_number" : 100.0 #所有活跃分片/打开的所有索引的分片总数。 }
Elasticsearch 5.0发布了 _cluster / allocation / explain,帮助诊断未分配分片的原因。
#查询第一个索引的未分配原因。
curl localhost:9200/_cluster/allocation/explain
#查询指定索引索引名未分配的原因。 curl -XGET localhost:9200/_cluster/allocation/explain?pretty -d' { "index": "skyeye-dns-2018.12.21", "shard": 1, "primary": true } 返回值: assigned 分片是已分配还是未分配 shard_state_fetch_pending 是否仍在获取有关分片的信息 unassigned_info-reason 分片最初未分配的原因 allocation_delay_in_millis 分片可分配之前的已配置延迟 remaining_delay_in_millis 可以分配分片之前的剩余延迟 nodes-es.…….com.1-node_attributes 节点具有用户添加的属性 nodes-es.…….com.1-store-shard_copy 该节点的分片副本信息和错误(如果适用) nodes-es.…….com.1-final_decision、final_explanation 分片是否可以分配给该节点的最终决定和解释 nodes-es.…….com.1-weight 分配器想要向该节点分配分片的权重 nodes-es.…….com.1-decisions 节点决策列表,这些决策将成为有关分片的最终决策
根据final_explanation进行问题排查:
一般是文件系统上对应分片的文件被清理了,这个是会丢数据的,但是丢的只是这一个分片的数据。
当有分片处于red状态时,会影响到这个索引的数据写入,手动分配空分片等于在通知集群可以重新给这个索引的该分片分配一个新的空分片,然后该索引就可以正常进行数据写入了。
分配一个空的主分片:
curl -XPOST 'localhost:9200/_cluster/reroute?pretty' -H 'Content-Type: application/json' -d' { "commands": [ { "allocate_empty_primary": { "index": "original_a_20200329", "shard": 1, "node": "es.a.com.1", "accept_data_loss": true } }, { "allocate_empty_primary": { "index": "original_a_20200329", "shard": 1, "node": "es.a.com.0", "accept_data_loss": true } } ] }
如果red索引比较多,那么一个个分片这样处理比较麻烦,用脚本进行批处理。
可以手动将该主分片分配到es.IP上:
curl -XPOST 'localhost:9200/_cluster/reroute?pretty' -H 'Content-Type: application/json' -d'
{
"commands": [
{
"allocate_stale_primary": {
"index": "a-2020.05.21",
"shard": 1,
"node": "es.a.3",
"accept_data_loss": true
}
}
]
}
du -sh /data*/es
如果出现磁盘写满的情况,ES集群很可能会变成yellow,red,可能是删除历史索引的程序出问题。
查询bank,bank2:
curl -X GET "localhost:9200/bank,bank2/_search?q=*&sort=account_number:asc&pretty&ignore_unavailable=true"
多索引查询:
test1,test2,test3
表示法_all
表示所有索引test*或 *test或 te*t或 *test*
等test*,-test3
参数 | 说明 | 默认值 | 举例 |
---|---|---|---|
ignore_unavailable | 当指定多个索引时,如果有索引不可用(不存在或者已经关闭)那么是否忽略该索引 | false | |
allow_no_indices | 允许通配符匹配索引 | true | curl -X GET "localhost:9200/bank3*/_search?q=*&pretty&allow_no_indices=false" ,若bank3不存在,则报错。 |
expand_wildcards | 查询索引的范围 | open表示查询所有匹配并open的索引,closed则表示查询所有匹配的索引 | |
pretty | 响应的JSON将被格式化 | true | |
format=yaml | 请求以更可读的yaml格式响应 | ||
human=true | 以人类可读的格式来返回数据 | true则返回{"exists_time":"1h"} ,否则,返回:{"exists_time_in_millis":3600000} |
索引参数相关:
URL | 说明 |
---|---|
/index/_search | 不解释 |
/_aliases | 获取或操作索引的别名 |
/index/type/ | 创建或操作类型 |
/index/_mapping | 创建或操作mapping |
/index/_settings | 创建或操作设置(number_of_shards是不可更改的) |
/index/_open | 打开被关闭的索引 |
/index/_close | 关闭索引 |
/index/_refresh | 刷新索引(使新加内容对搜索可见) |
/index/_flush | 刷新索引,将变动提交到lucene索引文件中,并清空elasticsearch的transaction log |
/index/_optimize | 优化segement,个人认为主要是对segement进行合并 |
/index/_status | 获得索引的状态信息 |
/index/_segments | 获得索引的segments的状态信息 |
/index/_explain | 不执行实际搜索,而返回解释信息 |
/index/_analyze | 不执行实际搜索,根据输入的参数进行文本分析 |
/index/type/id | 操作指定文档,不解释 |
/index/type/id/_create | 创建一个文档,如果该文件已经存在,则返回失败 |
/index/type/id/_update | 更新一个文件,如果改文件不存在,则返回失败 |
不存在则创建,存在则覆盖。
curl -X PUT "localhost:9200/customer/_doc/1?pretty" -H 'Content-Type: application/json' -d'
{
"name": "John Doe"
}'
未指定id时,系统会自动创建id。
POST /_bulk
{ "index": { "_index": "ecommerce", "_type":"product"}}
{ "name": "test yagao", "desc": "youxiao fangzhu"}
强制创建文档create
POST /_bulk
{ "create": { "_index": "ecommerce", "_type": "product", "_id": "4" }}
{ "test_field": "test12" }
curl -X DELETE "localhost:9200/customer?pretty"
在删除一个document之后,我们可以从侧面证明,它不是立即物理删除的,因为它的一些版本号等信息还是保留的。
POST /_bulk
{ "delete": { "_index": "ecommerce", "_type": "product", "_id": "4"}}
由于删除只需要被删除文档的ID,所以并没有对应的源文档。
bulk API按顺序执行这些操作。如果其中一个操作因为某些原因失败了,它将会继续处理后面的操作。当bulk API返回时,它将提供每个操作的状态(按照同样的顺序),所以开发者能够看到每个操作成功与否。
URL | 说明 |
---|---|
/_nodes/process | 看file descriptor 这个信息 |
/_nodes/process/stats | 统计信息(内存、CPU能) |
/_nodes/jvm | 获得各节点的虚拟机统计和配置信息 |
/_nodes/jvm/stats | 更详细的虚拟机信息 |
/_nodes/http | 获得各个节点的http信息(如ip地址) |
/_nodes/http/stats | 获得各个节点处理http请求的统计情况 |
/_nodes/thread_pool | 获得各种类型的线程池(elasticsearch分别对不同的操作提供不同的线程池)的配置信息 |
/_nodes/thread_pool/stats | 获得各种类型的线程池的统计信息 |
以上这些操作和可以通过如
/_nodes/${nodeId}/jvm/stats
/_nodes/${nodeip}/jvm/stats
/_nodes/${nodeattribute}/jvm/stats
的形式针对指定节点的操作。
ClusterHealthResponse response = client.admin().cluster()
.prepareHealth("library")
.execute().actionGet();
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
Settings settings = Settings.builder()
.put("cluster.name", "elasticsearch").build();
TransportClient client == new PreBuiltTransportClient(settings). addTransportAddress(new TransportAddress(InetAddress.getByName("XXX.XXX.XX.XX"), 9300));
java api:queryBuilder = QueryBuilders.matchAllQuery().boost(11f).normsField("title");
boost参数被用来增加一个子句的相对权重(当boost大于1时),或者减小相对权重(当boost介于0到1时),但是增加或者减小不是线性的。换言之,boost设为2并不会让最终的_score加倍。
相反,新的_score会在适用了boost后被归一化(Normalized)。每种查询都有自己的归一化算法(Normalization Algorithm)。但是能够说一个高的boost值会产生一个高的_score。
如果你在实现你自己的不基于TF/IDF的相关度分值模型并且你需要对提升过程拥有更多的控制,你可以使用function_score查询,它不通过归一化步骤对文档的boost进行操作。
在es查询之前,判断index是否存在,以免抛出异常。
private Client client; private void init() throws Exception{ Settings settings = Settings.settingsBuilder().put("cluster.name", "log-test") .build(); client = TransportClient.builder().settings(settings) .addPlugin(DeleteByQueryPlugin.class) .build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); } public boolean indexExists(String index){ IndicesExistsRequest request = new IndicesExistsRequest(index); IndicesExistsResponse response = client.admin().indices().exists(request).actionGet(); if (response.isExists()) { return true; } return false; }
场景:用于精确查询一个字段。
curl -XGET http://192.168.168.101:9200/index01/_search?pretty -H 'Content-Type: application/json' -d '
{
"query":
{"term":
{"title":"你好"}
}
}'
#通过浏览器sql检索:
http://ip:9200/indexName/_search?q=title:你好
term查找时直接对关键词进行查找。
java api:
SearchResponse response = client.prepareSearch("index1", "index2")
.setTypes("type1", "type2")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("multi", "test")) //term Query
.setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // range query、Filter query
.setFrom(0).setSize(60).setExplain(true)
.get();
场景:用于精确查询多个字段。
{
'query':{
'terms':{
'tag':["search",'nosql','hello'] //json中必须包含数组。
}
}
}
i> Preparing a query 准备查询请求
SearchResponse response = client.prepareSearch("library")
.addFields("title", "_source")
.execute().actionGet();
for(SearchHit hit: response.getHits().getHits()) {
System.out.println(hit.getId());
if (hit.getFields().containsKey("title")) {
System.out.println("field.title: "+ hit.getFields().get("title").getValue());
}
System.out.println("source.title: " + hit.getSource().get("title"));
}
ii> Building queries 构造查询
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("sid_s", text));
Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex("webpage").build();
SearchResult result = client.execute(search);
场景:用于当前文档的全文模糊查找。
match在匹配时会对所查找的关键词进行分词,然后按分词匹配查找。
GET /ecommerce/product/_search {'query':{'match':{'title':'你好'}}} { "query": { "match": { "__type": "info" } }, "sort": [ { "campaign_end_time": { "order": "desc" } } ] }
匹配查询:
queryBuilder = QueryBuilders
.matchQuery("message", "a quick brown fox")
.operator(Operator.AND)
.zeroTermsQuery(ZeroTermsQuery.ALL);
场景:查询指定索引下的所有文档。
{'query':{'match_all':{'title':'标题一样'}}}
场景:多值匹配查询。
{ "query": { "multi_match": { "query": "运动 上衣", "fields": [ "brandName^100", "brandName.brandName_pinyin^100", "brandName.brandName_keyword^100", "sortName^80", "sortName.sortName_pinyin^80", "productName^60", "productKeyword^20" ], "type": <multi-match-type>, "operator": "AND" } } }
场景:精确查找所有字段。
match(全文检索)会将输入的搜索串拆解开来,去倒排索引里面去一一匹配,只要能匹配上任意一个拆解后的单词,就可以作为结果返回。
match_phrase要求输入的搜索串,必须在指定的字段文本中,完全包含一模一样的,才可以算匹配,才能作为结果返回。
GET /ecommerce/product/_search
{
"query" : {
"match_phrase" : {
"producer" : "yagao producer"
}
}
}
场景:多条件查询。
bool查询包含四个子句,must,filter,should,must_not。
{ 'query':{ 'bool':{ 'must':[{ 'term':{ '_type':{ 'value':'age' } } },{ 'term':{ 'account_grade':{ 'value':'23' } } } ] } } } { "bool":{ "must":{ "term":{"user":"lucy"} }, "filter":{ "term":{"tag":"teach"} }, "should":[ {"term":{"tag":"wow"}}, {"term":{"tag":"elasticsearch"}} ], "mininum_should_match":1, "boost":1.0 } }
“minimum_should_match”: 1,表示最小匹配度,可以设置为百分百,详情看源文档Elasticsearch Reference [6.4] » Query DSL » Minimum Should Match,设置了这个值的时候就必须满足should里面的设置了,另外注意这边should里面同一字段设置的多个值(意思是当这个值等于X或者等于Y的时候都成立,务必注意格式)。
match来指定查询条件;
filter执行速度高于查询,原因如下:
QueryBuilder filterBuilder = QueryBuilders
.filteredQuery(
QueryBuilders.existsQuery("title").queryName("exist"),
QueryBuilders.termQuery("title", "elastic")
);
SearchResponse response = client.prepareSearch("library")
.setPostFilter(filterBuilder)
.execute().actionGet();
{
'query':{
'range':{
'age':{
'gte':'30',
'lte':'20'
}
}
}
}
java api 见 term query例子。
java api-精确匹配:termQuery,相当于=
java api-范围匹配:rangeQuery,相当于SQL between and
GET /ecommerce/product/_search
{
"query": { "match_all": {} },
"_source": ["name", "price"]
"size": 1,
"from": 10
}
size表示返回的文档个数为1,默认为10。
from表示从第10个开始查询,主要用于分页查询。
_source表示自定义返回字段。默认会返回文档的所有字段。
分页查询:
SearchResponse response = client.prepareSearch("library")
.setQuery(QueryBuilders.matchAllQuery())
.setFrom(10) //跳过前10个文档
.setSize(20) //获取20个文档
.execute().actionGet();
response.getHits().totalHits()//可以统计当前匹配到的结果数
优点:能够大大减少网络的请求次数,缩减网络开销。
自定义设置index、type以及document id:(id为1的没有查到(found为false))
GET /_mget
{
"docs" : [
{
"_index" : "ecommerce",
"_type" : "product",
"_id" : 1
},
{
"_index" : "ecommerce",
"_type" : "product",
"_id" : 2
}
]
}
在对应的index、type下进行批量查询:(注意:在ElasticSearch6.0以后一个index下只能有一个type,否则会报错)
GET /ecommerce/product/_mget { "ids": [2, 3] } 或者: GET /ecommerce/product/_mget { "docs" : [ { "_id" : 2 }, { "_id" : 3 } ] }
{
'query':{
'wildcard':{
'title':'cr?me'
}
}
}
{
'query':{
'regex':{
'title':{
'value':'cr.m[ae]',
'boost':10.0
}
}
}
}
场景:前缀查询。
{
'query':{
'match_phrase_prefix':{
'title':{
'query':'crime punish',
'slop':1
}
}
}
}
{
'query':{
'query_string':{
'query':'title:crime^10 +title:punishment -otitle:cat +author:(+Fyodor +dostoevsky)'
}
}
}
降序排序desc,升序asc。
GET /ecommerce/product/_search
{
"query" : {
"match" : {
"name" : "yagao"
}
},
"sort": [
{ "price": "desc" }
]
}
java api:
searchSourceBuilder.fetchSource(null, "content").sort("_score");
searchSourceBuilder.sort("date", SortOrder.DESC);
SortBuilders.scriptSort(script, type) //使用脚本来实现排序
SortBuilders.geoDistanceSort(fieldName) //根据空间距离来进行排序
searchRequestBuilder.addSort("publish_time", SortOrder.DESC);
按照某个字段排序的话,hit.getScore()将会失效。
bucket和metric:
group by缺点:
terms根据字段值项分组聚合。field按什么字段分组,size指定返回多少个分组,shard_size指定每个分片上返回多少个分组,order排序方式。可以指定include和exclude正则筛选表达式的值,指定missing设置缺省值。
【terms】 java api见【max/min/avg/sum/stats】中的例子。
计算每个tag下的商品数量:
GET /ecommerce/product/_search
{
"size": 0, //size=0,表示只获取聚合结果,而不要执行聚合的原始数据。
"aggs": { //aggs:固定语法,要对一份数据执行分组聚合操作
"all_tags": { //all_tags:自定义对每个aggs取名。
"terms": { "field": "tags" } //terms根据字段的值进行分组;field:根据指定的字段的值进行分组将文本
}
}
}
返回结果:
{ "took": 53, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 2, "max_score": 0, "hits": [] }, "aggregations": { "all_tags": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "fangzhu", "doc_count": 2 }, { "key": "meibai", "doc_count": 2 } ] } } }
hits.hits:我们指定了size是0,所以hits.hits就是空的,否则会把执行聚合的那些原始数据给你返回回来
aggregations:聚合结果
all_tags:我们指定的某个聚合的名称
buckets:根据我们指定的field划分出的buckets
key:每个bucket对应的分组字段的值
doc_count:这个bucket分组内,有多少个数据
默认的排序规则:按照doc_count降序排序
stats:bucket,terms,自动就会有一个doc_count,就相当于是数量。
avg:avg aggs,求平均值
max:求一个bucket内,指定field值最大的那个数据
min:求一个bucket内,指定field值最小的那个数据
sum:求一个bucket内,指定field值的总和先分组,再算每组的平均值
GET /ecommerce/product/_search
{
"size": 0,
"aggs" : {
"group_by_tags" : {
"terms" : { "field" : "tags" },
"aggs" : {
"avg_price": { "avg": { "field": "price" } },
"min_price" : { "min": { "field": "price"} },
"max_price" : { "max": { "field": "price"} },
"sum_price" : { "sum": { "field": "price" } }
}
}
}
{
"aggs":{
"avg_fees":{
"avg":{
"field":"fees"
}
}
}
}
聚合操作主要是调用了SearchRequestBuilder的addAggregation方法,通常是传入一个TermsBuilder。
多字段上的聚合操作需要用到子聚合(subAggregation),子聚合调用TermsBuilder的subAggregation方法,可以添加的子聚合有TermsBuilder、SumBuilder、AvgBuilder、MaxBuilder、MinBuilder等常见的聚合操作。
从实现上来讲,SearchRequestBuilder在内部保持了一个私有的 SearchSourceBuilder实例, SearchSourceBuilder内部包含一个List,每次调用addAggregation时会调用 SearchSourceBuilder实例,添加一个AggregationBuilder。
同样的,TermsBuilder也在内部保持了一个List,调用addAggregation方法(来自父类addAggregation)时会添加一个AggregationBuilder。
例如要计算每个球队年龄最大/最小/总/平均的球员年龄,如果使用SQL语句,应表达如下:
select team, max(age) as max_age from player group by team;
ES的java api:
TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");
MaxBuilder ageAgg= AggregationBuilders.max("max_age").field("age");
sbuilder.addAggregation(teamAgg.subAggregation(ageAgg));
SearchResponse response = sbuilder.execute().actionGet();
例如要计算每个球队球员的平均年龄,同时又要计算总年薪,如果使用SQL语句,应表达如下:
select team, avg(age)as avg_age, sum(salary) as total_salary from player group by team;
ES的java api:
TermsBuilder teamAgg= AggregationBuilders.terms("team");
AvgBuilder ageAgg= AggregationBuilders.avg("avg_age").field("age");
SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary");
sbuilder.addAggregation(teamAgg.subAggregation(ageAgg).subAggregation(salaryAgg));
SearchResponse response = sbuilder.execute().actionGet();
public void stats(){
SearchResponse response = client.prepareSearch(indexName).setTypes(typeName)
.addAggregation(AggregationBuilders.stats("ageAgg").field("age"))
.get();
Stats ageAgg = response.getAggregations().get("ageAgg");
System.out.println("总数:"+ageAgg.getCount());
System.out.println("最小值:"+ageAgg.getMin());
System.out.println("最大值:"+ageAgg.getMax());
System.out.println("平均值:"+ageAgg.getAvg());
System.out.println("和:"+ageAgg.getSum());
}
例如要计算每个球队每个位置的球员数,如果使用SQL语句,应表达如下:
select team, position, count(*) as pos_count from player group by team, position;
ES的java api:
TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");
TermsBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
sbuilder.addAggregation(teamAgg.subAggregation(posAgg));
SearchResponse response = sbuilder.execute().actionGet();
例如要计算每个球队的球员数,如果使用SQL语句,应表达如下:
select team, count(*) as player_count from player group by team;
TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");
sbuilder.addAggregation(teamAgg);
SearchResponse response = sbuilder.execute().actionGet();
CountResponse response = client.prepareCount("library")
.setQuery(QueryBuilders.termQuery("title", "elastic"))
.execute().actionGet();
例如要计算每个球队总年薪,并按照总年薪倒序排列,如果使用SQL语句,应表达如下:
select team, sum(salary) as total_salary from player group by team order by total_salary desc;
ES的java api:
TermsBuilder teamAgg= AggregationBuilders.terms("team").order(Order.aggregation("total_salary ", false);
SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary");
sbuilder.addAggregation(teamAgg.subAggregation(salaryAgg));
SearchResponse response = sbuilder.execute().actionGet();
需要特别注意的是,排序是在TermAggregation处执行的,Order.aggregation函数的第一个参数是aggregation的名字,第二个参数是boolean型,true表示正序,false表示倒序。
默认情况下,search执行后,仅返回10条聚合结果,如果想反悔更多的结果,需要在构建TermsBuilder 时指定size:
TermsBuilder teamAgg= AggregationBuilders.terms("team").size(15);
得到response后:
Map<String, Aggregation> aggMap = response.getAggregations().asMap(); StringTerms teamAgg= (StringTerms) aggMap.get("keywordAgg"); Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator(); while (teamBucketIt .hasNext()) { Bucket buck = teamBucketIt .next(); //分桶 //球队名 String team = buck.getKey(); //记录数 long count = buck.getDocCount(); //得到所有子聚合 Map subaggmap = buck.getAggregations().asMap(); //avg值获取方法 double avg_age= ((InternalAvg) subaggmap.get("avg_age")).getValue(); //sum值获取方法 double total_salary = ((InternalSum) subaggmap.get("total_salary")).getValue(); //... //max/min以此类推 }
i> 作用
Top Hits聚合主要用于桶聚合后查询分组后的其它数据。
比如对于下表,通过max(time)group by ip
进行分组后,我们还想知道每一组数据hostname等其它字段内容,则需要使用Top Hits,再每个bucket中查询对应的数据,具体代码如下:
score | ip | hostname | time |
---|
TermsAggregationBuilder depIpGroup = AggregationBuilders.terms("group_by_ip").field("dip").size(10000);//一次最多拿到10000条数据,要拿到更多的数据参考后文scroll的相关讲解 TopHitsAggregationBuilder detail = AggregationBuilders.topHits("detail").size(1);//用于拿到分组以外的其它详情数据。size来确定数量,默认返回3条数据。sort用于组内排序。 MaxAggregationBuilder maxTime = AggregationBuilders.max("max_time").field("time"); SearchRequestBuilder searchRequestBuilder = client .prepareSearch(indexExistsList.toArray(new String[indexNameList.size()]))//通过变长数组查询多个index .setTypes(indexType).addAggregation(depIpGroup.subAggregation(maxTime).subAggregation(detail)); SearchResponse searchResponse = searchRequestBuilder .execute() .actionGet(); Terms ipTerms = searchResponse.getAggregations().get("group_by_ip"); for (Terms.Bucket bucket : ipTerms.getBuckets()) {//分桶 ListEntity listEntity = new ListEntity(); listEntity.setIpAddress(bucket.getKey().toString()); TopHits topHits = bucket.getAggregations().get("detail"); SearchHit hit = topHits.getHits().getHits()[0];//????没看懂原理,先这样用吧。返回的是一个id不同、其它数据相同的数组,由size决定长度。 listEntity.setHostname(hit.getSource().get("hostname").toString()); listEntity.setScore(Integer.parseInt(hit.getSource().get("score").toString())); dataList.add(listEntity); }
{
"size": 0,
"aggs": {
"count_type": {
"cardinality": {
"field": "__type"
}
}
}
}
cardinality
percentiles对指定字段(脚本)的值按从小到大累计每个值对应的文档数的占比(占所有命中文档数的百分比),返回指定占比比例对应的值。默认返回[ 1, 5, 25, 50, 75, 95, 99 ]分位上的值。
{ "size": 0, "aggs": { "age_percents":{ "percentiles": { "field": "age", "percents": [ 1, 5, 25, 50, 75, 95, 99 ] } } } } { "size": 0, "aggs": { "states": { "terms": { "field": "gender" }, "aggs": { "banlances": { "percentile_ranks": { "field": "balance", "values": [ 20000, 40000 ] } } } } }
统计小于等于指定值的文档比。
{
"size": 0,
"aggs": {
"tests": {
"percentile_ranks": {
"field": "age",
"values": [
10,
15
]
}
}
}
}
场景:对不同的bucket下的aggs,进行filter。
filter对满足过滤查询的文档进行聚合计算,在查询命中的文档中选取过滤条件的文档进行聚合,先过滤在聚合。
如果放query里面的filter,是全局的,会对所有的数据都有影响。
但是,如果,比如说,你要统计,长虹电视,最近1个月的平均值; 最近3个月的平均值; 最近6个月的平均值,用bucket filter。
{ "size": 0, "aggs": { "agg_filter":{ "filter": { "match":{"gender":"F"} }, "aggs": { "avgs": { "avg": { "field": "age" } } } } } }
多个过滤组聚合计算。
{ "size": 0, "aggs": { "message": { "filters": { "filters": { "errors": { "exists": { "field": "__type" } }, "warring":{ "term": { "__type": "info" } } } } } } }
{ "aggs": { "agg_range": { "range": { "field": "cost", "ranges": [ { "from": 50, "to": 70 }, { "from": 100 } ] }, "aggs": { "bmax": { "max": { "field": "cost" } } } } } }
{ "aggs": { "date_aggrs": { "date_range": { "field": "accepted_time", "format": "MM-yyy", "ranges": [ { "from": "now-10d/d", "to": "now" } ] } } } }
按天、月、年等进行聚合统计。可按 year (1y), quarter (1q), month (1M), week (1w), day (1d), hour (1h), minute (1m), second (1s) 间隔聚合或指定的时间间隔聚合。
{
"aggs": {
"sales_over_time": {
"date_histogram": {
"field": "accepted_time",
"interval": "quarter",
"min_doc_count" : 0, //可以返回没有数据的月份
"extended_bounds" : { //强制返回数据的范围
"min" : "2014-01-01",
"max" : "2014-12-31"
}
}
}
}
}
{
"aggs": {
"account_missing": {
"missing": {
"field": "__type"
}
}
}
}
将所有数据纳入聚合的scope,而不管之前的query。
aggregation,scope,一个聚合操作,必须在query的搜索结果范围内执行
出来两个结果,一个结果,是基于query搜索结果来聚合的; 一个结果,是对所有数据执行聚合的。
GET /tvs/sales/_search { "size": 0, "query": { "term": { "brand": { "value": "长虹" } } }, "aggs": { "single_brand_avg_price": { "avg": { "field": "price" } }, "all": { "global": {}, "aggs": { "all_brand_avg_price": { "avg": { "field": "price" } } } } } }
返回结果:
{ "took": 4, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 3, "max_score": 0, "hits": [] }, "aggregations": { "all": { "doc_count": 8, "all_brand_avg_price": { "value": 2650 } }, "single_brand_avg_price": { "value": 1666.6666666666667 } } }
single_brand_avg_price:就是针对query搜索结果,执行的,拿到的,就是长虹品牌的平均价格
all.all_brand_avg_price:拿到所有品牌的平均价格
top_hits 获取前几个doc_(即分组内前几个doc_,由size指定,默认为3个)
source 返回指定field(主要用于group by之后不能查看其它字段详情)。
GET /ecommerce/product/_search { "size": 0, "aggs" : { "group_by_tags" : { "terms" : { "field" : "tags" }, "aggs" : { "top_tags": { "top_hits": { "_source": { "include": "name" }, "size": 1 } } } } } }
collect_mode 子聚合计算
直接进行子聚合的计算
计算每个tag下的商品的平均价格,并且按照平均价格降序排序:
"order": { "avg_price": "desc" }
GET /ecommerce/product/_search
{
"size": 0,
"aggs" : {
"all_tags" : {
"terms" : { "field" : "tags", "collect_mode" : "breadth_first", "order": { "avg_price": "desc" } },
"aggs" : {
"avg_price" : {
"avg" : { "field" : "price" }
}
}
}
}
}
vii> breadth_first
先计算出当前聚合的结果,针对这个结果在对子聚合进行计算。
"ranges": [{},{}]
按照指定的价格范围区间进行分组,然后在每组内再按照tag进行分组,最后再计算每组的平均价格:
GET /ecommerce/product/_search { "size": 0, "aggs": { "group_by_price": { "range": { "field": "price", "ranges": [ { "from": 0, "to": 20 }, { "from": 20, "to": 40 }, { "from": 40, "to": 50 } ] }, "aggs": { "group_by_tags": { "terms": { "field": "tags" }, "aggs": { "average_price": { "avg": { "field": "price" } } } } } } } }
类似于terms,也是进行bucket分组操作,接收一个field,按照这个field的值的各个范围区间,进行bucket分组操作
interval:10,划分范围,010,1020,20~30
GET /ecommerce/product/_search { "size" : 0, "aggs":{ "price":{ "histogram":{ "field": "price", "interval": 10 }, "aggs":{ "revenue": { "sum": { "field" : "price" } } } } } }
按照我们指定的某个date类型的日期field,以及日期interval,按照一定的日期间隔,去划分bucket
date interval = 1m,
2017-01-01~2017-01-31,就是一个bucket
2017-02-01~2017-02-28,就是一个bucket
然后会去扫描每个数据的date field,判断date落在哪个bucket中,就将其放入那个bucket
min_doc_count:即使某个日期interval,2017-01-01~2017-01-31中,一条数据都没有,那么这个区间也是要返回的,不然默认是会过滤掉这个区间的
extended_bounds,min,max:划分bucket的时候,会限定在这个起始日期,和截止日期内
GET /tvs/sales/_search { "size" : 0, "aggs": { "sales": { "date_histogram": { "field": "sold_date", "interval": "month", "format": "yyyy-MM-dd", "min_doc_count" : 0, "extended_bounds" : { "min" : "2016-01-01", "max" : "2017-12-31" } } } } }
String scrollId = ""; QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(queryBuilder).size(10000); SearchRequest request = Requests.searchRequest(indexName); request.scroll("1s"); request.source(sourceBuilder); SearchResponse response = client.search(request).actionGet(); severityCount = deelHits(severityCount, response.getHits()); scrollId = response.getScrollId(); while (true) { SearchScrollRequestBuilder searchScrollRequestBuilder = client.prepareSearchScroll(scrollId); searchScrollRequestBuilder.setScroll("1s"); // 请求 SearchResponse response1 = searchScrollRequestBuilder.get(); SearchHits hits = response1.getHits(); if (hits.getHits().length == 0) { break; }else { severityCount = deelHits(severityCount, hits); //hit.getSource().get("detail").toString()读取数据 //下一批处理 scrollId = response1.getScrollId(); } }
public class Scroll { public static void main(String[] args) { try{ long startTime = System.currentTimeMillis(); /*创建客户端*/ //client startup //设置集群名称 Settings settings = Settings.builder() .put("cluster.name", "elsearch") .put("client.transport.sniff", true) .build(); //创建client TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("54.223.232.95"),9300)); List<String> result = new ArrayList<>(); String scrollId = ""; //第一次请求 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //TODO: 设置查询条件 RangeQueryBuilder rangequerybuilder = QueryBuilders .rangeQuery("inputtime") .from("2016-12-14 02:00:00").to("2016-12-14 07:59:59"); sourceBuilder.query(QueryBuilders.boolQuery() .must(QueryBuilders .matchPhraseQuery("pointid","W3.UNIT1.10HFC01CT013")) .must(rangequerybuilder)) .size(100)//如果开启游标,则滚动获取 .sort("inputtime", SortOrder.ASC); //查询 SearchRequest request = Requests.searchRequest("pointdata"); request.scroll("2m"); request.source(sourceBuilder); SearchResponse response = client.search(request).actionGet(); //TODO:处理数据 SearchHits hits = response.getHits(); for(int i = 0; i < hits.getHits().length; i++) { //System.out.println(hits.getHits()[i].getSourceAsString()); result.add(hits.getHits()[i].getSourceAsString()); } //记录滚动ID scrollId = response.getScrollId(); while(true){ //后续的请求 //scrollId = query.getScollId(); SearchScrollRequestBuilder searchScrollRequestBuilder = client .prepareSearchScroll(scrollId); // 重新设定滚动时间 //TimeValue timeValue = new TimeValue(30000); searchScrollRequestBuilder.setScroll("2m"); // 请求 SearchResponse response1 = searchScrollRequestBuilder.get(); //TODO:处理数据 SearchHits hits2 = response1.getHits(); if(hits2.getHits().length == 0){ break; } for(int i = 0; i < hits2.getHits().length; i++) { result.add(hits2.getHits()[i].getSourceAsString()); } //下一批处理 scrollId = response1.getScrollId(); } System.out.println(result.size()); long endTime = System.currentTimeMillis(); System.out.println("Java程序运行时间:" + (endTime - startTime) + "ms"); }catch(Exception e){ e.printStackTrace(); } }
scroll搜索会在第一次搜索的时候,保存一个当时的视图快照,之后只会基于该旧的视图快照提供数据搜索,如果这个期间数据变更,是不会让用户看到的;
采用基于_doc(不使用_score)进行排序的方式,性能较高
每次发送scroll请求,我们还需要指定一个scroll参数,指定一个时间窗口,每次搜索请求只要在这个事件窗口内能完成就可以了
# sort默认是相关度排序("sort":[{"FIELD":{"order":"desc"}}]),不按_score排序,按_doc排序 # size设置的是这批查三条 # 第一次查询会生成快照 GET /lib3/user/_search?scroll=1m #这一批查询在一分钟内完成 { "query":{ "match":{} }, "sort":[ "_doc" ], "size":3 } # 第二次查询通过第一次的快照ID来查询,后面以此类推 GET /_search/scroll { "scroll":"1m", "scroll_id":"DnF1ZXJ5VGhIbkXIdGNoAwAAAAAAAAAdFkEwRENOVTdnUUJPWVZUd1p2WE5hV2cAAAAAAAAAHhZBMERDTIU3Z1FCT1|WVHdadIhOYVdnAAAAAAAAAB8WQTBEQ05VN2dRQk9ZVIR3WnZYTmFXZw==" }
原理上是对某次查询生成一个游标 scroll_id , 后续的查询只需要根据这个游标去取数据,直到结果集中返回的 hits 字段为空,就表示遍历结束。
注意:scroll_id 的生成可以理解为建立了一个临时的历史快照,在此之后的增删改查等操作不会影响到这个快照的结果。
使用 curl 进行分页读取过程如下:
GET /product/info/_search?scroll=2m { "query":{ "match_all":{ } }, "sort":["_doc"] } # 返回结果 { "_scroll_id": "DnF1ZXJ5VGhIbkXIdGNoAwAAAAAAAAAdFkEwRENOVTdnUUJPWVZUd1p2WE5hV2cAAAAAAAAAHhZBMERDTIU3Z1FCT1|WVHdadIhOYVdnAAAAAAAAAB8WQTBEQ05VN2dRQk9ZVIR3WnZYTmFXZw==", "took": 1, "timed_out": false, "_shards": { "total": 1, "successful": 1, "failed": 0 }, "hits":{...} }
GET /product/info/_search?scroll=DnF1ZXJ5VGhIbkXIdGNoAwAAAAAAAAAdFkEwRENOVTdnUUJPWVZUd1p2WE5hV2cAAAAAAAAAHhZBMERDTIU3Z1FCT1|WVHdadIhOYVdnAAAAAAAAAB8WQTBEQ05VN2dRQk9ZVIR3WnZYTmFXZw== { "query":{ "match_all":{ } }, "sort":["_doc"] } # 返回结果 { "_scroll_id": "DnF1ZXJ5VGhIbkXIdGNoAwAAAAAAAAAdFkEwRENOVTdnUUJPWVZUd1p2WE5hV2cAAAAAAAAAHhZBMERDTIU3Z1FCT1|WVHdadIhOYVdnAAAAAAAAAB8WQTBEQ05VN2dRQk9ZVIR3WnZYTmFXZw==", "took": 106, "_shards": { "total": 1, "successful": 1, "failed": 0 }, "hits": { "total": 22424, "max_score": 1.0, "hits": [{ "_index": "product", "_type": "info", "_id": "did-519392_pdid-2010", "_score": 1.0, "_routing": "519392", "_source": { .... } } ] } }
# 删掉指定的多个 srcoll_id DELETE /_search/scroll -d { "scroll_id":[ "cXVlcnlBbmRGZXRjaDsxOzg3OTA4NDpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7" ] } # 删除掉所有索引上的 scroll_id DELETE /_search/scroll/_all # 查询当前所有的scroll 状态 GET /_nodes/stats/indices/_search?pretty # 返回结果 { "cluster_name" : "200.200.107.232", "nodes" : { "SC4fYi0CT5mIp274ZgH_fg" : { "timestamp" : 1514346295736, "name" : "200.200.107.232", "transport_address" : "200.200.107.232:9300", "host" : "200.200.107.232", "ip" : [ "200.200.107.232:9300", "NONE" ], "indices" : { "search" : { "open_contexts" : 0, "query_total" : 975758, "query_time_in_millis" : 329850, "query_current" : 0, "fetch_total" : 217069, "fetch_time_in_millis" : 84699, "fetch_current" : 0, "scroll_total" : 5348, "scroll_time_in_millis" : 92712468, "scroll_current" : 0 } } } } }
search_after 是 ES5.0 及之后版本提供的新特性,search_after 有点类似 scroll,但是和 scroll 又不一样,它提供一个活动的游标,通过上一次查询最后一条数据来进行下一次查询。
search_after 分页的方式和 scroll 有一些显著的区别,首先它是根据上一页的最后一条数据来确定下一页的位置,同时在分页请求的过程中,如果有索引数据的增删改查,这些变更也会实时的反映到游标上。
GET /order/info/_search { "size": 10, "query": { "match_all" : { } }, "sort": [ {"date": "asc"} ] } # 返回结果 { "_index": "zmrecall", "_type": "recall", "_id": "60310505115909", "_score": null, "_source": { ... "date": 1545037514 }, "sort": [ 1545037514 ] }
curl -XGET 127.0.0.1:9200/order/info/_search
{
"size": 10,
"query": {
"match_all" : {
}
},
"search_after": [1463538857], # 这个值与上次查询最后一条数据的sort值一致,支持多个
"sort": [
{"date": "asc"}
]
}
注意:
如果 search_after 中的关键字为654,那么654323的文档也会被搜索到,所以在选择 search_after 的排序字段时需要谨慎,可以使用比如文档的id或者时间戳等。
search_after 适用于深度分页+ 排序,因为每一页的数据依赖于上一页最后一条数据,所以无法跳页请求。
返回的始终是最新的数据,在分页过程中数据的位置可能会有变更。这种分页方式更加符合 moa 的业务场景。
获取所有数据:
GET /_search
接口访问链接:127.0.0.1:9200/_search
返回数据含义:
返回参数 | 说明 | 备注 |
---|---|---|
found | 表示查询的数据是否存在 | |
took | 耗费时间(毫秒)。 | |
timed_out | 是否超时 | 默认无timeout |
_source | 表示查询到的数据 | |
_shards | shards fail的条件(primary和replica全部挂掉),不影响其他shard | 默认情况下来说,一个搜索请求,会打到一个index的所有primary shard上去,当然了,每个primary shard都可能会有一个或多个replic shard,所以请求也可以到primary shard的其中一个replica shard上去 |
_shards.total | 表示应执行索引操作的分片(主分片和副本分片)的数量 | |
_shards.successful | 表示索引操作成功的分片数 | |
_shards.failed | 返回一个数组,这个数组是在副本分片上索引操作失败的情况下相关错误的数组 | 如果没有失败的分片,failed将会为0。 |
hits.total | 本次搜索返回了几条结果 | |
hits.max_score | score的含义,就是document对于一个search的相关度的匹配分数,越相关,就越匹配,分数也高 | |
hits.hits | 包含了匹配搜索的document的详细数据,默认查询前10条数据,按_score降序排序 | 在java api中,可以通过client.setSize设置返回数量。 |
hits.hits._index | 索引名,对应sql的库 | |
hits.hits._type | 类型,对应sql的表 | |
hits.hits._id | 搜索的id | |
hits.hits._score | 描述搜索结果的匹配度,得分越高,文档匹配度越高,得分越低,文档的匹配度越低。 | |
hits.hits._source | 搜索到的具体数据 | |
hits.hits._source.fields | 搜索到的具体字段 | 在java api中通过hit.getSource().get(“field_name”)获取 |
可以通过设置timeout这个值,来定时返回已经搜索到的数据。timeout机制,指定每个shard,就只能在timeout时间范围内,将搜索到的部分数据(也可能是搜索到的全部数据),直接返回给client,而不是等到所有数据全部搜索出来后再返回。可以通过如下方式进行设置:
timeout=10ms,timeout=1s,timeout=1m
GET /_search?timeout=10m
Elasticsearch也提供了相关操作的批处理功能,这些批处理功能通过使用_bulk API实现。通过批处理可以非常高效的完成多个文档的操作,同时可以减少不必要的网络请求。
bulk语法:
注意点:
bulk api奇特的json格式
目前处理流程:
换成良好json格式的处理流程:
可以将json数据(可以在http://www.json-generator.com/网站上自动生成)放到当前用户目录下,然后执行如下命令,将数据导入到Elasticsearch中,如下:
curl -H "Content-Type: application/json" -XPOST "localhost:9200/bank/_doc/_bulk?pretty&refresh" --data-binary "@accounts.json"
ElasticSearch教程——汇总篇
elasticsearch文档Delete By Query API(二)
Elasticsearch-基础介绍及索引原理分析
ElasticSearch教程——Java常用操作
ElasticSearch AggregationBuilders java api常用聚合查询
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。