赞
踩
原文链接:https://www.toutiao.com/i6859235904779715076/
本文主要从以下几个方面介绍Flink的流处理API——Sink
一、输出到Kafka
二、输出到Redis
三、输出到MySQL
数据处理的过程基本可以分为三个阶段分别是,数据从来哪里,做什么业务逻辑,落地到哪里去。
这三部分在Flink中分别被称为Source、Transform和Sink
其中Source部分可以参考这篇:Flink流处理API——Source
Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。
Flink支持的Sink
版本:
scala:2.11.12
Kafka:0.8.2.2
Flink:1.7.2
Redis:3.2.9
MySQL:5.7.30
pom.xml依赖部分(log日志的依赖一定要加上,否则当Flink链接Kafka0.8时会报Failed to instantiate SLF4J LoggerFactory Reported exception错误)
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.11</artifactId>
- <version>1.7.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.11</artifactId>
- <version>1.7.2</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.11</artifactId>
- <version>1.7.2</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
- <version>1.7.2</version>
- </dependency>
-
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.22</version>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.22</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.bahir</groupId>
- <artifactId>flink-connector-redis_2.11</artifactId>
- <version>1.0</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.38</version>
- </dependency>
-
-
- </dependencies>

Flink和Kafka天生是一对,Sink到Kafka相当方便
- package xxx
-
- import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.connectors.kafka._
-
- object SinkDataToKafka {
- def main(args: Array[String]): Unit = {
-
- val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- // source
- val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
-
- // transform操作
- val maped: DataStream[String] = value.map(line => {
- val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
- SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble).toString // 转成String方便输出
- })
-
- // FlinkKafkaProducer08参数:brokerList,topicID,序列化机制
- maped.addSink(new FlinkKafkaProducer08[String](
- "slave1:9092,slave2:9092,slave3:9092", "out", new SimpleStringSchema()))
-
- environment.execute()
- }
- }

redis:在slave4,端口:6379,密码;123
- package xxx
-
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.connectors.redis.RedisSink
- import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
- import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
-
- object SinkDataToRedis {
- def main(args: Array[String]): Unit = {
- val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
- // source
- val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
-
- // transform操作
- val maped: DataStream[SensorReading] = value.map(line => {
- val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
- SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble)
- })
-
- // redis的链接参数
- val conf = new FlinkJedisPoolConfig.Builder().setHost("slave4").setPort(6379).setPassword("123").build()
-
- maped.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper())) // 用hgetall sensor 命令查看值
-
- environment.execute()
- }
- }
-
- class MyRedisMapper extends RedisMapper[SensorReading]{
- // 定义保存数据到redis的命令
- override def getCommandDescription: RedisCommandDescription = {
- new RedisCommandDescription(RedisCommand.HSET, "sensor")
- }
-
- // 保存到redis的key
- override def getKeyFromData(t: SensorReading): String = {
- t.id
- }
-
- // 保存到redis的value
- override def getValueFromData(t: SensorReading): String = {
- t.temperature.toString
- }
- }

表信息:slave3,端口:3306,用户名:root 密码:root
库名:sensor_db 表名:sensor
字段信息:
- package xxx
-
- import java.sql.{Connection, DriverManager, PreparedStatement}
-
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
- import org.apache.flink.streaming.api.scala._
-
- object SinkDataToMySQL {
- def main(args: Array[String]): Unit = {
- val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
- // source
- val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
-
- // transform操作
- val maped: DataStream[SensorReading] = value.map(line => {
- val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
- SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble)
- })
-
- maped.addSink(new MyJDBCSink())
-
- environment.execute()
- }
-
- }
-
- class MyJDBCSink() extends RichSinkFunction[SensorReading]{
- // 定义SQL连接、定义预编译器
- var connect: Connection = _
- var insertStrmt: PreparedStatement = _
- var updateStrmt : PreparedStatement = _
-
- // 初始化,创建链接和预编译语句
- override def open(parameters: Configuration): Unit = {
- super.open(parameters)
- connect = DriverManager.getConnection("jdbc:mysql://slave3:3306/sensor_db?useUnicode=true&characterEncoding=UTF-8", "root", "root")
- insertStrmt = connect.prepareStatement("INSERT INTO sensors(id, temp) VALUES (?, ?)")
- updateStrmt = connect.prepareStatement("UPDATE sensors set temp = ? where id = ?")
- }
-
- // 调用链接,执行sql
- override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
- // 执行更新语句
- updateStrmt.setDouble(1, value.temperature) // setDouble表示数据的类型,1表示第几个?,value.temperature表示赋值
- updateStrmt.setString(2, value.id)
- updateStrmt.execute()
-
- // update没有查到数据,那么执行插入语句
- if (updateStrmt.getUpdateCount == 0){
- insertStrmt.setDouble(2, value.temperature)
- insertStrmt.setString(1, value.id)
- insertStrmt.execute()
- }
- }
-
- // 关闭链接
- override def close(): Unit = {
- insertStrmt.close()
- updateStrmt.close()
- connect.close()
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。