当前位置:   article > 正文

实时流:Kafka中的数据从flink传输到redis_kafka同步到redis flink

kafka同步到redis flink

一、准备工作

1.1 redis的安装

redis的配置大家可以参考这两篇文章:
redis安装1
redis安装2

1.2 安装总结

1、在Linux上,可以直接使用
wget http://download.redis.io/releases/redis-3.0.7.tar.gz命令进行安装。
2、make报错时,说明是Linux上没有运行C的环境,redis是用C语言编写的。所以需要下载gcc:
yum install gcc
3、make失败后,需要删除掉原先解压后的redis包,重新解压安装;
4、redis.conf的配置文件 #./redis-server开头的一行,不需要填写redis.conf的文件路径;

二、依赖包

下载时要注意版本,flink本身不没有redis的依赖,这里使用bahir。

<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.0</version>
    </dependency>
    <!-- redis -->
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>
    <!-- /redis -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>1.7.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.7.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.7.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.7.2</version>
     </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.7.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-shaded-hadoop-2</artifactId>
      <version>2.4.1-9.0</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

三、代码实现

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config. FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.kafka.clients.consumer.ConsumerConfig

object WriteToRedis {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(5)
    val prop = new Properties()
    prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092")
    prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
    prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
    prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
    prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"redis")

    val KafkaMsg: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](
      "test",
      new SimpleStringSchema(),
      prop
    ))
    val Msg2: DataStream[(String, Int)] = KafkaMsg.flatMap(_.split(" "))
      .map((_, 1))
    val msg3: KeyedStream[(String, Int), String] = Msg2.keyBy(_._1)
    val result: DataStream[(String, Int)] = msg3.reduce((x, y)=>(x._1,x._2+y._2))


    val conf = new FlinkJedisPoolConfig.Builder()
      .setHost("192.168.136.20")
      //.setPassword("12345")
      .setPort(6379)
      .build()
   
     result.addSink(new RedisSink[(String, Int)](conf,new MyRedisMapper()))

    result.print()

    env.execute("写入redis成功!")

  }
  //定义一个redis类
  class MyRedisMapper extends RedisMapper[(String,Int)] {
    //定义保存数据写入redis的命令,HSET表名key value
    override def getCommandDescription: RedisCommandDescription = {
      //additionalKey是用于类似HSET 时,设置表名
      new RedisCommandDescription(RedisCommand.HSET,"WordCount2")
    }
//设置redis的key
    override def getKeyFromData(t: (String, Int)): String = t._1
//设置redis的value
    override def getValueFromData(t: (String, Int)): String = t._2.toString()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

结果展示:
Kafka输入数据:

kafka-console-producer.sh --topic test --broker-list 192.168.136.20:9092
>hello world
>hello scala
  • 1
  • 2
  • 3

redis输出结果:

hget WordCount2 hello
>"2"
hget WordCount2 world
>"1"
  • 1
  • 2
  • 3
  • 4

3.1 redis命令

查看所有的表名

keys *
  • 1

获取值,WorCount2就是我们设置的表名,hello是我们传输数据时的key值。

hget WordCount2 hello
  • 1

查看密码

127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
  • 1
  • 2
  • 3

设置密码

config set requirepass "12345"
  • 1

设置完密码后,需要输入下面的命令才可以继续使用

auth 12345
  • 1
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "12345"
  • 1
  • 2
  • 3

把密码改回来,默认不需要密码

config set requirepass ""
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/587273
推荐阅读
相关标签
  

闽ICP备14008679号