当前位置:   article > 正文

Flink案例——kafka、MySQL source_org.apache.flink.connector.kafka.source.kafkasourc

org.apache.flink.connector.kafka.source.kafkasource

Flink案例——kafka、MySQL source

一、kafka source

flink和kafka的连接是十分友好的,毕竟是做流式处理的吧。

首先依赖

<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-scala_2.12</artifactId>
     <version>1.10.1</version>
</dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.12</artifactId>
      <version>1.10.1</version>
</dependency>
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
      <version>1.10.1</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

接着是代码

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.api.scala._

object KafkaSource {
  def main(args: Array[String]): Unit = {
    //环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //并行度
    env.setParallelism(4)
    env.disableOperatorChaining()

    //kafka配置 集群以逗号隔开,如172.0.0.101:1111,172.0.0.102:1111
    val pro: Properties = new Properties()
    pro.setProperty("bootstrap.servers", "*******");
    pro.setProperty("group.id", "topic");
    pro.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    pro.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    pro.setProperty("auto.offset.reset","latest")


    //接收kafka数据
    env.addSource(new FlinkKafkaConsumer011[String]("topic",new SimpleStringSchema(),pro))
        .print()

    //执行
    env.execute()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

二、MySQL source

MySQL采用自定义数据源的方式

依赖

<dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
     <version>8.0.25</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

代码

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._


object MysqlSource {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //接收MySQL数据
    val inputData: DataStream[CNC_AlarmAnalysresult] = env.addSource(new MySQLSource).setParallelism(1)
    inputData.print()
    env.execute("mysql source")

  }

    //根据表 创建样例类
  case class class_name(id: Int, cid: Int)

  class MySQLSource extends RichParallelSourceFunction[class_name] {
    var flag = true
    var conn: Connection = _
    var stat: PreparedStatement = _

    override def open(parameters: Configuration): Unit = {
      conn = DriverManager.getConnection("jdbc:mysql://172.8.10.188:3306/1001_161?characterEncoding=utf-8&serverTimezone=UTC", "siger", "Siger_123")
      val sql = "select id,cid from class_name"
      stat = conn.prepareStatement(sql)
    }


    override def run(sourceContext: SourceFunction.SourceContext[CNC_AlarmAnalysresult]): Unit = {
      while (flag) {
        val resultSet: ResultSet = stat.executeQuery()
        while (resultSet.next()) {
          val id = resultSet.getInt("id")
          val cid = resultSet.getInt("cid")
          sourceContext.collect(class_name(id, cid))
          Thread.sleep(100)
        }
      }
    }

    override def cancel(): Unit = {
      flag = false
    }

    override def close(): Unit = {
      if (stat != null) stat.close()
      if (conn != null) conn.close()
    }
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

几个月没写博客了,以后还是要坚持写才好。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/789331
推荐阅读
相关标签
  

闽ICP备14008679号