赞
踩
通过使用 Flink DataStream Connectors 数据流连接器连接到 Redis 缓存数据库,并提供数据流输入与输出操作;
示例环境
- java.version: 1.8.x
- flink.version: 1.11.1
- redis:3.2
示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
- package com.flink.examples.redis;
-
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- import redis.clients.jedis.JedisPoolConfig;
- import redis.clients.jedis.Protocol;
-
- /**
- * @Description 从redis中读取数据输出到DataStream数据流中
- */publicclass DataStreamSource {
- /**
- * 官方文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
- */publicstaticvoid main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- String key = "props";
- //实现RichSourceFunction抽象方法,加载数据源数据到流中
- DataStream<Tuple2<String, String>> dataStream = env.addSource(new RichSourceFunction<Tuple2<String, String>>(){
- private JedisPool jedisPool = null;
- @Overridepublicvoid run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
- jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379, Protocol.DEFAULT_TIMEOUT);
- Jedis jedis = jedisPool.getResource();
- try{
- ctx.collect(Tuple2.of(key, jedis.get(key)));
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- if (jedis != null){
- //用完即关,内部会做判断,如果存在数据源与池,则回滚到池中
- jedis.close();
- }
- }
- }
- @Overridepublicvoid cancel() {
- try {
- super.close();
- }catch(Exception e){
- }
- if (jedisPool != null){
- jedisPool.close();
- jedisPool = null;
- }
- }
- });
- dataStream.print();
- env.execute("flink redis source");
- }
-
- }
数据流输出
DataStreamSink.java
- package com.flink.examples.redis;
-
- import org.apache.commons.lang3.RandomUtils;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.redis.RedisSink;
- import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-
- /**
- * @Description 将数据流写入到redis中
- */publicclass DataStreamSink {
-
- /**
- * 官方文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
- */publicstaticvoid main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //1.写入数据到流中String [] words = newString[]{"props","student","build","name","execute"};
- DataStream<Tuple2<String, Integer>> sourceStream = env.fromElements(words).map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Overridepublic Tuple2<String, Integer> map(String v) throws Exception {
- return Tuple2.of(v, RandomUtils.nextInt(1000, 9999));
- }
- });
- sourceStream.print();
-
- //2.实例化FlinkJedisPoolConfig 配置redis
- FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
-
- //3.写入到redis,实例化RedisSink,并通过flink的addSink的方式将flink计算的结果插入到redis
- sourceStream.addSink(new RedisSink<>(conf, new RedisMapper<Tuple2<String, Integer>>(){
- @Overridepublic RedisCommandDescription getCommandDescription() {
- returnnew RedisCommandDescription(RedisCommand.SET, null);
- //通过实例化传参,设置hash值的key//return new RedisCommandDescription(RedisCommand.HSET, key);
- }
- @OverridepublicString getKeyFromData(Tuple2<String, Integer> tuple2) {
- return tuple2.f0;
- }
- @OverridepublicString getValueFromData(Tuple2<String, Integer> tuple2) {
- return tuple2.f1.toString();
- }
- }));
- env.execute("flink redis sink");
- }
-
- }
数据展示
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。