赞
踩
上一篇博客提到,REST Client的方式整合的项目实现复杂的查询比较麻烦,实现的方法是需要我们自己根据ES的语法写出符合语法的bean来,然后把这个bean发送http请求给服务端,这种方法也是我以前参与的一个项目的做法。但是ES本身是提供了java操作的api的,我们直接调用具体的类就能实现而不需要自己创建bean了。
上一篇博客只是实现了ES单节点根据id来实现简单的增删改查,上一篇博客的地址:SpringBoot整合ES(REST Client方式)并实现简单的增删改查
接下来的是ES多节点并实现复杂的通过条件查询并分页
首先还是依赖
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.5.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.5.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.5.1</version> <scope>compile</scope> </dependency>
建议选用ES7以上的版本,因为ES7版本相对于老版本有较大改动,比如不支持索引类型,所有的索引类型都默认为_doc类型,还有不支持TransportClient等等。7.5.1是我从docker里面搜索到的最新的官方版本。
PS:客户端和服务端的版本需要一样,不然接下来的查询会报语法错误(如果是自己写的bean则能实现向下兼容,这个不行)
YML文件
server:
port: 8080
swagger:
enable: true
title: ElasticSearch-demo
description:
serviceUrl: http://localhost:8080/
version: 1.0.0
controllers: com.example.es.controller
elasticsearch:
user:
password:
host: 192.168.145.128:9200,192.168.145.129:9200,192.168.145.130:9200
这里的swagger是用来进行测试用的,然后集群ip配置以","符号区分
代码如下
@Slf4j @Configuration public class ESRestClient { private static final int ADDRESS_LENGTH = 2; private static final String HTTP_SCHEME = "http"; @Value("${elasticsearch.host}") String ipAddress; @Value("${elasticsearch.user}") private String userName; @Value("${elasticsearch.password}") private String password; @Bean public RestClientBuilder restClientBuilder() { String[] split = ipAddress.split(","); HttpHost[] hosts = Arrays.stream(split) .map(this::makeHttpHost) .filter(Objects::nonNull) .toArray(HttpHost[]::new); return RestClient.builder(hosts); } @Bean(name = "highLevelClient") public RestHighLevelClient highLevelClient(@Autowired RestClientBuilder restClientBuilder){ //配置身份验证 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); restClientBuilder.setHttpClientConfigCallback( httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); //设置连接超时和套接字超时 restClientBuilder.setRequestConfigCallback( requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(10000).setConnectTimeout(60000)); //配置HTTP异步请求ES的线程数 restClientBuilder.setHttpClientConfigCallback( httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultIOReactorConfig( IOReactorConfig.custom().setIoThreadCount(1).build())); //设置监听器,每次节点失败都可以监听到,可以作额外处理 restClientBuilder.setFailureListener(new RestClient.FailureListener() { @Override public void onFailure(Node node) { super.onFailure(node); log.error(node.getHost() + "--->该节点失败了"); } }); return new RestHighLevelClient(restClientBuilder); } private HttpHost makeHttpHost(String str) { assert StringUtils.isNotEmpty(str); String[] address = str.split(":"); if (address.length == ADDRESS_LENGTH) { String ip = address[0]; int port = Integer.parseInt(address[1]); log.info("ES连接ip和port:{},{}", ip, port); return new HttpHost(ip, port, HTTP_SCHEME); } else { log.error("传入的ip参数不正确!"); return null; } } }
自此,操作环境搭建完成
引入两个bean
@Data
public class EsEntity {
private String id;
private Object data;
}
@Data
public class Person {
private String id;
private String name;
private Integer age;
private String country;
private String addr;
private String data;
private String birthday;
}
PS:@Data注解是lombok插件的,能够自动实现getter,setter,toString等bean类所有的方法
Controller类,没有任何业务逻辑
@RestController @RequestMapping("/es") public class ESRestController { @Autowired private ESRestService service; @PostMapping("/add") @ApiOperation("增加数据") public String add() { return service.add(); } @PostMapping("/update") @ApiOperation("修改数据") public String update() { return service.update(); } @PostMapping("/insertBatch") @ApiOperation("批量增加数据") public String insertBatch() { return service.insertBatch(); } @PostMapping("/deleteByQuery") @ApiOperation("根据条件删除") public void delete() { service.delete(); } @PostMapping("/deleteById") @ApiOperation("根据id删除") public String deleteById() { return service.deleteById(); } @PostMapping("/searchData") @ApiOperation("根据条件查询") public List searchData() { return service.searchData(); } }
Service层
@Service @Slf4j public class ESRestService { /** * 往索引添加数据 * @return */ public String add() { String Id = null; String index = "person"; String id = "10001"; try { XContentBuilder builder = XContentFactory.jsonBuilder() .startObject() .field("id", "10001") .field("name", "张三") .field("age", "28") .field("country", "中国") .field("addr", "广东深圳") .field("data", "2020-01-15 20:47:20") .field("birthday", "1992-01-01") .endObject(); Id = ESUtil.addData(builder, index, id); } catch (IOException e) { log.error("索引:{},id:{},添加数据失败", index, id); } return Id; } /** * 更新指定id的文档数据 * @return */ public String update() { String Id = null; String index = "person"; String id = "10001"; try { XContentBuilder builder = XContentFactory.jsonBuilder() .startObject() .field("id", "10001") .field("name", "李四") .field("age", "30") .field("country", "中国") .field("addr", "广东深圳") .field("data", "2020-01-15 20:47:20") .field("birthday", "1990-01-01") .endObject(); Id = ESUtil.updateData(builder, index, id); } catch (IOException e) { log.error("索引:{},id:{},添加数据失败", index, id); } return Id; } /** * 批量插入数据 * @return */ public String insertBatch() { String index = "person"; List<EsEntity> entityList = new ArrayList<>(); for (int i = 0; i < 1000; i++) { Person person = new Person(); String s = Integer.toString(i + 1); EsEntity esEntity = new EsEntity(); person.setId(s); person.setName("张三" + s); person.setAge(30); person.setAddr("广东省深圳市" + s); person.setCountry("中国"); person.setBirthday("1990-01-01"); person.setData("2020-01-16 12:00:00"); esEntity.setData(person); esEntity.setId(s); entityList.add(esEntity); } return ESUtil.insertBatch(index, entityList); } /** * 根据条件删除 */ public void delete() { String index = "person"; QueryBuilder queryBuilder = QueryBuilders.matchQuery("age", "30"); ESUtil.deleteByQuery(index, queryBuilder); } /** * 根据id删除文档 * @return */ public String deleteById() { String s; String index = "person"; String id = "1001"; s = ESUtil.deleteById(index, id); return s; } /** * 根据条件查询 * @return */ public List searchData() { List<Map<String, Object>> list = new ArrayList<>(); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); String[] fields = {"name", "addr", "birthday", "id"}; //需要返回和不返回的字段,可以是数组也可以是字符串 sourceBuilder.fetchSource(fields, null); //设置根据哪个字段进行排序查询 sourceBuilder.sort(new FieldSortBuilder("birthday").order(SortOrder.DESC)); BoolQueryBuilder builder = QueryBuilders.boolQuery(); //添加查询条件 builder.must(QueryBuilders.matchQuery("country", "中国")); list = ESUtil.SearchDataPage("person", 1, 10, sourceBuilder, builder); return list; }
Util类,与ES客户端相关的操作都在这儿
@Component @Slf4j public class ESUtil { @Qualifier("highLevelClient") @Autowired private RestHighLevelClient rhlClient; private static RestHighLevelClient client; /** * spring容器初始化的时候执行该方法 */ @PostConstruct public void init() { client = this.rhlClient; } /** * 添加数据 * * @param content 数据内容 * @param index 索引 * @param id id */ public static String addData(XContentBuilder content, String index, String id) { String Id = null; try { IndexRequest request = new IndexRequest(index).id(id).source(content); IndexResponse response = client.index(request, RequestOptions.DEFAULT); Id = response.getId(); log.info("索引:{},数据添加,返回码:{},id:{}", index, response.status().getStatus(), Id); } catch (IOException e) { log.error("添加数据失败,index:{},id:{}", index, id); } return Id; } /** * 修改数据 * * @param content 修改内容 * @param index 索引 * @param id id */ public static String updateData(XContentBuilder content, String index, String id) { String Id = null; try { UpdateRequest request = new UpdateRequest(index, id).doc(content); UpdateResponse response = client.update(request, RequestOptions.DEFAULT); Id = response.getId(); log.info("数据更新,返回码:{},id:{}", response.status().getStatus(), Id); } catch (IOException e) { log.error("数据更新失败,index:{},id:{}", index, id); } return Id; } /** * 批量插入数据 * * @param index 索引 * @param list 批量增加的数据 */ public static String insertBatch(String index, List<EsEntity> list) { String state = null; BulkRequest request = new BulkRequest(); list.forEach(item -> request.add(new IndexRequest(index) .id(item.getId()).source(JSON.toJSONString(item.getData()), XContentType.JSON))); try { BulkResponse bulk = client.bulk(request, RequestOptions.DEFAULT); int status = bulk.status().getStatus(); state = Integer.toString(status); log.info("索引:{},批量插入{}条数据成功!", index, list.size()); } catch (IOException e) { log.error("索引:{},批量插入数据失败", index); } return state; } /** * 根据条件删除数据 * * @param index 索引 * @param builder 删除条件 */ public static void deleteByQuery(String index, QueryBuilder builder) { DeleteByQueryRequest request = new DeleteByQueryRequest(index); request.setQuery(builder); //设置此次删除的最大条数 request.setBatchSize(1000); try { client.deleteByQuery(request, RequestOptions.DEFAULT); } catch (IOException e) { log.error("根据条件删除数据失败,index:{}", index); } } /** * 根据id删除数据 * * @param index 索引 * @param id id */ public static String deleteById(String index, String id) { String state = null; DeleteRequest request = new DeleteRequest(index, id); try { DeleteResponse response = client.delete(request, RequestOptions.DEFAULT); int status = response.status().getStatus(); state = Integer.toString(status); log.info("索引:{},根据id{}删除数据:{}", index, id, JSON.toJSONString(response)); } catch (IOException e) { log.error("根据id删除数据失败,index:{},id:{}", index, id); } return state; } /** * 根据条件查询数据 * * @param index 索引 * @param startPage 开始页 * @param pageSize 每页条数 * @param sourceBuilder 查询返回条件 * @param queryBuilder 查询条件 */ public static List<Map<String, Object>> SearchDataPage(String index, int startPage, int pageSize, SearchSourceBuilder sourceBuilder, QueryBuilder queryBuilder) { SearchRequest request = new SearchRequest(index); //设置超时时间 sourceBuilder.timeout(new TimeValue(120, TimeUnit.SECONDS)); //设置是否按匹配度排序 sourceBuilder.explain(true); //加载查询条件 sourceBuilder.query(queryBuilder); //设置分页 sourceBuilder.from((startPage - 1) * pageSize).size(pageSize); log.info("查询返回条件:" + sourceBuilder.toString()); request.source(sourceBuilder); try { SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT); long totalHits = searchResponse.getHits().getTotalHits().value; log.info("共查出{}条记录", totalHits); RestStatus status = searchResponse.status(); if (status.getStatus() == 200) { List<Map<String, Object>> sourceList = new ArrayList<>(); for (SearchHit searchHit : searchResponse.getHits().getHits()) { Map<String, Object> sourceAsMap = searchHit.getSourceAsMap(); sourceList.add(sourceAsMap); } return sourceList; } } catch (IOException e) { log.error("条件查询索引{}时出错", index); } return null; } }
PS:如果有时间,再把不用Low-Level实现复杂查询的写下(也就是需要自己写与ES交互的bean)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。