赞
踩
flink-connector-redis
查询Flink连接器,最简单的就是查询关键字
flink-connector-
这里将Redis当作sink的输出对象。
1. pom依赖
<!-- Flink 和 redis 连接-->
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
编写代码
package com.zch.apitest.sink; import com.zch.apitest.beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * Author: zhaoHui * Date: 2022/01/22 * Time: 16:32 * Description: */ public class SinkTest2_Redis { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取文件 DataStream<String> inputStream = env.readTextFile("F:\\JAVA\\bigdata2107\\zch\\flink\\src\\main\\resources\\Sensor.txt"); SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(lines -> { String[] split = lines.split(","); return new SensorReading(split[0], new Long(split[1]), new Double(split[2])); }); // 定义jedis连接配置 FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig .Builder() .setHost("192.168.235.10") .setPort(6379) .build(); dataStream.addSink(new RedisSink<>(flinkJedisPoolConfig,new MyRedisMapper())); env.execute(); } // 自定义RedisMapper public static class MyRedisMapper implements RedisMapper<SensorReading>{ // 定义保存数据到redis的命令,存成hash表,hset sensor_temp id temperature @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp"); } @Override public String getKeyFromData(SensorReading sensorReading) { return sensorReading.getId(); } @Override public String getValueFromData(SensorReading sensorReading) { return sensorReading.getTemperature().toString(); } } }
启动redis服务(我这里是docker里的)
启动Flink程序
查看Redis里的数据
因为最新数据覆盖前面的,所以最后redis里呈现的是最新的数据。
localhost:6379>hgetall sensor_temp
1) "sensor_1"
2) "37.1"
3) "sensor_6"
4) "15.4"
5) "sensor_7"
6) "6.7"
7) "sensor_10"
8) "38.1"
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。