当前位置:   article > 正文

Flink流处理API之Sink_org.apache.flink.api.connector.sink2

org.apache.flink.api.connector.sink2

Flink没有类似于Spark中foreach方法,让用户进行迭代的操作

虽有对外的输出操作都要利用Sink完成

最后通过类似如下方式完成整个任务最终输出操作

stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的Sink;除此以外,需要用户自定义实现Sink

在这里插入图片描述
在这里插入图片描述

1. Kafka

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.7.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
package com.streamapi

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}

object KafkaSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

//    val stream = env.readTextFile("source/sensor.txt")

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "node7-1:9092")
    properties.setProperty("group.id", "flink-test")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val stream = env.addSource(new FlinkKafkaConsumer011[String](
      "test1",
      new SimpleStringSchema(),
      properties
    ))

    val value = stream.map(line => {
      // sensor_2,1600828094726,113.45370583283331
      val splited = line.split(",")
      SensorReading(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble)
    })

    val result = value.map(_.temperature.toString)

    val rse = result.addSink(new FlinkKafkaProducer011[String](
      "node7-1:9092",
      "test",
      new SimpleStringSchema()
    ))

    env.execute("KafkaSink")
  }
}

// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

2. Redis

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
package co.streamapi

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读入数据
    val stream = env.readTextFile("source/sensor.txt")

    // Transform操作
    val value = stream.map(data => {
      val dataArray = data.split(",")
      SensorReading(
        dataArray(0).trim,
        dataArray(1).trim.toLong,
        dataArray(2).trim.toDouble
      )
    })

    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()

    // sink
    value.addSink(new RedisSink(conf, new MyRedisMapper()))
    value.print("RedisSink")

    env.execute("RedisSink")
  }
}

class MyRedisMapper() extends RedisMapper[SensorReading] {
  // 定义保存数据到redis的命令
  override def getCommandDescription: RedisCommandDescription = {
    // 把传感器id和温度值保存成哈希表 HSET key field value
    new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
  }

  // 定义保存到redis的key
  override def getKeyFromData(t: SensorReading): String = t.id

  // 定义保存到redis的value
  override def getValueFromData(t: SensorReading): String = t.temperature.toString
}

// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

3. Elasticsearch

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.7.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
package com.streamapi

import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

object ElasticsearchSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读入数据
    val stream = env.readTextFile("source/sensor.txt")

    // Transform操作
    val value = stream.map(data => {
      val dataArray = data.split(",")
      SensorReading(
        dataArray(0).trim,
        dataArray(1).trim.toLong,
        dataArray(2).trim.toDouble
      )
    })

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))

    // 创建一个EsSink的builder
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
      httpHosts,
      new ElasticsearchSinkFunction[SensorReading] {
        override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          println("saving data: " + t)
          // 包装成一个Map或者JSONObject
          val json = new util.HashMap[String, String]()
          json.put("sensor_id", t.id)
          json.put("timestamp", t.timestamp.toString)
          json.putIfAbsent("temperature", t.temperature.toString)

          // 创建index request,准备发送数据
          val indexRequest = Requests.indexRequest().index("sensor").`type`("readingdata").source(json)

          // 利用index发送数据,写入数据
          requestIndexer.add(indexRequest)
          println("data saved...")
        }
      }
    )

    // sink
    value.addSink(esSinkBuilder.build())

    env.execute("ElasticsearchSink")
  }
}

// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

4. JDBC自定义Sink

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.18</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
package com.streamapi

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object JDBCSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.readTextFile("source/sensor.txt")

//    val properties = new Properties()
//    properties.setProperty("bootstrap.servers", "node7-1:9092")
//    properties.setProperty("group.id", "flink-test")
//    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//    properties.setProperty("auto.offset.reset", "latest")
//
//    val stream = env.addSource(new FlinkKafkaConsumer011[String](
//      "test",
//      new SimpleStringSchema(),
//      properties
//    ))

    val value = stream.map(line => {
      // sensor_2,1600828094726,113.45370583283331
      val splited = line.split(",")
      SensorReading(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble)
    })

    value.addSink(new MyJdbcSink())

    env.execute("JDBCSink")
  }
}

/**
 * JDBC自定义Sink
 * 添加MyJdbcSink
 */
class MyJdbcSink() extends RichSinkFunction[SensorReading] {
  // 预编译的操作
  var conn: Connection = _
  var insertStream: PreparedStatement = _
  var updateStreeam: PreparedStatement = _

  // open:主要是创建连接
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

//    Class.forName("com.mysql.cj.jdbc.Driver");
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/mybatis?serverTimezone=UTC", "root", "123456")
    insertStream = conn.prepareStatement("insert into sensor_reading(sensor, temperature) values (?, ?)")
    updateStreeam = conn.prepareStatement("update sensor_reading set temperature = ? where sensor = ?")
  }

  // 调用连接,执行sql
  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 如果SensorReading id存在,更新温度;如果不存在,做数据插入
    updateStreeam.setDouble(1, value.temperature)
    updateStreeam.setString(2, value.id)
    updateStreeam.execute()

    // 没有更新成功,做数据插入操作
    if (updateStreeam.getUpdateCount == 0) {
      insertStream.setString(1, value.id)
      insertStream.setDouble(2, value.temperature)
      insertStream.execute()
    }
  }

  // 关闭,做清理操作
  override def close(): Unit = {
    insertStream.close()
    updateStreeam.close()
    conn.close()
  }
}

// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/615151
推荐阅读
相关标签
  

闽ICP备14008679号