当前位置:   article > 正文

Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中_1、使用flink消费kafka中的数据,实时统计商城中消费额前2的用户(需要考虑订单状态

1、使用flink消费kafka中的数据,实时统计商城中消费额前2的用户(需要考虑订单状态

案例:实时处理电商订单信息

使用 Flink 消费 Kafka 中的数据,并进行相应的数据统计计算。
数据格式为:

"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
  • 1
  • 2
  • 3
  • 4

字段描述为:


其中 order_status 订单状态的描述为:1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。

提示:pom 文件依赖,放在在文章最后有需要自取。

需求一:统计商城实时订单实收金额

注意:(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加)
代码实现:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig

import java.text.SimpleDateFormat
import java.util.Properties

object test1 {
  // 封装数据用到的样例类
  case class order(id: String, consignee: String, consignee_tel: String, feight_fee: Double, amount: Double, status: String, create_time: Long, operate_time: Long)

  def main(args: Array[String]): Unit = {
    // 创建流数据环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置时间语义 设置为时间时间,根据数据的时间戳来判断
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)
    val properties = new Properties()
    // 配置 Kafka 服务端地址
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092")
    // 指定消费组
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "order_consumer")
    // 创建消费者
    val consumer = new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), properties)
    // 连接 Kafka 数据源,获取数据
    val dataStream = env.addSource(consumer)
    // 得到数据后,我们就可以开始处理元数据了
    val data = dataStream
      // 对每条数据使用逗号 "," 进行切分,去掉前后双引号
      .map(
        data => {
          val arr = data.split(",")
          // 遍历数组,去点每个字段的前后双引号

          for (i <- 0 until arr.length) {
            arr(i) = arr(i).replace("\"", "")
            // 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 null
            if (arr(i) == "") {
              arr(i) = "null"
            }
          }
          // 数据格式处理完后,开始对可用数据进行封装
          // 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time
          // 使用 SimpleDateFormat 将时间字符串转换为 时间戳
          order(arr(0), arr(1), arr(2), arr(arr.length - 1).toDouble, arr(3).toDouble, arr(4), new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime, new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)
        })
      // 过滤不进入计算的数据
      .filter(_.status != "1003")
      .filter(_.status != "1005")
      .filter(_.status != "1006")
      // 将 时间戳设置为事件时间
      .assignAscendingTimestamps(t => {
        if (t.create_time > t.operate_time) {
          t.create_time
        } else {
          t.operate_time
        }
      })
    data.print("清洗完的数据")
    // 输出数据
    data.map(
      data => {
        (1, data.amount)
      }
    ).keyBy(0)
      .sum(1)
      .print("订单实收金额")
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

测试:
这里我就不开启 Kafka 生产者 了,直接使用上一篇文章中写的 Flume 从端口获取数据,输出到多端(Kafka、HDFS)单文件 文件,去监控端口转存到 Kafka 中。

如果没有配置 Flume,可以直接使用 Kafka 开启一个生产者:bin/kafka-console-producer.sh --topic order --broker-list master:9092 (注意需要进入到 Kafka 安装目录下)

开启 Flume 监控端口后,就可以直接用 telnet master 10050 登陆端口,进行发送数据。
发送数据:

"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
"3447","计娴瑾","13208002474","6665.00","1001","5903","第4大街第25号楼6单元338门","描述396659","689816418657611","荣耀10青春版 幻彩渐变 2400万AI自拍 全网通版4GB+64GB 渐变蓝 移动联通电信4G全面屏手机 双卡双待等3件商品","2020/4/25 18:47:14","2020/4/25 18:47:14","2020/4/25 19:02:14","","","http://img.gmall.com/793265.jpg","29","0.00","6660.00","5.00"
"3448","时友裕","13908519819","217.00","1004","5525","第19大街第27号楼9单元874门","描述286614","675628874311147","迪奥(Dior)烈艳蓝金唇膏 口红 3.5g 999号 哑光-经典正红等1件商品","2020/4/25 18:47:14","2020/4/26 18:55:02","2020/4/25 19:02:14","","","http://img.gmall.com/553516.jpg","32","55.00","252.00","20.00"
"3449","东郭妍","13289011809","164.00","1006","9321","第11大街第33号楼6单元645门","描述368435","489957278482334","北纯精制黄小米(小黄米 月子米 小米粥 粗粮杂粮 大米伴侣)2.18kg等1件商品","2020/4/25 18:47:14","2020/4/26 23:10:20","2020/4/25 19:02:14","","","http://img.gmall.com/235333.jpg","22","0.00","145.00","19.00"
"3450","汪毅","13419873912","1064.00","1001","1088","第7大街第9号楼2单元723门","描述774486","661124816482447","Dior迪奥口红唇膏送女友老婆礼物生日礼物 烈艳蓝金999+888两支装礼盒等3件商品","2020/4/25 18:47:14","2020/4/26 18:48:16","2020/4/25 19:02:14","","","http://img.gmall.com/552723.jpg","28","432.00","1488.00","8.00"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

