当前位置:   article > 正文

ES客户端RestHighLevelClient的使用_es集群resthighlevelclient

es集群resthighlevelclient

1 RestHighLevelClient介绍

默认情况下,ElasticSearch使用两个端口来监听外部TCP流量。

  • 9200端口:用于所有通过HTTP协议进行的API调用。包括搜索、聚合、监控、以及其他任何使用HTTP协议的请求。所有的客户端库都会使用该端口与ElasticSearch进行交互。
  • 9300端口:是一个自定义的二进制协议,用于集群中各节点之间的通信。用于诸如集群变更、主节点选举、节点加入/离开、分片分配等事项。

RestHighLevelClient是ES的Java客户端,它是通过HTTP与ES集群进行通信。

2 引入ES依赖

  1. <!--引入es-high-level-client相关依赖 start-->
  2. <dependency>
  3. <groupId>org.elasticsearch</groupId>
  4. <artifactId>elasticsearch</artifactId>
  5. <version>7.10.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.elasticsearch.client</groupId>
  9. <artifactId>elasticsearch-rest-client</artifactId>
  10. <version>7.10.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.elasticsearch.client</groupId>
  14. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  15. <version>7.10.0</version>
  16. </dependency>
  17. <!--引入es-high-level-client相关依赖 end-->

3 使用

3.1 es的配置

  1. # es配置
  2. # es用户名
  3. elasticsearch.userName=elastic
  4. # es密码
  5. elasticsearch.password=elastic
  6. # es host ip 地址(集群),多个以","间隔
  7. elasticsearch.hosts=127.0.0.1:9200
  8. # es 请求方式
  9. elasticsearch.scheme=http
  10. # es 连接超时时间(ms)
  11. elasticsearch.connectTimeOut=1000
  12. # es socket 连接超时时间(ms)
  13. elasticsearch.socketTimeOut=30000
  14. # es 请求超时时间(ms)
  15. elasticsearch.connectionRequestTimeOut=500
  16. # es 最大连接数
  17. elasticsearch.maxConnectNum=100
  18. # es 每个路由的最大连接数
  19. elasticsearch.maxConnectNumPerRoute=100

3.2 es客户端配置类

  1. /**
  2. * restHighLevelClient 客户端配置类
  3. *
  4. */
  5. @Slf4j
  6. @Data
  7. @Configuration
  8. @ConfigurationProperties(prefix = "elasticsearch")
  9. public class ElasticsearchConfig {
  10. /**
  11. * es host ip 地址(集群)
  12. */
  13. private String hosts;
  14. /**
  15. * es用户名
  16. */
  17. private String userName;
  18. /**
  19. * es密码
  20. */
  21. private String password;
  22. /**
  23. * es 请求方式
  24. */
  25. private String scheme;
  26. /**
  27. * es 连接超时时间
  28. */
  29. private int connectTimeOut;
  30. /**
  31. * es socket 连接超时时间
  32. */
  33. private int socketTimeOut;
  34. /**
  35. * es 请求超时时间
  36. */
  37. private int connectionRequestTimeOut;
  38. /**
  39. * es 最大连接数
  40. */
  41. private int maxConnectNum;
  42. /**
  43. * es 每个路由的最大连接数
  44. */
  45. private int maxConnectNumPerRoute;
  46. /**
  47. * 如果@Bean没有指定bean的名称,那么方法名就是bean的名称
  48. */
  49. @Bean(name = "restHighLevelClient")
  50. public RestHighLevelClient restHighLevelClient() {
  51. // 构建连接对象
  52. RestClientBuilder builder = RestClient.builder(getEsHost());
  53. // 连接延时配置
  54. builder.setRequestConfigCallback(requestConfigBuilder -> {
  55. requestConfigBuilder.setConnectTimeout(connectTimeOut);
  56. requestConfigBuilder.setSocketTimeout(socketTimeOut);
  57. requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
  58. return requestConfigBuilder;
  59. });
  60. // 连接数配置
  61. builder.setHttpClientConfigCallback(httpClientBuilder -> {
  62. httpClientBuilder.setMaxConnTotal(maxConnectNum);
  63. httpClientBuilder.setMaxConnPerRoute(maxConnectNumPerRoute);
  64. httpClientBuilder.setDefaultCredentialsProvider(getCredentialsProvider());
  65. return httpClientBuilder;
  66. });
  67. return new RestHighLevelClient(builder);
  68. }
  69. private HttpHost[] getEsHost() {
  70. // 拆分地址(es为多节点时,不同host以逗号间隔)
  71. List<HttpHost> hostLists = new ArrayList<>();
  72. String[] hostList = hosts.split(",");
  73. for (String addr : hostList) {
  74. String host = addr.split(":")[0];
  75. String port = addr.split(":")[1];
  76. hostLists.add(new HttpHost(host, Integer.parseInt(port), scheme));
  77. }
  78. // 转换成 HttpHost 数组
  79. return hostLists.toArray(new HttpHost[]{});
  80. }
  81. private CredentialsProvider getCredentialsProvider() {
  82. // 设置用户名、密码
  83. CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  84. credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
  85. return credentialsProvider;
  86. }
  87. }

