当前位置:   article > 正文

Flink实战5-Flink全局参数传递实现Kafka数据源对接落地Elasticsearch_Sink与过程Search_flink elasticsearch sink

flink elasticsearch sink

背景

  • 适用于配置化传入Flink全局参数;
  • 项目起源于公司多数据源对接,此Demo只是其中一种,整个过程是公司前端部门因为各方渠道传入的不同数据,风控的业务方需要保留进行规则查询;数据流转过程就是Kafka接入不同数据源,按照不同的要求进行es的直接存储或者查询拼接后的存储;

摘要

关键字

  • Flink_Elasticsearch查询、Elasticsearch_Sink、Flink全局配置化参数;

设计

  • 整个过程比较简单,每个类中注释很详细;
  • Kafka_Source对接数据源;
  • Mapper_Processing进行数据处理、数据解析、数据查询;
  • Elasticsearch_Search是一些es查询封装的方法;
  • Elasticsearch_Sink进行数据存储落地;
  • Elasticsearch_Storage是一些es存储的封装方法;

理解

  • eg:Flink流中的查询和存储;

    1. 目前的示例全部都是Flink初始化的时候创建连接;
  1. 操作过程中进行数据具体查询和存储,所有数据对接都可以使用这种方式;

实现

说明

此处的处理没有写成项目中使用的比较复杂的可配置化的形式,也就是只针对单表测试表的操作;

依赖

