赞
踩
最近项目用到elasticsearch,结合官方提供的RestHighLevelClient使用,这里记录下项目整合实践的过程,包括创建索引、增删改、搜索以及批量操作等。
首先,简单的介绍下:
elasticsearch是一个实时的分布式搜索引擎,基于Lucene开发,它可以快速的存储和搜索海量数据,同时提供RestAPI操作接口,可以说开箱即用。
安装准备:
这里用的是elasticsearch-6.4.0,在windows下搭建伪集群,下载elasticsearch-6.4.0.zip压缩包,下载地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.0.zip
下载完成后解压elasticsearch-6.4.0.zip,复制一份得到两个文件夹,然后分别重命名为elasticsearch-6.4.0_1,elasticsearch-6.4.0_2,然后修改两个文件夹下的配置文件,/config/elasticsearch.yml:
# 第一个节点配置 # 集群名称 cluster.name: es # 节点名称 node.name: node-0 # 是否参与master节点竞选 node.master: true # 访问地址 network.host: 127.0.0.1 # 访问端口 http.port: 9200 # 内部通讯端口 transport.tcp.port: 9300 # 集群节点配置,注意,这里是IP:内部通讯端口,多个用,隔开 discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301"] # 最少竞选master节点个数 # 第二个节点配置 # 集群名称 cluster.name: es # 节点名称 node.name: node-1 # 是否参与master节点竞选 node.master: true # 访问地址 network.host: 127.0.0.1 # 访问端口 http.port: 9201 # 内部通讯端口 transport.tcp.port: 9301 # 集群节点配置,注意,这里是IP:内部通讯端口,多个用,隔开 discovery.zen.ping.unicast.hosts: ["127.0.0.1:9300"] # 最少竞选master节点个数 discovery.zen.minimum_master_nodes: 2
在window环境下,直接启动/bin下的elasticsearch.bat即可启动,启动成功后访问http://127.0.0.1:9200会返回如下json信息:
{ "name" : "node-0", "cluster_name" : "es", "cluster_uuid" : "fYtHRgFjRJuUrmJNz8tA_g", "version" : { "number" : "6.4.0", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "595516e", "build_date" : "2018-08-17T23:18:47.308994Z", "build_snapshot" : false, "lucene_version" : "7.4.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" }
接着下载Cerebro,这个是一款Elasticsearch监控工具,下载地址:
https://github.com/lmenezes/cerebro/releases
下载后解压,进入/bin文件夹下,启动cerebro.bat即可,访问http://127.0.0.1:9000,然后输入:http://127.0.0.1:9200,进入监控界面,如下:
可以看到,集群环境已经成功搭建。
接下来新建一个SpringBoot maven项目,添加elasticsearch依赖,这个因为spring-data-elasticsearch暂不支持elasticsearch6.0以上版本,所以笔者用的是elasticsearch提供的RestHighLevelClient,pom文件如下:
<properties> <java.version>1.8</java.version> <elasticsearch.version>6.4.3</elasticsearch.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.4.3</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies>
添加启动类EsApp,如下:
package com.es;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class EsApp {
public static void main(String[] args) {
SpringApplication.run(EsApp.class, args);
}
}
添加elasticsearch配置类EsConfiguration,主要用来构建RestHighLevelClient客户端,如下 :
package com.es.config; import java.util.ArrayList; import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig.Builder; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EsConfiguration { private static String hosts = "127.0.0.1"; // 集群地址,多个用,隔开 private static int port = 9200; // 使用的端口号 private static String schema = "http"; // 使用的协议 private static ArrayList<HttpHost> hostList = null; private static int connectTimeOut = 1000; // 连接超时时间 private static int socketTimeOut = 30000; // 连接超时时间 private static int connectionRequestTimeOut = 500; // 获取连接的超时时间 private static int maxConnectNum = 100; // 最大连接数 private static int maxConnectPerRoute = 100; // 最大路由连接数 static { hostList = new ArrayList<>(); String[] hostStrs = hosts.split(","); for (String host : hostStrs) { hostList.add(new HttpHost(host, port, schema)); } } @Bean public RestHighLevelClient client() { RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0])); // 异步httpclient连接延时配置 builder.setRequestConfigCallback(new RequestConfigCallback() { @Override public Builder customizeRequestConfig(Builder requestConfigBuilder) { requestConfigBuilder.setConnectTimeout(connectTimeOut); requestConfigBuilder.setSocketTimeout(socketTimeOut); requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut); return requestConfigBuilder; } }); // 异步httpclient连接数配置 builder.setHttpClientConfigCallback(new HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setMaxConnTotal(maxConnectNum); httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute); return httpClientBuilder; } }); RestHighLevelClient client = new RestHighLevelClient(builder); return client; } }
添加测试bean类Tests,如下:
package com.es.bean; public class Tests { private Long id; private String name; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Tests [id=" + id + ", name=" + name + "]"; } }
最后添加测试类EsAppTest,然后可以开始用RestHighLevelClient来对elasticsearch进行探索测试了,例如增加索引、增删改查以及批量操作,testIndex如下:
package com.es; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.alibaba.fastjson.JSON; import com.es.bean.Tests; @RunWith(SpringRunner.class) @SpringBootTest(classes = { EsApp.class }) // 指定启动类 public class EsAppTest { @Autowired private RestHighLevelClient client; public static String INDEX_TEST = null; public static String TYPE_TEST = null; public static Tests tests = null; public static List<Tests> testsList = null; @BeforeClass public static void before() { INDEX_TEST = "index_test"; // 索引名称 TYPE_TEST = "type_test"; // 索引类型 testsList = new ArrayList<>(); for (int i = 0; i < 100; i++) { tests = new Tests(); tests.setId(Long.valueOf(i)); tests.setName("this is the test " + i); testsList.add(tests); } } @Test public void testIndex() throws IOException { // 各种操作 } }
第一步,新建索引,我们在testIndex下添加一个方法createIndex来创建索引,如下:
/**
* 创建索引
* @param index
* @throws IOException
*/
public void createIndex(String index) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(index);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println("createIndex: " + JSON.toJSONString(createIndexResponse));
}
但是创建索引之前我们要判断索引是不是已经存在了,不存在我们才创建,所以再添加一个判断的方法existsIndex,如下:
/**
* 判断索引是否存在
* @param index
* @return
* @throws IOException
*/
public boolean existsIndex(String index) throws IOException {
GetIndexRequest request = new GetIndexRequest();
request.indices(index);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
System.out.println("existsIndex: " + exists);
return exists;
}
然后修改testIndex方法,增加如下代码:
@Test
public void testIndex() throws IOException {
// 判断是否存在索引
if (!existsIndex(INDEX_TEST)) {
// 不存在则创建索引
createIndex(INDEX_TEST);
}
}
运行下测试类,结果输出:existsIndex: true,因为笔者已经创建过索引了。
接着是增加记录到elasticsearch,添加一个增加记录的方法add,如下:
/**
* 增加记录
* @param index
* @param type
* @param tests
* @throws IOException
*/
public void add(String index, String type, Tests tests) throws IOException {
IndexRequest indexRequest = new IndexRequest(index, type, tests.getId().toString());
indexRequest.source(JSON.toJSONString(tests), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
System.out.println("add: " + JSON.toJSONString(indexResponse));
}
同样的添加一个判断记录是否存在的方法exists,如下:
/** * 判断记录是都存在 * @param index * @param type * @param tests * @return * @throws IOException */ public boolean exists(String index, String type, Tests tests) throws IOException { GetRequest getRequest = new GetRequest(index, type, tests.getId().toString()); getRequest.fetchSourceContext(new FetchSourceContext(false)); getRequest.storedFields("_none_"); boolean exists = client.exists(getRequest, RequestOptions.DEFAULT); System.out.println("exists: " + exists); return exists; }
因为tests对象初始化好了,修改testIndex方法,如下:
@Test
public void testIndex() throws IOException {
// 判断是否存在索引
if (!existsIndex(INDEX_TEST)) {
// 不存在则创建索引
createIndex(INDEX_TEST);
}
// 判断是否存在记录
if (!exists(INDEX_TEST, TYPE_TEST, tests)) {
// 不存在增加记录
add(INDEX_TEST, TYPE_TEST, tests);
}
}
笔者先用Cerebro把索引删除,执行输出如下:
可以看到创建索引以及增加记录操作都成功了。
接下来添加获取记录信息、更新记录信息、删除记录的方法,分别做测试,如下:
/** * 获取记录信息 * @param index * @param type * @param id * @throws IOException */ public void get(String index, String type, Long id) throws IOException { GetRequest getRequest = new GetRequest(index, type, id.toString()); GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); System.out.println("get: " + JSON.toJSONString(getResponse)); } /** * 更新记录信息 * @param index * @param type * @param tests * @throws IOException */ public void update(String index, String type, Tests tests) throws IOException { tests.setName(tests.getName() + "updated"); UpdateRequest request = new UpdateRequest(index, type, tests.getId().toString()); request.doc(JSON.toJSONString(tests), XContentType.JSON); UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); System.out.println("update: " + JSON.toJSONString(updateResponse)); } /** * 删除记录 * @param index * @param type * @param id * @throws IOException */ public void delete(String index, String type, Long id) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, type, id.toString()); DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT); System.out.println("delete: " + JSON.toJSONString(response)); }
同时还有搜索的方法search,如下:
/** * 搜索 * @param index * @param type * @param name * @throws IOException */ public void search(String index, String type, String name) throws IOException { BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery(); boolBuilder.must(QueryBuilders.matchQuery("name", name)); // 这里可以根据字段进行搜索,must表示符合条件的,相反的mustnot表示不符合条件的 // boolBuilder.must(QueryBuilders.matchQuery("id", tests.getId().toString())); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(boolBuilder); sourceBuilder.from(0); sourceBuilder.size(100); // 获取记录数,默认10 sourceBuilder.fetchSource(new String[] { "id", "name" }, new String[] {}); // 第一个是获取字段,第二个是过滤的字段,默认获取全部 SearchRequest searchRequest = new SearchRequest(index); searchRequest.types(type); searchRequest.source(sourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println("search: " + JSON.toJSONString(response)); SearchHits hits = response.getHits(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { System.out.println("search -> " + hit.getSourceAsString()); } }
最后还有批量操作的方法bulk,如下:
/** * 批量操作 * @throws IOException */ public void bulk() throws IOException { // 批量增加 BulkRequest bulkAddRequest = new BulkRequest(); for (int i = 0; i < testsList.size(); i++) { tests = testsList.get(i); IndexRequest indexRequest = new IndexRequest(INDEX_TEST, TYPE_TEST, tests.getId().toString()); indexRequest.source(JSON.toJSONString(tests), XContentType.JSON); bulkAddRequest.add(indexRequest); } BulkResponse bulkAddResponse = client.bulk(bulkAddRequest, RequestOptions.DEFAULT); System.out.println("bulkAdd: " + JSON.toJSONString(bulkAddResponse)); search(INDEX_TEST, TYPE_TEST, "this"); // 批量更新 BulkRequest bulkUpdateRequest = new BulkRequest(); for (int i = 0; i < testsList.size(); i++) { tests = testsList.get(i); tests.setName(tests.getName() + " updated"); UpdateRequest updateRequest = new UpdateRequest(INDEX_TEST, TYPE_TEST, tests.getId().toString()); updateRequest.doc(JSON.toJSONString(tests), XContentType.JSON); bulkUpdateRequest.add(updateRequest); } BulkResponse bulkUpdateResponse = client.bulk(bulkUpdateRequest, RequestOptions.DEFAULT); System.out.println("bulkUpdate: " + JSON.toJSONString(bulkUpdateResponse)); search(INDEX_TEST, TYPE_TEST, "updated"); // 批量删除 BulkRequest bulkDeleteRequest = new BulkRequest(); for (int i = 0; i < testsList.size(); i++) { tests = testsList.get(i); DeleteRequest deleteRequest = new DeleteRequest(INDEX_TEST, TYPE_TEST, tests.getId().toString()); bulkDeleteRequest.add(deleteRequest); } BulkResponse bulkDeleteResponse = client.bulk(bulkDeleteRequest, RequestOptions.DEFAULT); System.out.println("bulkDelete: " + JSON.toJSONString(bulkDeleteResponse)); search(INDEX_TEST, TYPE_TEST, "this"); }
再回过头来修改下testIndex方法,对上面的方法做下验证,如下:
@Test public void testIndex() throws IOException { // 判断是否存在索引 if (!existsIndex(INDEX_TEST)) { // 不存在则创建索引 createIndex(INDEX_TEST); } // 判断是否存在记录 if (!exists(INDEX_TEST, TYPE_TEST, tests)) { // 不存在增加记录 add(INDEX_TEST, TYPE_TEST, tests); } // 获取记录信息 get(INDEX_TEST, TYPE_TEST, tests.getId()); // 更新记录信息 update(INDEX_TEST, TYPE_TEST, tests); get(INDEX_TEST, TYPE_TEST, tests.getId()); // 删除记录信息 delete(INDEX_TEST, TYPE_TEST, tests.getId()); get(INDEX_TEST, TYPE_TEST, tests.getId()); // 批量操作 bulk(); }
运行输出如下:
参考:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.4/java-rest-high.html
代码:
https://github.com/191720653/es
出错:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。