赞
踩
flink和kafka的连接是十分友好的,毕竟是做流式处理的吧。
首先依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.10.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.10.1</version> </dependency>
接着是代码
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.api.scala._ object KafkaSource { def main(args: Array[String]): Unit = { //环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //并行度 env.setParallelism(4) env.disableOperatorChaining() //kafka配置 集群以逗号隔开,如172.0.0.101:1111,172.0.0.102:1111 val pro: Properties = new Properties() pro.setProperty("bootstrap.servers", "*******"); pro.setProperty("group.id", "topic"); pro.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") pro.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") pro.setProperty("auto.offset.reset","latest") //接收kafka数据 env.addSource(new FlinkKafkaConsumer011[String]("topic",new SimpleStringSchema(),pro)) .print() //执行 env.execute() } }
MySQL采用自定义数据源的方式
依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
代码
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ object MysqlSource { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //接收MySQL数据 val inputData: DataStream[CNC_AlarmAnalysresult] = env.addSource(new MySQLSource).setParallelism(1) inputData.print() env.execute("mysql source") } //根据表 创建样例类 case class class_name(id: Int, cid: Int) class MySQLSource extends RichParallelSourceFunction[class_name] { var flag = true var conn: Connection = _ var stat: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://172.8.10.188:3306/1001_161?characterEncoding=utf-8&serverTimezone=UTC", "siger", "Siger_123") val sql = "select id,cid from class_name" stat = conn.prepareStatement(sql) } override def run(sourceContext: SourceFunction.SourceContext[CNC_AlarmAnalysresult]): Unit = { while (flag) { val resultSet: ResultSet = stat.executeQuery() while (resultSet.next()) { val id = resultSet.getInt("id") val cid = resultSet.getInt("cid") sourceContext.collect(class_name(id, cid)) Thread.sleep(100) } } } override def cancel(): Unit = { flag = false } override def close(): Unit = { if (stat != null) stat.close() if (conn != null) conn.close() } } }
几个月没写博客了,以后还是要坚持写才好。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。