当前位置:   article > 正文

Flink elasticSearchSink

flink elasticsearchsink
  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
  4.     <version>1.14.4</version>
  5. </dependency>

SinkFunction实现类:

  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
  3. class ElasticIndexSinkFunction(indexType:String) extends ElasticsearchSinkFunction[JSONObject]{
  4. override def process(element: JSONObject, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
  5. try{
  6. val uniqueKey=element.getString("uniqueKey")
  7. val indexName=EsIndexName.getWeekMondayIndexName(indexType,element.getLongValue("startTime"))
  8. val request: IndexRequest = Requests.indexRequest
  9. .index(indexName)
  10. .`type`("_doc")
  11. .id(uniqueKey)
  12. .source(element)
  13. requestIndexer.add(request)
  14. }catch {
  15. case e:Exception=>e.printStackTrace()
  16. }
  17. }
  18. }

数据写入es:

  1. import org.apache.commons.configuration2.builder.fluent.Configurations
  2. import org.apache.flink.streaming.api.scala.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
  4. import org.apache.http.HttpHost
  5. import java.util
  6. import java.util.UUID
  7. object EsSlinkUtil {
  8. def sendData(indexType: String, data: DataStream[JSONObject], jobType: String) = {
  9. val configs = new Configurations()
  10. val configuration = configs.properties("SysConfig.properties")
  11. val esIP = configuration.getString("eSPort_export")
  12. val httpHosts = new util.ArrayList[HttpHost]()
  13. httpHosts.add(new HttpHost(esIP, 9200, "http"))
  14. try {
  15. val elasticSink: ElasticsearchSink.Builder[JSONObject] = new ElasticsearchSink.Builder[JSONObject](httpHosts, new ElasticIndexSinkFunction(indexType))
  16. data.addSink(elasticSink.build())
  17. .name(indexType + "-" + UUID.randomUUID().toString.replaceAll("-", ""))
  18. .uid(indexType + "-" + jobType)
  19. } catch {
  20. case e: Exception => e.printStackTrace()
  21. }
  22. }
  23. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/822511
推荐阅读
相关标签
  

闽ICP备14008679号