当前位置:   article > 正文

Java集成ElasticSearch_java集成es

java集成es

Java集成ElasticSearch,包含:ES客户端依赖的引入,创建客户端,ES索引管理,ES管道管理,ES新增、修改、删除、查询数据。

1、引入依赖

<dependency>
	<groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
   	<version>7.16.3</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2、创建客户端

2.1 工具类

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;

import net.sinodata.smart.xmgl.common.PropertiesUtil;

import org.elasticsearch.client.RestHighLevelClient;

public class RestHighLevelClientFactory{

	//TODO 这里的client需要重构 暂时这样用
    private static final String HOST = PropertiesUtil.getString("esHosts");
    private static final int PORT = PropertiesUtil.getInteger("esPorts");
    private static final String SCHEMA = PropertiesUtil.getString("esSchema");//"http";
    private static final int CONNECT_TIME_OUT = 1000;
    private static final int SOCKET_TIME_OUT = 30000;
    private static final int CONNECTION_REQUEST_TIME_OUT = 500;

    private static final int MAX_CONNECT_NUM = 100;
    private static final int MAX_CONNECT_PER_ROUTE = 100;

    private static HttpHost HTTP_HOST = new HttpHost(HOST,PORT,SCHEMA);
    private static boolean uniqueConnectTimeConfig = false;
    private static boolean uniqueConnectNumConfig = false;
    private static RestClientBuilder builder;
    private static RestClient restClient;
    private static RestHighLevelClient restHighLevelClient;

    static {
    	
        init();
    }

    public static void init(){
        // 可以初始化多个HttpHost
    	HTTP_HOST = new HttpHost(HOST,PORT,SCHEMA);
        builder = RestClient.builder(HTTP_HOST);
        if(uniqueConnectTimeConfig){
            setConnectTimeOutConfig();
            uniqueConnectTimeConfig = true;
        }
        if(uniqueConnectNumConfig){
            setMultiConnectConfig();
            uniqueConnectNumConfig = true;
        }
        restClient = builder.build();
        restHighLevelClient = new RestHighLevelClient(builder);
    }

