赞
踩
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object Streaming_test { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val pro = new Properties() pro.setProperty("bootstrap.servers", "localhost:9092") pro.setProperty("group.id", "test1") val stream = env.addSource(new FlinkKafkaConsumer011[String]("flinktest1", new SimpleStringSchema(), pro)) val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")).map(x => { (x, 1) }).keyBy(0).sum(1) //flink自带的redisSink不需要我们考虑系统性能的相关配置,需要自己实现RedisMapper类 val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).setPassword("123456").build() val sink = new RedisSink[(String, Int)](conf, new MyRedisMapper) result.addSink(sink) result.print() env.execute("kafkatest") } } class MyRedisMapper extends RedisMapper[(String, Int)]() { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET)//选择不同的redis数据类型 } override def getValueFromData(t: (String, Int)) = t._2.toString override def getKeyFromData(t: (String, Int)) = t._1 }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。