赞
踩
在web网站的架设中特别是数据量大的网站或者APP小程序需要搜索或者全文检索的场景,几乎都需要借助ElasticSearch来作为全文检索引擎,以提高网站的搜索效率和性能。
这一节,我们通过一篇文章介绍,使大家通过一文就学会使用ElasticSearch。
一、ElasticSearch介绍:
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
ElasticSearch相关概念:
a)、索引index,相当于数据库中的database。
b)、类型type相当于数据库中的table。
c)、主键id相当于数据库中记录的主键,是唯一的。
d)、文档 document (相当于一条数据)
文档是ElasticSearch的基本单位。在Es中文档以JSON格式来表示
向es中的index下面的type中存储json类型的数据。
e) 、字段是文档中的field 属性,需要对每一个属性定义索引和被搜索的方式
二、ElasticSearch的安装:
1、先安装jdk
2、安装ElasticSearch
直接进入elasticsearch的官网,下载最新的安装包:https://www.elastic.co/downloads/elasticsearch,此教程使用的是5.1.1版本。
将下载的安装包上传到centos,或者直接在centos使用wget命令下载。
解压:
unzip elasticsearch-5.1.1.zip
运行:
- cd bin
- ./elasticsearch
三、java语言操作ElasticSearch:
1、maven依赖
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>7.6.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>7.6.2</version>
- </dependency>
2、连接ElasticSearch
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestHighLevelClient;
-
- import java.io.IOException;
-
- public class EsClientTest {
-
- public static void main(String[] args) throws IOException {
- RestHighLevelClient esClient = new RestHighLevelClient(
- RestClient.builder(new HttpHost("IP",9200,"http"))
- );
- System.out.println("success");
- esClient.close();
- }
-
- }
3、连接的相关api
- public static RestHighLevelClient esClient;
-
- static {
- esClient = new RestHighLevelClient(
- RestClient.builder(new HttpHost("IP", 9200, "http"))
- );
- }
4、创建索引操作:
- /**
- * 创建索引
- * @throws IOException
- */
- public static void createIndex() throws IOException {
- CreateIndexRequest createIndexRequest = new CreateIndexRequest("user");
- CreateIndexResponse indexResponse = esClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
- boolean acknowledged = indexResponse.isAcknowledged();
- System.out.println("索引创建状态:" + acknowledged);
- }
5、获取索引:
- /**
- * 索引信息查询
- * @throws IOException
- */
- public static void getIndex() throws IOException {
- GetIndexRequest getIndexRequest = new GetIndexRequest("user");
- GetIndexResponse getIndexResponse = esClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
- System.out.println(getIndexResponse.getAliases());
- System.out.println(getIndexResponse.getMappings());
- System.out.println(getIndexResponse.getSettings());
- }
6、删除索引:
- /**
- * 删除索引
- * @throws IOException
- */
- public static void deleteIndex() throws IOException {
- DeleteIndexRequest getIndexRequest = new DeleteIndexRequest("user");
- AcknowledgedResponse delete = esClient.indices().delete(getIndexRequest, RequestOptions.DEFAULT);
- System.out.println("索引删除状态:" + delete.isAcknowledged());
- }
7、添加数据:
- /**
- * 添加数据
- * @throws Exception
- */
- public static void add() throws Exception{
- IndexRequest indexRequest = new IndexRequest();
- indexRequest.index("user").id("1008");
- User user = new User();
- user.setName("茅河野人");
- user.setAge(28);
- user.setSex("男");
- user.setSalary(50000);
-
- String userData = objectMapper.writeValueAsString(user);
- indexRequest.source(userData,XContentType.JSON);
- //插入数据
- IndexResponse response = esClient.index(indexRequest, RequestOptions.DEFAULT);
- System.out.println(response.status());
- System.out.println(response.getResult());
- }
8、修改数据:
- /**
- * 修改数据
- * @throws Exception
- */
- public static void update() throws Exception{
- UpdateRequest request = new UpdateRequest();
- request.index("user").id("1008");
- request.doc(XContentType.JSON,"name","茅河野人");
- //插入数据
- UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
- System.out.println(response.getResult());
- }
9、删除数据:
- /**
- * 删除
- * @throws Exception
- */
- public static void delete() throws Exception{
- DeleteRequest request = new DeleteRequest();
- request.index("user").id("1008");
- //插入数据
- DeleteResponse delete = esClient.delete(request, RequestOptions.DEFAULT);
- System.out.println(delete.getResult());
- }
10、批量添加数据:
- /**
- * 批量添加
- * @throws Exception
- */
- public static void batchInsert() throws Exception{
-
- BulkRequest bulkRequest = new BulkRequest();
-
- User user1 = new User("关羽","男",33,5500);
- String userData1 = objectMapper.writeValueAsString(user1);
- IndexRequest indexRequest1 = new IndexRequest().index("user").id("1002").source(userData1, XContentType.JSON);
-
- bulkRequest.add(indexRequest1);
-
- User user2 = new User("黄忠","男",50,8000);
- String userData2 = objectMapper.writeValueAsString(user2);
- IndexRequest indexRequest2 = new IndexRequest().index("user").id("1003").source(userData2, XContentType.JSON);
- bulkRequest.add(indexRequest2);
-
- User user3 = new User("黄忠2","男",49,10000);
- String userData3 = objectMapper.writeValueAsString(user3);
- IndexRequest indexRequest3 = new IndexRequest().index("user").id("1004").source(userData3, XContentType.JSON);
- bulkRequest.add(indexRequest3);
-
- User user4 = new User("赵云","男",33,12000);
- String userData4 = objectMapper.writeValueAsString(user4);
- IndexRequest indexRequest4 = new IndexRequest().index("user").id("1005").source(userData4, XContentType.JSON);
- bulkRequest.add(indexRequest4);
-
- User user5 = new User("马超","男",38,20000);
- String userData5 = objectMapper.writeValueAsString(user5);
- IndexRequest indexRequest5 = new IndexRequest().index("user").id("1006").source(userData5, XContentType.JSON);
- bulkRequest.add(indexRequest5);
-
- User user6 = new User("关羽","男",41,27000);
- String userData6 = objectMapper.writeValueAsString(user6);
- IndexRequest indexRequest6 = new IndexRequest().index("user").id("1007").source(userData6, XContentType.JSON);
- bulkRequest.add(indexRequest6);
-
- BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- System.out.println(bulkResponse.status());
- System.out.println(bulkResponse.getItems());
- }
11、批量删除数据:
- /**
- * 批量删除
- * @throws Exception
- */
- public static void batchDelete() throws Exception{
- BulkRequest bulkRequest = new BulkRequest();
- DeleteRequest indexRequest1 = new DeleteRequest().index("user").id("1002");
- DeleteRequest indexRequest2 = new DeleteRequest().index("user").id("1003");
- DeleteRequest indexRequest3 = new DeleteRequest().index("user").id("1004");
- DeleteRequest indexRequest4 = new DeleteRequest().index("user").id("1005");
- DeleteRequest indexRequest5 = new DeleteRequest().index("user").id("1006");
- DeleteRequest indexRequest6 = new DeleteRequest().index("user").id("1007");
-
- bulkRequest.add(indexRequest1);
- bulkRequest.add(indexRequest2);
- bulkRequest.add(indexRequest3);
- bulkRequest.add(indexRequest4);
- bulkRequest.add(indexRequest5);
- bulkRequest.add(indexRequest6);
-
- BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
- System.out.println(bulkResponse.status());
- System.out.println(bulkResponse.getItems());
- }
13、删除某个索引下所有数据:
- /**
- * 查询某个索引下的所有数据
- * @throws Exception
- */
- public static void searchIndexAll() throws Exception{
- SearchRequest request = new SearchRequest();
- request.indices("user");
- // 索引中的全部数据查询
- SearchSourceBuilder query = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
- request.source(query);
- SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
- SearchHits hits = response.getHits();
- for (SearchHit searchHit : hits){
- System.out.println(searchHit.getSourceAsString());
- }
- }
14、根据条件查询:
- TermQueryBuilder ageQueryBuilder = QueryBuilders.termQuery("sex", "女");
- SearchSourceBuilder query = new SearchSourceBuilder().query(ageQueryBuilder);
- request.source(query);
- SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
- System.out.println(response.getHits().getHits());
- System.out.println(response.getHits().getTotalHits());
- SearchHits hits = response.getHits();
- for (SearchHit searchHit : hits){
- System.out.println(searchHit.getSourceAsString());
- }
15、分页查询:
- SearchSourceBuilder sourceBuilder = new
- SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
- sourceBuilder.from(0).size(3);
- request.source(sourceBuilder);
- SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
- System.out.println(response.getHits().getHits());
- System.out.println(response.getHits().getTotalHits());
- SearchHits hits = response.getHits();
- for (SearchHit searchHit : hits){
- System.out.println(searchHit.getSourceAsString());
- }
四、在springboot中的运用
1、maven依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
- </dependency>
-
2、yml配置文件:
- # es 服务地址
- elasticsearch.host=IP
- # es 服务端口
- elasticsearch.port=9200
- # 配置日志级别,开启 debug 日志
- logging.level.com.congge=debug
3、实际例子:
创建一个实体类
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- import lombok.ToString;
- import org.springframework.data.annotation.Id;
- import org.springframework.data.elasticsearch.annotations.Document;
- import org.springframework.data.elasticsearch.annotations.Field;
- import org.springframework.data.elasticsearch.annotations.FieldType;
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- @ToString
- @Document(indexName = "shopping", shards = 3, replicas = 1)
- public class Product {
- //必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
- @Id
- private Long id;//商品唯一标识
-
- /**
- * type : 字段数据类型
- * analyzer : 分词器类型
- * index : 是否索引(默认:true)
- * Keyword : 短语,不进行分词
- */
- @Field(type = FieldType.Text, analyzer = "ik_max_word")
- private String title;//商品名称
-
- @Field(type = FieldType.Keyword)
- private String category;//分类名称
-
- @Field(type = FieldType.Double)
- private Double price;//商品价格
-
- @Field(type = FieldType.Keyword, index = false)
- private String images;//图片地址
- }
提供接口:
- import com.congge.entity.Product;
- import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
- import org.springframework.stereotype.Repository;
-
- @Repository
- public interface ProductDao extends ElasticsearchRepository<Product, Long>{
-
- }
配置类:
- import lombok.Data;
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Configuration;
- //import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
-
- @ConfigurationProperties(prefix = "elasticsearch")
- @Configuration
- @Data
- public class EsConfig extends com.congge.config.AbstractElasticsearchConfiguration {
-
- private String host ;
- private Integer port ;
-
- //重写父类方法
- @Override
- public RestHighLevelClient elasticsearchClient() {
- RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
- RestHighLevelClient restHighLevelClient = new
- RestHighLevelClient(builder);
- return restHighLevelClient;
- }
- }
- import org.elasticsearch.client.RestHighLevelClient;
- import org.springframework.context.annotation.Bean;
- import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport;
- import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
- import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
- import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
-
- public abstract class AbstractElasticsearchConfiguration extends ElasticsearchConfigurationSupport {
-
- //需重写本方法
- public abstract RestHighLevelClient elasticsearchClient();
-
- @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
- public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter) {
- return new ElasticsearchRestTemplate(elasticsearchClient(), elasticsearchConverter);
- }
- }
测试1:
- import com.congge.entity.Product;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
- import org.springframework.test.context.junit4.SpringRunner;
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class EsIndexTest {
-
- //注入 ElasticsearchRestTemplate
- @Autowired
- private ElasticsearchRestTemplate elasticsearchRestTemplate;
-
- //创建索引并增加映射配置
- @Test
- public void createIndex(){
- //创建索引,系统初始化会自动创建索引
- System.out.println("创建索引");
- }
-
- @Test
- public void deleteIndex(){
- //创建索引,系统初始化会自动创建索引
- boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
- System.out.println("删除索引 = " + flg);
- }
-
- }
测试2:
- import com.congge.dao.ProductDao;
- import com.congge.entity.Product;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.index.query.TermQueryBuilder;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.data.domain.Page;
- import org.springframework.data.domain.PageRequest;
- import org.springframework.data.domain.Sort;
- import org.springframework.test.context.junit4.SpringRunner;
-
- import java.util.ArrayList;
- import java.util.List;
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class EsDocTest {
-
-
- @Autowired
- private ProductDao productDao;
-
- /**
- * 新增
- */
- @Test
- public void save() {
- Product product = new Product();
- product.setId(2L);
- product.setTitle("ipad mini");
- product.setCategory("ipad");
- product.setPrice(1998.0);
- product.setImages("http://ipad.jpg");
- productDao.save(product);
- }
-
-
- //修改
- @Test
- public void update(){
- Product product = new Product();
- product.setId(2L);
- product.setTitle("iphone");
- product.setCategory("mobile");
- product.setPrice(6999.0);
- product.setImages("http://www.phone.jpg");
- productDao.save(product);
- }
-
- //根据 id 查询
- @Test
- public void findById(){
- Product product = productDao.findById(2L).get();
- System.out.println(product);
- }
-
- //查询所有
- @Test
- public void findAll(){
- Iterable<Product> products = productDao.findAll();
- for (Product product : products) {
- System.out.println(product);
- }
- }
-
- //删除
- @Test
- public void delete(){
- Product product = new Product();
- product.setId(2L);
- productDao.delete(product);
- }
-
- //批量新增
- @Test
- public void saveAll(){
- List<Product> productList = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Product product = new Product();
- product.setId(Long.valueOf(i));
- product.setTitle("iphone" + i);
- product.setCategory("mobile");
- product.setPrice(5999.0 + i);
- product.setImages("http://www.phone.jpg");
- productList.add(product);
- }
- productDao.saveAll(productList);
- }
-
- //分页查询
- @Test
- public void findByPageable(){
- //设置排序(排序方式,正序还是倒序,排序的 id)
- Sort sort = Sort.by(Sort.Direction.DESC,"id");
- int currentPage=0;//当前页,第一页从 0 开始, 1 表示第二页
- int pageSize = 5;//每页显示多少条
- //设置查询分页
- PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
- //分页查询
- Page<Product> productPage = productDao.findAll(pageRequest);
- for (Product Product : productPage.getContent()) {
- System.out.println(Product);
- }
- }
-
- /**
- * term 查询
- * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
- */
- @Test
- public void termQuery(){
- TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "iphone");
- Iterable<Product> products = productDao.search(termQueryBuilder);
- for (Product product : products) {
- System.out.println(product);
- }
- }
-
- /**
- * term 查询加分页
- */
- @Test
- public void termQueryByPage(){
- int currentPage= 0 ;
- int pageSize = 5;
- //设置查询分页
- PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
- TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "phone");
- Iterable<Product> products =
- productDao.search(termQueryBuilder,pageRequest);
- for (Product product : products) {
- System.out.println(product);
- }
- }
-
- }
五、将mysql数据写入Elasticsearch例子
- package com.example.esdemo.service.impl;
-
- import com.example.esdemo.config.DBHelper;
- import com.example.esdemo.imports.ImportDb2Es;
- import com.example.esdemo.service.ImportService;
- import org.apache.logging.log4j.LogManager;
- import org.apache.logging.log4j.Logger;
- import org.elasticsearch.action.ActionListener;
- import org.elasticsearch.action.bulk.BackoffPolicy;
- import org.elasticsearch.action.bulk.BulkProcessor;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.unit.ByteSizeUnit;
- import org.elasticsearch.common.unit.ByteSizeValue;
- import org.elasticsearch.common.unit.TimeValue;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.sql.*;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.concurrent.TimeUnit;
- import java.util.function.BiConsumer;
-
-
- /**
- * 导入db2es 实现类
- */
- @Component
- public class ImportServiceImpl implements ImportService {
-
- private static final Logger logger = LogManager.getLogger(ImportServiceImpl.class);
-
- @Autowired
- private RestHighLevelClient client;
-
-
- @Override
- public void importDb2Es(ImportDb2Es importDb2Es) {
- writeMySQLDataToES(importDb2Es.getDbTableName(),importDb2Es.getDbTableName());
- }
-
-
- private void writeMySQLDataToES(String tableName,String esIndeName) {
- BulkProcessor bulkProcessor = getBulkProcessor(client);
- Connection connection = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- connection = DBHelper.getConn();
- logger.info("start handle data :" + tableName);
- String sql = "select * from " + tableName;
- ps = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- // 根据自己需要设置 fetchSize
- ps.setFetchSize(20);
- rs = ps.executeQuery();
- ResultSetMetaData colData = rs.getMetaData();
- ArrayList<HashMap<String, String>> dataList = new ArrayList<>();
- HashMap<String, String> map = null;
- int count = 0;
- // c 就是列的名字 v 就是列对应的值
- String c = null;
- String v = null;
- while (rs.next()) {
- count++;
- map = new HashMap<String, String>(128);
- for (int i = 1; i < colData.getColumnCount(); i++) {
- c = colData.getColumnName(i);
- v = rs.getString(c);
- map.put(c, v);
- }
- dataList.add(map);
- // 每1万条 写一次 不足的批次的数据 最后一次提交处理
- if (count % 10000 == 0) {
- logger.info("mysql handle data number:" + count);
- // 将数据添加到 bulkProcessor
- for (HashMap<String, String> hashMap2 : dataList) {
- bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
- }
- // 每提交一次 清空 map 和 dataList
- map.clear();
- dataList.clear();
- }
- }
- // 处理 未提交的数据
- for (HashMap<String, String> hashMap2 : dataList) {
- bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
- }
- bulkProcessor.flush();
-
- } catch (SQLException e) {
- e.printStackTrace();
- } finally {
- try {
- rs.close();
- ps.close();
- connection.close();
- boolean terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
- logger.info(terinaFlag);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
-
- private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
-
- BulkProcessor bulkProcessor = null;
- try {
-
- BulkProcessor.Listener listener = new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- logger.info("Try to insert data number : "
- + request.numberOfActions());
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request,
- BulkResponse response) {
- logger.info("************** Success insert data number : "
- + request.numberOfActions() + " , id: " + executionId);
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
- }
- };
-
- BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
- .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
-
- BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
- builder.setBulkActions(5000);
- builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
- builder.setConcurrentRequests(10);
- builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
- builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
- // 注意点:让参数设置生效
- bulkProcessor = builder.build();
-
- } catch (Exception e) {
- e.printStackTrace();
- try {
- bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
- } catch (Exception e1) {
- logger.error(e1.getMessage());
- }
- }
- return bulkProcessor;
- }
- }
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。