赞
踩
目录
- <properties>
- <java.version>1.8</java.version>
- <elasticsearch.version>7.9.3</elasticsearch.version>
- </properties>
- <!-- elasticsearch相关 -->
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-client-sniffer</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
将对象转换为map
- public static <T> Map<String, Object> beanToMap(T bean) {
- Map<String, Object> map = Maps.newHashMap();
- if (bean != null) {
- BeanMap beanMap = BeanMap.create(bean);
- for (Object key : beanMap.keySet()) {
- map.put(key.toString(), beanMap.get(key));
- }
- }
- return map;
- }
创建map
- try {
- //下面这行是你要插入es的数据,根据自己的数据来
- List<DyingVideoDO> allVideoList = videoMapper.findAllVideoList();
-
- List<Map<String, Object>> mapList = new ArrayList<>();
- for (DyingVideoDO row:allVideoList){
- Map<String, Object> videoMap = BeanUtil.beanToMap(row);
- mapList.add(videoMap);
- }
- //调用插入方法
- syncVideoEsService.createData(mapList);
-
- } catch (Exception e) {
- //可有可无
- log.error("sync video data error :{}", e);
- return 0;
- }
- /**
- * 创建文档
- * @param list list
- */
-
- //索引名称
- private final String index = "t_dying_video";
- private final String alias = "t_dying_video";
- @Autowired
- private ElasticsearchService elasticsearchService;
-
- public void createData(List<Map<String, Object>> list) throws Exception {
- try {
- if (!CollectionUtil.isEmpty(list)) {
- List<String> idList = list.stream().map(it -> String.valueOf(it.get("id"))).collect(Collectors.toList());
-
- List<Map<String,Object>> jointVideoArray = new ArrayList<Map<String,Object>>();
- list.stream().forEach(e -> {
- Map<String, Object> obj = new HashMap<String, Object>() {{
- put("keyid", e.get("id"));
- put("title", e.get("title"));//标题
- put("video_url", e.get("video_url"));
- put("pic_url",e.get("pic_url"));
- put("small_pic_url",e.get("small_pic_url"));
- put("media_content",e.get("media_content"));
- put("update_content",e.get("update_content"));
- put("publish_time",e.get("publish_time"));
- put("label",e.get("label"));
- put("title_desc",e.get("title_desc"));//由update_content清洗
- put("voice_content",e.get("voice_content"));//由media_content清洗
- }};
- jointVideoArray.add(obj);
- });
- //调用下面插入类中的方法
- //elasticsearchService.batchDeleteRequest(alias, idList);
- //elasticsearchService.batchDeleteRequest(jointAlias, idList);
- elasticsearchService.batchInsertRequest(alias, jointVideoArray);
-
- }
- } catch (Exception e) {
- log.error("插入数据到ES异常",e);
- throw e;
- }
- }
- /**
- * 批量新增文档
- */
- public boolean batchInsertRequest(String index, List<Map<String, Object>> list) throws Exception {
- BulkRequest request = new BulkRequest();
- for (int i = 0; i < list.size(); i++) {
- Map<String, Object> item = list.get(i);
- request.add(new IndexRequest(index).id(String.valueOf(item.get("keyid"))).source(item, XContentType.JSON));
- }
- BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
- if (bulk.status().getStatus() == 200) {
- if (!bulk.hasFailures()) {
- return true;
- }
- log.error("批量创建索引{}文档失败", index);
- }
- return false;
- }
- package com.dengtacj.synces.service;
-
- import cn.hutool.core.bean.BeanUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.elasticsearch.action.admin.indices.alias.Alias;
- import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
- import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
- import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
- import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
- import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
- import org.elasticsearch.action.bulk.BulkRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.support.master.AcknowledgedResponse;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.indices.GetIndexRequest;
- import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.xcontent.XContentType;
- import org.elasticsearch.index.reindex.BulkByScrollResponse;
- import org.elasticsearch.index.reindex.DeleteByQueryRequest;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.List;
- import java.util.Map;
-
- /**
- * Elasticsearch服务
- *
- */
- @Slf4j
- @Service
- public class ElasticsearchService {
-
-
- @Autowired
- public RestHighLevelClient restHighLevelClient;
-
- protected static final RequestOptions COMMON_OPTIONS;
-
- static {
- RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
- // 默认缓冲限制为100MB,此处修改为30MB。
- builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
- COMMON_OPTIONS = builder.build();
- }
-
-
- /**
- * 创建索引
- *
- * @param index 索引名称
- * @return 是否成功
- */
- public boolean createIndexRequest(String index) {
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(index)
- .settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0));
-
- try {
- CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, COMMON_OPTIONS);
- log.info("所有节点确认响应 : {}", response.isAcknowledged());
- log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
- log.info("创建索引【{}】成功", index);
- return true;
- } catch (IOException e) {
- log.error("创建索引库【{}】失败", index, e);
- }
- return false;
- }
-
-
- /**
- * 创建索引
- *
- * @param index 索引名称
- * @param mapping 索引结构
- * @return 是否成功
- */
- public boolean createIndexRequest(String index, String mapping) {
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
- try {
- createIndexRequest.settings(Settings.builder()
- // 分片数
- .put("index.number_of_shards", 3)
- // 副本数
- .put("index.number_of_replicas", 0)
- // 默认分词器
- // .put("analysis.analyzer.default.tokenizer", "index_ansj")
- );
- createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
- CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
- log.info("所有节点确认响应 : {}", response.isAcknowledged());
- log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
- log.info("创建索引【{}】成功", index);
- return true;
- } catch (Exception e) {
- log.error("创建索引库【{}】失败", index, e);
- }
- return false;
- }
-
-
- /**
- * 创建索引
- *
- * @param index 索引名称
- * @param settings 索引设置
- * @param mapping 索引结构
- * @return 是否成功
- */
- public boolean createIndexRequest(String index, String settings, String mapping) {
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
- try {
- createIndexRequest.settings(settings, XContentType.JSON);
- createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
- CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
- log.info("所有节点确认响应 : {}", response.isAcknowledged());
- log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
- log.info("创建索引【{}】成功", index);
- return true;
- } catch (Exception e) {
- log.error("创建索引库【{}】失败", index, e);
- }
- return false;
- }
-
-
- /**
- * 创建索引
- *
- * @param index 索引名称
- * @param settings 索引设置
- * @param mapping 索引结构
- * @param alias 索引别名
- * @return 是否成功
- */
- public boolean createIndexRequest(String index, String settings, String mapping, String alias) {
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
- try {
- createIndexRequest.settings(settings, XContentType.JSON);
- createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
- createIndexRequest.alias(new Alias(alias));
- CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
- log.info("所有节点确认响应 : {}", response.isAcknowledged());
- log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
- log.info("创建索引【{}】成功", index);
- return true;
- } catch (Exception e) {
- log.error("创建索引库【{}】失败", index, e);
- }
- return false;
- }
-
-
- /**
- * 删除索引
- *
- * @param index 索引名称
- */
- public boolean deleteIndexRequest(String index) {
- try {
- boolean exists = restHighLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT);
- if (exists) {
- AcknowledgedResponse response = restHighLevelClient.indices().delete(new DeleteIndexRequest(index), COMMON_OPTIONS);
- // 判断是否确认响应
- if (response.isAcknowledged()) {
- return true;
- }
- } else {
- log.info("索引【{}】不存在", index);
- return true;
- }
- } catch (IOException e) {
- log.error("删除索引库【{}】失败", index, e);
- }
- return false;
- }
-
-
- /**
- * 新增文档
- */
- public boolean insertRequest(String index, String id, Object object) {
- IndexRequest indexRequest = new IndexRequest(index).id(id).source(BeanUtil.beanToMap(object), XContentType.JSON);
- try {
- IndexResponse indexResponse = restHighLevelClient.index(indexRequest, COMMON_OPTIONS);
- if (indexResponse.status().getStatus() == 200) {
- log.info("创建索引{}文档成功", index);
- return true;
- }
- } catch (IOException e) {
- log.error("创建索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
- }
- return false;
- }
-
-
- /**
- * 新增文档
- */
- public boolean insertRequest(String index, Map<String, Object> item) {
- IndexRequest indexRequest = new IndexRequest(index).id((String) item.get("id")).source(item, XContentType.JSON);
- try {
- IndexResponse indexResponse = restHighLevelClient.index(indexRequest, COMMON_OPTIONS);
- if (indexResponse.status().getStatus() == 200) {
- log.info("创建索引{}文档成功", index);
- return true;
- }
- } catch (IOException e) {
- log.error("创建索引文档 {" + index + "} 数据 {" + item + "} 失败", e);
- }
- return false;
- }
-
-
- /**
- * 批量新增文档
- */
- public boolean batchInsertRequest(String index, List<Map<String, Object>> list) throws Exception {
- BulkRequest request = new BulkRequest();
- for (int i = 0; i < list.size(); i++) {
- Map<String, Object> item = list.get(i);
- request.add(new IndexRequest(index).id(String.valueOf(item.get("keyid"))).source(item, XContentType.JSON));
- }
- BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
- if (bulk.status().getStatus() == 200) {
- if (!bulk.hasFailures()) {
- return true;
- }
- log.error("批量创建索引{}文档失败", index);
- }
- return false;
- }
-
-
- /**
- * 修改文档
- */
- public void updateRequest(String index, String id, Object object) {
- UpdateRequest updateRequest = new UpdateRequest(index, id);
- updateRequest.doc(BeanUtil.beanToMap(object), XContentType.JSON);
- try {
- restHighLevelClient.update(updateRequest, COMMON_OPTIONS);
- } catch (IOException e) {
- log.error("更新索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
- }
- }
-
-
- /**
- * 批量修改文档
- */
- public boolean batchUpdateRequest(String index, List<Map<String, Object>> list) throws Exception {
- BulkRequest request = new BulkRequest();
- for (int i = 0; i < list.size(); i++) {
- Map<String, Object> item = list.get(i);
- request.add(new UpdateRequest(index, (String) item.get("keyid")).doc(item, XContentType.JSON));
- }
- BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
- if (bulk.status().getStatus() == 200) {
- if (!bulk.hasFailures()) {
- log.info("批量修改索引{}文档成功", index);
- return true;
- }
- log.error("批量修改索引{}文档失败,失败原因:{}", index, bulk.buildFailureMessage());
- }
- return false;
- }
-
-
- /**
- * 删除文档
- */
- public void deleteRequest(String index, String id) {
- DeleteRequest deleteRequest = new DeleteRequest(index, id);
- try {
- restHighLevelClient.delete(deleteRequest, COMMON_OPTIONS);
- } catch (IOException e) {
- log.error("删除索引文档 {" + index + "} 数据id {" + id + "} 失败", e);
- }
- }
-
-
- /**
- * 批量删除文档
- */
- public boolean batchDeleteRequest(String index, List<String> list) throws Exception {
- BulkRequest request = new BulkRequest();
- for (int i = 0; i < list.size(); i++) {
- request.add(new DeleteRequest(index, list.get(i)));
- }
- BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
- if (bulk.status().getStatus() == 200) {
- if (!bulk.hasFailures()) {
- log.info("批量删除索引{}文档成功 count:{}", index,list.size());
- return true;
- }
- log.error("批量删除索引{}文档成功,失败原因:{}", index, bulk.buildFailureMessage());
- }
- return false;
- }
-
-
-
- public boolean cleanIndex(String index){
- try {
- DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index);
- deleteRequest.setRefresh(true);
- BulkByScrollResponse response = restHighLevelClient.deleteByQuery(deleteRequest,RequestOptions.DEFAULT);
- return true;
- }catch (IOException e){
- log.error("clean index :{} error : {}",index,e);
- return false;
- }
- }
-
-
- public boolean refreshIndex(String... index){
- try {
- RefreshRequest refreshRequest = new RefreshRequest();
- refreshRequest.indices(index);
- RefreshResponse response = restHighLevelClient.indices().refresh(refreshRequest,RequestOptions.DEFAULT);
- log.info("refresh {} response status : {}",index,response.getStatus());
- return true;
- }catch (IOException e){
- log.error("refresh index :{} error : {}",index,e);
- return false;
- }
- }
-
-
- }
创建索引的调用方法,mapping和seeting只是读取你的索引配置文件 比如我的文件是:
{ "settings": { "number_of_shards": 5, "number_of_replicas": 1, "index": { "analysis.analyzer.default.type" : "ik_max_word"} } }
public boolean createIndex() { String mapping = null; String settings = null; try { mapping = FileUtil.readString(ResourceUtils.getFile("classpath:index/mapping/t_dying_video.json"), "UTF-8"); settings = FileUtil.readString(ResourceUtils.getFile("classpath:index/settings/t_dying_video.json"), "UTF-8"); if (StrUtil.isNotBlank(mapping) && StrUtil.isNotBlank(settings)) { return elasticsearchService.createIndexRequest(index, settings, mapping, alias); } } catch (Exception e) { log.error("读取索引【{}】的Mapping文件失败", index, e); } return false; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。