运行结果:


成功!!


需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice)

关键知识点: RedisSink自定义 RedisMapper 类

Redis Sink

Flink 有专门的 Sink 到 Redis 的对象(RedisSink):
public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper)

创建时,还需要设置输入数据的泛型:
new RedisSink[IN]()

需要传入的数据为 Redis 的基本配置:主机、端口…,redisSinkMapper 方法。
我们先创建一个 FlinkJedisPoolConfig 构造器:

    val conf = new FlinkJedisPoolConfig.Builder()
    	// 设置主机
      .setHost("master")
    	// 设置端口
      .setPort(6379)
      	// 构建
      .build()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

创建一个 RedisSink:
val sinkToRedis = new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))(MyRedis 见下方 自定义 RedisMapper 类)

自定义 RedisMapper 类

类源码:

public interface RedisMapper<T> extends Function, Serializable {

    /**
     * Returns descriptor which defines data type.
     *
     * @return data type descriptor
     */
    RedisCommandDescription getCommandDescription();

    /**
     * Extracts key from data.
     *
     * @param data source data
     * @return key
     */
    String getKeyFromData(T data);

    /**
     * Extracts value from data.
     *
     * @param data source data
     * @return value
     */
    String getValueFromData(T data);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

通过源码我们可以知道,T 表示传入数据的泛型,还需要重写三个方法:getCommandDescription(创建 Redis 描述器:使用什么方法写入 Redis,比如:HSET、SET…),getKeyFromData(传入 Redis 中键值对的 Key 值),getValueFromData(传入 Redis 中键值对的值)。

代码示例:

// 自定义 SinkRedis 类
  // 调用时需要传入一个值作为 Key
class MyRedis(Key: String) extends RedisMapper[(Int,Double)]{
  override def getCommandDescription: RedisCommandDescription = {
    // 创建 Redis 描述器,使用 SET 方法
    new RedisCommandDescription(RedisCommand.SET)
  }
  // Key 的名字使用前方传入的值为 Key 值
  override def getKeyFromData(data: (Int, Double)): String = Key
  // 将数据转换成 String 类型再存储到 Redis 中
  override def getValueFromData(data: (Int, Double)): String = data._2.toString
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

整体代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.kafka.clients.consumer.ConsumerConfig

import java.text.SimpleDateFormat
import java.util.Properties

object test1 {
  // 封装数据用到的样例类
  case class order(id: String, consignee: String, consignee_tel: String, feight_fee: Double, amount: Double, status: String, create_time: Long, operate_time: Long)

