赞
踩
- <!--es 相关依赖开始 es客户端的版本必须和服务器版本一致-->
- <dependency>
- <groupId>io.searchbox</groupId>
- <artifactId>jest</artifactId>
- <version>6.3.1</version>
- </dependency>
- <dependency>
- <groupId>net.java.dev.jna</groupId>
- <artifactId>jna</artifactId>
- <version>4.5.2</version>
- </dependency>
- <dependency>
- <groupId>org.codehaus.janino</groupId>
- <artifactId>commons-compiler</artifactId>
- <version>2.7.8</version>
- </dependency>
- <!-- es 相关依赖结束 -->
- package com.zyj.gmall.common
-
- import io.searchbox.client.JestClientFactory
- import io.searchbox.client.config.HttpClientConfig
- import io.searchbox.core.Index
-
- object ESUtil {
-
- // 端口号,服务器如果没有配置,默认是9200
- val esUrl = "http://hadoop103:9200"
-
- def main(args: Array[String]): Unit = {
- // 向es写数据
-
- //1. 先有es的客户端
- //1.1 创建一个客户端工厂
- val factory = new JestClientFactory
- val conf = new HttpClientConfig.Builder(esUrl)
- .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
- .connTimeout(10 * 1000) // 连接到es的超时时间
- .readTimeout(10 * 1000) // 读取数据的最大超时时间
- .multiThreaded(true) // 是否允许多线程
- .build()
- factory.setHttpClientConfig(conf)
- //1.2 从工厂获取一个客户端
- val client = factory.getObject
-
- //2. es需要的数据(json,样例类)
- val data =
- """
- |{
- | "name": "zs",
- | "age": 20
- |}
- |""".stripMargin
-
- //3. 写入(单次,批次)
- val index = new Index.Builder(data)
- .index("user")
- .`type`("_doc")
- // .id("1") // 可选 如果没有设置 id自动生成
- .build()
- client.execute(index)
-
- //4. 关闭客户端(其实是把客户端还给工厂)
- client.shutdownClient() // 虽然过时了,但比较稳定
-
- }
- }
- package com.zyj.gmall.common
-
- import io.searchbox.client.JestClientFactory
- import io.searchbox.client.config.HttpClientConfig
- import io.searchbox.core.Index
-
- object ESUtil {
-
- // 端口号,服务器如果没有配置,默认是9200
- val esUrl = "http://hadoop103:9200"
-
- def main(args: Array[String]): Unit = {
- // 向es写数据
-
- //1. 先有es的客户端
- //1.1 创建一个客户端工厂
- val factory = new JestClientFactory
- val conf = new HttpClientConfig.Builder(esUrl)
- .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
- .connTimeout(10 * 1000) // 连接到es的超时时间
- .readTimeout(10 * 1000) // 读取数据的最大超时时间
- .multiThreaded(true) // 是否允许多线程
- .build()
- factory.setHttpClientConfig(conf)
- //1.2 从工厂获取一个客户端
- val client = factory.getObject
-
- //2. es需要的数据(json,样例类)
- val data = User(30, "ww")
-
- //3. 写入(单次,批次)
- val index = new Index.Builder(data)
- .index("user")
- .`type`("_doc")
- // .id("1") // 可选 如果没有设置 id自动生成
- .build()
- client.execute(index)
-
- //4. 关闭客户端(其实是把客户端还给工厂)
- client.shutdownClient() // 虽然过时了,但比较稳定
-
- }
- }
-
- case class User(age: Int, name: String)
- package com.zyj.gmall.common
-
- import io.searchbox.client.JestClientFactory
- import io.searchbox.client.config.HttpClientConfig
- import io.searchbox.core.Index
-
- object ESUtil {
-
- // 端口号,服务器如果没有配置,默认是9200
- val esUrl = "http://hadoop103:9200"
-
- // 创建一个客户端工厂
- val factory = new JestClientFactory
- val conf = new HttpClientConfig.Builder(esUrl)
- .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
- .connTimeout(10 * 1000) // 连接到es的超时时间
- .readTimeout(10 * 1000) // 读取数据的最大超时时间
- .multiThreaded(true) // 是否允许多线程
- .build()
- factory.setHttpClientConfig(conf)
-
- /*
- * 向es中插入单条数据
- * */
- def insertSingle(index: String, source: Object, id: String = null) = {
- // 从工厂获取一个客户端
- val client = factory.getObject
- // 写入(单次,批次)
- val action = new Index.Builder(source)
- .index(index)
- .`type`("_doc")
- .id(id) // id 如果是null,相当于没有传
- .build()
- client.execute(action)
- // 关闭客户端(其实是把客户端还给工厂)
- client.shutdownClient()
- }
-
- def main(args: Array[String]): Unit = {
-
- val data = User(30, "xx")
-
- insertSingle("user", data)
-
- }
- }
-
- case class User(age: Int, name: String)
- package com.zyj.gmall.common
-
- import io.searchbox.client.JestClientFactory
- import io.searchbox.client.config.HttpClientConfig
- import io.searchbox.core.{Bulk, Index}
-
- object ESUtil {
-
- // 端口号,服务器如果没有配置,默认是9200
- val esUrl = "http://hadoop103:9200"
-
- // 创建一个客户端工厂
- val factory = new JestClientFactory
- val conf = new HttpClientConfig.Builder(esUrl)
- .maxTotalConnection(100) // 最多同时可以有100个到es的连接 一般是分区数的1.5倍
- .connTimeout(10 * 1000) // 连接到es的超时时间
- .readTimeout(10 * 1000) // 读取数据的最大超时时间
- .multiThreaded(true) // 是否允许多线程
- .build()
- factory.setHttpClientConfig(conf)
-
- /*
- * 向es批量插入数据
- * */
- def insertBulk(index: String, sources: Iterator[Any]) = {
- val client = factory.getObject
-
- val bulk = new Bulk.Builder()
- .defaultIndex(index)
- .defaultType("_doc")
-
- sources.foreach {
- case (id: String, data) =>
- val action = new Index.Builder(data).id(id).build()
- bulk.addAction(action)
- case data =>
- val action = new Index.Builder(data).build()
- bulk.addAction(action)
- }
-
- client.execute(bulk.build())
- client.shutdownClient()
- }
-
- def main(args: Array[String]): Unit = {
-
- val list = User(1, "aa") :: User(2, "bb") :: User(3, "cc") :: Nil
- insertBulk("user", list.toIterator)
-
- val list2 = ("100", User(1, "a")) :: ("200", User(1, "b")) :: ("300", User(3, "c")) :: Nil
- insertBulk("user", list2.toIterator)
-
- }
- }
-
- case class User(age: Int, name: String)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。