赞
踩
- package com.example.flink;
-
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.typeutils.RowTypeInfo;
- import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-
- /**
- * @Author ex-liujiwei
- * @Date 2022/3/11 23:20
- */
- public class Demo01 {
- public static void main(String[] args) throws Exception {
- //sql查询结果列类型
- TypeInformation[] fieldTypes = new TypeInformation[] {
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO
- };
-
- RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
- JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- //数据库连接信息
- .setDrivername("com.mysql.jdbc.Driver")
- .setDBUrl("jdbc:mysql://10.20.16.15:5102/bsmp?useSSL=false&useUnicode=true&characterEncoding=UTF-8")
- .setUsername("bsmpopr")
- .setPassword("Jcfgdasdf4#")
- .setQuery("SELECT user_id,name,phone_num,email FROM sys_user_info")
- .setRowTypeInfo(rowTypeInfo)
- .finish();
-
- //搭建flink
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- //获取数据源
- //datasource
- DataSource s = env.createInput(jdbcInputFormat);
- s.print();
- env.execute();
- }
- }
pom
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.10.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>1.10.0</version>
- <!-- <scope>provided</scope> -->
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-jdbc_2.11</artifactId>
- <version>1.8.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
- <version>1.9.0</version>
- </dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。