当前位置:   article > 正文

Flink连接kafka处理数据并存入redis中_使用flink消费kafka中的数据,统计实时营业额存入redis中

使用flink消费kafka中的数据,统计实时营业额存入redis中

scala 写Flink读取kafka数据处理并存入redis中

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object Streaming_test {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val pro = new Properties()
    pro.setProperty("bootstrap.servers", "localhost:9092")
    pro.setProperty("group.id", "test1")
    val stream = env.addSource(new FlinkKafkaConsumer011[String]("flinktest1", new SimpleStringSchema(), pro))
    val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")).map(x => {
      (x, 1)
    }).keyBy(0).sum(1)
    //flink自带的redisSink不需要我们考虑系统性能的相关配置,需要自己实现RedisMapper类
    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).setPassword("123456").build()
    val sink = new RedisSink[(String, Int)](conf, new MyRedisMapper)
    result.addSink(sink)
    result.print()
    env.execute("kafkatest")
  }
}
class MyRedisMapper extends RedisMapper[(String, Int)]() {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SET)//选择不同的redis数据类型
  }
  override def getValueFromData(t: (String, Int)) = t._2.toString
  override def getKeyFromData(t: (String, Int)) = t._1
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/514533
推荐阅读
相关标签
  

闽ICP备14008679号