赞
踩
实战过程中,源算子为Kafka,sink为 需要将数据以HINCRBY
形式写入Redis
以下提供三种实现方案,推荐使用第二种。
以下方案在发生错误时恢复可能表现的不太如意。正在改进。
实现RedisMapper
接口
// 此处stream为源算子拿到数据的对象 DataStreamSource<T> stream.addSink(new RedisSink<>(buildSinkRedisPool(), new RedisMapper<Tuple3<String, String, Long>>() { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HINCRBY, "temp"); } @Override public String getKeyFromData(Tuple3<String, String, Long> data) { return data.f1; } @Override public String getValueFromData(Tuple3<String, String, Long> data) { return String.valueOf(data.f2); } @Override public Optional<String> getAdditionalKey(Tuple3<String, String, Long> data) { return Optional.of(data.f0); } @Override public Optional<Integer> getAdditionalTTL(Tuple3<String, String, Long> data) { return Optional.of(120 * 60 * 1000); } }))
<dependency> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> <version>${lettuce.version}</version> <exclusions> <exclusion> <artifactId>netty-buffer</artifactId> <groupId>io.netty</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.84.Final</version> </dependency>
public class RedisPipLineSink extends ProcessFunction<Tuple3<String, String, Long>, String> { private static final long expire = 120 * 60; private Configuration configuration; private Long commFlag = 0L; private Long thresholdValue = 10000L; protected RedisClusterClient client = null; protected StatefulRedisClusterConnection<String, String> connect = null; protected RedisAdvancedClusterAsyncCommands<String, String> async = null; @Override public void open(Configuration parameters) throws Exception { Set<RedisURI> redisURIS = new HashSet<>(); // 此处 redisServer 应为从配置文件获取,配置文件由启动main()函数时指定 redisServer = ""; Arrays.asList(redisServer.split(",")).forEach(item -> { String[] info = item.split(":"); redisURIS.add(RedisURI.builder().withHost(info[0]).withPort(Integer.valueOf(info[1])).build()); }); client = RedisClusterClient.create(redisURIS); connect = client.connect(); async = connect.async(); async.setAutoFlushCommands(false); } @Override public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception { // 缓冲区为1W commFlag++; async.hincrby(value.f0, value.f1, value.f2); async.expire(value.f0, expire); // 达到1W提交命令并清空缓冲区 if (thresholdValue.equals(commFlag)) { commFlag = 0L; async.flushCommands(); } } @Override public void close() throws Exception { if (connect != null && connect.isOpen()) { connect.close(); } if (client != null) { client.shutdown(); } } }
jvm启动时需要增加参数-Dcom.datastax.driver.FORCE_NIO=true
如果不加入此命令将会报错
Caused by: java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup
at com.datastax.driver.core.NettyUtil.newEventLoopGroupInstance(NettyUtil.java:134)
at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:95)
at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:926)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1499)
at com.datastax.driver.core.Cluster.init(Cluster.java:208)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:376)
原因见:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.6</version>
</dependency>
public class RedisRedissonPipLineSink extends ProcessFunction<Tuple3<String, String, Long>, String> { private static final String PREFIX = "redis://"; private static final long expire = 120 * 60; private Configuration configuration; private Long commFlag = 0L; private Long thresholdValue = 10000L; private RedissonClient redisson = null; private RBatch batch = null; @Override public void open(Configuration parameters) throws Exception { // 此处 redisServer 应为从配置文件获取,配置文件由启动main()函数时指定 String redisServer = ""; if (redisServer == null || "".equals(redisServer)) { return; } Config config = new Config(); ClusterServersConfig clusterServersConfig = config.useClusterServers(); List<String> nodes = Arrays.asList(redisServer.split(",")); for (String node : nodes) { clusterServersConfig.addNodeAddress(PREFIX + node); } redisson = Redisson.create(config); batch = redisson.createBatch(); } @Override public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception { commFlag++; RMapAsync<Object, Object> map = batch.getMap(value.f0, StringCodec.INSTANCE); map.addAndGetAsync(value.f1, value.f2); map.expireAsync(2, TimeUnit.HOURS); batch.getMap(value.f0).addAndGetAsync(value.f1, value.f2); if (thresholdValue.equals(commFlag)) { batch.executeAsync(); batch = redisson.createBatch(); commFlag = 0L; } } @Override public void close() throws Exception { if (redisson != null && !redisson.isShutdown()) { redisson.shutdown(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。