赞
踩
使用 Flink
消费 Kafka
中的数据,并进行相应的数据统计计算。
数据格式为:
"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
字段描述为:
其中 order_status 订单状态的描述为:1001:创建订单、1002:支付订单、1003:取消订单、1004:完成订单、1005:申请退回、1006:退回完成。
提示:pom 文件依赖,放在在文章最后有需要自取。
注意:(需要考虑订单状态,若有取消订单、申请退回、退回完成则不计入订单实收金额,其他状态的则累加)
代码实现:
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.kafka.clients.consumer.ConsumerConfig import java.text.SimpleDateFormat import java.util.Properties object test1 { // 封装数据用到的样例类 case class order(id: String, consignee: String, consignee_tel: String, feight_fee: Double, amount: Double, status: String, create_time: Long, operate_time: Long) def main(args: Array[String]): Unit = { // 创建流数据环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置时间语义 设置为时间时间,根据数据的时间戳来判断 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包) val properties = new Properties() // 配置 Kafka 服务端地址 properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092") // 指定消费组 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "order_consumer") // 创建消费者 val consumer = new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), properties) // 连接 Kafka 数据源,获取数据 val dataStream = env.addSource(consumer) // 得到数据后,我们就可以开始处理元数据了 val data = dataStream // 对每条数据使用逗号 "," 进行切分,去掉前后双引号 .map( data => { val arr = data.split(",") // 遍历数组,去点每个字段的前后双引号 for (i <- 0 until arr.length) { arr(i) = arr(i).replace("\"", "") // 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 null if (arr(i) == "") { arr(i) = "null" } } // 数据格式处理完后,开始对可用数据进行封装 // 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time // 使用 SimpleDateFormat 将时间字符串转换为 时间戳 order(arr(0), arr(1), arr(2), arr(arr.length - 1).toDouble, arr(3).toDouble, arr(4), new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime, new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime) }) // 过滤不进入计算的数据 .filter(_.status != "1003") .filter(_.status != "1005") .filter(_.status != "1006") // 将 时间戳设置为事件时间 .assignAscendingTimestamps(t => { if (t.create_time > t.operate_time) { t.create_time } else { t.operate_time } }) data.print("清洗完的数据") // 输出数据 data.map( data => { (1, data.amount) } ).keyBy(0) .sum(1) .print("订单实收金额") env.execute() } }
测试:
这里我就不开启 Kafka 生产者
了,直接使用上一篇文章中写的 Flume 从端口获取数据,输出到多端(Kafka、HDFS)单文件 文件,去监控端口转存到 Kafka
中。
如果没有配置 Flume,可以直接使用 Kafka
开启一个生产者:bin/kafka-console-producer.sh --topic order --broker-list master:9092
(注意需要进入到 Kafka 安装目录下)
开启 Flume 监控端口后,就可以直接用 telnet master 10050
登陆端口,进行发送数据。
发送数据:
"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
"3447","计娴瑾","13208002474","6665.00","1001","5903","第4大街第25号楼6单元338门","描述396659","689816418657611","荣耀10青春版 幻彩渐变 2400万AI自拍 全网通版4GB+64GB 渐变蓝 移动联通电信4G全面屏手机 双卡双待等3件商品","2020/4/25 18:47:14","2020/4/25 18:47:14","2020/4/25 19:02:14","","","http://img.gmall.com/793265.jpg","29","0.00","6660.00","5.00"
"3448","时友裕","13908519819","217.00","1004","5525","第19大街第27号楼9单元874门","描述286614","675628874311147","迪奥(Dior)烈艳蓝金唇膏 口红 3.5g 999号 哑光-经典正红等1件商品","2020/4/25 18:47:14","2020/4/26 18:55:02","2020/4/25 19:02:14","","","http://img.gmall.com/553516.jpg","32","55.00","252.00","20.00"
"3449","东郭妍","13289011809","164.00","1006","9321","第11大街第33号楼6单元645门","描述368435","489957278482334","北纯精制黄小米(小黄米 月子米 小米粥 粗粮杂粮 大米伴侣)2.18kg等1件商品","2020/4/25 18:47:14","2020/4/26 23:10:20","2020/4/25 19:02:14","","","http://img.gmall.com/235333.jpg","22","0.00","145.00","19.00"
"3450","汪毅","13419873912","1064.00","1001","1088","第7大街第9号楼2单元723门","描述774486","661124816482447","Dior迪奥口红唇膏送女友老婆礼物生日礼物 烈艳蓝金999+888两支装礼盒等3件商品","2020/4/25 18:47:14","2020/4/26 18:48:16","2020/4/25 19:02:14","","","http://img.gmall.com/552723.jpg","28","432.00","1488.00","8.00"
运行结果:
成功!!
关键知识点: RedisSink
、自定义 RedisMapper 类
Flink 有专门的 Sink 到 Redis 的对象(RedisSink):
public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper)
创建时,还需要设置输入数据的泛型:
new RedisSink[IN]()
需要传入的数据为 Redis 的基本配置:主机、端口…,redisSinkMapper 方法。
我们先创建一个 FlinkJedisPoolConfig
构造器:
val conf = new FlinkJedisPoolConfig.Builder()
// 设置主机
.setHost("master")
// 设置端口
.setPort(6379)
// 构建
.build()
创建一个 RedisSink:
val sinkToRedis = new RedisSink[(Int,Double)](conf,new MyRedis("totalprice"))
(MyRedis 见下方 自定义 RedisMapper 类)
类源码:
public interface RedisMapper<T> extends Function, Serializable { /** * Returns descriptor which defines data type. * * @return data type descriptor */ RedisCommandDescription getCommandDescription(); /** * Extracts key from data. * * @param data source data * @return key */ String getKeyFromData(T data); /** * Extracts value from data. * * @param data source data * @return value */ String getValueFromData(T data); }
通过源码我们可以知道,T
表示传入数据的泛型,还需要重写三个方法:getCommandDescription
(创建 Redis 描述器:使用什么方法写入 Redis,比如:HSET、SET…),getKeyFromData
(传入 Redis 中键值对的 Key 值),getValueFromData
(传入 Redis 中键值对的值)。
代码示例:
// 自定义 SinkRedis 类
// 调用时需要传入一个值作为 Key
class MyRedis(Key: String) extends RedisMapper[(Int,Double)]{
override def getCommandDescription: RedisCommandDescription = {
// 创建 Redis 描述器,使用 SET 方法
new RedisCommandDescription(RedisCommand.SET)
}
// Key 的名字使用前方传入的值为 Key 值
override def getKeyFromData(data: (Int, Double)): String = Key
// 将数据转换成 String 类型再存储到 Redis 中
override def getValueFromData(data: (Int, Double)): String = data._2.toString
}
整体代码如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import org.apache.kafka.clients.consumer.ConsumerConfig import java.text.SimpleDateFormat import java.util.Properties object test1 { // 封装数据用到的样例类 case class order(id: String, consignee: String, consignee_tel: String, feight_fee: Double, amount: Double, status: String, create_time: Long, operate_time: Long) def main(args: Array[String]): Unit = { // 创建流数据环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置时间语义 设置为时间时间,根据数据的时间戳来判断 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包) val properties = new Properties() // 配置 Kafka 服务端地址 properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092") // 指定消费组 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "order_consumer") // 创建消费者 val consumer = new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), properties) // 连接 Kafka 数据源,获取数据 val dataStream = env.addSource(consumer) // 得到数据后,我们就可以开始处理元数据了 val data = dataStream // 对每条数据使用逗号 "," 进行切分,去掉前后双引号 .map( data => { val arr = data.split(",") // 遍历数组,去点每个字段的前后双引号 for (i <- 0 until arr.length) { arr(i) = arr(i).replace("\"", "") // 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 null if (arr(i) == "") { arr(i) = "null" } } // 数据格式处理完后,开始对可用数据进行封装 // 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time // 使用 SimpleDateFormat 将时间字符串转换为 时间戳 order(arr(0), arr(1), arr(2), arr(arr.length - 1).toDouble, arr(3).toDouble, arr(4), new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime, new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime) }) // 过滤不进入计算的数据 .filter(_.status != "1003") .filter(_.status != "1005") .filter(_.status != "1006") // 将 时间戳设置为事件时间 .assignAscendingTimestamps(t => { if (t.create_time > t.operate_time) { t.create_time } else { t.operate_time } }) data.print("清洗完的数据") // 输出数据 val sinkData = data.map( data => { (1, data.amount) } ).keyBy(0) .sum(1) sinkData.print("订单实收金额") // ----------------------------- 分割线------------------------------------------ // 下面为 需求二 添加的代码 val conf = new FlinkJedisPoolConfig.Builder() .setHost("master") .setPort(6379) .build() val SinkToRedis = new RedisSink[(Int,Double)](conf,new MyRedis("totalprice")) sinkData.addSink(SinkToRedis) env.execute() } } // 自定义 SinkRedis 类 // 调用时需要传入一个值作为 Key class MyRedis(Key: String) extends RedisMapper[(Int,Double)]{ override def getCommandDescription: RedisCommandDescription = { // 创建 Redis 描述器,使用 SET 方法 new RedisCommandDescription(RedisCommand.SET) } // Key 的名字使用前方传入的值为 Key 值 override def getKeyFromData(data: (Int, Double)): String = Key // 将数据转换成 String 类型再存储到 Redis 中 override def getValueFromData(data: (Int, Double)): String = data._2.toString }
需要先启动 Redis:redis-server &
(后台启动)
进入 Redis 交互界面:redis-cli
输入数据:
"3443","严致","13207871570","1449.00","1001","2790","第4大街第5号楼4单元464门","描述345855","214537477223728","小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机等1件商品","2020/4/25 18:47:14","2020/4/26 18:59:01","2020/4/25 19:02:14","","","http://img.gmall.com/117814.jpg","20","0.00","1442.00","7.00"
"3444","慕容亨","13028730359","17805.00","1004","2015","第9大街第26号楼3单元383门","描述948496","226551358533723","Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待等2件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/353392.jpg","11","0.00","17800.00","5.00"
"3445","姚兰凤","13080315675","16180.00","1003","8263","第5大街第1号楼7单元722门","描述148518","754426449478474","联想(Lenovo)拯救者Y7000 英特尔酷睿i7 2019新款 15.6英寸发烧游戏本笔记本电脑(i7-9750H 8GB 512GB SSD GTX1650 4G 高色域等3件商品","2020/4/25 18:47:14","2020/4/26 18:55:17","2020/4/25 19:02:14","","","http://img.gmall.com/478856.jpg","26","3935.00","20097.00","18.00"
"3446","柏锦黛","13487267342","4922.00","1002","7031","第17大街第40号楼2单元564门","描述779464","262955273144195","十月稻田 沁州黄小米 (黄小米 五谷杂粮 山西特产 真空装 大米伴侣 粥米搭档) 2.5kg等4件商品","2020/4/25 18:47:14","2020/4/26 19:11:37","2020/4/25 19:02:14","","","http://img.gmall.com/144444.jpg","30","0.00","4903.00","19.00"
测试结果:
需要创建一个为侧输出流的变量:new outputTag[T](id:String)
参数说明:
T
:输入数据类型;id:String
:设置创建侧边流的 id,必须为字符串类型。然后我们就可以使用万能的 processFunction
进行分流操作:
val sideToRedis = new OutputTag[order]("Status1006") val sideToMySQL = new OutputTag[order]("Status1003") data .process(new ProcessFunction[order,order] { // i:输入数据;context:上下文,可以获取时间戳、输出数据到侧输出流;collector:抛出数据,设置类型为样例类 order; override def processElement(i: order, context: ProcessFunction[order, order]#Context, collector: Collector[order]): Unit = { // 传入的数据状态为 1006 就放入侧输出流 sideToRedis 中。 if(i.status == "1006"){ context.output(sideToRedis,i) }else if(i.status == "1003"){ context.output(sideToMySQL,i) } else{ // 其他的就抛出去正常输出 collector.collect(i) } } })
注意我们这里使用了侧输出流,上面的 filter 过滤就可以去掉了,因为我们等于使用侧输出流的方式将数据过滤掉了。
完整代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import org.apache.flink.util.Collector import org.apache.kafka.clients.consumer.ConsumerConfig import java.sql.{Connection, DriverManager, PreparedStatement} import java.text.SimpleDateFormat import java.util.Properties case class order(id: String, consignee: String, consignee_tel: String, feight_fee: Double, amount: Double, status: String, create_time: Long, operate_time: Long) object test1 { // 封装数据用到的样例类 def main(args: Array[String]): Unit = { // 创建流数据环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置时间语义 设置为时间时间,根据数据的时间戳来判断 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 创建 Kafka 消费者配置对象(这个需要导入的包是 java.util 里面的包) val properties = new Properties() // 配置 Kafka 服务端地址 properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092") // 指定消费组 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "order_consumer") // 创建消费者 val consumer = new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), properties) // 连接 Kafka 数据源,获取数据 val dataStream = env.addSource(consumer) // 得到数据后,我们就可以开始处理元数据了 val data = dataStream // 对每条数据使用逗号 "," 进行切分,去掉前后双引号 .map( data => { val arr = data.split(",") // 遍历数组,去点每个字段的前后双引号 for (i <- 0 until arr.length) { arr(i) = arr(i).replace("\"", "") // 因为有些字段是空的,所以我们判断如果是空的我们就把他赋值为 null if (arr(i) == "") { arr(i) = "null" } } // 数据格式处理完后,开始对可用数据进行封装 // 需要封装的字段为:id,consignee,consignee_tel,final_total_amount,order_status,create_time,operate_time // 使用 SimpleDateFormat 将时间字符串转换为 时间戳 order(arr(0), arr(1), arr(2), arr(arr.length - 1).toDouble, arr(3).toDouble, arr(4), new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(10)).getTime, new SimpleDateFormat("YYYY/m/dd HH:mm:ss").parse(arr(11)).getTime) }) // 将 时间戳设置为事件时间 .assignAscendingTimestamps(t => { if (t.create_time > t.operate_time) { t.create_time } else { t.operate_time } }) // data.print("清洗完的数据") val conf = new FlinkJedisPoolConfig.Builder() .setHost("master") .setPort(6379) .build() val SinkToRedis = new RedisSink[(Int,Double)](conf,new MyRedis("totalprice")) val sideToRedis = new OutputTag[order]("Status1006") val sideToMySQL = new OutputTag[order]("Status1003") val endData = data .process(new ProcessFunction[order,order] { override def processElement(i: order, context: ProcessFunction[order, order]#Context, collector: Collector[order]): Unit = { if(i.status == "1006"){ context.output(sideToRedis,i) }else if(i.status == "1003"){ context.output(sideToMySQL,i) } else{ collector.collect(i) } } }) val end = endData .filter(_.status == "1005") .map( data => { (1,data.amount) }) .keyBy(0) .sum(1) // endData.print("正常数据:") end.addSink(SinkToRedis) val status_1006 = endData.getSideOutput(sideToRedis).map( data => { (1,data.amount) }) .keyBy(0) .sum(1) // 将 1006 输出到 Redis 中 status_1006.print("status_1006") val SinkToRedis2 = new RedisSink[(Int,Double)](conf,new MyRedis("totalreduceprice")) status_1006.addSink(SinkToRedis2) // 将 1003 的取出,输出到MySQL 中 val status_1003 = endData.getSideOutput(sideToMySQL) status_1003.print("status_1003") status_1003.addSink(new ToMySQL) env.execute() } } // 自定义 SinkRedis 类 // 调用时需要传入一个值作为 Key class MyRedis(Key: String) extends RedisMapper[(Int,Double)]{ override def getCommandDescription: RedisCommandDescription = { // 创建 Redis 描述器,使用 SET 方法 new RedisCommandDescription(RedisCommand.SET) } // Key 的名字使用前方传入的值为 Key 值 override def getKeyFromData(data: (Int, Double)): String = Key // 将数据转换成 String 类型再存储到 Redis 中 override def getValueFromData(data: (Int, Double)): String = data._2.toString } class ToMySQL extends RichSinkFunction[order]{ var conn:Connection = _ var insetStat:PreparedStatement = _ override def invoke(value: order): Unit = { // 补充后面的预留位置,每个数字代表的是第几个 ?。 insetStat.setString(1,value.id) insetStat.setString(2,value.consignee) insetStat.setString(3,value.consignee_tel) insetStat.setDouble(4,value.amount) insetStat.setDouble(5,value.feight_fee) insetStat.execute() } override def open(parameters: Configuration): Unit = { // 连接 MySQL 驱动 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/shtd_result?useSSL=false","root","root") // 配置 insert SQL 语句 insetStat = conn.prepareStatement("insert into order_info values(?,?,?,?,?)") } override def close(): Unit = { // 关闭各个节点 conn.close() insetStat.close() } }
pom
文件中需要的依赖:
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.10.1</flink.version> <scala.binary.version>2.11</scala.binary.version> <kafka.version>2.2.0</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> <exclusions> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> </exclusion> </exclusions> </dependency> <!-- 指定mysql-connector的依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。