当前位置:   article > 正文

ElaticSearchUtil工具类封装_elasticsearch util

elasticsearch util
  1. package com.xxx.util;
  2. import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
  3. import org.elasticsearch.action.admin.indices.analyze.AnalyzeAction;
  4. import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequestBuilder;
  5. import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
  6. import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
  7. import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
  8. import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
  9. import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
  10. import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
  11. import org.elasticsearch.action.bulk.BulkRequestBuilder;
  12. import org.elasticsearch.action.bulk.BulkResponse;
  13. import org.elasticsearch.action.delete.DeleteRequestBuilder;
  14. import org.elasticsearch.action.delete.DeleteResponse;
  15. import org.elasticsearch.action.get.GetResponse;
  16. import org.elasticsearch.action.index.IndexResponse;
  17. import org.elasticsearch.action.search.SearchRequestBuilder;
  18. import org.elasticsearch.action.search.SearchResponse;
  19. import org.elasticsearch.action.search.SearchType;
  20. import org.elasticsearch.action.update.UpdateRequest;
  21. import org.elasticsearch.client.transport.TransportClient;
  22. import org.elasticsearch.cluster.node.DiscoveryNode;
  23. import org.elasticsearch.common.settings.Settings;
  24. import org.elasticsearch.common.transport.InetSocketTransportAddress;
  25. import org.elasticsearch.common.unit.TimeValue;
  26. import org.elasticsearch.index.query.BoolQueryBuilder;
  27. import org.elasticsearch.index.query.QueryBuilder;
  28. import org.elasticsearch.index.query.QueryBuilders;
  29. import org.elasticsearch.search.SearchHit;
  30. import org.elasticsearch.search.SearchHits;
  31. import org.elasticsearch.search.aggregations.AggregationBuilders;
  32. import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
  33. import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
  34. import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
  35. import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
  36. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
  37. import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
  38. import org.elasticsearch.search.aggregations.metrics.avg.Avg;
  39. import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
  40. import org.elasticsearch.search.aggregations.metrics.sum.Sum;
  41. import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
  42. import org.elasticsearch.search.sort.SortBuilders;
  43. import org.elasticsearch.transport.client.PreBuiltTransportClient;
  44. import java.net.InetAddress;
  45. import java.util.ArrayList;
  46. import java.util.HashMap;
  47. import java.util.List;
  48. import java.util.Map;
  49. import org.json.JSONException;
  50. import org.json.JSONObject;
  51. import org.slf4j.Logger;
  52. import org.slf4j.LoggerFactory;
  53. import com.xxx.common.StaticContant;
  54. import com.xxx.util.DateUtil;
  55. /**
  56. * Title: ElasticSearchUtil
  57. * Description: ES工具类
  58. * @author wufei
  59. * @date 2017/12/26 下午2:07:06
  60. */
  61. public class ElasticSearchUtil {
  62. private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class);
  63. private static TransportClient client = StaticContant.ESCLIENT;
  64. /**
  65. * 功能描述:服务初始化(本地测试使用,实际开发写为长连接)
  66. * @param clusterName 集群名称
  67. * @param ip 地址
  68. * @param port 端口
  69. */
  70. @SuppressWarnings({ "resource", "static-access" })
  71. public ElasticSearchUtil (String clusterName, String ip, int port) {
  72. try {
  73. // 通过setting对象指定集群配置信息, 配置集群名称
  74. Settings settings = Settings.builder()
  75. .put("cluster.name", clusterName) //设置集群名
  76. // .put("client.transport.sniff", true) //启动嗅探功能,自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
  77. // .put("client.transport.ignore_cluster_name", true)//忽略集群名字验证, 打开后集群名字不对也能连接上
  78. // .put("client.transport.nodes_sampler_interval", 5)//报错
  79. // .put("client.transport.ping_timeout", 5) //报错, ping等待时间
  80. .build();
  81. // 创建client,通过setting来创建,若不指定则默认链接的集群名为elasticsearch,链接使用tcp协议即9300
  82. // addTransportAddress此步骤添加IP,至少一个,其实一个就够了,因为添加了自动嗅探配置
  83. this.client = new PreBuiltTransportClient(settings)
  84. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip), port));
  85. } catch (Exception e) {
  86. logger.error("es init failed! " + e.getMessage());
  87. }
  88. }
  89. /**
  90. * 查看集群信息
  91. */
  92. public static void getClusterInfo() {
  93. List<DiscoveryNode> nodes = client.connectedNodes();
  94. for (DiscoveryNode node : nodes) {
  95. System.out.println("HostId:"+node.getHostAddress()+" hostName:"+node.getHostName()+" Address:"+node.getAddress());
  96. }
  97. }
  98. /**
  99. * 功能描述:新建索引
  100. * @param indexName 索引名
  101. */
  102. public static void createIndex(String indexName) {
  103. try {
  104. if (indexExist(indexName)) {
  105. System.out.println("The index " + indexName + " already exits!");
  106. } else {
  107. CreateIndexResponse cIndexResponse = client.admin().indices()
  108. .create(new CreateIndexRequest(indexName))
  109. .actionGet();
  110. if (cIndexResponse.isAcknowledged()) {
  111. System.out.println("create index successfully!");
  112. } else {
  113. System.out.println("Fail to create index!");
  114. }
  115. }
  116. } catch (Exception e) {
  117. e.printStackTrace();
  118. }
  119. }
  120. /**
  121. * 功能描述:新建索引
  122. * @param index 索引名
  123. * @param type 类型
  124. */
  125. public static void createIndex(String index, String type) {
  126. try {
  127. client.prepareIndex(index, type).setSource().get();
  128. } catch (Exception e) {
  129. System.out.println("Fail to create index!");
  130. e.printStackTrace();
  131. }
  132. }
  133. /**
  134. * 功能描述:删除索引
  135. * @param index 索引名
  136. */
  137. public static void deleteIndex(String index) {
  138. try {
  139. if (indexExist(index)) {
  140. DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index)
  141. .execute().actionGet();
  142. if (!dResponse.isAcknowledged()) {
  143. logger.info("failed to delete index " + index + "!");
  144. }else {
  145. logger.info("delete index " + index + " successfully!");
  146. }
  147. } else {
  148. logger.error("the index " + index + " not exists!");
  149. }
  150. } catch (Exception e) {
  151. e.printStackTrace();
  152. }
  153. }
  154. /**
  155. * 功能描述:验证索引是否存在
  156. * @param index 索引名
  157. */
  158. public static boolean indexExist(String index) {
  159. IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(index);
  160. IndicesExistsResponse inExistsResponse = client.admin().indices()
  161. .exists(inExistsRequest).actionGet();
  162. return inExistsResponse.isExists();
  163. }
  164. /**
  165. * 功能描述:插入数据
  166. * @param index 索引名
  167. * @param type 类型
  168. * @param json 数据
  169. */
  170. @SuppressWarnings("deprecation")
  171. public static void insertData(String index, String type, String json) {
  172. IndexResponse response = client.prepareIndex(index, type)
  173. .setSource(json)
  174. .get();
  175. System.out.println(response.getVersion());
  176. }
  177. /**
  178. * 功能描述:获取所有的索引
  179. */
  180. public static void getAllIndex(){
  181. ClusterStateResponse response = client.admin().cluster().prepareState().execute().actionGet();
  182. //获取所有索引
  183. String[] indexs=response.getState().getMetaData().getConcreteAllIndices();
  184. System.out.println("Index总数为: " + indexs.length);
  185. for (String index : indexs) {
  186. System.out.println("获取的Index: " + index);
  187. }
  188. }
  189. /**
  190. * 通过prepareIndex增加文档,参数为json字符串
  191. * @param index 索引名
  192. * @param type 类型
  193. * @param _id 数据id
  194. * @param json 数据
  195. */
  196. @SuppressWarnings("deprecation")
  197. public static void insertData(String index, String type, String _id, String json) {
  198. IndexResponse indexResponse = client.prepareIndex(index, type).setId(_id)
  199. .setSource(json)
  200. .get();
  201. System.out.println(indexResponse.getVersion());
  202. logger.info("数据插入ES成功!");
  203. }
  204. /**
  205. * 功能描述:更新数据
  206. * @param index 索引名
  207. * @param type 类型
  208. * @param _id 数据id
  209. * @param json 数据
  210. */
  211. @SuppressWarnings("deprecation")
  212. public static void updateData(String index, String type, String _id, String json){
  213. try {
  214. UpdateRequest updateRequest = new UpdateRequest(index, type, _id).doc(json);
  215. // client.prepareUpdate(index, type, _id).setDoc(json).get();
  216. client.update(updateRequest).get();
  217. } catch (Exception e) {
  218. logger.error("update data failed." + e.getMessage());
  219. }
  220. }
  221. /**
  222. * 功能描述:删除指定数据
  223. * @param index 索引名
  224. * @param type 类型
  225. * @param _id 数据id
  226. */
  227. public static void deleteData(String index, String type, String _id) {
  228. try {
  229. DeleteResponse response = client.prepareDelete(index, type, _id).get();
  230. System.out.println(response.isFragment());
  231. logger.info("删除指定数据成功!");
  232. } catch (Exception e) {
  233. logger.error("删除指定数据失败!" + e);
  234. }
  235. }
  236. /**
  237. * 删除索引类型表所有数据,批量删除
  238. * @param index
  239. * @param type
  240. */
  241. public static void deleteIndexTypeAllData(String index, String type) {
  242. SearchResponse response = client.prepareSearch(index).setTypes(type)
  243. .setQuery(QueryBuilders.matchAllQuery()).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
  244. .setScroll(new TimeValue(60000)).setSize(10000).setExplain(false).execute().actionGet();
  245. BulkRequestBuilder bulkRequest = client.prepareBulk();
  246. while (true) {
  247. SearchHit[] hitArray = response.getHits().getHits();
  248. SearchHit hit = null;
  249. for (int i = 0, len = hitArray.length; i < len; i++) {
  250. hit = hitArray[i];
  251. DeleteRequestBuilder request = client.prepareDelete(index, type, hit.getId());
  252. bulkRequest.add(request);
  253. }
  254. BulkResponse bulkResponse = bulkRequest.execute().actionGet();
  255. if (bulkResponse.hasFailures()) {
  256. logger.error(bulkResponse.buildFailureMessage());
  257. }
  258. if (hitArray.length == 0) break;
  259. response = client.prepareSearchScroll(response.getScrollId())
  260. .setScroll(new TimeValue(60000)).execute().actionGet();
  261. }
  262. }
  263. /**
  264. * 功能描述:批量插入数据
  265. * @param index 索引名
  266. * @param type 类型
  267. * @param jsonList 批量数据
  268. */
  269. @SuppressWarnings("deprecation")
  270. public void bulkInsertData(String index, String type, List<String> jsonList) {
  271. BulkRequestBuilder bulkRequest = client.prepareBulk();
  272. jsonList.forEach(item -> {
  273. bulkRequest.add(client.prepareIndex(index, type).setSource(item));
  274. });
  275. BulkResponse bulkResponse = bulkRequest.get();
  276. if(!bulkResponse.hasFailures()) {
  277. System.out.println(bulkResponse.getItems().length + "条数据插入完成!");
  278. }
  279. }
  280. /**
  281. * 通过prepareGet方法获取指定文档信息
  282. */
  283. public static void getOneDocument(String index, String type, String id) {
  284. // 搜索数据
  285. GetResponse response = client.prepareGet(index, type, id)
  286. // .setOperationThreaded(false) // 线程安全
  287. .get();
  288. System.out.println(response.isExists()); // 查询结果是否存在
  289. System.out.println("***********"+response.getSourceAsString());//获取文档信息
  290. // System.out.println(response.toString());//获取详细信息
  291. }
  292. /**
  293. * 通过prepareSearch方法获取指定索引所有文档信息
  294. */
  295. public static List<Map<String, Object>> getDocuments(String index) {
  296. List<Map<String, Object>> mapList = new ArrayList<>();
  297. // 搜索数据
  298. SearchResponse response = client.prepareSearch(index)
  299. // .setTypes("type1","type2"); //设置过滤type
  300. // .setTypes(SearchType.DFS_QUERY_THEN_FETCH) 精确查询
  301. // .setQuery(QueryBuilders.matchQuery(term, queryString));
  302. // .setFrom(0) //设置查询数据的位置,分页用
  303. // .setSize(60) //设置查询结果集的最大条数
  304. // .setExplain(true) //设置是否按查询匹配度排序
  305. .get(); //最后就是返回搜索响应信息
  306. System.out.println("共匹配到:"+response.getHits().getTotalHits()+"条记录!");
  307. SearchHit[] hits = response.getHits().getHits();
  308. for (SearchHit hit : hits) {
  309. Map<String, Object> source = hit.getSource();
  310. mapList.add(source);
  311. }
  312. System.out.println(response.getTotalShards());//总条数
  313. return mapList;
  314. }
  315. /**
  316. * 获取指定索引库下指定type所有文档信息
  317. * @param index
  318. * @param type
  319. * @return
  320. */
  321. public static List<Map<String, Object>> getDocuments(String index, String type) {
  322. List<Map<String, Object>> mapList = new ArrayList<>();
  323. SearchResponse response = client.prepareSearch(index).setTypes(type).get();
  324. SearchHit[] hits = response.getHits().getHits();
  325. for (SearchHit hit : hits) {
  326. Map<String, Object> source = hit.getSource();
  327. mapList.add(source);
  328. }
  329. return mapList;
  330. }
  331. /**
  332. * 带有搜索条件的聚合查询(聚合相当于关系型数据库里面的group by)
  333. * @param index
  334. * @param type
  335. * @return
  336. */
  337. public static Map<String, Long> searchBucketsAggregation(String index, String type) {
  338. long total = 0;
  339. Map<String, Long> rtnMap = new HashMap<>();
  340. SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type);
  341. //搜索条件
  342. String dateStr = DateUtil.getDays();
  343. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  344. /*queryBuilder.must(QueryBuilders.matchQuery("source", "resource"))精确匹配*/
  345. queryBuilder.filter(QueryBuilders.boolQuery()
  346. .must(QueryBuilders.rangeQuery("logTime")
  347. .gte(dateStr + "000000")
  348. .lte(dateStr + "235959") //时间为当天
  349. .format("yyyyMMddHHmmss")));//时间匹配格式
  350. // 获取当日告警总数
  351. // long total = searchRequestBuilder.setQuery(queryBuilder).get().getHits().getTotalHits();
  352. // 聚合分类(以告警类型分类)
  353. TermsAggregationBuilder teamAggBuilder = AggregationBuilders.terms("source_count").field("source.keyword");
  354. searchRequestBuilder.addAggregation(teamAggBuilder);
  355. // 不指定 "size":0 ,则搜索结果和聚合结果都将被返回,指定size:0则只返回聚合结果
  356. searchRequestBuilder.setSize(0);
  357. searchRequestBuilder.setQuery(queryBuilder);
  358. SearchResponse response = searchRequestBuilder.execute().actionGet();
  359. // System.out.println("++++++聚合分类:"+response.toString());
  360. // 聚合结果处理
  361. Terms genders = response.getAggregations().get("source_count");
  362. for (Terms.Bucket entry : genders.getBuckets()) {
  363. Object key = entry.getKey(); // Term
  364. Long count = entry.getDocCount(); // Doc count
  365. rtnMap.put(key.toString(), count);
  366. // System.out.println("Term: "+key);
  367. // System.out.println("Doc count: "+count);
  368. }
  369. if(!rtnMap.isEmpty()) {
  370. if(!rtnMap.containsKey("system")) {
  371. rtnMap.put("system", 0L);
  372. }
  373. if(!rtnMap.containsKey("resource")) {
  374. rtnMap.put("resource", 0L);
  375. }
  376. if(!rtnMap.containsKey("scheduler")) {
  377. rtnMap.put("scheduler", 0L);
  378. }
  379. total = rtnMap.get("system") + rtnMap.get("resource") + rtnMap.get("scheduler");
  380. }
  381. if(total != 0) {
  382. rtnMap.put("total", total);
  383. }
  384. return rtnMap;
  385. }
  386. /**
  387. * 当日流数据总量汇总
  388. * @param index
  389. * @param type
  390. * @param dataworkerType
  391. * @return
  392. */
  393. public static Map<String, Long> searchBucketsAggregation(String index, String type, String dataworkerType) {
  394. Map<String, Long> rtnMap = new HashMap<>();
  395. long count = 0;
  396. SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type);
  397. // SumAggregationBuilder sAggBuilder = AggregationBuilders.sum("agg").field("count.keyword");
  398. //搜索条件
  399. String dateStr = DateUtil.getDays();
  400. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  401. // queryBuilder.must(QueryBuilders.matchQuery("dataworkerType", dataworkerType))
  402. queryBuilder.must(QueryBuilders.matchQuery("dataworkerType", dataworkerType))
  403. .filter(QueryBuilders.boolQuery()
  404. .must(QueryBuilders.rangeQuery("logTime")
  405. .gte(dateStr + "000000")
  406. .lte(dateStr + "235959")
  407. .format("yyyyMMddHHmmss")));
  408. searchRequestBuilder.setQuery(queryBuilder);
  409. // searchRequestBuilder.addAggregation(sAggBuilder);
  410. SearchResponse response = searchRequestBuilder.execute().actionGet();
  411. System.out.println("++++++条件查询结果:"+response.toString());
  412. SearchHit[] hits = response.getHits().getHits();
  413. for (SearchHit searchHit : hits) {
  414. Object object = searchHit.getSource().get("count");
  415. if(object != null && !"".equals(object)) {
  416. System.out.println("=============Count:"+object.toString());
  417. count += Long.parseLong(object.toString());
  418. }
  419. }
  420. System.out.println("======流数据量:" + count);
  421. rtnMap.put(dataworkerType + "Total", count);
  422. System.out.println("======rtnMap:"+rtnMap.toString());
  423. return rtnMap;
  424. }
  425. /**
  426. * 读取索引类型表指定列名的平均值
  427. * @param index
  428. * @param type
  429. * @param avgField
  430. * @return
  431. */
  432. public static double readIndexTypeFieldValueWithAvg(String index, String type, String avgField) {
  433. String avgName = avgField + "Avg";
  434. AvgAggregationBuilder aggregation = AggregationBuilders.avg(avgName).field(avgField);
  435. SearchResponse response = client.prepareSearch(index).setTypes(type)
  436. .setQuery(QueryBuilders.matchAllQuery())
  437. .addAggregation(aggregation).execute().actionGet();
  438. Avg avg = response.getAggregations().get(avgName);
  439. return avg.getValue();
  440. }
  441. /**
  442. * 读取索引类型表指定列名的总和
  443. * @param index
  444. * @param type
  445. * @param sumField
  446. * @return
  447. */
  448. public static Map<String, Long> readIndexTypeFieldValueWithSum(String index, String type, String dataworkerType, String sumField) {
  449. Map<String, Long> rtnMap = new HashMap<>();
  450. long count = 0;
  451. // 聚合结果
  452. String sumName = sumField + "Sum";
  453. // 搜索条件
  454. String dateStr = DateUtil.getDays();
  455. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  456. queryBuilder.must(QueryBuilders.matchQuery("dataworkerType", dataworkerType))
  457. .filter(QueryBuilders.boolQuery()
  458. .must(QueryBuilders.rangeQuery("logTime")
  459. .gte(dateStr + "000000")
  460. .lte(dateStr + "235959")
  461. .format("yyyyMMddHHmmss")));
  1. // 时间前缀匹配(如:20180112)
  2. //        PrefixQueryBuilder preQueryBuild = QueryBuilders.prefixQuery(dataworkerType, dateStr);
  3.         // 模糊匹配
  4. //        FuzzyQueryBuilder fuzzyQueryBuild = QueryBuilders.fuzzyQuery(name, value);
  5.         // 范围匹配
  6. //        RangeQueryBuilder rangeQueryBuild = QueryBuilders.rangeQuery(name);
  7.         // 查询字段不存在及字段值为空 filterBuilder = QueryBuilders.boolQuery().should(new BoolQueryBuilder().mustNot(existsQueryBuilder)) .should(QueryBuilders.termsQuery(field, ""));
  8. //     ExistsQueryBuilder existsQueryBuilder = QueryBuilders.existsQuery(field);
  9. // 对某字段求和聚合(sumField字段)
  10. SumAggregationBuilder aggregation = AggregationBuilders.sum(sumName).field(sumField);
  11. SearchResponse response = client.prepareSearch(index).setTypes(type)
  12. .setQuery(QueryBuilders.matchAllQuery())
  13. .setQuery(queryBuilder)
  14. .addAggregation(aggregation).execute().actionGet();
  15. Sum sum = response.getAggregations().get(sumName);
  16. count = new Double(sum.getValue()).longValue();
  17. rtnMap.put(dataworkerType + "Total", count);
  18. System.out.println("======rtnMap:"+rtnMap.toString());
  19. return rtnMap;
  20. }
  21. /**
  22. * 按时间统计聚合
  23. * @param index
  24. * @param type
  25. */
  26. public static void dataHistogramAggregation(String index, String type) {
  27. try {
  28. SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type);
  29. DateHistogramAggregationBuilder field = AggregationBuilders.dateHistogram("sales").field("value");
  30. field.dateHistogramInterval(DateHistogramInterval.MONTH);
  31. // field.dateHistogramInterval(DateHistogramInterval.days(10))
  32. field.format("yyyy-MM");
  33. //强制返回空 buckets,既空的月份也返回
  34. field.minDocCount(0);
  35. // Elasticsearch 默认只返回你的数据中最小值和最大值之间的 buckets
  36. field.extendedBounds(new ExtendedBounds("2018-01", "2018-12"));
  37. searchRequestBuilder.addAggregation(field);
  38. searchRequestBuilder.setSize(0);
  39. SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
  40. System.out.println(searchResponse.toString());
  41. Histogram histogram = searchResponse.getAggregations().get("sales");
  42. for (Histogram.Bucket entry : histogram.getBuckets()) {
  43. // DateTime key = (DateTime) entry.getKey();
  44. String keyAsString = entry.getKeyAsString();
  45. Long count = entry.getDocCount(); // Doc count
  46. System.out.println("======="+keyAsString + ",销售" + count + "辆");
  47. }
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. /**
  53. * 功能描述:统计查询
  54. * @param index 索引名
  55. * @param type 类型
  56. * @param constructor 查询构造
  57. * @param groupBy 统计字段
  58. */
  59. /*public Map<Object, Object> statSearch(String index, String type, ESQueryBuilderConstructor constructor, String groupBy) {
  60. Map<Object, Object> map = new HashedMap();
  61. SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type);
  62. //排序
  63. if (StringUtils.isNotEmpty(constructor.getAsc()))
  64. searchRequestBuilder.addSort(constructor.getAsc(), SortOrder.ASC);
  65. if (StringUtils.isNotEmpty(constructor.getDesc()))
  66. searchRequestBuilder.addSort(constructor.getDesc(), SortOrder.DESC);
  67. //设置查询体
  68. if (null != constructor) {
  69. searchRequestBuilder.setQuery(constructor.listBuilders());
  70. } else {
  71. searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
  72. }
  73. int size = constructor.getSize();
  74. if (size < 0) {
  75. size = 0;
  76. }
  77. if (size > MAX) {
  78. size = MAX;
  79. }
  80. //返回条目数
  81. searchRequestBuilder.setSize(size);
  82. searchRequestBuilder.setFrom(constructor.getFrom() < 0 ? 0 : constructor.getFrom());
  83. SearchResponse sr = searchRequestBuilder.addAggregation(
  84. AggregationBuilders.terms("agg").field(groupBy)
  85. ).get();
  86. Terms stateAgg = sr.getAggregations().get("agg");
  87. Iterator<Terms.Bucket> iter = (Iterator<Terms.Bucket>) stateAgg.getBuckets().iterator();
  88. while (iter.hasNext()) {
  89. Terms.Bucket gradeBucket = iter.next();
  90. map.put(gradeBucket.getKey(), gradeBucket.getDocCount());
  91. }
  92. return map;
  93. }*/
  94. /**
  95. * 功能描述:统计查询
  96. * @param index 索引名
  97. * @param type 类型
  98. * @param constructor 查询构造
  99. * @param agg 自定义计算
  100. */
  101. /*public Map<Object, Object> statSearch(String index, String type, ESQueryBuilderConstructor constructor, AggregationBuilder agg) {
  102. if (agg == null) {
  103. return null;
  104. }
  105. Map<Object, Object> map = new HashedMap();
  106. SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type);
  107. //排序
  108. if (StringUtils.isNotEmpty(constructor.getAsc()))
  109. searchRequestBuilder.addSort(constructor.getAsc(), SortOrder.ASC);
  110. if (StringUtils.isNotEmpty(constructor.getDesc()))
  111. searchRequestBuilder.addSort(constructor.getDesc(), SortOrder.DESC);
  112. //设置查询体
  113. if (null != constructor) {
  114. searchRequestBuilder.setQuery(constructor.listBuilders());
  115. } else {
  116. searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
  117. }
  118. int size = constructor.getSize();
  119. if (size < 0) {
  120. size = 0;
  121. }
  122. if (size > MAX) {
  123. size = MAX;
  124. }
  125. //返回条目数
  126. searchRequestBuilder.setSize(size);
  127. searchRequestBuilder.setFrom(constructor.getFrom() < 0 ? 0 : constructor.getFrom());
  128. SearchResponse sr = searchRequestBuilder.addAggregation(
  129. agg
  130. ).get();
  131. Terms stateAgg = sr.getAggregations().get("agg");
  132. Iterator<Terms.Bucket> iter = (Iterator<Terms.Bucket>) stateAgg.getBuckets().iterator();
  133. while (iter.hasNext()) {
  134. Terms.Bucket gradeBucket = iter.next();
  135. map.put(gradeBucket.getKey(), gradeBucket.getDocCount());
  136. }
  137. return map;
  138. }*/
  139. /**
  140. * 范围查询
  141. * @throws Exception
  142. */
  143. public static void rangeQuery() {
  144. //查询字段不存在及字段值为空 filterBuilder = QueryBuilders.boolQuery().should(new BoolQueryBuilder().mustNot(existsQueryBuilder)) .should(QueryBuilders.termsQuery(field, ""));
  145. // ExistsQueryBuilder existsQueryBuilder = QueryBuilders.existsQuery(field);
  146. //term精确查询
  147. // QueryBuilder queryBuilder = QueryBuilders.termQuery("age", 50) ; //年龄等于50
  148. //range查询
  149. QueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("age").gt(20); //年龄大于20
  150. SearchResponse searchResponse = client.prepareSearch("index_test")
  151. .setTypes("type_test")
  152. .setQuery(rangeQueryBuilder) //query
  153. .setPostFilter(QueryBuilders.rangeQuery("age").from(40).to(50)) // Filter
  154. // .addSort("age", SortOrder.DESC)
  155. .setSize(120) // 不设置的话,默认取10条数据
  156. .execute().actionGet();
  157. SearchHits hits = searchResponse.getHits();
  158. System.out.println("查到记录数:"+hits.getTotalHits());
  159. SearchHit[] searchHists = hits.getHits();
  160. if(searchHists.length>0){
  161. for(SearchHit hit:searchHists){
  162. String name = (String) hit.getSource().get("username");
  163. Integer age = Integer.parseInt(hit.getSource().get("age").toString());
  164. System.out.println("姓名:" + name + " 年龄:" + age);
  165. }
  166. }
  167. }
  168. /**
  169. * 时间范围查询
  170. * @param index
  171. * @param type
  172. * @param startDate
  173. * @param endDate
  174. */
  175. public static void rangeQuery(String index, String type, String startDate, String endDate) {
  176. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  177. // boolQueryBuilder.must(queryBuilders);
  178. boolQueryBuilder.filter(QueryBuilders.boolQuery().must(
  179. QueryBuilders.rangeQuery("time").gte(startDate).lte(endDate).format("yyyyMMddHHmmss")));
  180. SearchResponse searchResponse = client.prepareSearch(index).setTypes(type)
  181. .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
  182. .setQuery(boolQueryBuilder)
  183. .addAggregation(AggregationBuilders.terms("value_count").field("value.keyword")).get();
  184. //Terms Aggregation 名称为terms1_count 字段为field1 下面的也类似
  185. // .addAggregation(AggregationBuilders.terms("terms2_count").field("field2"));
  186. System.out.println("时间范围查询结果: "+searchResponse.toString());
  187. SearchHit[] hits = searchResponse.getHits().getHits();
  188. List<Map<String, Object>> mapList = new ArrayList<>();
  189. for (SearchHit searchHit : hits) {
  190. Map<String, Object> map = searchHit.getSource();
  191. mapList.add(map);
  192. }
  193. System.out.println(mapList.toString());
  194. }
  195. /**
  196. * 功能描述:关闭链接
  197. */
  198. public static void close() {
  199. client.close();
  200. }
  201. /**
  202. * 在Elasticsearch老版本中做数据遍历一般使用Scroll-Scan。Scroll是先做一次初始化搜索把所有符合搜索条件的结果缓存起来生成一个快照,
  203. * 然后持续地、批量地从快照里拉取数据直到没有数据剩下。而这时对索引数据的插入、删除、更新都不会影响遍历结果,因此scroll 并不适合用来做实时搜索。
  204. * Scan是搜索类型,告诉Elasticsearch不用对结果集进行排序,只要分片里还有结果可以返回,就返回一批结果。
  205. * 在5.X版本中SearchType.SCAN已经被去掉了。根据官方文档说明,使用“_doc”做排序可以达到更高性能的Scroll查询效果,
  206. * 这样可以遍历所有文档而不需要进行排序。
  207. * @param index
  208. * @param type
  209. */
  210. @SuppressWarnings("deprecation")
  211. public static void scroll(String index, String type) {
  212. System.out.println("scroll()方法开始.....");
  213. List<JSONObject> lst = new ArrayList<JSONObject>();
  214. SearchResponse searchResponse = client.prepareSearch(index)
  215. .setTypes(type)
  216. .setQuery(QueryBuilders.matchAllQuery())
  217. .addSort(SortBuilders.fieldSort("_doc"))
  218. .setSize(30)
  219. // 这个游标维持多长时间
  220. .setScroll(TimeValue.timeValueMinutes(8)).execute().actionGet();
  221. System.out.println("getScrollId: "+searchResponse.getScrollId());
  222. System.out.println("匹配记录数:"+searchResponse.getHits().getTotalHits());
  223. System.out.println("hits长度:"+searchResponse.getHits().hits().length);
  224. for (SearchHit hit : searchResponse.getHits()) {
  225. String json = hit.getSourceAsString();
  226. try {
  227. JSONObject jsonObject = new JSONObject(json);
  228. lst.add(jsonObject);
  229. } catch (JSONException e) {
  230. e.printStackTrace();
  231. }
  232. }
  233. System.out.println("======" + lst.toString());
  234. System.out.println("======" + lst.get(0).get("username"));
  235. // 使用上次的scrollId继续访问
  236. // croll scroll = new ScrollTest2();
  237. // do{
  238. // int num = scroll.scanData(esClient,searchResponse.getScrollId())
  239. // if(num ==0) break;
  240. // }while(true);
  241. System.out.println("------------------------------END");
  242. }
  243. /**
  244. * 分词
  245. * @param index
  246. * @param text
  247. */
  248. public static void analyze(String index, String text) {
  249. TransportClient client = StaticContant.ESCLIENT;
  250. AnalyzeRequestBuilder request = new AnalyzeRequestBuilder(client, AnalyzeAction.INSTANCE, index, text);
  251. request.setAnalyzer("ik");
  252. List<AnalyzeToken> analyzeTokens = request.execute().actionGet().getTokens();
  253. for (int i = 0, len = analyzeTokens.size(); i < len; i++) {
  254. AnalyzeToken analyzeToken = analyzeTokens.get(i);
  255. System.out.println(analyzeToken.getTerm());
  256. }
  257. }
  258. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/240318
推荐阅读
相关标签
  

闽ICP备14008679号