3.3 es的使用

3.3.1 创建es索引

3.3.1.1 创建es索引的工具类

创建es索引的工具类如下所示。

  1. /**
  2. * 操作ES索引
  3. *
  4. */
  5. @Slf4j
  6. @Service
  7. public class EsIndexOperation {
  8. @Resource
  9. private RestHighLevelClient restHighLevelClient;
  10. private final RequestOptions options = RequestOptions.DEFAULT;
  11. /**
  12. * 判断索引是否存在
  13. */
  14. public boolean checkIndex(String index) {
  15. try {
  16. return restHighLevelClient.indices().exists(new GetIndexRequest(index), options);
  17. } catch (Exception e) {
  18. log.error("EsIndexOperation checkIndex error.", e);
  19. }
  20. return Boolean.FALSE;
  21. }
  22. /**
  23. * 创建索引
  24. *
  25. * @param indexName es索引名
  26. * @param esSettingFilePath es索引的alias、settings和mapping的配置文件
  27. */
  28. public boolean createIndex(String indexName, String esSettingFilePath) {
  29. String aliases = null;
  30. String mappings = null;
  31. String settings = null;
  32. if (StringUtils.isNotBlank(esSettingFilePath)) {
  33. try {
  34. String fileContent = FileUtils.readFileContent(esSettingFilePath);
  35. if (StringUtils.isNotBlank(fileContent)) {
  36. JSONObject jsonObject = JSON.parseObject(fileContent);
  37. aliases = jsonObject.getString("aliases");
  38. mappings = jsonObject.getString("mappings");
  39. settings = jsonObject.getString("settings");
  40. }
  41. } catch (Exception e) {
  42. log.error("createIndex error.", e);
  43. return false;
  44. }
  45. }
  46. if (checkIndex(indexName)) {
  47. log.error("createIndex indexName:[{}]已存在", indexName);
  48. return false;
  49. }
  50. CreateIndexRequest request = new CreateIndexRequest(indexName);
  51. if ((StringUtils.isNotBlank(aliases))) {
  52. request.aliases(aliases, XContentType.JSON);
  53. }
  54. if (StringUtils.isNotBlank(mappings)) {
  55. request.mapping(mappings, XContentType.JSON);
  56. }
  57. if (StringUtils.isNotBlank(settings)) {
  58. request.settings(settings, XContentType.JSON);
  59. }
  60. try {
  61. this.restHighLevelClient.indices().create(request, options);
  62. return true;
  63. } catch (IOException e) {
  64. log.error("EsIndexOperation createIndex error.", e);
  65. return false;
  66. }
  67. }
  68. /**
  69. * 删除索引
  70. */
  71. public boolean deleteIndex(String indexName) {
  72. try {
  73. if (checkIndex(indexName)) {
  74. DeleteIndexRequest request = new DeleteIndexRequest(indexName);
  75. AcknowledgedResponse response = restHighLevelClient.indices().delete(request, options);
  76. return response.isAcknowledged();
  77. }
  78. } catch (Exception e) {
  79. log.error("EsIndexOperation deleteIndex error.", e);
  80. }
  81. return Boolean.FALSE;
  82. }
  83. }
3.3.1.2 读取文件的工具类
  1. /**
  2. * 文件操作类
  3. */
  4. @Slf4j
  5. public class FileUtils {
  6. /**
  7. * 读取项目resources文件夹下的文件
  8. *
  9. * @param filePath 文件路径
  10. * @return 文件内容
  11. */
  12. public static String readFileContent(String filePath) {
  13. try {
  14. BufferedReader reader = new BufferedReader(new FileReader(filePath));
  15. String line;
  16. StringBuilder stringBuilder = new StringBuilder();
  17. while ((line = reader.readLine()) != null) {
  18. stringBuilder.append(line);
  19. }
  20. reader.close();
  21. return stringBuilder.toString();
  22. } catch (IOException e) {
  23. log.error("readFileContent error.", e);
  24. }
  25. return null;
  26. }
  27. public static void main(String[] args) {
  28. String filePath = "src/main/resources/es/mappings_test20231216.txt";
  29. String fileContent = readFileContent(filePath);
  30. }
  31. }
