赞
踩
首先,预祝大家2020年多福多寿,少宰少难!!!
作为2020年的第一篇博文,再不写的话就对不起大家了!!!
好,废话少说,今天这篇文章主要是解决你在做实时计算的时候,将数据sink到redis的种种问题
实时计算流程框架其实比较简单,目前比较流行的也就是kafka+flink+redis或者kafka+flink+hbase了
前面kafka+flink的流程稍后会专门来写,本篇主要写flink sink redis技术
Redis的数据结构常用的无非就是两种:
这里并不展开说这两种数据结构的具体定义和使用场景,主要针对这两种数据类型具体说明flink sink的不同用法;
目前市场上的私服绝大部分都是这个包
org.apache.flink:flink-connector-redis_2.11:1.1.5
如果你使用的SET结构类型,那这个包完全可以满足你的需求;
如果你使用的是HSET类型,并且不需要从数据中提取additionalKey,那这个也可以满足,例如:
hget 1000038 20200216:$action:exchange:311::
你的1000038是固定的,不随数据变化而变化,那就不用接着往下看了
但是,如果redisKey是需要从数据里面提取的话,那就有必要看一下官方文档了~~~
参考flink的官方文档:Flink Redis Connector
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
可是你是不是在哪个私服都找不到这个jar包,恭喜你,请继续往下看:
打眼一看,这个jar包是SNAPSHOT的,所以一般仓库也没有,需要从源码编译了
/opt/maven/apache-maven-3.5.4/bin/mvn deploy:deploy-file -DgroupId=org.apache.bahir -DartifactId=flink-connector-redis_2.11 -Dversion=1.1-SNAPSHOT -Dpackaging=jar -Dfile=/opt/software/bahir-flink-master/flink-connector-redis/target/flink-connector-redis_2.11-1.1-SNAPSHOT.jar -Durl=http://hadoop03:8081/nexus/content/repositories/snapshots/ -DrepositoryId=snapshots
/opt/maven/apache-maven-3.5.4/bin/mvn deploy:deploy-file -DgroupId=org.apache.bahir -DartifactId=flink-connector-redis_2.11 -Dversion=1.1-SNAPSHOT -Dclassifier=sources -Dpackaging=jar -Dfile=/opt/software/bahir-flink-master/flink-connector-redis/target/flink-connector-redis_2.11-1.1-SNAPSHOT-sources.jar -Durl=http://hadoop03:8081/nexus/content/repositories/snapshots/ -DrepositoryId=snapshots
/opt/maven/apache-maven-3.5.4/bin/mvn deploy:deploy-file -DgroupId=org.apache.bahir -DartifactId=flink-connector-redis_2.11 -Dversion=1.1-SNAPSHOT -Dclassifier=test-sources -Dpackaging=jar -Dfile=/opt/software/bahir-flink-master/flink-connector-redis/target/flink-connector-redis_2.11-1.1-SNAPSHOT-test-sources.jar -Durl=http://hadoop03:8081/nexus/content/repositories/snapshots/ -DrepositoryId=snapshots
将命令中的路径修改为你自己环境的路径,并依次执行完成上传动作;
如果出现下面的错误:
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-deploy-plugin:2.7:deploy-file (default-cli) on project standalone-pom: Failed to deploy artifacts: Could not transfer artifact org.apache.bahir:flink-connector-redis_2.11:jar:1.1-20200216.012547-1 from/to snapshots (http://hadoop03:8081/nexus/content/repositories/snapshots/): Failed to transfer file: http://hadoop03:8081/nexus/content/repositories/snapshots/org/apache/bahir/flink-connector-redis_2.11/1.1-SNAPSHOT/flink-connector-redis_2.11-1.1-20200216.012547-1.jar. Return code is: 401, ReasonPhrase: Unauthorized. -> [Help 1]
[ERROR]
解决方案
在maven的setting文件中添加下面的代码即可,注意将密码修改为私服的密码,再次执行上面的命令即可
<server>
<id>snapshots</id>
<username>admin</username>
<password>123</password>
</server>
compile 'org.apache.bahir:flink-connector-redis_2.11:1.1-SNAPSHOT'
compile 'redis.clients:jedis:3.1.0'
/** * @ClassName Redis * @Description * @Author HuZhongJin * @Date 2020/2/15 14:09 * @Version 1.0 */ class RedisHSetSink extends RedisMapper[(String, String, Long, Long, String)] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "enbrands_") } override def getKeyFromData(data: (String, String, Long, Long, String)): String = { data._2 } override def getValueFromData(data: (String, String, Long, Long, String)): String = { val json = new JSONObject() json.put("pv", String.valueOf(data._3)) json.put("uv", String.valueOf(data._4)) json.put("date", data._5) json.toJSONString } override def getAdditionalKey(data: (String, String, Long, Long, String)): util.Optional[String] = { util.Optional.of("enbrands_" + data._1) } }
注意:
其实这个看源码比较方便,我大概列一下差别吧,主要说org.apache.bahir有的,而org.apache.flink没有的
/** * Extracts the additional key from data as an {@link Optional<String>}. * The default implementation returns an empty Optional. * * @param data * @return Optional */ default Optional<String> getAdditionalKey(T data) { return Optional.empty(); } /** * Extracts the additional time to live (TTL) for data as an {@link Optional<Integer>}. * The default implementation returns an empty Optional. * * @param data * @return Optional */ default Optional<Integer> getAdditionalTTL(T data) { return Optional.empty(); }
其实就多了上面的两个方法,一个是为了提取redis key,一个是为了指定ttl,具体使用的时候大家再仔细领会
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。