当前位置:   article > 正文

Scala Api 操作 Elasticsearch数据库_scala 操作 elasticsearch

scala 操作 elasticsearch

1、操作前先导Maven包,注意版本一定要一致,新旧版本不兼容

  1. <!--es 相关依赖开始 es客户端的版本必须和服务器版本一致-->
  2. <dependency>
  3. <groupId>io.searchbox</groupId>
  4. <artifactId>jest</artifactId>
  5. <version>6.3.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>net.java.dev.jna</groupId>
  9. <artifactId>jna</artifactId>
  10. <version>4.5.2</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.codehaus.janino</groupId>
  14. <artifactId>commons-compiler</artifactId>
  15. <version>2.7.8</version>
  16. </dependency>
  17. <!-- es 相关依赖结束 -->

2、单条数据插入,使用 json 字符串作为数据源

  1. package com.zyj.gmall.common
  2. import io.searchbox.client.JestClientFactory
  3. import io.searchbox.client.config.HttpClientConfig
  4. import io.searchbox.core.Index
  5. object ESUtil {
  6. // 端口号,服务器如果没有配置,默认是9200
  7. val esUrl = "http://hadoop103:9200"
  8. def main(args: Array[String]): Unit = {
  9. // 向es写数据
  10. //1. 先有es的客户端
  11. //1.1 创建一个客户端工厂
  12. val factory = new JestClientFactory
  13. val conf = new HttpClientConfig.Builder(esUrl)
  14. .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
  15. .connTimeout(10 * 1000) // 连接到es的超时时间
  16. .readTimeout(10 * 1000) // 读取数据的最大超时时间
  17. .multiThreaded(true) // 是否允许多线程
  18. .build()
  19. factory.setHttpClientConfig(conf)
  20. //1.2 从工厂获取一个客户端
  21. val client = factory.getObject
  22. //2. es需要的数据(json,样例类)
  23. val data =
  24. """
  25. |{
  26. | "name": "zs",
  27. | "age": 20
  28. |}
  29. |""".stripMargin
  30. //3. 写入(单次,批次)
  31. val index = new Index.Builder(data)
  32. .index("user")
  33. .`type`("_doc")
  34. // .id("1") // 可选 如果没有设置 id自动生成
  35. .build()
  36. client.execute(index)
  37. //4. 关闭客户端(其实是把客户端还给工厂)
  38. client.shutdownClient() // 虽然过时了,但比较稳定
  39. }
  40. }

3、单条数据插入,使用样例类作为数据源

  1. package com.zyj.gmall.common
  2. import io.searchbox.client.JestClientFactory
  3. import io.searchbox.client.config.HttpClientConfig
  4. import io.searchbox.core.Index
  5. object ESUtil {
  6. // 端口号,服务器如果没有配置,默认是9200
  7. val esUrl = "http://hadoop103:9200"
  8. def main(args: Array[String]): Unit = {
  9. // 向es写数据
  10. //1. 先有es的客户端
  11. //1.1 创建一个客户端工厂
  12. val factory = new JestClientFactory
  13. val conf = new HttpClientConfig.Builder(esUrl)
  14. .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
  15. .connTimeout(10 * 1000) // 连接到es的超时时间
  16. .readTimeout(10 * 1000) // 读取数据的最大超时时间
  17. .multiThreaded(true) // 是否允许多线程
  18. .build()
  19. factory.setHttpClientConfig(conf)
  20. //1.2 从工厂获取一个客户端
  21. val client = factory.getObject
  22. //2. es需要的数据(json,样例类)
  23. val data = User(30, "ww")
  24. //3. 写入(单次,批次)
  25. val index = new Index.Builder(data)
  26. .index("user")
  27. .`type`("_doc")
  28. // .id("1") // 可选 如果没有设置 id自动生成
  29. .build()
  30. client.execute(index)
  31. //4. 关闭客户端(其实是把客户端还给工厂)
  32. client.shutdownClient() // 虽然过时了,但比较稳定
  33. }
  34. }
  35. case class User(age: Int, name: String)

 4、单条数据插入,将对数据库的连接进行封装

  1. package com.zyj.gmall.common
  2. import io.searchbox.client.JestClientFactory
  3. import io.searchbox.client.config.HttpClientConfig
  4. import io.searchbox.core.Index
  5. object ESUtil {
  6. // 端口号,服务器如果没有配置,默认是9200
  7. val esUrl = "http://hadoop103:9200"
  8. // 创建一个客户端工厂
  9. val factory = new JestClientFactory
  10. val conf = new HttpClientConfig.Builder(esUrl)
  11. .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
  12. .connTimeout(10 * 1000) // 连接到es的超时时间
  13. .readTimeout(10 * 1000) // 读取数据的最大超时时间
  14. .multiThreaded(true) // 是否允许多线程
  15. .build()
  16. factory.setHttpClientConfig(conf)
  17. /*
  18. * 向es中插入单条数据
  19. * */
  20. def insertSingle(index: String, source: Object, id: String = null) = {
  21. // 从工厂获取一个客户端
  22. val client = factory.getObject
  23. // 写入(单次,批次)
  24. val action = new Index.Builder(source)
  25. .index(index)
  26. .`type`("_doc")
  27. .id(id) // id 如果是null,相当于没有传
  28. .build()
  29. client.execute(action)
  30. // 关闭客户端(其实是把客户端还给工厂)
  31. client.shutdownClient()
  32. }
  33. def main(args: Array[String]): Unit = {
  34. val data = User(30, "xx")
  35. insertSingle("user", data)
  36. }
  37. }
  38. case class User(age: Int, name: String)

5、批量插入,样例类作为数据源,并封装连接

  1. package com.zyj.gmall.common
  2. import io.searchbox.client.JestClientFactory
  3. import io.searchbox.client.config.HttpClientConfig
  4. import io.searchbox.core.{Bulk, Index}
  5. object ESUtil {
  6. // 端口号,服务器如果没有配置,默认是9200
  7. val esUrl = "http://hadoop103:9200"
  8. // 创建一个客户端工厂
  9. val factory = new JestClientFactory
  10. val conf = new HttpClientConfig.Builder(esUrl)
  11. .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
  12. .connTimeout(10 * 1000) // 连接到es的超时时间
  13. .readTimeout(10 * 1000) // 读取数据的最大超时时间
  14. .multiThreaded(true) // 是否允许多线程
  15. .build()
  16. factory.setHttpClientConfig(conf)
  17. /*
  18. * 向es批量插入数据
  19. * */
  20. def insertBulk(index: String, sources: Iterator[Any]) = {
  21. val client = factory.getObject
  22. val bulk = new Bulk.Builder()
  23. .defaultIndex(index)
  24. .defaultType("_doc")
  25. sources.foreach {
  26. case (id: String, data) =>
  27. val action = new Index.Builder(data).id(id).build()
  28. bulk.addAction(action)
  29. case data =>
  30. val action = new Index.Builder(data).build()
  31. bulk.addAction(action)
  32. }
  33. client.execute(bulk.build())
  34. client.shutdownClient()
  35. }
  36. def main(args: Array[String]): Unit = {
  37. val list = User(1, "aa") :: User(2, "bb") :: User(3, "cc") :: Nil
  38. insertBulk("user", list.toIterator)
  39. val list2 = ("100", User(1, "a")) :: ("200", User(1, "b")) :: ("300", User(3, "c")) :: Nil
  40. insertBulk("user", list2.toIterator)
  41. }
  42. }
  43. case class User(age: Int, name: String)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/606383
推荐阅读
相关标签
  

闽ICP备14008679号