3.3.1.3 测试创建es索引

(1)在“resources”文件夹下创建es索引的配置文件

配置文件内容如下所示。

  1. {
  2. "aliases": {
  3. "test": {}
  4. },
  5. "mappings": {
  6. "properties": {
  7. "name": {
  8. "type": "text",
  9. "fields": {
  10. "keyword": {
  11. "type": "keyword",
  12. "ignore_above": 256
  13. }
  14. }
  15. },
  16. "address": {
  17. "type": "text",
  18. "fields": {
  19. "keyword": {
  20. "type": "keyword",
  21. "ignore_above": 256
  22. }
  23. }
  24. }
  25. }
  26. },
  27. "settings": {
  28. "index": {
  29. "number_of_shards": "1",
  30. "number_of_replicas": "1"
  31. }
  32. }
  33. }

(2)读取es索引的配置文件,创建es索引

  1. @Test
  2. public void createIndex() {
  3. String indexName = "test_1216";
  4. String filePath = "src/main/resources/es/mappings_test20231216.txt";
  5. boolean b = esIndexOperation.createIndex(indexName, filePath);
  6. Assert.assertTrue(b);
  7. }

(3)查看创建结果

通过命令 GET /test 查看es索引创建结果,结果如下所示。

  1. {
  2. "test_1216" : {
  3. "aliases" : {
  4. "test" : { }
  5. },
  6. "mappings" : {
  7. "properties" : {
  8. "address" : {
  9. "type" : "text",
  10. "fields" : {
  11. "keyword" : {
  12. "type" : "keyword",
  13. "ignore_above" : 256
  14. }
  15. }
  16. },
  17. "name" : {
  18. "type" : "text",
  19. "fields" : {
  20. "keyword" : {
  21. "type" : "keyword",
  22. "ignore_above" : 256
  23. }
  24. }
  25. }
  26. }
  27. },
  28. "settings" : {
  29. "index" : {
  30. "routing" : {
  31. "allocation" : {
  32. "include" : {
  33. "_tier_preference" : "data_content"
  34. }
  35. }
  36. },
  37. "number_of_shards" : "1",
  38. "provided_name" : "test_1216",
  39. "creation_date" : "1702723364945",
  40. "number_of_replicas" : "1",
  41. "uuid" : "RCAhqjPZSG-n4fse3cot4A",
  42. "version" : {
  43. "created" : "7100099"
  44. }
  45. }
  46. }
  47. }
  48. }

3.3.2 查询操作

3.3.2.1 常用查询
  1. /**
  2. * 查询操作
  3. *
  4. */
  5. @Slf4j
  6. @Service
  7. public class EsQueryOperation {
  8. @Resource
  9. private RestHighLevelClient client;
  10. private final RequestOptions options = RequestOptions.DEFAULT;
  11. /**
  12. * 查询总数
  13. */
  14. public Long count(String indexName) {
  15. CountRequest countRequest = new CountRequest(indexName);
  16. try {
  17. CountResponse countResponse = client.count(countRequest, options);
  18. return countResponse.getCount();
  19. } catch (Exception e) {
  20. log.error("EsQueryOperation count error.", e);
  21. }
  22. return 0L;
  23. }
  24. /**
  25. * 查询数据集
  26. */
  27. public List<Map<String, Object>> list(String indexName, SearchSourceBuilder sourceBuilder) {
  28. SearchRequest searchRequest = new SearchRequest(indexName);
  29. searchRequest.source(sourceBuilder);
  30. try {
  31. SearchResponse searchResp = client.search(searchRequest, options);
  32. List<Map<String, Object>> data = new ArrayList<>();
  33. SearchHit[] searchHitArr = searchResp.getHits().getHits();
  34. for (SearchHit searchHit : searchHitArr) {
  35. Map<String, Object> temp = searchHit.getSourceAsMap();
  36. temp.put("id", searchHit.getId());
  37. data.add(temp);
  38. }
  39. return data;
  40. } catch (Exception e) {
  41. log.error("EsQueryOperation list error.", e);
  42. }
  43. return null;
  44. }
  45. }

