赞
踩
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
- <version>1.14.4</version>
- </dependency>
-
SinkFunction实现类:
- import org.apache.flink.api.common.functions.RuntimeContext
- import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
-
- class ElasticIndexSinkFunction(indexType:String) extends ElasticsearchSinkFunction[JSONObject]{
- override def process(element: JSONObject, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
- try{
- val uniqueKey=element.getString("uniqueKey")
- val indexName=EsIndexName.getWeekMondayIndexName(indexType,element.getLongValue("startTime"))
- val request: IndexRequest = Requests.indexRequest
- .index(indexName)
- .`type`("_doc")
- .id(uniqueKey)
- .source(element)
-
- requestIndexer.add(request)
- }catch {
- case e:Exception=>e.printStackTrace()
- }
-
- }
- }
数据写入es:
- import org.apache.commons.configuration2.builder.fluent.Configurations
- import org.apache.flink.streaming.api.scala.DataStream
- import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
- import org.apache.http.HttpHost
-
- import java.util
- import java.util.UUID
-
- object EsSlinkUtil {
- def sendData(indexType: String, data: DataStream[JSONObject], jobType: String) = {
- val configs = new Configurations()
- val configuration = configs.properties("SysConfig.properties")
- val esIP = configuration.getString("eSPort_export")
- val httpHosts = new util.ArrayList[HttpHost]()
- httpHosts.add(new HttpHost(esIP, 9200, "http"))
- try {
- val elasticSink: ElasticsearchSink.Builder[JSONObject] = new ElasticsearchSink.Builder[JSONObject](httpHosts, new ElasticIndexSinkFunction(indexType))
- data.addSink(elasticSink.build())
- .name(indexType + "-" + UUID.randomUUID().toString.replaceAll("-", ""))
- .uid(indexType + "-" + jobType)
- } catch {
- case e: Exception => e.printStackTrace()
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。