当前位置:   article > 正文

Flink流处理API——Sink_flink sink

flink sink

原文链接:https://www.toutiao.com/i6859235904779715076/

本文主要从以下几个方面介绍Flink的流处理API——Sink

一、输出到Kafka

二、输出到Redis

三、输出到MySQL

数据处理的过程基本可以分为三个阶段分别是,数据从来哪里,做什么业务逻辑,落地到哪里去。

这三部分在Flink中分别被称为Source、Transform和Sink

其中Source部分可以参考这篇:Flink流处理API——Source

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))


官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。

Flink流处理API——Sink

Flink支持的Sink

版本:

scala:2.11.12

Kafka:0.8.2.2

Flink:1.7.2

Redis:3.2.9

MySQL:5.7.30

pom.xml依赖部分(log日志的依赖一定要加上,否则当Flink链接Kafka0.8时会报Failed to instantiate SLF4J LoggerFactory Reported exception错误)

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-scala_2.11</artifactId>
  5. <version>1.7.2</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-scala_2.11</artifactId>
  10. <version>1.7.2</version>
  11. <scope>provided</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-clients_2.11</artifactId>
  16. <version>1.7.2</version>
  17. </dependency>
  18. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
  22. <version>1.7.2</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.slf4j</groupId>
  26. <artifactId>slf4j-api</artifactId>
  27. <version>1.7.22</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.slf4j</groupId>
  31. <artifactId>slf4j-log4j12</artifactId>
  32. <version>1.7.22</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.apache.bahir</groupId>
  36. <artifactId>flink-connector-redis_2.11</artifactId>
  37. <version>1.0</version>
  38. </dependency>
  39. <dependency>
  40. <groupId>mysql</groupId>
  41. <artifactId>mysql-connector-java</artifactId>
  42. <version>5.1.38</version>
  43. </dependency>
  44. </dependencies>

一、输出到Kafka

Flink和Kafka天生是一对,Sink到Kafka相当方便

  1. package xxx
  2. import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._
  3. import org.apache.flink.streaming.connectors.kafka._
  4. object SinkDataToKafka {
  5. def main(args: Array[String]): Unit = {
  6. val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  7. // source
  8. val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
  9. // transform操作
  10. val maped: DataStream[String] = value.map(line => {
  11. val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
  12. SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble).toString // 转成String方便输出
  13. })
  14. // FlinkKafkaProducer08参数:brokerList,topicID,序列化机制
  15. maped.addSink(new FlinkKafkaProducer08[String](
  16. "slave1:9092,slave2:9092,slave3:9092", "out", new SimpleStringSchema()))
  17. environment.execute()
  18. }
  19. }

二、输出到Redis

redis:在slave4,端口:6379,密码;123

  1. package xxx
  2. import org.apache.flink.streaming.api.scala._
  3. import org.apache.flink.streaming.connectors.redis.RedisSink
  4. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
  5. import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
  6. object SinkDataToRedis {
  7. def main(args: Array[String]): Unit = {
  8. val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  9. // source
  10. val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
  11. // transform操作
  12. val maped: DataStream[SensorReading] = value.map(line => {
  13. val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
  14. SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble)
  15. })
  16. // redis的链接参数
  17. val conf = new FlinkJedisPoolConfig.Builder().setHost("slave4").setPort(6379).setPassword("123").build()
  18. maped.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper())) // 用hgetall sensor 命令查看值
  19. environment.execute()
  20. }
  21. }
  22. class MyRedisMapper extends RedisMapper[SensorReading]{
  23. // 定义保存数据到redis的命令
  24. override def getCommandDescription: RedisCommandDescription = {
  25. new RedisCommandDescription(RedisCommand.HSET, "sensor")
  26. }
  27. // 保存到redis的key
  28. override def getKeyFromData(t: SensorReading): String = {
  29. t.id
  30. }
  31. // 保存到redis的value
  32. override def getValueFromData(t: SensorReading): String = {
  33. t.temperature.toString
  34. }
  35. }

三、输出到MySQL

表信息:slave3,端口:3306,用户名:root 密码:root

库名:sensor_db 表名:sensor

字段信息:

Flink流处理API——Sink

 

  1. package xxx
  2. import java.sql.{Connection, DriverManager, PreparedStatement}
  3. import org.apache.flink.configuration.Configuration
  4. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  5. import org.apache.flink.streaming.api.scala._
  6. object SinkDataToMySQL {
  7. def main(args: Array[String]): Unit = {
  8. val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  9. // source
  10. val value: DataStream[String] = environment.readTextFile("src\\main\\resources\\sensor.txt")
  11. // transform操作
  12. val maped: DataStream[SensorReading] = value.map(line => {
  13. val fildes: Array[String] = line.split(",") // 这里的split是scala的split方法
  14. SensorReading(fildes(0).trim, fildes(1).trim.toLong, fildes(2).trim.toDouble)
  15. })
  16. maped.addSink(new MyJDBCSink())
  17. environment.execute()
  18. }
  19. }
  20. class MyJDBCSink() extends RichSinkFunction[SensorReading]{
  21. // 定义SQL连接、定义预编译器
  22. var connect: Connection = _
  23. var insertStrmt: PreparedStatement = _
  24. var updateStrmt : PreparedStatement = _
  25. // 初始化,创建链接和预编译语句
  26. override def open(parameters: Configuration): Unit = {
  27. super.open(parameters)
  28. connect = DriverManager.getConnection("jdbc:mysql://slave3:3306/sensor_db?useUnicode=true&characterEncoding=UTF-8", "root", "root")
  29. insertStrmt = connect.prepareStatement("INSERT INTO sensors(id, temp) VALUES (?, ?)")
  30. updateStrmt = connect.prepareStatement("UPDATE sensors set temp = ? where id = ?")
  31. }
  32. // 调用链接,执行sql
  33. override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
  34. // 执行更新语句
  35. updateStrmt.setDouble(1, value.temperature) // setDouble表示数据的类型,1表示第几个?,value.temperature表示赋值
  36. updateStrmt.setString(2, value.id)
  37. updateStrmt.execute()
  38. // update没有查到数据,那么执行插入语句
  39. if (updateStrmt.getUpdateCount == 0){
  40. insertStrmt.setDouble(2, value.temperature)
  41. insertStrmt.setString(1, value.id)
  42. insertStrmt.execute()
  43. }
  44. }
  45. // 关闭链接
  46. override def close(): Unit = {
  47. insertStrmt.close()
  48. updateStrmt.close()
  49. connect.close()
  50. }
  51. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/615142
推荐阅读
相关标签
  

闽ICP备14008679号