当前位置:   article > 正文

Flink 系例 之 Connectors 连接 Redis_flink-connector-redis

flink-connector-redis
通过使用 Flink DataStream Connectors 数据流连接器连接到 Redis 缓存数据库,并提供数据流输入与输出操作;

示例环境

  1. java.version: 1.8.x
  2. flink.version: 1.11.1
  3. redis:3.2

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

  1. package com.flink.examples.redis;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  6. import redis.clients.jedis.Jedis;
  7. import redis.clients.jedis.JedisPool;
  8. import redis.clients.jedis.JedisPoolConfig;
  9. import redis.clients.jedis.Protocol;
  10. /**
  11. * @Description 从redis中读取数据输出到DataStream数据流中
  12. */publicclass DataStreamSource {
  13. /**
  14. * 官方文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
  15. */publicstaticvoid main(String[] args) throws Exception {
  16. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. String key = "props";
  18. //实现RichSourceFunction抽象方法,加载数据源数据到流中
  19. DataStream<Tuple2<String, String>> dataStream = env.addSource(new RichSourceFunction<Tuple2<String, String>>(){
  20. private JedisPool jedisPool = null;
  21. @Overridepublicvoid run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
  22. jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379, Protocol.DEFAULT_TIMEOUT);
  23. Jedis jedis = jedisPool.getResource();
  24. try{
  25. ctx.collect(Tuple2.of(key, jedis.get(key)));
  26. }catch(Exception e){
  27. e.printStackTrace();
  28. }finally{
  29. if (jedis != null){
  30. //用完即关,内部会做判断,如果存在数据源与池,则回滚到池中
  31. jedis.close();
  32. }
  33. }
  34. }
  35. @Overridepublicvoid cancel() {
  36. try {
  37. super.close();
  38. }catch(Exception e){
  39. }
  40. if (jedisPool != null){
  41. jedisPool.close();
  42. jedisPool = null;
  43. }
  44. }
  45. });
  46. dataStream.print();
  47. env.execute("flink redis source");
  48. }
  49. }

数据流输出

DataStreamSink.java

  1. package com.flink.examples.redis;
  2. import org.apache.commons.lang3.RandomUtils;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.connectors.redis.RedisSink;
  8. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  9. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  10. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  11. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  12. /**
  13. * @Description 将数据流写入到redis中
  14. */publicclass DataStreamSink {
  15. /**
  16. * 官方文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
  17. */publicstaticvoid main(String[] args) throws Exception {
  18. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. //1.写入数据到流中String [] words = newString[]{"props","student","build","name","execute"};
  20. DataStream<Tuple2<String, Integer>> sourceStream = env.fromElements(words).map(new MapFunction<String, Tuple2<String, Integer>>() {
  21. @Overridepublic Tuple2<String, Integer> map(String v) throws Exception {
  22. return Tuple2.of(v, RandomUtils.nextInt(1000, 9999));
  23. }
  24. });
  25. sourceStream.print();
  26. //2.实例化FlinkJedisPoolConfig 配置redis
  27. FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
  28. //3.写入到redis,实例化RedisSink,并通过flink的addSink的方式将flink计算的结果插入到redis
  29. sourceStream.addSink(new RedisSink<>(conf, new RedisMapper<Tuple2<String, Integer>>(){
  30. @Overridepublic RedisCommandDescription getCommandDescription() {
  31. returnnew RedisCommandDescription(RedisCommand.SET, null);
  32. //通过实例化传参,设置hash值的key//return new RedisCommandDescription(RedisCommand.HSET, key);
  33. }
  34. @OverridepublicString getKeyFromData(Tuple2<String, Integer> tuple2) {
  35. return tuple2.f0;
  36. }
  37. @OverridepublicString getValueFromData(Tuple2<String, Integer> tuple2) {
  38. return tuple2.f1.toString();
  39. }
  40. }));
  41. env.execute("flink redis sink");
  42. }
  43. }

数据展示

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/587268
推荐阅读
相关标签
  

闽ICP备14008679号