赞
踩
使用flink消费kafka中事件,计算规则以及维度数据保存在达梦数据库中,将计算结果存放在MemFireDB中。
达梦数据库管理系统是达梦公司推出的具有完全自主知识产权的高性能数据库管理系统,简称DM。
将达梦的jdbc驱动安装到maven local仓库中
start cmd /k "%mvn% install:install-file -Dfile=Dm7JdbcDriver17.jar -DgroupId=com.dm -DartifactId=Dm7JdbcDriver -Dversion=1.7 -Dpackaging=jar"
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.dm</groupId>
<artifactId>Dm7JdbcDriver</artifactId>
<version>1.7</version>
<scope>compile</scope>
</dependency>
public class DmRichMapFunction<IN, OUT> extends RichFlatMapFunction<IN, OUT> { private static final Logger logger = LoggerFactory.getLogger(DmRichMapFunction.class); private static final String dmjdbcString = "dm.jdbc.driver.DmDriver"; protected Connection connect = null; public void loadJdbcDriver() throws SQLException { try { System.out.println("Loading JDBC Driver..."); // 加载 JDBC 驱动程序 //DriverManager.registerDriver(new dm.jdbc.driver.DmDriver()); Class.forName(dmjdbcString); } catch (ClassNotFoundException e) { throw new SQLException("Load JDBC Driver Error1: " + e.getMessage()); } catch (Exception ex) { throw new SQLException("Load JDBC Driver Error : " + ex.getMessage()); } } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); String user = parameterTool.get("u", ""); String passwd = parameterTool.get("p", ""); String address = parameterTool.get("a", "localhost:3306"); String url = "jdbc:dm://" + address; logInfo("url:" + url + ",user:" + user ); loadJdbcDriver(); connect = DriverManager.getConnection(url, user, passwd); } @Override public void close() throws Exception { if (connect != null) { connect.close(); } super.close(); } @Override public void flatMap(IN value, Collector<OUT> out) throws Exception { } }
environment.getConfig().setGlobalJobParameters(parameterTool);
ParameterTool parameterTool = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
数据分析小白入门指南
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。