当前位置:   article > 正文

Java对ES的基本操作_es java

es java

1.首先介绍一下ES的查询类型,SearchType。

    ES一共有四种查询类型:

    QUERY_AND_FETCH:主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和ES交互一次。

     特点:a.存在数据量和排序问题,主节点会汇总所有分片返回的数据,这样数据量会比较大b.各个分片上的规则可能不一致

    QUERY_THEN_FETCH:主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序,再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和ES交互两次

     特点解决了数据量问题但是排序问题依然存在,是ES的默认查询方式

    DFS_QUERY_AND_FETCH:和前面两种的区别在于将各个分片的规则统一起来进行打分

    特点解决了排序问题,但是仍然存在数据量问题

    DFS_QUERY_THEN_FETCH:和前面两种的区别在于将各个分片的规则统一起来进行打分,将数据的id和分值返回给主节点,主节点收到后进行汇总排序,再根据排序后的id到对应的节点读取对应的数据再返回给客户端

    特点解决了排序和数据量问题但是效率是最差的

2. 分页是会给服务器造成很大的压力。原因是会将分片的数据汇总后数据,如果有N个分片就会产生N倍的数量。

3.Java操作ES

依赖包:

   <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>2.4.1</version>
    </dependency>  

  1. import java.io.IOException;
  2. import java.net.InetAddress;
  3. import java.net.UnknownHostException;
  4. import java.util.HashMap;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Random;
  8. import java.util.concurrent.ExecutionException;
  9. import org.elasticsearch.action.bulk.BulkRequestBuilder;
  10. import org.elasticsearch.action.bulk.BulkResponse;
  11. import org.elasticsearch.index.query.QueryBuilders;
  12. import org.elasticsearch.action.delete.DeleteResponse;
  13. import org.elasticsearch.action.get.GetResponse;
  14. import org.elasticsearch.action.get.MultiGetItemResponse;
  15. import org.elasticsearch.action.get.MultiGetResponse;
  16. import org.elasticsearch.action.index.IndexRequest;
  17. import org.elasticsearch.action.index.IndexResponse;
  18. import org.elasticsearch.action.search.SearchResponse;
  19. import org.elasticsearch.action.search.SearchType;
  20. import org.elasticsearch.action.update.UpdateRequest;
  21. import org.elasticsearch.action.update.UpdateResponse;
  22. import org.elasticsearch.client.transport.TransportClient;
  23. import org.elasticsearch.cluster.node.DiscoveryNode;
  24. import org.elasticsearch.common.settings.Settings;
  25. import org.elasticsearch.common.text.Text;
  26. import org.elasticsearch.common.transport.InetSocketTransportAddress;
  27. import org.elasticsearch.common.xcontent.XContentBuilder;
  28. import org.elasticsearch.common.xcontent.XContentFactory;
  29. import org.elasticsearch.search.SearchHit;
  30. import org.elasticsearch.search.SearchHits;
  31. import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
  32. import org.elasticsearch.search.aggregations.metrics.sum.Sum;
  33. import org.elasticsearch.search.aggregations.AggregationBuilders;
  34. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
  35. import org.elasticsearch.search.highlight.HighlightField;
  36. import org.elasticsearch.search.sort.SortOrder;
  37. import com.alibaba.fastjson.JSON;
  38. /**
  39. * Hello world!
  40. *
  41. */
  42. public class App
  43. {
  44. private static TransportClient client;
  45. static {
  46. System.out.println("初始化链接。。。");
  47. Map<String, String> map = new HashMap<>();
  48. map.put("cluster.name", "my_home");
  49. Settings.Builder settings = Settings.builder().put(map);
  50. try {
  51. client = TransportClient.builder().settings(settings).build()
  52. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), Integer.parseInt("9300")));
  53. List<DiscoveryNode> nodes = client.connectedNodes();
  54. for (DiscoveryNode node : nodes) {
  55. System.out.println(node.getHostAddress());
  56. }
  57. } catch (NumberFormatException e) {
  58. e.printStackTrace();
  59. } catch (UnknownHostException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. public static void main(String[] args) throws Exception {
  64. testPreference();
  65. }
  66. /**
  67. * 功能描述: 增加索引
  68. *
  69. * @throws Exception void
  70. * @version 1.0.6
  71. * @author king
  72. */
  73. public static void addIndex() throws Exception{
  74. System.out.println("============分=====割=====线=============");
  75. XContentBuilder source = createJson4();
  76. // 存json入索引中
  77. IndexResponse response = client.prepareIndex("twitter", "tweet", "4").setSource(source).get();
  78. // 结果获取
  79. String index = response.getIndex();
  80. String type = response.getType();
  81. String id = response.getId();
  82. long version = response.getVersion();
  83. boolean created = response.isCreated();
  84. Map<String, Object> result = new HashMap<>();
  85. result.put("index", index);
  86. result.put("type", type);
  87. result.put("id", id);
  88. result.put("version", version);
  89. result.put("created", created);
  90. System.out.println(JSON.toJSONString(result));
  91. }
  92. /**
  93. * 功能描述:删除索引
  94. * void
  95. * @version 1.0.6
  96. * @author king
  97. */
  98. public static void delIndex() {
  99. System.out.println("======================删==除===分===割==线=================");
  100. DeleteResponse deleteResponse = client.prepareDelete("twitter", "tweet", "1").get();
  101. String index = deleteResponse.getIndex();
  102. String type = deleteResponse.getType();
  103. String id = deleteResponse.getId();
  104. Long version = deleteResponse.getVersion();
  105. Map<String, Object> result = new HashMap<>();
  106. result.put("index", index);
  107. result.put("type", type);
  108. result.put("id", id);
  109. result.put("version", version);
  110. System.out.println(JSON.toJSONString(result));
  111. }
  112. /**
  113. * 功能描述: 获取数据
  114. * void
  115. * @version 1.0.6
  116. * @author king
  117. */
  118. public static void getData() {
  119. System.out.println("=================获==取==数==据==============");
  120. GetResponse getResponse = client.prepareGet("tom", "cat", "2").execute().actionGet();
  121. String id = getResponse.getId();
  122. String resultStr = getResponse.getSourceAsString();
  123. System.out.println("id" + id);
  124. System.out.println("resultStr:" + resultStr);
  125. }
  126. /**
  127. *
  128. * */
  129. public static void testGetThread() {
  130. //设置线程安全
  131. GetResponse getresponse = client.prepareGet("tom", "cat", "2").setOperationThreaded(false).get();
  132. System.out.println(getresponse.getSourceAsString());
  133. }
  134. /**
  135. * 功能描述: 更新ES数据
  136. *
  137. * @throws IOException
  138. * @throws InterruptedException
  139. * @throws ExecutionException void
  140. * @version 1.0.6
  141. * @author king
  142. */
  143. public static void testUpdate() throws IOException, InterruptedException, ExecutionException {
  144. UpdateRequest updateRequest = new UpdateRequest();
  145. updateRequest.index("tom");
  146. updateRequest.type("cat");
  147. updateRequest.id("2");
  148. updateRequest.doc(XContentFactory.jsonBuilder()
  149. .startObject()
  150. .field("phone", "17686886688")
  151. .field("color", "blue")
  152. .endObject());
  153. UpdateResponse updateResponse = client.update(updateRequest).get();
  154. String index = updateResponse.getIndex();
  155. String type = updateResponse.getType();
  156. String id = updateResponse.getId();
  157. long version = updateResponse.getVersion();
  158. Map<String, Object> result = new HashMap<>();
  159. result.put("index", index);
  160. result.put("type", type);
  161. result.put("id", id);
  162. result.put("version", version);
  163. System.out.println(JSON.toJSONString(result));
  164. }
  165. /**
  166. * 功能描述: 更新数据
  167. *
  168. * @throws IOException
  169. * @throws InterruptedException
  170. * @throws ExecutionException void
  171. * @version 1.0.6
  172. * @author yaoyaowang
  173. */
  174. public static void testUpdate1() throws IOException, InterruptedException, ExecutionException {
  175. UpdateRequest updateRequest = new UpdateRequest("tom", "cat", "2");
  176. updateRequest.doc(XContentFactory.jsonBuilder()
  177. .startObject()
  178. .field("phone", "1366688866")
  179. .endObject());
  180. UpdateResponse updateResponse = client.update(updateRequest).get();
  181. String index = updateResponse.getIndex();
  182. String type = updateResponse.getType();
  183. String id = updateResponse.getId();
  184. long version = updateResponse.getVersion();
  185. Map<String, Object> result = new HashMap<>();
  186. result.put("index", index);
  187. result.put("type", type);
  188. result.put("id", id);
  189. result.put("version", version);
  190. System.out.println(JSON.toJSONString(result));
  191. }
  192. /**
  193. * 功能描述: 新增数据,如果没有插入,如果有更新
  194. *
  195. * @throws IOException
  196. * @throws InterruptedException
  197. * @throws ExecutionException void
  198. * @version 1.0.6
  199. * @author yaoyaowang
  200. */
  201. public static void testUpSert() throws IOException, InterruptedException, ExecutionException {
  202. //查询该数据是否存在,如果不存在则插入
  203. IndexRequest indexRequest = new IndexRequest("tom", "cat", "1");
  204. indexRequest.source(XContentFactory.jsonBuilder()
  205. .startObject()
  206. .field("name", "staven")
  207. .field("id", "2")
  208. .field("age", "20")
  209. .field("sex", "F")
  210. .field("address", "America")
  211. .field("phone", "110")
  212. .field("color", "yello"));
  213. //更新该数据,看该数据是否存在,如果不存在,则把上面的数据插入,如果存在则把数据更新(将"address", "America" 更新为 "address", "America"
  214. UpdateRequest updateRequest = new UpdateRequest("tom", "cat", "1");
  215. updateRequest.doc(XContentFactory.jsonBuilder()
  216. .startObject()
  217. .field("address", "America")
  218. .endObject());
  219. updateRequest.upsert(indexRequest);
  220. UpdateResponse updateResponse = client.update(updateRequest).get();
  221. String index = updateResponse.getIndex();
  222. String type = updateResponse.getType();
  223. String id = updateResponse.getId();
  224. long version = updateResponse.getVersion();
  225. Map<String, Object> result = new HashMap<>();
  226. result.put("index", index);
  227. result.put("type", type);
  228. result.put("id", id);
  229. result.put("version", version);
  230. System.out.println(JSON.toJSONString(result));
  231. }
  232. /**
  233. * 功能描述: 增加索引
  234. * void
  235. * @version 1.0.6
  236. * @author king
  237. */
  238. public static void testMultiGet() {
  239. MultiGetResponse multiGetResponse = client.prepareMultiGet()
  240. .add("tom", "cat", "1")
  241. .add("tom", "cat", "2")
  242. .add("twitter", "tweet", "1")
  243. .get();
  244. for(MultiGetItemResponse itemReponse : multiGetResponse){
  245. GetResponse getResponse = itemReponse.getResponse();
  246. if(getResponse.isExists()){
  247. String sourceAsString = getResponse.getSourceAsString();
  248. System.out.println(sourceAsString);
  249. }
  250. }
  251. }
  252. /**
  253. * 功能描述: 批处理
  254. *
  255. * @throws Exception void
  256. * @version 1.0.6
  257. * @author yaoyaowang
  258. */
  259. public static void testBulk() throws Exception {
  260. BulkRequestBuilder bulkBuilder = client.prepareBulk();
  261. bulkBuilder.add(client.prepareIndex("tom", "cat", "3")
  262. .setSource(XContentFactory.jsonBuilder()
  263. .startObject()
  264. .field("id", "2")
  265. .field("name", "rose")
  266. .field("age", "18")
  267. .field("sex", "F")
  268. .field("address", "tianjin")
  269. .field("color", "black")
  270. .field("phone", "120")
  271. .endObject()));
  272. bulkBuilder.add(client.prepareIndex("tom", "cat", "1")
  273. .setSource(XContentFactory.jsonBuilder()
  274. .startObject()
  275. .field("address", "England")
  276. .endObject()));
  277. BulkResponse bulkResponse = bulkBuilder.get();
  278. System.out.println(bulkResponse.getHeaders());
  279. }
  280. /**
  281. * 功能描述: 统计索引下的数据
  282. * void
  283. * @version 1.0.6
  284. * @author king
  285. */
  286. public static void testCount(){
  287. long num = client.prepareCount("twitter").get().getCount();
  288. System.out.println(String.format("twitter的总数为:%d", num));
  289. }
  290. /**
  291. * 功能描述: 查询
  292. *
  293. * void
  294. * @version 1.0.6
  295. * @author king
  296. */
  297. public static void testSearch() {
  298. Map<String, String> query = new HashMap<>();
  299. query.put("name", "tom");
  300. SearchResponse searchResponse = client.prepareSearch("twitter").setTypes("tweet")
  301. //.setQuery(QueryBuilders.matchQuery("name", "tom").operator(Operator.AND)).setSearchType(SearchType.DEFAULT)
  302. .setFrom(0).setSize(2).addSort("id", SortOrder.DESC)
  303. .get();
  304. SearchHits searchHits = searchResponse.getHits();
  305. Long count = searchHits.getTotalHits();
  306. System.out.println("总数:" + count);
  307. SearchHit[] searchHitArr = searchHits.getHits();
  308. for(SearchHit searchHit : searchHitArr){
  309. System.out.println(searchHit.getSourceAsString());
  310. }
  311. }
  312. /**
  313. * 功能描述: 筛选
  314. * void
  315. * @version 1.0.6
  316. * @author king
  317. */
  318. public static void testFilter() {
  319. SearchResponse searchResponse = client.prepareSearch("twitter").setTypes("tweet")
  320. .setQuery(QueryBuilders.matchAllQuery())
  321. .setPostFilter(QueryBuilders.rangeQuery("age").gte(17))
  322. .setFrom(0).addSort("id", SortOrder.ASC)
  323. .get();
  324. SearchHits searchHits = searchResponse.getHits();
  325. Long total = searchHits.getTotalHits();
  326. System.out.println("年龄大于17的数据总数" + total);
  327. SearchHit[] searchHitArr = searchHits.getHits();
  328. for(SearchHit searchHit : searchHitArr){
  329. System.out.println(searchHit.getSourceAsString());
  330. }
  331. }
  332. /**
  333. * 功能描述: 高亮
  334. * void
  335. * @version 1.0.6
  336. * @author king
  337. */
  338. public static void testHighLight() {
  339. SearchResponse searchResponse = client.prepareSearch("twitter").setTypes("tweet")
  340. .setQuery(QueryBuilders.matchQuery("name", "tom"))
  341. .setSearchType(SearchType.QUERY_THEN_FETCH)
  342. .addHighlightedField("name")
  343. .setHighlighterPreTags("<font color='red'>")
  344. .setHighlighterPostTags("</font>")
  345. .get();
  346. SearchHits searchHits = searchResponse.getHits();
  347. long total = searchHits.getTotalHits();
  348. System.out.println("索引twitter下的所有数据的总数:" + total);
  349. SearchHit[] searchHitArr = searchHits.getHits();
  350. for(SearchHit searchHit : searchHitArr){
  351. Map<String, HighlightField> highlightFields = searchHit.getHighlightFields();
  352. HighlightField highlightField = highlightFields.get("name");
  353. if(null != highlightField){
  354. Text[] fragments = highlightField.fragments();
  355. System.out.println("这是个什么鬼呢? ---> " + fragments[0]);
  356. }
  357. System.out.println(searchHit.sourceAsString());
  358. }
  359. }
  360. /**
  361. * 功能描述: 分组统计
  362. * void
  363. * @version 1.0.6
  364. * @author king
  365. */
  366. public static void testGroupBy() {
  367. SearchResponse searchResponse = client.prepareSearch("twitter").setTypes("tweet")
  368. .setQuery(QueryBuilders.matchAllQuery())
  369. .setSearchType(SearchType.QUERY_THEN_FETCH)
  370. .addAggregation(AggregationBuilders.terms("group_age").field("age").size(0))
  371. .get();
  372. Terms terms = searchResponse.getAggregations().get("group_age");
  373. List<Bucket> buckets = terms.getBuckets();
  374. for(Bucket bucket : buckets){
  375. System.out.println(bucket.getKey() + ":" + bucket.getDocCount());
  376. }
  377. }
  378. /**
  379. * 功能描述: 聚合
  380. * void
  381. * @version 1.0.6
  382. * @author king
  383. */
  384. public static void testAggregationFunction() {
  385. SearchResponse searchResponse = client.prepareSearch("twitter").setTypes("tweet")
  386. .setQuery(QueryBuilders.matchAllQuery())
  387. .setSearchType(SearchType.QUERY_THEN_FETCH)
  388. .addAggregation(AggregationBuilders.terms("group_name").field("name")
  389. .subAggregation(AggregationBuilders.sum("age_count").field("age")))
  390. .get();
  391. Terms terms = searchResponse.getAggregations().get("group_name");
  392. List<Bucket> buckets = terms.getBuckets();
  393. for(Bucket bucket : buckets){
  394. Sum sum = bucket.getAggregations().get("age_count");
  395. System.out.println("{" + bucket.getKey() + ":" + bucket.getDocCount() + ":" + sum.getValue() + "}");
  396. }
  397. }
  398. /**
  399. * 功能描述: 造数据
  400. *
  401. * @throws IOException void
  402. * @version 1.0.6
  403. * @author king
  404. */
  405. public static void generateOtherIndexData() throws IOException {
  406. for(int i = 1; i <= 60; i++){
  407. XContentBuilder source = XContentFactory.jsonBuilder()
  408. .startObject()
  409. .field("id", i)
  410. .field("name", createName())
  411. .field("sex", (i < 30 ? "F" : "M"))
  412. .field("age", createAge())
  413. .field("address", "china")
  414. .endObject();
  415. client.prepareIndex("ycu", "computer", String.valueOf(i-1)).setSource(source).get();
  416. }
  417. }
  418. /**
  419. * 功能描述: 创建人名
  420. *
  421. * @return String
  422. * @version 1.0.6
  423. * @author king
  424. */
  425. public static String createName(){
  426. Random random = new Random();
  427. String[] str = {"q","w","e","r","t","y","u","i","o","p","a","s","f","d","g","h","j","k","l","z","x","c","v","b","n","m"};
  428. random.nextInt(23);
  429. String name = new StringBuffer(str[random.nextInt(23)]).append(str[random.nextInt(23)])
  430. .append(str[random.nextInt(23)]).toString();
  431. return name;
  432. }
  433. /**
  434. * 功能描述: 创建年龄
  435. *
  436. * @return int
  437. * @version 1.0.6
  438. * @author king
  439. */
  440. public static int createAge(){
  441. Random random = new Random();
  442. String[] num = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
  443. String ageStr = new StringBuffer(num[random.nextInt(10)]).append(num[random.nextInt(10)]).toString();
  444. return Integer.parseInt(ageStr);
  445. }
  446. /**
  447. * 功能描述: 指定分片区查询数据
  448. * void
  449. * @version 1.0.6
  450. * @author king
  451. */
  452. public static void testPreference(){
  453. SearchResponse searchResponse = client.prepareSearch("ycu").setTypes("computer").setPreference("_shards:0,1")
  454. .setQuery(QueryBuilders.matchAllQuery()).setExplain(true).setFrom(0).setSize(60).get();
  455. SearchHits searchHits = searchResponse.getHits();
  456. long total = searchHits.getTotalHits();
  457. SearchHit[] searchHitArr = searchHits.getHits();
  458. System.out.println("总数:" + total);
  459. for(SearchHit searchHit : searchHitArr){
  460. System.out.println(searchHit.getSourceAsString());
  461. }
  462. }
  463. /**
  464. * 使用es的帮助类
  465. */
  466. public static XContentBuilder createJson4() throws Exception {
  467. // 创建json对象, 其中一个创建json的方式
  468. XContentBuilder source = XContentFactory.jsonBuilder()
  469. .startObject()
  470. .field("id", "5")
  471. .field("name", "tom")
  472. .field("sex", "F")
  473. .field("age", 19)
  474. .field("address", "china")
  475. .endObject();
  476. return source;
  477. }
  478. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/68304
推荐阅读
相关标签
  

闽ICP备14008679号