赞
踩
pom.xml配置
<dependency>
<!-- ElasticSearch-->
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
<exclusions>
<exclusion>
<artifactId>elasticsearch</artifactId>
<groupId>org.elasticsearch</groupId>
</exclusion>
<exclusion>
<artifactId>elasticsearch-rest-client</artifactId>
<groupId>org.elasticsearch.client</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>7.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>7.8.0</version>
</dependency>
yml配置
elasticsearch:
hosts: ${ES_HOST:ip:端口}
es_user_name: ${ES_USER_NAME:xxxx}
es_password: ${ES_PASSWORD:xxxxx}
EsConfig配置
@Configuration
@Slf4j
public class EsConfig {
@Value("${es_user_name}")
private String username;
@Value("${es_password}")
private String password;
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder=RequestOptions.DEFAULT.toBuilder();
COMMON_OPTIONS=builder.build();
}
@Bean
RestHighLevelClient restHighLevelClient(@Value("${spring.elasticsearch.hosts}") String hosts) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
String[] hostsWithPort = hosts.split(",");
HttpHost[] httpHosts = new HttpHost[hostsWithPort.length];
for (int i = 0; i < hostsWithPort.length; i++) {
String hostWithPort = hostsWithPort[i];
String[] hostPort = hostWithPort.split(":");
String host = hostPort[0];
String port = hostPort[1];
httpHosts[i] = new HttpHost(host, Integer.parseInt(port));
}
return new RestHighLevelClient(RestClient.builder(httpHosts).setHttpClientConfigCallback(f->f.setDefaultCredentialsProvider(credentialsProvider)));
}
}
ElasticSearchConfig 配置
@Configuration
public class ElasticSearchConfig {
/**
* 防止netty的bug
* java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]
*/
@PostConstruct
void init() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
}
}
启动类配置
public static void main(String[] args) {
System.setProperty("es.set.netty.runtime.available.processors", "false");
SpringApplication.run(xxxx.class, args);
}
//批量操作的对象
private BulkProcessor bulkProcessor=createBulkProcessor();
@Autowired
RestHighLevelClient restHighLevelClient;
private BulkProcessor createBulkProcessor() {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (!response.hasFailures()) {
log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
} else {
BulkItemResponse[] items = response.getItems();
for (BulkItemResponse item : items) {
if (item.isFailed()) {
log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
break;
}
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
List<DocWriteRequest<?>> requests = request.requests();
List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
}), listener);
//到达10000条时刷新
builder.setBulkActions(10000);
//内存到达8M时刷新
builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
//设置的刷新间隔10s
builder.setFlushInterval(TimeValue.timeValueSeconds(10));
//设置允许执行的并发请求数。
builder.setConcurrentRequests(8);
//设置重试策略
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
return builder.build();
}
public void bulkUpdate(EsUpdateBO esUpdateBO){
List<UpdateRequest> updateRequests=new ArrayList<>();
esUpdateBO.getIds().forEach(e->{
//获取id
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("es索引名称");
//更新的id
updateRequest.id(e);
//更新的数据
Map<String,Object> map=new HashMap<>();
map.put(esUpdateBO.getIsWarning(),"1");
updateRequest.doc(map);
updateRequests.add(updateRequest);
});
updateRequests.forEach(bulkProcessor::add);
}
批量新增
public void bulkAdd(List<Map<String, Object>> result) {
List<IndexRequest> indexRequests = new ArrayList<>();
result.forEach(e -> {
IndexRequest request = new IndexRequest("es索引名称");
request.source(JSON.toJSONString(e), XContentType.JSON);
request.opType(DocWriteRequest.OpType.CREATE);
indexRequests.add(request);
});
indexRequests.forEach(bulkProcessor::add);
}
根据es内部id批量查询数据
public List<Map<String, Object>> getWaringEventList(List<String> ids) throws IOException {
String[] idsStr = ids.toArray(new String[ids.size()]);
List<Map<String, Object>> details=new ArrayList<>();
SearchRequest searchRequest = new SearchRequest("es索引名称");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
IdsQueryBuilder idsQueryBuilder = QueryBuilders.idsQuery().addIds(idsStr);
boolQueryBuilder.must(idsQueryBuilder);
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.size(10000);
//将所有的条件进行整合
searchRequest.source(searchSourceBuilder);
//根据restHighLevelClient进行查询
SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits searchHits = search.getHits();
SearchHit[] hits = searchHits.getHits();
for (SearchHit inhit : hits) {
Map<String, Object> thirdSourceAsMap = inhit.getSourceAsMap();
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。