赞
踩
Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,**它可以快速地储存、搜索和分析海量数据。**作为 Elastic Stack 的核心,Elasticsearch 会集中存储您的数据,让您飞快完成搜索,微调相关性,进行强大的分析,并轻松缩放规模。
elasticsearch官网:Elasticsearch:官方分布式搜索和分析引擎 | Elastic
官方文档地址:Elasticsearch Guide | Elastic
index(索引)
动词,相当于mysql的insert
名词,相当于mysql的Database
Type(类型)
在index(索引)中,可以定义一个或多个类型。类似于MySQL当中的Table,每一种类型的数据放在一起。
保存在某个索引(Index) 下,某种类型(Type) 的一个数据(Document) ,文档是JSON格式的,Document就像 是MySQL中的某个Table里面的内容;
将整句分拆为单词
报错的记录
红海行动
探索红海行动
红海特别行动
红海记录篇
特工红海特别探索
检索
相关性得分
词 | 记录 |
---|---|
红海 | 1,2,3,4,5 |
行动 | 1,2,3 |
探索 | 2,5 |
特别 | 3,5 |
记录篇 | 4 |
特工 | 5 |
docker pull elasticsearch:7.4.2 //存储和检索数据
docker pull kibana:7.4.2 //可视化检索数据
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
// 任何远程机器都能访问es
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml
chmod -R 777 /mydata/elasticsearch/ 改变文件权限
docker run -d -p 9200:9200 -p 9300:9300 \
--restart=always \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
--name elasticsearch elasticsearch:7.4.2
命令解释:
-e “discovery.type=single-node”:使es单节点运行
-e ES_JAVA_OPTS=“-Xms64m -Xmx512m”:设置es占用的内存
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --zone=public --add-port=9300/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --zone=public --query-port=9200/tcp
docker run -d -p 5601:5601 \
--restart=always \
-e ELASTICSEARCH_HOSTS=http://192.168.26.160:9200 \
--name kibana \
kibana:7.4.2
firewall-cmd --zone=public --add-port=5601/tcp --permanent
systemctl restart firewalld.service
GET /_cat/nodes
:查看所有节点
GET /_cat/health
:查看 es 健康状况
GET /_cat/master
:查看主节点
GET /_cat/indices
:查看所有索引
相当于Mysql数据库的show databases
PUT 和 POST 都可以, POST 新增。如果不指定 id,会自动生成 id。指定 id 就会修改这个数据,并新增版本号 。
PUT 可以新增可以修改。PUT 必须指定 id;由于 PUT 需要指定 id,我们一般都用来做修改操作,不指定 id 会报错。
保存一个数据,保存在哪个索引的哪个类型下,指定用哪个唯一标识 PUT customer/external/1;
在 customer 索引下的 external 类型下保存 1 号数据为
POST /customer/external/1
保存的数据
{
"name": "zhangsan"
}
如果当前索引不存在就会自动创建:
- POST /customer/external/1/_update
{
"doc": {
"name": "lisi"
}
}
此种方式会查询数据,如果相同不允许修改
- POST /customer/external/1/
{
"name": "zhangsan"
}
- PUT /customer/external/1/
{
"name": "zhangsan"
}
GET /索引/类型/数据id
结果:
{
"_index": "customer",//在哪个索引
"_type": "external",//在哪个类型
"_id": "1", //记录 id
"_version": 6,//版本号
"_seq_no": 5,//并发控制字段,每次更新就会+1,用来做乐观锁
"_primary_term": 1,//同上,主分片重新分配,如重启,就会变化
"found": true, //查询是否成功
"_source": { //真正的内容
"name": "zhangsan"
}
}
DELETE /customer/external/1
POST /customer/external/_bulk
{"index":{"_id":"1"}}
{"name":"John Doe"}
{"index":{"_id":"2"}}
{"name":"Jane Doe"}
语法格式:
{ action: { metadata }}\n
{ request body }\n
{ action: { metadata }}\n
{ request body }\n
切换到kibana的Dev Tools
指定命令
对应GET的查询结果,其中有一个字段_sep_no
,其为了是进行并发控制,当修改数据后,对应的_sep_no
版本就会更改。
实现方式:
更新携带 ?if_seq_no=*&if_primary_term=1
再次指定_sep_no
为相同值将不会生效
POST /bank/account/_bulk
测试数据地址:https://gitee.com/xu-huaiang/codes/nbqcg1dsfh6vutk4o8mxy65
ES 支持两种基本方式检索 :
一个是通过使用 REST request URI 发送搜索参数(uri+检索参数)
另一个是通过使用 REST request body 来发送它们(uri+请求体)
一切检索从
_search
开始
检索 bank 下所有信息,包括 type 和 docs
请求参数方式检索(按照account_number
升序排列)
GET bank/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"account_number": "asc"
},
{
"balance": "desc"
}
]
}
Elasticsearch 提供了一个可以执行查询的 Json 风格的 DSL(domain-specific language 领域特定语言)。这个被称为 Query DSL。该查询语言非常全面,并且刚开始的时候感觉有点复杂。
{
QUERY_NAME: {
ARGUMENT: VALUE,
ARGUMENT: VALUE,...
}
}
{
QUERY_NAME(查询条件): {
FIELD_NAME(字段名): {
ARGUMENT(参数名): VALUE(参数值),
ARGUMENT: VALUE,...
}
}
}
例子:
GET /bank/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"balance": {
"order": "desc"
}
}
],
"from": 0,
"size": 3
}
查询结果:
可以采用_source
字段规定查询的参数名
GET /bank/_search { "query": { "match_all": {} }, "sort": [ { "balance": { "order": "desc" } } ], "_source": ["firstname","balance"], "from": 0, "size": 3 }
GET /bank/_search
{
"query": {
"match": {
"account_number": "1"
}
}
}
即精确匹配account_number
为1的数据
倒排索引
排序keyword
进行精确匹配,并且区分大小写GET /bank/_search
{
"query": {
"match": {
"address.keyword": "198 Mill Lane"
}
}
}
将需要匹配的值当成一个整体单词(不分词)进行检索
GET /bank/_search
{
"query": {
"match_phrase": {
"address": "mill lane"
}
}
}
对指定的多个字段进行关键字配置
GET /bank/_search
{
"query": {
"multi_match": {
"query": "mill movico",
"fields": ["address","city"]
}
}
}
bool 用来做复合查询:
复合语句可以合并任何其它查询语句,包括复合语句,了解这一点是很重要的。这就意味着,复合语句之间可以互相嵌套,可以表达非常复杂的逻辑。
GET /bank/_search { "query": { "bool": { "must": [ //必须满足的条件 { "match": { //参数匹配 "gender": "F" } }, { "match": { //参数匹配 "address": "mill" } } ], "must_not": [ //必须不满足的 { "match": {//参数匹配 "age": "38" } } ], "should": [ //应该满足的 { "match": { //参数匹配 "lastname": "Long" } } ] } } }
filter
与之前的must
不同的是,filter
不会计算相关性得分
并不是所有的查询都需要产生分数,特别是那些仅用于 “filtering”(过滤)的文档。为了不计算分数 Elasticsearch 会自动检查场景并且优化查询的执行。
测试:查询年龄在10到20之间的数据
GET /bank/_search { "query": { "bool": { "must": [ { "range": { "age": { "gte": 10, "lte": 20 } } } ] } } }
不使用filter,会有相关项得分
使用filter,相关性得分为0
和 match 一样。匹配某个属性的值。全文检索字段用 match,其他非 text(文本)字段匹配用 term
GET /bank/_search
{
"query": {
"term": {
"age": {
"value": "22"
}
}
}
}
**聚合提供了从数据中分组和提取数据的能力。**最简单的聚合方法大致等于 SQL GROUP BY 和 SQL 聚合函数。
在 Elasticsearch 中,您有执行搜索返回 hits(命中结果),并且同时返回聚合结果,把一个响应中的所有 hits(命中结果)分隔开的能力。这是非常强大且有效的, 您可以执行查询和多个聚合,并且在一次使用中得到各自的(任何一个的)返回结果,使用 一次简洁和简化的 API 来避免网络往返。
执行聚合文档地址:[Aggregations | Elasticsearch Guide 7.5] | Elastic
搜索 address 中包含 mill 的所有人的年龄分布以及平均年龄,但不显示这些人的详情。
GET /bank/_search { "query": { "match": { "address": "mill" } }, "aggs": { "ageData": { "terms": { //terms执行聚合age参数,并查询前5条数据 "field": "age", "size": 5 } }, "ageAvg": { //平均年龄 "avg": { "field": "age" } } }, "size": 0 //不显示搜索结果 }
按年龄进行分组,并且计算出组内的平均薪资
在聚合当中再使用聚合进行查询
GET /bank/_search { "query": { "match_all": {} }, "aggs": { "aggGroup": { "terms": { "field": "age", "size": 1000 }, "aggs": { "ageAvg": { "avg": { "field": "balance" } } } } }, "size": 0 }
按照年龄段进行分组,并求出该年龄段的平均信息,和该年龄段的男女数和各男女的平均薪资
GET /bank/_search { "query": { "match_all": {} }, "aggs": { "aggGroup": { "terms": { "field": "age", "size": 1000 }, "aggs": { "ageBalanceaAvg": { "avg": { "field": "balance" } }, "genderGroup": { "terms": { "field": "gender.keyword", "size": 10 }, "aggs": { "balanceAvg": { "avg": { "field": "balance" } } } } } } }, "size": 0 }
**Mapping 是用来定义一个文档(document),以及它所包含的属性(field)是如何存储和索引的。**当添加数据时,他会自动处理属性类型。
比如,使用 mapping 来定义:
查看mapping信息:
GET /bank/_mapping
如果存储数据之前,存储的数据类型并不是我们想要的,也可以修改索引下参数的Mapping规则
PUT /my_index
{
"mappings": {
"properties": {
"age":{"type": "integer"},
"email":{"type": "keyword"},
"name":{"type": "text"}
}
}
}
PUT /my_index/_mapping
{
"properties": {
"address": {
"type": "keyword"
}
}
}
对于已经存在的映射字段,我们不能更新。更新必须创建新的索引进行数据迁移
新建映射规则
首先查询原先的数据Mapping规则:
GET /bank/_mapping
PUT /newbank { "mappings": { "properties": { "account_number": { "type": "long" }, "address": { "type": "text" }, "age": { "type": "integer" }, "balance": { "type": "long" }, "city": { "type": "keyword" }, "email": { "type": "keyword" }, "employer": { "type": "keyword" }, "firstname": { "type": "text" }, "gender": { "type": "keyword" }, "lastname": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "state": { "type": "keyword", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } }
POST _reindex
{
"source": {
"index": "bank",
"type": "account"
},
"dest": {
"index": "newbank"
}
}
GET /newbank/_search
一个 tokenizer(分词器)接收一个字符流,将之分割为独立的 tokens(词元,通常是独立的单词),然后输出 tokens 流。
例如,whitespace tokenizer 遇到空白字符时分割文本。它会将文本 “Quick brown fox!” 分割 为 [Quick, brown, fox!]。 该 tokenizer(分词器)还负责记录各个 term(词条)的顺序或 position 位置(用于 phrase 短 语和 word proximity 词近邻查询),以及 term(词条)所代表的原始 word(单词)的 start (起始)和 end(结束)的 character offsets(字符偏移量)(用于高亮显示搜索的内容)。
Elasticsearch 提供了很多内置的分词器,可以用来构建 custom analyzers(自定义分词器)。
如下是使用标准分词器:
POST _analyze
{
"analyzer": "standard",
"text": "Hello,My name is Xuhuaiang"
}
分词器是对于英文的,对于中文就不太友好,所以就可以安装ik分词器
进入到elasticsearch
的挂载目录plugins
下,创建文件夹,并将ik分词器文件放在此目录下并解压到指定目录:
unzip -d ik/ elasticsearch-analysis-ik-7.4.2.zip
重启docker容器,并测试ik分词器:
POST _analyze
{
"analyzer": "ik_smart",
"text": "我是中国人"
}
POST _analyze
{
"analyzer": "ik_max_word",
"text": "我是中国人"
}
docker pull nginx:1.18.0
# 1.运行容器
docker run -p 80:80 --name nginx -d nginx:1.18.0
# 2.将容器内的配置文件拷贝到当前/mydata中:
docker container cp nginx:/etc/nginx .
# 3.将文件nginx修改为conf
mv nginx conf
# 4.创建文件夹nginx
mkdir nginx
# 5.将conf目录拷贝到nginx目录
cp -r conf nginx/
# 6.删除conf目录
rm -rf conf
# 3.停止并删除容器
docker stop nginx && docker rm nginx
docker run -d -p 80:80 \
--restart=always \
-v /mydata/nginx/html:/usr/share/nginx/html \
-v /mydata/nginx/logs:/var/log/nginx \
-v /mydata/nginx/conf:/etc/nginx \
--name nginx nginx:1.18.0
有时候ik分词器并不能按照我们想的那样进行分词,这个时候就需要进行自定义分词。
进入到/mydata/elasticsearch/plugins/ik/config
目录下,修改IKAnalyzer.cfg.xml
目录
填写远程扩展字典地址:
再重启es容器实例
POST _analyze
{
"analyzer": "ik_smart",
"text": "徐**爱赵**"
}
Elasticsearch Clients官网地址:Elasticsearch Clients | Elastic
这里使用Java Rest Clients的elasticsearch-rest-high-level-client
(RHLC)来操作ES
文档说明:[Index API | Java REST Client 7.17] | Elastic
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
添加RHLC配置类
import org.apache.http.HttpHost; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ElasticsearchConfig { // 通用设置项 public static final RequestOptions COMMON_OPTIONS; static { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); COMMON_OPTIONS = builder.build(); } @Bean public RestHighLevelClient esRestClient() { RestHighLevelClient restHighLevelClient = new RestHighLevelClient( RestClient.builder( new HttpHost("192.168.26.160", 9200, "http") ) ); return restHighLevelClient; } }
/** * 保存数据到es */ @Test public void saveData() throws IOException { // 指定索引 IndexRequest indexRequest = new IndexRequest("users"); // 数据id indexRequest.id("1"); User user = new User(); user.setUsername("zhangsan"); user.setGender("男"); user.setAge(20); ObjectMapper mapper = new ObjectMapper(); // 将User对象转换为JSON数据 String userJsonData = mapper.writeValueAsString(user); // 要保存的内容 indexRequest.source(userJsonData, XContentType.JSON); // 执行操作 IndexResponse response = restHighLevelClient.index(indexRequest, ElasticsearchConfig.COMMON_OPTIONS); System.out.println(response); }
再kinbana上进行测试:
GET /users/_search
步骤:
构建检索条件(SearchSourceBuilder
) -> 创建检索请求(指定索引,传入检索请求
) -> 执行检索
@Test public void searchData() throws IOException { // 1.构建检索条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 2.检索address字段中有mill的数据 searchSourceBuilder.query(QueryBuilders.matchQuery("address","mill")); // 3.按照年龄的值分布进行聚合 TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10); // 4.聚合计算平均薪资 AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance"); searchSourceBuilder .aggregation(ageGroup) .aggregation(balanceAvg); // 5.指定索引和检索条件 // 5.1创建检索请求 SearchRequest searchRequest = new SearchRequest(); searchRequest .indices("bank") .source(searchSourceBuilder.size(0)); // 6.执行检索 SearchResponse search = rhlc.search(searchRequest, RequestOptions.DEFAULT); // 7.响应结果 log.info("检索结果:" + search); }
@Data static class Account{ private int account_number; private int balance; private String firstname; private String lastname; private int age; private String gender; private String address; private String employer; private String email; private String city; private String state; } @Test public void searchData() throws IOException { // 1.构建检索条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 2.检索address字段中有mill的数据 searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill")); // 3.按照年龄的值分布进行聚合 TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10); // 4.聚合计算平均薪资 AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance"); searchSourceBuilder .aggregation(ageGroup) .aggregation(balanceAvg); // 5.指定索引和检索条件 // 5.1创建检索请求 SearchRequest searchRequest = new SearchRequest(); searchRequest .indices("bank") .source(searchSourceBuilder); // 6.执行检索 SearchResponse responseSearch = rhlc.search(searchRequest, RequestOptions.DEFAULT); // 7.处理响应数据 SearchHits hits = responseSearch.getHits(); SearchHit[] hitsArray = hits.getHits(); Arrays.stream(hitsArray).map(hit -> { String sourceAsString = hit.getSourceAsString(); ObjectMapper mapper = new ObjectMapper(); Account account = null; try { account = mapper.readValue(sourceAsString, Account.class); } catch (JsonProcessingException e) { e.printStackTrace(); } finally { return account; } }).forEach(account -> log.info("账户信息:" + account)); }
@Data static class Account { private int account_number; private int balance; private String firstname; private String lastname; private int age; private String gender; private String address; private String employer; private String email; private String city; private String state; } @Test public void searchData() throws IOException { // 1.构建检索条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 2.检索address字段中有mill的数据 searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill")); // 3.按照年龄的值分布进行聚合 TermsAggregationBuilder ageGroup = AggregationBuilders.terms("ageGroup").field("age").size(10); // 4.聚合计算平均薪资 AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance"); searchSourceBuilder .aggregation(ageGroup) .aggregation(balanceAvg); // 5.指定索引和检索条件 // 5.1创建检索请求 SearchRequest searchRequest = new SearchRequest(); searchRequest .indices("bank") .source(searchSourceBuilder); // 6.执行检索 SearchResponse responseSearch = rhlc.search(searchRequest, RequestOptions.DEFAULT); // 7.处理聚合结果 Aggregations aggregations = responseSearch.getAggregations(); Terms ageGroups = aggregations.get("ageGroup"); for (Terms.Bucket bucket : ageGroups.getBuckets()) { String key = bucket.getKeyAsString(); log.info("年龄:" + key + "===>" + "一共" + bucket.getDocCount() + "人。"); } Avg balanceAvgs = aggregations.get("balanceAvg"); log.info("平均薪资:" + balanceAvgs.getValue()); }
import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest; import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse; import co.elastic.clients.elasticsearch.indices.GetIndexRequest; import co.elastic.clients.elasticsearch.indices.GetIndexResponse; import com.vector.common.constants.SystemInfoConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @Component @Slf4j public class SystemLog { private ElasticsearchClient elasticsearchClient; private static final String SYSTEM_LOG_INDEX_PREFIX_MATCH = SystemInfoConstants.SYSTEM_LOG_INDEX_PREFIX + "*"; public SystemLog(ElasticsearchClient elasticsearchClient) { this.elasticsearchClient = elasticsearchClient; } /** * 每月1号删除一个月前的日志 * * @throws IOException */ @Scheduled(cron = "0 0 0 1 * ?") public void systemLog() throws IOException { Set<String> systemLogIndexLists = searchSystemLogIndex(); if(systemLogIndexLists.isEmpty()){ log.info("没有找到索引{}", systemLogIndexLists); return; } LocalDateTime oneMonthBefore = LocalDateTime.now().minusMonths(1); StringBuilder oneMonthFormat = new StringBuilder(oneMonthBefore.format(DateTimeFormatter.ISO_DATE)); oneMonthFormat.insert(0, SystemInfoConstants.SYSTEM_LOG_INDEX_PREFIX); //删除一个月前的索引 List<String> ids = systemLogIndexLists.stream() .filter(currentIndexDate -> { // 2020-03-25 > 2020-02-25 true return oneMonthFormat.toString().compareTo(currentIndexDate) > 0; }) .collect(Collectors.toList()); if (ids.isEmpty()) { log.info("没有需要删除的索引{}", ids); return; } DeleteIndexResponse result = elasticsearchClient.indices().delete(DeleteIndexRequest.of(id -> id .index(ids))); log.info("删除索引{},结果:{}", ids, result.toString()); } /** * 查询所有SystemLog索引 * * @return * @throws IOException */ private Set<String> searchSystemLogIndex() throws IOException { // 查询yiqichang-*的索引 GetIndexResponse response = elasticsearchClient.indices() .get(GetIndexRequest.of(idx -> idx.index(SYSTEM_LOG_INDEX_PREFIX_MATCH))); Set<String> result = response.result().keySet(); log.info("查询到的索引:{}", result); return result; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。