赞
踩
redis的配置大家可以参考这两篇文章:
redis安装1
redis安装2
1、在Linux上,可以直接使用
wget http://download.redis.io/releases/redis-3.0.7.tar.gz
命令进行安装。
2、make报错时,说明是Linux上没有运行C的环境,redis是用C语言编写的。所以需要下载gcc:
yum install gcc
3、make失败后,需要删除掉原先解压后的redis包,重新解压安装;
4、redis.conf的配置文件 #./redis-server
开头的一行,不需要填写redis.conf的文件路径;
下载时要注意版本,flink本身不没有redis的依赖,这里使用bahir。
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <!-- redis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <!-- /redis --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2</artifactId> <version>2.4.1-9.0</version> </dependency>
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config. FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} import org.apache.kafka.clients.consumer.ConsumerConfig object WriteToRedis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(5) val prop = new Properties() prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092") prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer") prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer") prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest") prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"redis") val KafkaMsg: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]( "test", new SimpleStringSchema(), prop )) val Msg2: DataStream[(String, Int)] = KafkaMsg.flatMap(_.split(" ")) .map((_, 1)) val msg3: KeyedStream[(String, Int), String] = Msg2.keyBy(_._1) val result: DataStream[(String, Int)] = msg3.reduce((x, y)=>(x._1,x._2+y._2)) val conf = new FlinkJedisPoolConfig.Builder() .setHost("192.168.136.20") //.setPassword("12345") .setPort(6379) .build() result.addSink(new RedisSink[(String, Int)](conf,new MyRedisMapper())) result.print() env.execute("写入redis成功!") } //定义一个redis类 class MyRedisMapper extends RedisMapper[(String,Int)] { //定义保存数据写入redis的命令,HSET表名key value override def getCommandDescription: RedisCommandDescription = { //additionalKey是用于类似HSET 时,设置表名 new RedisCommandDescription(RedisCommand.HSET,"WordCount2") } //设置redis的key override def getKeyFromData(t: (String, Int)): String = t._1 //设置redis的value override def getValueFromData(t: (String, Int)): String = t._2.toString() } }
结果展示:
Kafka输入数据:
kafka-console-producer.sh --topic test --broker-list 192.168.136.20:9092
>hello world
>hello scala
redis输出结果:
hget WordCount2 hello
>"2"
hget WordCount2 world
>"1"
查看所有的表名
keys *
获取值,WorCount2就是我们设置的表名,hello是我们传输数据时的key值。
hget WordCount2 hello
查看密码
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
设置密码
config set requirepass "12345"
设置完密码后,需要输入下面的命令才可以继续使用
auth 12345
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "12345"
把密码改回来,默认不需要密码
config set requirepass ""
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。