  def main(args: Array[String]): Unit = {
    // 创建流数据环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置时间语义 设置为时间时间,根据数据的时间戳来判断
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)
    val properties = new Properties()
    // 配置 Kafka 服务端地址
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092")
    // 指定消费组
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "order_consumer")
    // 创建消费者
    val consumer = new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), properties)
    // 连接 Kafka 数据源,获取数据
    val dataStream = env.addSource(consumer)
    // 得到数据后,我们就可以开始处理元数据了
    val data = dataStream
      // 对每条数据使用逗号 "," 进行切分,去掉前后双引号
      .map(
        data => {
          val arr = data.split(",")
          // 遍历数组,去点每个字段的前后双引号

          for (i <- 0 until arr.length) {
            arr(i) = arr(i).replace("\"", "")
            // 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 null
            if (arr(i) == "") {
              arr(i) = "null"
            }
          }
          // 数据格式处理完后,开始对可用数据进行封装
          // 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time
          // 使用 SimpleDateFormat 将时间字符串转换为 时间戳
          order(arr(0), arr(1), arr(2), arr(arr.length - 1).toDouble, arr(3).toDouble, arr(4), new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime, new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)
        })
      // 过滤不进入计算的数据
      .filter(_.status != "1003")
      .filter(_.status != "1005")
      .filter(_.status != "1006")
      // 将 时间戳设置为事件时间
      .assignAscendingTimestamps(t => {
        if (t.create_time > t.operate_time) {
          t.create_time
        } else {
          t.operate_time
        }
      })
    data.print("清洗完的数据")
  
    // 输出数据
    val sinkData = data.map(
      data => {
        (1, data.amount)
      }
    ).keyBy(0)
      .sum(1)

    sinkData.print("订单实收金额")
  
    // ----------------------------- 分割线------------------------------------------
    						// 下面为 需求二 添加的代码
    val conf = new FlinkJedisPoolConfig.Builder()
      .setHost("master")
      .setPort(6379)
      .build()

    val SinkToRedis = new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))

    sinkData.addSink(SinkToRedis)

    env.execute()
  }
}
// 自定义 SinkRedis 类
  // 调用时需要传入一个值作为 Key
class MyRedis(Key: String) extends RedisMapper[(Int,Double)]{
  override def getCommandDescription: RedisCommandDescription = {
    // 创建 Redis 描述器,使用 SET 方法
    new RedisCommandDescription(RedisCommand.SET)
  }
  // Key 的名字使用前方传入的值为 Key 值
  override def getKeyFromData(data: (Int, Double)): String = Key
  // 将数据转换成 String 类型再存储到 Redis 中
  override def getValueFromData(data: (Int, Double)): String = data._2.toString
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

需要先启动 Redis:redis-server &(后台启动)
进入 Redis 交互界面:redis-cli


输入数据:

"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
  • 1
  • 2
  • 3
  • 4

测试结果:


需求三:使用侧边流,监控发现 order_status 字段为退回完成,将退回总额存入到 Redis 中,将 order_status 字段为取消订单的存入到 MySQL 中(Sink 到 MySQL 的偷懒没有仔细写了,直接放在最后的代码里面了)。
侧输出流

需要创建一个为侧输出流的变量:new outputTag[T](id:String)
参数说明:

  • T:输入数据类型;
  • id:String:设置创建侧边流的 id,必须为字符串类型。

然后我们就可以使用万能的 processFunction 进行分流操作:

    val sideToRedis = new OutputTag[order]("Status1006")
    val sideToMySQL = new OutputTag[order]("Status1003")
    data
      .process(new ProcessFunction[order,order] {
      // i:输入数据;context:上下文,可以获取时间戳、输出数据到侧输出流;collector:抛出数据,设置类型为样例类 order;
        override def processElement(i: order, context: ProcessFunction[order, order]#Context, collector: Collector[order]): Unit = {
        // 传入的数据状态为 1006 就放入侧输出流 sideToRedis 中。
          if(i.status == "1006"){
            context.output(sideToRedis,i)
          }else if(i.status == "1003"){
			context.output(sideToMySQL,i)
		}
          else{
          // 其他的就抛出去正常输出
            collector.collect(i)
          }
        }
      })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

注意我们这里使用了侧输出流,上面的 filter 过滤就可以去掉了,因为我们等于使用侧输出流的方式将数据过滤掉了。

完整代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerConfig

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Properties

case class order(id: String, consignee: String, consignee_tel: String, feight_fee: Double, amount: Double, status: String, create_time: Long, operate_time: Long)

object test1 {
  // 封装数据用到的样例类