3.3.2.2 测试
  1. @Test
  2. public void list() {
  3. String indexName = "test";
  4. // 查询条件
  5. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  6. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  7. queryBuilder.must(QueryBuilders.termQuery("address", "hunan"));
  8. queryBuilder.mustNot(QueryBuilders.matchQuery("name", "Jack"));
  9. sourceBuilder.query(queryBuilder);
  10. // 分页查询
  11. sourceBuilder.from(0);
  12. sourceBuilder.size(1);
  13. List<Map<String, Object>> list = esQueryOperation.list(indexName, sourceBuilder);
  14. Assert.assertTrue(true);
  15. }

3.3.3 增删改操作

3.3.3.1 常用增删改操作
  1. /**
  2. * 增删改数据
  3. *
  4. */
  5. @Slf4j
  6. @Service
  7. public class EsDataOperation {
  8. @Resource
  9. private RestHighLevelClient client;
  10. private final RequestOptions options = RequestOptions.DEFAULT;
  11. /**
  12. * 写入数据
  13. */
  14. public boolean insert(String indexName, Map<String, Object> dataMap) {
  15. try {
  16. BulkRequest request = new BulkRequest();
  17. request.add(new IndexRequest(indexName).opType("create")
  18. .id(dataMap.get("id").toString())
  19. .source(dataMap, XContentType.JSON));
  20. this.client.bulk(request, options);
  21. return Boolean.TRUE;
  22. } catch (Exception e) {
  23. log.error("EsDataOperation insert error.", e);
  24. }
  25. return Boolean.FALSE;
  26. }
  27. /**
  28. * 批量写入数据
  29. */
  30. public boolean batchInsert(String indexName, List<Map<String, Object>> userIndexList) {
  31. try {
  32. BulkRequest request = new BulkRequest();
  33. for (Map<String, Object> dataMap : userIndexList) {
  34. request.add(new IndexRequest(indexName).opType("create")
  35. .id(dataMap.get("id").toString())
  36. .source(dataMap, XContentType.JSON));
  37. }
  38. this.client.bulk(request, options);
  39. return Boolean.TRUE;
  40. } catch (Exception e) {
  41. log.error("EsDataOperation batchInsert error.", e);
  42. }
  43. return Boolean.FALSE;
  44. }
  45. /**
  46. * 根据id更新数据,可以直接修改索引结构
  47. *
  48. * @param refreshPolicy 数据刷新策略
  49. */
  50. public boolean update(String indexName, Map<String, Object> dataMap, WriteRequest.RefreshPolicy refreshPolicy) {
  51. try {
  52. UpdateRequest updateRequest = new UpdateRequest(indexName, dataMap.get("id").toString());
  53. updateRequest.setRefreshPolicy(refreshPolicy);
  54. updateRequest.doc(dataMap);
  55. this.client.update(updateRequest, options);
  56. return Boolean.TRUE;
  57. } catch (Exception e) {
  58. log.error("EsDataOperation update error.", e);
  59. }
  60. return Boolean.FALSE;
  61. }
  62. /**
  63. * 删除数据
  64. */
  65. public boolean delete(String indexName, String id) {
  66. try {
  67. DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
  68. this.client.delete(deleteRequest, options);
  69. return Boolean.TRUE;
  70. } catch (Exception e) {
  71. log.error("EsDataOperation delete error.", e);
  72. }
  73. return Boolean.FALSE;
  74. }
  75. }
3.3.3.2 测试
  1. @Test
  2. public void insert(){
  3. String indexName = "test";
  4. HashMap<String, Object> hashMap = new HashMap<>();
  5. hashMap.put("id",4);
  6. hashMap.put("name","tom");
  7. hashMap.put("address","Jiangsu");
  8. boolean flag = esDataOperation.insert(indexName, hashMap);
  9. Assert.assertTrue(true);
  10. }
  11. @Test
  12. public void update(){
  13. String indexName = "test";
  14. HashMap<String, Object> hashMap = new HashMap<>();
  15. hashMap.put("id", 5);
  16. hashMap.put("name", "jack7");
  17. boolean update = esDataOperation.update(indexName, hashMap, WriteRequest.RefreshPolicy.WAIT_UNTIL);
  18. Assert.assertTrue(true);
  19. }

4 参考文献

(1)elasticsearch学习(七):es客户端RestHighLevelClient_炎升的博客

(2)中间件:ElasticSearch组件RestHighLevelClient用法详解

(3)java api 实现es中的索引管理_createindexrequest

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/245681
推荐阅读
相关标签
  

闽ICP备14008679号