    // 主要关于异步httpclient的连接延时配置
    public static void setConnectTimeOutConfig(){
        // requestConfigBuilder
        builder.setRequestConfigCallback(new RequestConfigCallback() {
			@Override
			public Builder customizeRequestConfig(Builder requestConfigBuilder) {
			    requestConfigBuilder.setConnectTimeout(CONNECT_TIME_OUT);
			    requestConfigBuilder.setSocketTimeout(SOCKET_TIME_OUT);
			    requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIME_OUT);
			    return requestConfigBuilder;
			}
		});
    }

    /**
     *    主要关于异步httpclient的连接数配置
     */
    public static void setMultiConnectConfig(){
       // setHttpClientConfigCallback
        builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
			@Override
			public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
			    httpClientBuilder.setMaxConnTotal(MAX_CONNECT_NUM);
			    httpClientBuilder.setMaxConnPerRoute(MAX_CONNECT_PER_ROUTE);
			    return httpClientBuilder;
			}
		});
    }

    public static RestClient getClient(){
        return restClient;
    }

    public static RestHighLevelClient getHighLevelClient(){
        if(restHighLevelClient != null){
            init();
        }
        return restHighLevelClient;
    }

    public static void close() {
        if (restHighLevelClient != null) {
            try {
                restHighLevelClient.close();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                uniqueConnectNumConfig = false;
                uniqueConnectTimeConfig = false;
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105

2.2 创建客户端使用:

RestHighLevelClient restHighLevelClient = RestHighLevelClientFactory.getHighLevelClient();
  • 1

3、ES索引

3.1 索引setting
自定义分析器:my_ngram_analyzer,允许数字字母模糊搜索,且忽略字母大小写问题。
JSON在线检验格式网址:https://www.bejson.com/explore/index_new/

{
    "number_of_shards": "10",
    "number_of_replicas": "1",
    "analysis": {
        "analyzer": {
            "my_ngram_analyzer": {
                "tokenizer": "my_ngram_tokenizer",
                "filter": [
                    "lowercase"
                ]
            }
        },
        "tokenizer": {
            "my_ngram_tokenizer": {
                "token_chars": [
                    "letter",
                    "digit",
                    "symbol",
                    "punctuation"
                ],
                "min_gram": "1",
                "type": "ngram",
                "max_gram": "1"
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

3.2 索引mapping
多附件上传,且附带附件名称

{
    "properties": {
        "id": {
            "type": "text",
            "fields": {
                "keyword": {
                    "ignore_above": 256,
                    "type": "keyword"
                }
            }
        },
        "xlh": {
            "type": "text",
            "analyzer": "my_ngram_analyzer",
            "fields": {
                "keyword": {
                    "ignore_above": 256,
                    "type": "keyword"
                }
            }
        },
        "title": {
            "type": "text",
            "analyzer": "my_ngram_analyzer",
            "fields": {
                "keyword": {
                    "ignore_above": 256,
                    "type": "keyword"
                }
            }
        },
        "attachments": {
            "properties": {
                "attachment": {
                    "properties": {
                        "date": {
                            "type": "date"
                        },
                        "content_type": {
                            "type": "text",
                            "fields": {
                                "keyword": {
                                    "ignore_above": 256,
                                    "type": "keyword"
                                }
                            }
                        },
                        "author": {
                            "type": "text",
                            "fields": {
                                "keyword": {
                                    "ignore_above": 256,
                                    "type": "keyword"
                                }
                            }
                        },
                        "language": {
                            "type": "text",
                            "fields": {
                                "keyword": {
                                    "ignore_above": 256,
                                    "type": "keyword"
                                }
                            }
                        },
                        "content": {
                            "type": "text",
                            "analyzer": "my_ngram_analyzer",
                            "fields": {
                                "keyword": {
                                    "ignore_above": 256,
                                    "type": "keyword"
                                }
                            }
                        },
                        "content_length": {
                            "type": "long"
                        }
                    }
                },
                "data": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "filename": {
                    "type": "text",
                    "analyzer": "my_ngram_analyzer",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103

3.3 创建索引

	public boolean indexCreateFw(String indexName, RestHighLevelClient rhclient) throws Exception {
		// 1、创建 创建索引request 参数:索引名称
		CreateIndexRequest indexRequest = new CreateIndexRequest(indexName);
		// 2、设置索引的settings
		/*indexRequest.settings(Settings.builder()
                .put("index.number_of_shards", shards)
                .put("index.number_of_replicas", replicas)
                .put("index.analysis", replicas));*/
		
		String setting = "{\"number_of_shards\":\"" + shards + "\",\"number_of_replicas\":\"" + replicas + "\",\"analysis\":{\"analyzer\":{\"my_ngram_analyzer\":{\"tokenizer\":\"my_ngram_tokenizer\",\"filter\":[\"lowercase\"]}},\"tokenizer\":{\"my_ngram_tokenizer\":{\"token_chars\":[\"letter\",\"digit\",\"symbol\",\"punctuation\"],\"min_gram\":\"1\",\"type\":\"ngram\",\"max_gram\":\"1\"}}}}";
		// 3、设置索引的mappings
		String mapping = "{\"properties\":{\"attachments\":{\"properties\":{\"attachment\":{\"properties\":{\"date\":{\"type\":\"date\"},\"content_type\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"author\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"language\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"content\":{\"type\":\"text\",\"analyzer\":\"my_ngram_analyzer\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"content_length\":{\"type\":\"long\"}}},\"data\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"filename\":{\"type\":\"text\",\"analyzer\":\"my_ngram_analyzer\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}}}},\"sqrMc\":{\"type\":\"text\",\"analyzer\":\"my_ngram_analyzer\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"bt\":{\"type\":\"text\",\"analyzer\":\"my_ngram_analyzer\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"fwwh\":{\"type\":\"text\",\"analyzer\":\"my_ngram_analyzer\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"sqTjsj\":{\"type\":\"long\"},\"id\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"ztc\":{\"type\":\"text\",\"analyzer\":\"my_ngram_analyzer\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"sqbmDm\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"sqrDm\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"sqbmMc\":{\"type\":\"text\",\"analyzer\":\"my_ngram_analyzer\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}},\"sqXlh\":{\"type\":\"text\",\"analyzer\":\"my_ngram_analyzer\",\"fields\":{\"keyword\":{\"ignore_above\":256,\"type\":\"keyword\"}}}}}";
		// 4、 设置索引的别名
		// 5、 发送请求
		// 5.1 同步方式发送请求
		IndicesClient indicesClient = rhclient.indices();
		indexRequest.settings(setting, XContentType.JSON);
		indexRequest.mapping(mapping, XContentType.JSON);

		// 请求服务器
		CreateIndexResponse response = indicesClient.create(indexRequest, RequestOptions.DEFAULT);

		return response.isAcknowledged();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

3.4 检验索引是否存在

public static boolean checkIndexExists(String index, RestHighLevelClient rhclient) throws Exception {
		GetIndexRequest getIndexRequest = new GetIndexRequest(index);
		boolean exists = rhclient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
		return exists;
	}
  • 1
  • 2
  • 3
  • 4
  • 5

3.5 查询索引列表

public static Set<String> indices(RestHighLevelClient rhclient) throws IOException {
		GetAliasesRequest request = new GetAliasesRequest();
		GetAliasesResponse getAliasesResponse = rhclient.indices().getAlias(request, RequestOptions.DEFAULT);
		Map<String, Set<AliasMetadata>> aliases = getAliasesResponse.getAliases();
		return aliases.keySet();
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3.6 获取索引结构

	public static Map<String, Object> getMapping(String indexName, RestHighLevelClient rhclient) throws Exception {
		// 创建get请求
		GetIndexRequest request = new GetIndexRequest(indexName);
		// 发送get请求
		GetIndexResponse response = rhclient.indices().get(request, RequestOptions.DEFAULT);
		// 获取表结构
		Map<String, MappingMetadata> mappings = response.getMappings();
		Map<String, Object> sourceAsMap = mappings.get(indexName).getSourceAsMap();
		return sourceAsMap;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3.7 获取指定索引全部数据

	public void getDoc(String indexName, RestHighLevelClient rhclient) throws Exception {
		SearchRequest searchRequest = new SearchRequest();
		searchRequest.indices(indexName);

		SearchResponse search = rhclient.search(searchRequest, RequestOptions.DEFAULT);
		logger.info("total hits: " + search.getHits().getTotalHits().value);
		for (SearchHit hit : search.getHits().getHits()) {
			logger.info(hit.getSourceAsString());
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

4、ES管道

4.1

{
    "description": "Extract attachment information from arrays",
    "processors": [
        {
            "foreach": {
                "field": "attachments",
                "processor": {
                    "attachment": {
                        "target_field": "_ingest._value.attachment",
                        "field": "_ingest._value.data",
                        "indexed_chars": -1,
                        "ignore_missing": true
                    }
                }
            }
        },
        {
            "foreach": {
                "field": "attachments",
                "processor": {
                    "remove": {
                        "field": "_ingest._value.data"
                    }
                }
            }
        }
    ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

4.2 创建管道

	public boolean putPipelineFw(String pipeId, RestHighLevelClient rhclient) throws IOException {
        String fwInfo = "{\"description\":\"Extract attachment information from arrays\",\"processors\":[{\"foreach\":{\"field\":\"attachments\",\"processor\":{\"attachment\":{\"target_field\":\"_ingest._value.attachment\",\"field\":\"_ingest._value.data\",\"indexed_chars\":-1,\"ignore_missing\":true}}}},{\"foreach\":{\"field\":\"attachments\",\"processor\":{\"remove\":{\"field\":\"_ingest._value.data\"}}}}]}";
        
        PutPipelineRequest request = new PutPipelineRequest(
                pipeId,
                new BytesArray(fwInfo.getBytes(StandardCharsets.UTF_8)),
                XContentType.JSON
        );

        AcknowledgedResponse response = rhclient.ingest().putPipeline(request, RequestOptions.DEFAULT);
        logger.info("管道创建接口执行完毕,{}", response.isAcknowledged());
        return response.isAcknowledged();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4.3 删除管道

public static boolean deletePipeline(String pipeId, RestHighLevelClient rhclient) throws IOException {
		DeletePipelineRequest request = new DeletePipelineRequest(pipeId);
		AcknowledgedResponse response = rhclient.ingest().deletePipeline(request, RequestOptions.DEFAULT);

		logger.info("删除管道信息接口执行完毕,{}", response.toString());
		return true;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4.4 查询管道信息

	public static boolean getPipeline(String pipeId, RestHighLevelClient rhclient) throws IOException {
		// 获取所有管道
		GetPipelineRequest request = new GetPipelineRequest();
		if (StringUtils.isNotBlank(pipeId)) {
			// 获取指定管道
			request = new GetPipelineRequest(pipeId);
		}

		GetPipelineResponse response = rhclient.ingest().getPipeline(request, RequestOptions.DEFAULT);

		logger.info("查询管道信息接口执行完毕,{}", response.toString());
		return true;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

5、ES新增数据

5.1 新增数据

	public static int addDocAndContent(String index, String type, RestHighLevelClient rhclient, String pipelineName,
			String id, String docJson) throws Exception {
		if (pipelineName != null) {
			return addDocContent(index, type, rhclient, pipelineName, id, docJson);
		} else {
			return addDoc(index, type, rhclient, id, docJson);
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

5.2 不指定管道新增数据

	public static int addDoc(String index, String type, RestHighLevelClient rhclient, String id, String docJson)
			throws Exception {
		IndexRequest indexRequest = new IndexRequest(index, type, id);
		indexRequest.source(docJson, XContentType.JSON);
		IndexResponse indexResponse = rhclient.index(indexRequest, RequestOptions.DEFAULT);
		return indexResponse.status().getStatus();
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

5.3 指定管道新增数据

	public static int addDocContent(String index, String type, RestHighLevelClient rhclient, String pipelineName,
			String id, String docJson) throws Exception {
		IndexRequest indexRequest = new IndexRequest(index, type, id);
		indexRequest.source(docJson, XContentType.JSON);
		// System.out.println(docJson);
		indexRequest.setPipeline(pipelineName);
		indexRequest.timeout("20s");
		IndexResponse indexResponse = rhclient.index(indexRequest, RequestOptions.DEFAULT);
		return indexResponse.status().getStatus();
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

5.4 新增记录实体类信息组装

	public String addDataFw(String id, RestHighLevelClient rhclient) throws IOException {		
		String res = "{\"resCode\":\"30\",\"resMsg\":\"ES数据添加失败\"}";
		try {
			// 获取流程信息
			String hql = "from ZtxmOaFwSq entity where entity.delMark = '0' and entity.id = '" + id + "'";
			List<ZtxmOaFwSq> ztxmOaFwSqList = ztxmOaFwSqService.findListByHql(hql);
			if (ztxmOaFwSqList == null || ztxmOaFwSqList.size() == 0) {
				res = "{\"resCode\":\"30\",\"resMsg\":\"未获取到流程信息,ES数据添加失败\"}";
				return res;
			}
			ZtxmOaFwSq ztxmOaFwSq = ztxmOaFwSqList.get(0);

			// 获取流程附件信息
			String hqlFile = " from ZtxmOaUploadinfo entity where entity.delMark = '0' and entity.ywId='" + id
					+ "' and entity.remark is null and entity.sprzId is null order by entity.createTime";
			List<ZtxmOaUploadinfo> loadList = ztxmOaUploadinfoDao.findListByHql(hqlFile);

			// 处理文件路径
			List<String> downLoadFilePathList = getUploadFilePaths(loadList, "");
			
			List<Attachment> esAttachments = getEsAttachments(downLoadFilePathList);
			ztxmOaFwSq.setAttachments(esAttachments);
			
			int result = EsUtil.addDocAndContent(AppParameters.index_name_fw, AppParameters.index_type_fw, rhclient,
					AppParameters.index_pipeline_fw, ztxmOaFwSq.getId(), JSON.toJSONString(ztxmOaFwSq));
			if (RestStatus.CREATED.getStatus() == result || RestStatus.OK.getStatus() == result) {
				res = "{\"resCode\":\"10\",\"resMsg\":\"ES数据添加成功\"}";
			}else {
				res = "{\"resCode\":\"30\",\"resMsg\":\"ES数据添加失败\"}";
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("ES数据添加失败" + e);
			res = "{\"resCode\":\"30\",\"resMsg\":\"ES数据添加失败\"}";
			return res;
		}
		return res;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

获取文件信息:附带文件名称

	public List<Attachment> getEsAttachments(List<String> fileInfoList) throws IOException {
		List<Attachment> docList = new ArrayList<Attachment>();
		if (fileInfoList != null && fileInfoList.size() > 0) {
			for (int i = 0; i < fileInfoList.size(); i++) {
				String filePath = fileInfoList.get(i);
				// 压缩包的后缀有RAR、ZIP、ARJ、Z、LZH、JAR等。
				//文件名称
				String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
				//文件内容
				File file = new File(filePath);	
				String content = FileToBase64.fileToBase64(file);
				Attachment attachment = new Attachment();
				attachment.setData(content);
				attachment.setFilename(fileName);
				
				docList.add(attachment);
			}
			
		}
		return docList;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Attachment实体类

public class Attachment {
	String data;
	
	String filename;

	public String getData() {
		return data;
	}

	public void setData(String data) {
		this.data = data;
	}

	public String getFilename() {
		return filename;
	}

	public void setFilename(String filename) {
		this.filename = filename;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

5.5 批量新增数据

	public String addDataBulkFw(String index, String type, String pipelineName, RestHighLevelClient rhclient) throws IOException {
		String res = "{\"resCode\":\"30\",\"resMsg\":\"ES数据批量新增失败\"}";
		String hql = "from ZtxmOaFwSq entity where entity.delMark = '0' and entity.spzt in ('690','693','692')";
		List<ZtxmOaFwSq> ztxmOaFwSqList = ztxmOaFwSqService.findListByHql(hql);

		BulkRequest bulkAddRequest = new BulkRequest();
		if (ztxmOaFwSqList != null && ztxmOaFwSqList.size() > 0) {
			System.out.println("初始化数据总条数:" + ztxmOaFwSqList.size());
			for (ZtxmOaFwSq ztxmOaFwSq : ztxmOaFwSqList) {
				if (ztxmOaFwSq != null) {
					// 获取流程附件信息
					String hqlFile = " from ZtxmOaUploadinfo entity where entity.delMark = '0' and entity.ywId='"
							+ ztxmOaFwSq.getId()
							+ "' and entity.remark is null and entity.sprzId is null order by entity.createTime";
					List<ZtxmOaUploadinfo> loadList = ztxmOaUploadinfoDao.findListByHql(hqlFile);

					// 处理文件路径
					List<String> downLoadFilePathList = getUploadFilePaths(loadList, "");
					List<Attachment> esAttachments = getEsAttachments(downLoadFilePathList);
					ztxmOaFwSq.setAttachments(esAttachments);

					IndexRequest indexRequest = new IndexRequest(index, type, ztxmOaFwSq.getId().toString());
					indexRequest.setPipeline(pipelineName);
					indexRequest.source(JSON.toJSONString(ztxmOaFwSq), XContentType.JSON);
					bulkAddRequest.add(indexRequest);
				}
			}
		}

		BulkResponse bulkAddResponse = rhclient.bulk(bulkAddRequest, RequestOptions.DEFAULT);
		int status = bulkAddResponse.status().getStatus();
		if (RestStatus.CREATED.getStatus() == status || RestStatus.OK.getStatus() == status) {
			res = "{\"resCode\":\"10\",\"resMsg\":\"ES数据批量新增成功\"}";
		}else {
			res = "{\"resCode\":\"30\",\"resMsg\":\"ES数据批量新增失败\"}";
		}
		return res;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

6、ES删除数据

6.1 删除指定数据

	public static int deleteDoc(String index, String type, String id, RestHighLevelClient rhclient) {
		try {

			DeleteRequest deleteRequest = new DeleteRequest(index, type, id.toString());
			DeleteResponse deleteResponse = rhclient.delete(deleteRequest, RequestOptions.DEFAULT);
			return deleteResponse.status().getStatus();
		} catch (Exception e) {
			logger.error("删除记录异常:" + e);
			return 0;
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

6.2 批量删除数据

	public static int bulkDelete(String index, String type, String ids, RestHighLevelClient rhclient) {

		BulkRequest request = new BulkRequest();
		try {
			String[] idArr = ids.split(",");
			if (idArr.length > 0 && idArr != null) {
				// 1、创建批量操作请求参数
				for (int i = 0; i < idArr.length; i++) {
					request.add(new DeleteRequest(index, type, idArr[i]));
				}
			}
			// 同步请求
			BulkResponse bulkResponse = rhclient.bulk(request, null);

			if (bulkResponse != null) {
				for (BulkItemResponse bulkItemResponse : bulkResponse) {
					DocWriteResponse itemResponse = bulkItemResponse.getResponse();
					if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
						DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
						logger.info("删除成功,{" + deleteResponse.toString() + "}");
					}
				}
			}
			return bulkResponse.status().getStatus();
		} catch (Exception e) {
			logger.error("删除记录异常:" + e);
			return 0;
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

7、ES更新数据

先删除,后新增

	public static int updateDoc(String index, String type, RestHighLevelClient rhclient, String pipelineName, String id, String docJson)
			throws IOException {
		try {
			
			deleteDoc(index, type, id, rhclient);
			
			int result = addDocAndContent(index, type, rhclient, pipelineName, id, docJson);
			
			return result;
		} catch (Exception e) {
			logger.error("更新索引记录异常,异常记录ID:" + id);
			logger.error("异常信息:" + e);
			return 500; // 异常
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

8、ES查询数据

	public List<ZtxmOaFwSq> searchMatchFw(String index, String type, Map<String, String> paramMap, RestHighLevelClient rhclient, boolean flag) throws IOException {
		String esSearch = paramMap.get("esSearch");
		String fullMath = paramMap.get("fullMath");
		SearchRequest searchRequest = new SearchRequest(index);
		searchRequest.types(type);
	    
		//获取索引所有数据
		SearchResponse search = rhclient.search(searchRequest, RequestOptions.DEFAULT);
		System.out.println("all total hits: " + search.getHits().getTotalHits().value);
		/*for (SearchHit hit : search.getHits().getHits()) {
			//System.out.println(hit.getSourceAsString());
		}*/

		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		
		//搜索字段
		String[] keyArray = {"xlh", "title", "attachments.filename", "attachments.attachment.content"};
		if ("0".equals(fullMath)) {
			sourceBuilder.query(QueryBuilders.multiMatchQuery(esSearch, keyArray).slop(0).type("phrase"));
		}else {
			sourceBuilder.query(QueryBuilders.multiMatchQuery(esSearch, keyArray));
		}
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        sourceBuilder.size(10000);//默认是10,hits.getHits()中只显示10条数据

        //将请求体加入到请求中
        searchRequest.source(sourceBuilder);
        
        //设置高亮显示
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        for(String key : keyArray){
        	highlightBuilder.field(key);
        }
        //highlightBuilder.fragmentSize(150); //最大高亮分片数
        //highlightBuilder.numOfFragments(20); //从第一个分片获取高亮片段
        highlightBuilder.requireFieldMatch(false);
        highlightBuilder.preTags("<span style=\"color:red\">");
        highlightBuilder.postTags("</span>");
        sourceBuilder.highlighter(highlightBuilder);

        //3、发送请求
        SearchResponse searchResponse = rhclient.search(searchRequest, RequestOptions.DEFAULT);

        //处理搜索命中文档结果
        List<ZtxmOaFwSq> ztxmOaFwSqList = new ArrayList<ZtxmOaFwSq>();
        SearchHits hits = searchResponse.getHits();
        logger.info("es search total hits: " + hits.getTotalHits().value);
        SearchHit[] searchHits = hits.getHits();
		if (searchHits != null && searchHits.length > 0) {
			for (int i = 0; i < searchHits.length; i++) {
				if (searchHits[i] != null) {
					JSONObject jsonObject = JSONObject.parseObject(searchHits[i].getSourceAsString());
					ZtxmOaFwSq ztxmOaFwSq = JSONObject.toJavaObject(jsonObject, ZtxmOaFwSq.class);

					SearchHit hit = searchHits[i];
					logger.info("indexHit:" + hit.getIndex() + "  typeHit:" + hit.getType() + "  id:" + hit.getId() + " score" + hit.getScore());

					// 搜索内容 -- 高亮
					String content = "";
					Map<String, HighlightField> highlightFields = hit.getHighlightFields();
					if (keyArray != null && keyArray.length > 0) {
						for(String key : keyArray){
							HighlightField highlight = highlightFields.get(key);
							
							if (highlight != null) {
								Text[] fragments = highlight.fragments(); // 多值的字段会有多个值
								if (fragments != null) {
									for (int j = 0; j < fragments.length; j++) {
										String fragmentString = fragments[j].string();
										if (fragmentString != null) {
											content += fragmentString.toString();
										}
										content += "; ";
									}
								}
							}
						}
						
						if (content != null) {
							content.replace("\t", "");
							content.replace("\n", "");
							content.replace("\r", "");
						}
						//logger.info(content);
						ztxmOaFwSq.setBak1(content);
					}

					ztxmOaFwSqList.add(ztxmOaFwSq);
				}
			}
		}
		
		return ztxmOaFwSqList;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/334424
推荐阅读
相关标签
  

闽ICP备14008679号