  def main(args: Array[String]): Unit = {
    // 创建流数据环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置时间语义 设置为时间时间,根据数据的时间戳来判断
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包)
    val properties = new Properties()
    // 配置 Kafka 服务端地址
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092")
    // 指定消费组
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "order_consumer")
    // 创建消费者
    val consumer = new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), properties)
    // 连接 Kafka 数据源,获取数据
    val dataStream = env.addSource(consumer)
    // 得到数据后,我们就可以开始处理元数据了
    val data = dataStream
      // 对每条数据使用逗号 "," 进行切分,去掉前后双引号
      .map(
        data => {
          val arr = data.split(",")
          // 遍历数组,去点每个字段的前后双引号

          for (i <- 0 until arr.length) {
            arr(i) = arr(i).replace("\"", "")
            // 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 null
            if (arr(i) == "") {
              arr(i) = "null"
            }
          }
          // 数据格式处理完后,开始对可用数据进行封装
          // 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time
          // 使用 SimpleDateFormat 将时间字符串转换为 时间戳
          order(arr(0), arr(1), arr(2), arr(arr.length - 1).toDouble, arr(3).toDouble, arr(4), new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime, new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime)
        })
      // 将 时间戳设置为事件时间
      .assignAscendingTimestamps(t => {
        if (t.create_time > t.operate_time) {
          t.create_time
        } else {
          t.operate_time
        }
      })
//    data.print("清洗完的数据")


    val conf = new FlinkJedisPoolConfig.Builder()
      .setHost("master")
      .setPort(6379)
      .build()

    val SinkToRedis = new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))


    val sideToRedis = new OutputTag[order]("Status1006")
    val sideToMySQL = new OutputTag[order]("Status1003")
    val endData = data
      .process(new ProcessFunction[order,order] {
        override def processElement(i: order, context: ProcessFunction[order, order]#Context, collector: Collector[order]): Unit = {
          if(i.status == "1006"){
            context.output(sideToRedis,i)
          }else if(i.status == "1003"){
            context.output(sideToMySQL,i)
          }
          else{
            collector.collect(i)
          }
        }
      })
      val end = endData
      .filter(_.status == "1005")
      .map(
        data => {
          (1,data.amount)
        })
      .keyBy(0)
      .sum(1)

//    endData.print("正常数据:")
    end.addSink(SinkToRedis)

    val status_1006 = endData.getSideOutput(sideToRedis).map(
      data => {
        (1,data.amount)
      })
      .keyBy(0)
      .sum(1)

	// 将 1006 输出到 Redis 中
    status_1006.print("status_1006")
    val SinkToRedis2 = new RedisSink[(Int,Double)](conf,new MyRedis("totalreduceprice"))
    status_1006.addSink(SinkToRedis2)
	
	// 将 1003 的取出,输出到MySQL 中
    val status_1003 = endData.getSideOutput(sideToMySQL)
    status_1003.print("status_1003")
    status_1003.addSink(new ToMySQL)

    env.execute()
  }
}
// 自定义 SinkRedis 类
  // 调用时需要传入一个值作为 Key
class MyRedis(Key: String) extends RedisMapper[(Int,Double)]{
  override def getCommandDescription: RedisCommandDescription = {
    // 创建 Redis 描述器,使用 SET 方法
    new RedisCommandDescription(RedisCommand.SET)
  }
  // Key 的名字使用前方传入的值为 Key 值
  override def getKeyFromData(data: (Int, Double)): String = Key
  // 将数据转换成 String 类型再存储到 Redis 中
  override def getValueFromData(data: (Int, Double)): String = data._2.toString
}
class ToMySQL extends RichSinkFunction[order]{
  var conn:Connection = _
  var insetStat:PreparedStatement = _
  override def invoke(value: order): Unit = {
  // 补充后面的预留位置,每个数字代表的是第几个 ?。
    insetStat.setString(1,value.id)
    insetStat.setString(2,value.consignee)
    insetStat.setString(3,value.consignee_tel)
    insetStat.setDouble(4,value.amount)
    insetStat.setDouble(5,value.feight_fee)
    insetStat.execute()
  }

  override def open(parameters: Configuration): Unit = {
  	// 连接 MySQL 驱动
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/shtd_result?useSSL=false","root","root")
    // 配置 insert SQL 语句
    insetStat = conn.prepareStatement("insert into order_info values(?,?,?,?,?)")
  }

  override def close(): Unit = {
  // 关闭各个节点
    conn.close()
    insetStat.close()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161


pom 文件中需要的依赖:

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>

        <flink.version>1.10.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_2.11</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 指定mysql-connector的依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/587269
推荐阅读
相关标签
  

闽ICP备14008679号