<scala.main.version>2.11</scala.main.version>
<flink.version>1.10.1</flink.version>
<es.version>7.6.1</es.version>
<!--flink-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.main.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.main.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.main.version}</artifactId>
            <version>1.10.1</version>
        </dependency>

        <!--flink table & sql-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.main.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.main.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--导入flink连接redis的文件-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_${scala.main.version}</artifactId>
            <version>${flink.redis.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.main.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--rocksdb 与flink 进行整合依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.main.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- es客户端的依赖,其中包含了一个核心的api: TransportClient -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${es.version}</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

main

//此处中的properties只是项目中运用到了一些环境参数,大家学习过程中直接使用定参数到全局配置中即可
object FlinkFreelySourcePutInMain {
  def main(args: Array[String]): Unit = {
    //环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    var prop: Properties = null
    if (args != null && args.length == 2) {
      println(args(0))
      val env: String = args(0)
      println("args",args.toList)
      prop = ConfigUtils.getProperties(env , args(1))
      println("获取配置",prop)
      //      println("size:" + prop.size())
    }


      FlinkFreelySourcePutInProcessing.processor(env, prop)

    env.execute(this.getClass.getSimpleName)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Processing

import java.util.Properties

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
 * @Author mr_lu(鹿)
 * @Description ******
 * @Date 2021-06-04 下午6:59
 * @Copyright 代码类版权的最终解释权归属mr_lu本人所有;
 **/
object FlinkFreelySourcePutInProcessing extends FlinkPropertiesProcessor {
  override def processor(env: StreamExecutionEnvironment, prop: Properties): Unit = {
    /**
     * 1. kafka source-多数据源配置
     * 2. 数据处理,根据不同配置方式按照主键入库
     * 3. ES - sink 根据配置存储不同数据
     */
    /**
     * import
     */
    import org.apache.flink.api.scala._

    /**
     * 获取配置
     */
    val KAFKA_BROKER_SERVERS = prop.getProperty(ConfConstant.KAFKA_BROKER_KEY)
    val ZOOKEEPER_QUORUM_KEY = prop.getProperty(ConfConstant.ZOOKEEPER_QUORUM_KEY)
    val ES_NODES_HOSTS_NAME = prop.getProperty(ConfConstant.ES_NODES_KEY)

    println("KAFKA_BROKER_KEY", KAFKA_BROKER_SERVERS)
    println("ZOOKEEPER_QUORUM_KEY", ZOOKEEPER_QUORUM_KEY)

    /**
     * 这个是这次要实现的全局参数传递,具体使用就是在富函数中调用
     * 内部参数传递
     */
    val configuration = new Configuration()
    configuration.setString("ES_NODES_HOSTS_NAME", ES_NODES_HOSTS_NAME)
    env.getConfig.setGlobalJobParameters(configuration)

    /**
     * 1. kafka source-多数据源配置
     */
    val dataStream = MyKafkaSource.myKafkaSource(
      env,
      KAFKA_BROKER_SERVERS,
      List("FreelySourcePutInTopic")
    )

    /**
     * 2. 数据处理,根据不同配置方式按照主键入库,过程中涉及Flink关于Elasticsearch的查询
     */
    val freelySourcePutInMapStream: DataStream[FlinkElasticSearchSourceInsertBean] = dataStream.map(FlinkFreelySourcePutInMap)

    freelySourcePutInMapStream.addSink(MyElasticSearchSink)
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

Bean

case class FlinkElasticSearchSourceInsertBean(
                                             data : JSONObject ,
                                             id : String ,
                                             index : String
                                             )
  • 1
  • 2
  • 3
  • 4
  • 5

Kafka_Source

import java.util.Properties

import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer}

/**
 * @Author mr_lu(鹿)
 * @Description ******
 * @Date 2021-03-25 上午11:28
 * @Copyright 代码类版权的最终解释权归属mr_lu本人所有;
 **/
object MyKafkaSource {
  def myKafkaSource(
                     env : StreamExecutionEnvironment ,
                     bootstrapServers : String ,
                     topics : List[String]
                   ): DataStream[String] ={
    /**
     * 获取基础参数
     */
    import org.apache.flink.api.scala._
    import scala.collection.JavaConversions._
    /**
     * 定义kafka-source得到DataStream
     */
    //将kafka中数据反序列化,
    val valueDeserializer: DeserializationSchema[String] = new SimpleStringSchema()

    val properties = new Properties()
    properties.put("bootstrap.servers", bootstrapServers)

    val kafkaSinkDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](topics, valueDeserializer, properties))
    kafkaSinkDStream
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

Mapper_Processing

  • 涉及elasticsearch的查询;
  • 此处富函数获取全局参数;
  • 此处实现的逻辑就是根据kafka数据源中freely_data_flag的标记进行不同的操作,查询es的数据进行新老数据的拼接,最后再sink到es;
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.RestHighLevelClient

/**
 * @Author mr_lu(鹿)
 * @Description ******
 * @Date 2021-06-04 下午7:18
 * @Copyright 代码类版权的最终解释权归属mr_lu本人所有;
 **/
object FlinkFreelySourcePutInMap extends RichMapFunction[String, FlinkElasticSearchSourceInsertBean] {

  /**
   * 基础对象属性定义
   */
  var esClient: RestHighLevelClient = null
  var ES_NODES_HOSTS_NAME : String = null

  /**
   * 初始化
   * 初始化链接的时候获取全局配置参数
   * @param parameters
   */
  override def open(parameters: Configuration): Unit = {
    //初始化es链接
    val configuration = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[Configuration]
    ES_NODES_HOSTS_NAME = configuration.getString("ES_NODES_HOSTS_NAME", "null")
    println("map初始化es连接成功!")
    esClient = EsClient.getEsClient(ES_NODES_HOSTS_NAME)
  }

  override def map(value: String): FlinkElasticSearchSourceInsertBean = {
    /**
     * 初始化解析值
     */
    val jsonObject = JSON.parseObject(value)
    val freelyDataflag = jsonObject.getString("freely_data_flag")
    val customerId = jsonObject.getString("customer_id")
    val data = jsonObject.getString("data")

    //如果纵横标识为1那么只推一次,直接返回值
    if (freelyDataflag == "1") {
      LoggerPrint.loggerInfoPrint(s"customer_id=$customerId,纵横标记为1,直接返回!")
      FlinkElasticSearchSourceInsertBean(JSON.parseObject(value) , customerId , "vs_freely_source_put_in_data")

      //如果纵横标识为n那么会重复多次推,需要进行查询拼接返回
    } else if (freelyDataflag == "n") {
      val termsJson = new JSONObject()
      termsJson.put("customer_id", customerId)
      //请求获取es中的数据
      val searchResponse = EsSearchUtils.searchHitsByTerms(
        esClient,
        "vs_freely_source_put_in_data",
        termsJson,
        1
      )
      //拿到es中原始数据
      val hits = SingleResponseAnalysisUtils.analysisSearchResponseToHits(searchResponse)
      //判断来定义es中是否有历史记录,如果有:新老数据做合并,没有:直接返回新数据
      if (hits.length > 0) {
        val esOldJson = JSON.parseObject(hits(0).getSourceAsString)
        val resultJson = JSON.parseObject(value)
//        LoggerPrint.loggerTrait(s"customer_id=$customerId,纵横标记为n,merginJsonFun=${JsonUtils.merginJsonFun(JSON.parseObject(data) , esOldJson.getJSONObject("data"))}")
        resultJson.put("data" , JsonUtils.merginSingleJsonFun(esOldJson.getJSONObject("data") , JSON.parseObject(data)))
//        LoggerPrint.loggerTrait(s"customer_id=$customerId,纵横标记为n,esOldJson=$esOldJson")
        LoggerPrint.loggerInfoPrint(s"customer_id=$customerId,纵横标记为n,resultJson=$resultJson")
        FlinkElasticSearchSourceInsertBean(
          resultJson ,
          customerId ,
          "vs_freely_source_put_in_data"
        )
      } else {
        FlinkElasticSearchSourceInsertBean(JSON.parseObject(value) , customerId , "vs_freely_source_put_in_data")
      }
    } else {
      FlinkElasticSearchSourceInsertBean(JSON.parseObject(value) , customerId , "vs_freely_source_put_in_data")
    }
  }

  override def close(): Unit = {
    if(esClient != null){
      esClient.close()
    }else{
      println("FlinkFreelySourcePutInMap处理过程中esClient为空!")
    }
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

Elasticsearch_Search

  • 此处的查询方法的封装就是使用我之前帖子中针对es的基本操作的合集;
  • https://blog.csdn.net/Kevin__Durant/article/details/114501884
  /**
   * 多条件term,过滤查询
   *
   * @param client 客户端
   * @param index 索引
   * @param terms 条件组,json中kv
   * @return hits
   */
  def searchHitsByTerms(client: RestHighLevelClient,
                        index: String,
                        terms: JSONObject,
                        size: Int
                       ): SearchResponse = {
    val searchRequest = new SearchRequest(index)
    val searchSourceBuilder = new SearchSourceBuilder().size(size)
    val boolQueryBuilder = QueryBuilders.boolQuery()

    /**
     * 处理term
     */
    terms.keySet().toArray().foreach(jsonKey => {
      val termQueryBuilder = QueryBuilders.termQuery(jsonKey.toString, terms.getString(jsonKey.toString))
      boolQueryBuilder.must().add(termQueryBuilder)
    })
    searchSourceBuilder.query(boolQueryBuilder)
    searchRequest.source(searchSourceBuilder)

    client.search(
      searchRequest,
      RequestOptions.DEFAULT
    )
  }



  /**
   * 解析 searchResponse 为 SearchHit 集合
   * @param searchResponse
   * @return
   */
  def analysisSearchResponseToHits(
                                    searchResponse : SearchResponse
                                  ): Array[SearchHit] ={
    searchResponse.getHits.getHits
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

Elasticsearch_Sink

package com.utils.flink.sinks

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.elasticsearch.client.RestHighLevelClient

/**
 * @Author mr_lu(鹿)
 * @Description ******
 * @Date 2021-06-04 下午7:24
 * @Copyright 代码类版权的最终解释权归属mr_lu本人所有;
 **/
object MyElasticSearchSink extends RichSinkFunction[FlinkElasticSearchSourceInsertBean] {
  /**
   * 基础对象属性定义
   */
  var esClient: RestHighLevelClient = null
  var ES_NODES_HOSTS_NAME: String = null

  /**
   * 初始化建立es高级连接
   *
   * @param parameters
   */
  override def open(parameters: Configuration): Unit = {
    //初始化es链接
    val configuration = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[Configuration]
    ES_NODES_HOSTS_NAME = configuration.getString("ES_NODES_HOSTS_NAME", "null")
    esClient = EsClient.getEsClient(ES_NODES_HOSTS_NAME)
    println("sink初始化es连接成功!")
  }

  /**
   * 进行数据处理
   *
   * @param value
   * @param context
   */
  override def invoke(value: FlinkElasticSearchSourceInsertBean, context: SinkFunction.Context[_]): Unit = {
    /**
     * 处理逻辑
     */
    InsertUtils.insertESData(
      value.data,
      esClient,
      value.id,
      value.index
    )
    println("Sink写入成功!", value.data)
  }

  /**
   * 关闭连接
   */
  override def close(): Unit = {
    if (esClient != null) {
      esClient.close()
    } else {
      println("MyElasticSearchSink处理过程中esClient为空!")
    }
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

Elasticsearch_Storage

  • 一个简单的es数据insert实例;
  • 具体使用有insert、update等;
  • 优化方面可以使用bulk的方式,此贴不作阐述;
  /**
   * 插入数据直接按照id覆盖
   *
   * @param data   要插入的data
   * @param client es client
   * @param id     id
   * @param index  索引名
   */
  def insertESData(data: JSONObject, client: RestHighLevelClient, id: String, index: String): Unit = {
    client.index(
      new IndexRequest(index).id(id).timeout(TimeValue.timeValueSeconds(3)).source(data),
      //      new IndexRequest(index, index).id(id).timeout(TimeValue.timeValueSeconds(2)).source(data),
      RequestOptions.DEFAULT
    )
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

部署

#!/bin/bash
flink run -m yarn-cluster \
-c xxx.xxx \
-p 8 \
/Linux根目录
  • 1
  • 2
  • 3
  • 4
  • 5

优化

  • 一些显而易见容易看到的优化
    1. Sink过程中可以改变存储方式,比如我之前在Hbase_Sink中使用过的一种方式,就是达到设定的阈值在进行Sink,这样就最容易的实现尽量少的交互,毕竟我们目前的大量组件都是写能力小于读的;比如集合中对象数据达到1000个进行一次bulk存储提交;
    2. 连接初始化的优化,此处是Demo,并没有并发,所以可以使用最简直的直连,用es内部的连接池,如果说并发要求很高的情况下我们的查询和存储的连接都可以是初始化一个连接池,这样吞吐量就能达到线性增长了,推荐阅读前面关于es查询连接池的帖子;
    3. 其实使用方式,比如使用Table API或者Flink SQL避免自己对Flink理解不深导致的问题,直接交给FLink内部优化,因为个人接触更加底层,为上层使用者铺设平台所以最基础的API理解起来最为舒适;
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/552204
推荐阅读
相关标签
  

闽ICP备14008679号