当前位置:   article > 正文

JAVA elasticsearch批量插入_elasticsearch 插入

elasticsearch 插入

耐心看,多看代码中写的注释,keyid是文档中的_id

目录

耐心看,多看代码中写的注释,keyid是文档中的_id

1.导入包

2.插入格式

3.插入类

--------附录(新增,删除,更新,插入等)


1.导入包

  1. <properties>
  2. <java.version>1.8</java.version>
  3. <elasticsearch.version>7.9.3</elasticsearch.version>
  4. </properties>
  5. <!-- elasticsearch相关 -->
  6. <dependency>
  7. <groupId>org.elasticsearch.client</groupId>
  8. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  9. <version>${elasticsearch.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.elasticsearch</groupId>
  13. <artifactId>elasticsearch</artifactId>
  14. <version>${elasticsearch.version}</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.elasticsearch.client</groupId>
  18. <artifactId>elasticsearch-rest-client-sniffer</artifactId>
  19. <version>${elasticsearch.version}</version>
  20. </dependency>

2.插入格式

将对象转换为map

  1. public static <T> Map<String, Object> beanToMap(T bean) {
  2. Map<String, Object> map = Maps.newHashMap();
  3. if (bean != null) {
  4. BeanMap beanMap = BeanMap.create(bean);
  5. for (Object key : beanMap.keySet()) {
  6. map.put(key.toString(), beanMap.get(key));
  7. }
  8. }
  9. return map;
  10. }

创建map

  1. try {
  2. //下面这行是你要插入es的数据,根据自己的数据来
  3. List<DyingVideoDO> allVideoList = videoMapper.findAllVideoList();
  4. List<Map<String, Object>> mapList = new ArrayList<>();
  5. for (DyingVideoDO row:allVideoList){
  6. Map<String, Object> videoMap = BeanUtil.beanToMap(row);
  7. mapList.add(videoMap);
  8. }
  9. //调用插入方法
  10. syncVideoEsService.createData(mapList);
  11. } catch (Exception e) {
  12. //可有可无
  13. log.error("sync video data error :{}", e);
  14. return 0;
  15. }
  1. /**
  2. * 创建文档
  3. * @param list list
  4. */
  5. //索引名称
  6. private final String index = "t_dying_video";
  7. private final String alias = "t_dying_video";
  8. @Autowired
  9. private ElasticsearchService elasticsearchService;
  10. public void createData(List<Map<String, Object>> list) throws Exception {
  11. try {
  12. if (!CollectionUtil.isEmpty(list)) {
  13. List<String> idList = list.stream().map(it -> String.valueOf(it.get("id"))).collect(Collectors.toList());
  14. List<Map<String,Object>> jointVideoArray = new ArrayList<Map<String,Object>>();
  15. list.stream().forEach(e -> {
  16. Map<String, Object> obj = new HashMap<String, Object>() {{
  17. put("keyid", e.get("id"));
  18. put("title", e.get("title"));//标题
  19. put("video_url", e.get("video_url"));
  20. put("pic_url",e.get("pic_url"));
  21. put("small_pic_url",e.get("small_pic_url"));
  22. put("media_content",e.get("media_content"));
  23. put("update_content",e.get("update_content"));
  24. put("publish_time",e.get("publish_time"));
  25. put("label",e.get("label"));
  26. put("title_desc",e.get("title_desc"));//由update_content清洗
  27. put("voice_content",e.get("voice_content"));//由media_content清洗
  28. }};
  29. jointVideoArray.add(obj);
  30. });
  31. //调用下面插入类中的方法
  32. //elasticsearchService.batchDeleteRequest(alias, idList);
  33. //elasticsearchService.batchDeleteRequest(jointAlias, idList);
  34. elasticsearchService.batchInsertRequest(alias, jointVideoArray);
  35. }
  36. } catch (Exception e) {
  37. log.error("插入数据到ES异常",e);
  38. throw e;
  39. }
  40. }

3.插入类

  1. /**
  2. * 批量新增文档
  3. */
  4. public boolean batchInsertRequest(String index, List<Map<String, Object>> list) throws Exception {
  5. BulkRequest request = new BulkRequest();
  6. for (int i = 0; i < list.size(); i++) {
  7. Map<String, Object> item = list.get(i);
  8. request.add(new IndexRequest(index).id(String.valueOf(item.get("keyid"))).source(item, XContentType.JSON));
  9. }
  10. BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
  11. if (bulk.status().getStatus() == 200) {
  12. if (!bulk.hasFailures()) {
  13. return true;
  14. }
  15. log.error("批量创建索引{}文档失败", index);
  16. }
  17. return false;
  18. }

--------附录(新增,删除,更新,插入等)

  1. package com.dengtacj.synces.service;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.elasticsearch.action.admin.indices.alias.Alias;
  5. import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
  6. import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
  7. import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
  8. import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
  9. import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
  10. import org.elasticsearch.action.bulk.BulkRequest;
  11. import org.elasticsearch.action.bulk.BulkResponse;
  12. import org.elasticsearch.action.delete.DeleteRequest;
  13. import org.elasticsearch.action.index.IndexRequest;
  14. import org.elasticsearch.action.index.IndexResponse;
  15. import org.elasticsearch.action.search.SearchRequest;
  16. import org.elasticsearch.action.support.master.AcknowledgedResponse;
  17. import org.elasticsearch.action.update.UpdateRequest;
  18. import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
  19. import org.elasticsearch.client.RequestOptions;
  20. import org.elasticsearch.client.RestHighLevelClient;
  21. import org.elasticsearch.client.indices.GetIndexRequest;
  22. import org.elasticsearch.common.settings.Settings;
  23. import org.elasticsearch.common.xcontent.XContentType;
  24. import org.elasticsearch.index.reindex.BulkByScrollResponse;
  25. import org.elasticsearch.index.reindex.DeleteByQueryRequest;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.stereotype.Service;
  28. import java.io.IOException;
  29. import java.util.List;
  30. import java.util.Map;
  31. /**
  32. * Elasticsearch服务
  33. *
  34. */
  35. @Slf4j
  36. @Service
  37. public class ElasticsearchService {
  38. @Autowired
  39. public RestHighLevelClient restHighLevelClient;
  40. protected static final RequestOptions COMMON_OPTIONS;
  41. static {
  42. RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
  43. // 默认缓冲限制为100MB,此处修改为30MB。
  44. builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
  45. COMMON_OPTIONS = builder.build();
  46. }
  47. /**
  48. * 创建索引
  49. *
  50. * @param index 索引名称
  51. * @return 是否成功
  52. */
  53. public boolean createIndexRequest(String index) {
  54. CreateIndexRequest createIndexRequest = new CreateIndexRequest(index)
  55. .settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0));
  56. try {
  57. CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, COMMON_OPTIONS);
  58. log.info("所有节点确认响应 : {}", response.isAcknowledged());
  59. log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
  60. log.info("创建索引【{}】成功", index);
  61. return true;
  62. } catch (IOException e) {
  63. log.error("创建索引库【{}】失败", index, e);
  64. }
  65. return false;
  66. }
  67. /**
  68. * 创建索引
  69. *
  70. * @param index 索引名称
  71. * @param mapping 索引结构
  72. * @return 是否成功
  73. */
  74. public boolean createIndexRequest(String index, String mapping) {
  75. CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
  76. try {
  77. createIndexRequest.settings(Settings.builder()
  78. // 分片数
  79. .put("index.number_of_shards", 3)
  80. // 副本数
  81. .put("index.number_of_replicas", 0)
  82. // 默认分词器
  83. // .put("analysis.analyzer.default.tokenizer", "index_ansj")
  84. );
  85. createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
  86. CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
  87. log.info("所有节点确认响应 : {}", response.isAcknowledged());
  88. log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
  89. log.info("创建索引【{}】成功", index);
  90. return true;
  91. } catch (Exception e) {
  92. log.error("创建索引库【{}】失败", index, e);
  93. }
  94. return false;
  95. }
  96. /**
  97. * 创建索引
  98. *
  99. * @param index 索引名称
  100. * @param settings 索引设置
  101. * @param mapping 索引结构
  102. * @return 是否成功
  103. */
  104. public boolean createIndexRequest(String index, String settings, String mapping) {
  105. CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
  106. try {
  107. createIndexRequest.settings(settings, XContentType.JSON);
  108. createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
  109. CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
  110. log.info("所有节点确认响应 : {}", response.isAcknowledged());
  111. log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
  112. log.info("创建索引【{}】成功", index);
  113. return true;
  114. } catch (Exception e) {
  115. log.error("创建索引库【{}】失败", index, e);
  116. }
  117. return false;
  118. }
  119. /**
  120. * 创建索引
  121. *
  122. * @param index 索引名称
  123. * @param settings 索引设置
  124. * @param mapping 索引结构
  125. * @param alias 索引别名
  126. * @return 是否成功
  127. */
  128. public boolean createIndexRequest(String index, String settings, String mapping, String alias) {
  129. CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
  130. try {
  131. createIndexRequest.settings(settings, XContentType.JSON);
  132. createIndexRequest.mapping("_doc", mapping, XContentType.JSON);
  133. createIndexRequest.alias(new Alias(alias));
  134. CreateIndexResponse response = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
  135. log.info("所有节点确认响应 : {}", response.isAcknowledged());
  136. log.info("所有分片的复制未超时 :{}", response.isShardsAcknowledged());
  137. log.info("创建索引【{}】成功", index);
  138. return true;
  139. } catch (Exception e) {
  140. log.error("创建索引库【{}】失败", index, e);
  141. }
  142. return false;
  143. }
  144. /**
  145. * 删除索引
  146. *
  147. * @param index 索引名称
  148. */
  149. public boolean deleteIndexRequest(String index) {
  150. try {
  151. boolean exists = restHighLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT);
  152. if (exists) {
  153. AcknowledgedResponse response = restHighLevelClient.indices().delete(new DeleteIndexRequest(index), COMMON_OPTIONS);
  154. // 判断是否确认响应
  155. if (response.isAcknowledged()) {
  156. return true;
  157. }
  158. } else {
  159. log.info("索引【{}】不存在", index);
  160. return true;
  161. }
  162. } catch (IOException e) {
  163. log.error("删除索引库【{}】失败", index, e);
  164. }
  165. return false;
  166. }
  167. /**
  168. * 新增文档
  169. */
  170. public boolean insertRequest(String index, String id, Object object) {
  171. IndexRequest indexRequest = new IndexRequest(index).id(id).source(BeanUtil.beanToMap(object), XContentType.JSON);
  172. try {
  173. IndexResponse indexResponse = restHighLevelClient.index(indexRequest, COMMON_OPTIONS);
  174. if (indexResponse.status().getStatus() == 200) {
  175. log.info("创建索引{}文档成功", index);
  176. return true;
  177. }
  178. } catch (IOException e) {
  179. log.error("创建索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
  180. }
  181. return false;
  182. }
  183. /**
  184. * 新增文档
  185. */
  186. public boolean insertRequest(String index, Map<String, Object> item) {
  187. IndexRequest indexRequest = new IndexRequest(index).id((String) item.get("id")).source(item, XContentType.JSON);
  188. try {
  189. IndexResponse indexResponse = restHighLevelClient.index(indexRequest, COMMON_OPTIONS);
  190. if (indexResponse.status().getStatus() == 200) {
  191. log.info("创建索引{}文档成功", index);
  192. return true;
  193. }
  194. } catch (IOException e) {
  195. log.error("创建索引文档 {" + index + "} 数据 {" + item + "} 失败", e);
  196. }
  197. return false;
  198. }
  199. /**
  200. * 批量新增文档
  201. */
  202. public boolean batchInsertRequest(String index, List<Map<String, Object>> list) throws Exception {
  203. BulkRequest request = new BulkRequest();
  204. for (int i = 0; i < list.size(); i++) {
  205. Map<String, Object> item = list.get(i);
  206. request.add(new IndexRequest(index).id(String.valueOf(item.get("keyid"))).source(item, XContentType.JSON));
  207. }
  208. BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
  209. if (bulk.status().getStatus() == 200) {
  210. if (!bulk.hasFailures()) {
  211. return true;
  212. }
  213. log.error("批量创建索引{}文档失败", index);
  214. }
  215. return false;
  216. }
  217. /**
  218. * 修改文档
  219. */
  220. public void updateRequest(String index, String id, Object object) {
  221. UpdateRequest updateRequest = new UpdateRequest(index, id);
  222. updateRequest.doc(BeanUtil.beanToMap(object), XContentType.JSON);
  223. try {
  224. restHighLevelClient.update(updateRequest, COMMON_OPTIONS);
  225. } catch (IOException e) {
  226. log.error("更新索引文档 {" + index + "} 数据 {" + object + "} 失败", e);
  227. }
  228. }
  229. /**
  230. * 批量修改文档
  231. */
  232. public boolean batchUpdateRequest(String index, List<Map<String, Object>> list) throws Exception {
  233. BulkRequest request = new BulkRequest();
  234. for (int i = 0; i < list.size(); i++) {
  235. Map<String, Object> item = list.get(i);
  236. request.add(new UpdateRequest(index, (String) item.get("keyid")).doc(item, XContentType.JSON));
  237. }
  238. BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
  239. if (bulk.status().getStatus() == 200) {
  240. if (!bulk.hasFailures()) {
  241. log.info("批量修改索引{}文档成功", index);
  242. return true;
  243. }
  244. log.error("批量修改索引{}文档失败,失败原因:{}", index, bulk.buildFailureMessage());
  245. }
  246. return false;
  247. }
  248. /**
  249. * 删除文档
  250. */
  251. public void deleteRequest(String index, String id) {
  252. DeleteRequest deleteRequest = new DeleteRequest(index, id);
  253. try {
  254. restHighLevelClient.delete(deleteRequest, COMMON_OPTIONS);
  255. } catch (IOException e) {
  256. log.error("删除索引文档 {" + index + "} 数据id {" + id + "} 失败", e);
  257. }
  258. }
  259. /**
  260. * 批量删除文档
  261. */
  262. public boolean batchDeleteRequest(String index, List<String> list) throws Exception {
  263. BulkRequest request = new BulkRequest();
  264. for (int i = 0; i < list.size(); i++) {
  265. request.add(new DeleteRequest(index, list.get(i)));
  266. }
  267. BulkResponse bulk = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
  268. if (bulk.status().getStatus() == 200) {
  269. if (!bulk.hasFailures()) {
  270. log.info("批量删除索引{}文档成功 count:{}", index,list.size());
  271. return true;
  272. }
  273. log.error("批量删除索引{}文档成功,失败原因:{}", index, bulk.buildFailureMessage());
  274. }
  275. return false;
  276. }
  277. public boolean cleanIndex(String index){
  278. try {
  279. DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(index);
  280. deleteRequest.setRefresh(true);
  281. BulkByScrollResponse response = restHighLevelClient.deleteByQuery(deleteRequest,RequestOptions.DEFAULT);
  282. return true;
  283. }catch (IOException e){
  284. log.error("clean index :{} error : {}",index,e);
  285. return false;
  286. }
  287. }
  288. public boolean refreshIndex(String... index){
  289. try {
  290. RefreshRequest refreshRequest = new RefreshRequest();
  291. refreshRequest.indices(index);
  292. RefreshResponse response = restHighLevelClient.indices().refresh(refreshRequest,RequestOptions.DEFAULT);
  293. log.info("refresh {} response status : {}",index,response.getStatus());
  294. return true;
  295. }catch (IOException e){
  296. log.error("refresh index :{} error : {}",index,e);
  297. return false;
  298. }
  299. }
  300. }
创建索引的调用方法,mapping和seeting只是读取你的索引配置文件
比如我的文件是:
 
{ "settings": { "number_of_shards": 5, "number_of_replicas": 1, "index": { "analysis.analyzer.default.type" : "ik_max_word"} } }
  1. public boolean createIndex() {
  2. String mapping = null;
  3. String settings = null;
  4. try {
  5. mapping = FileUtil.readString(ResourceUtils.getFile("classpath:index/mapping/t_dying_video.json"), "UTF-8");
  6. settings = FileUtil.readString(ResourceUtils.getFile("classpath:index/settings/t_dying_video.json"), "UTF-8");
  7. if (StrUtil.isNotBlank(mapping) && StrUtil.isNotBlank(settings)) {
  8. return elasticsearchService.createIndexRequest(index, settings, mapping, alias);
  9. }
  10. } catch (Exception e) {
  11. log.error("读取索引【{}】的Mapping文件失败", index, e);
  12. }
  13. return false;
  14. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/68075
推荐阅读
相关标签
  

闽ICP备14008679号