赞
踩
本文将会实现flink连接MySQL,废话不多说,上代码。本文章中的数据库表名为token,只有两个字段code和state
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>flink-demo1</artifactId> <groupId>org.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>flink-mysql</artifactId> <properties> <java.version>1.8</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.17</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.12.2</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.12.2</version> </dependency> </dependencies> </project>
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class MysqlSource extends RichSourceFunction<TokenVO> { private static final Logger logger = LoggerFactory.getLogger(MysqlSource.class); private Connection connection = null; private PreparedStatement ps = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //加载数据库驱动 Class.forName("com.mysql.cj.jdbc.Driver"); //获取连接 connection = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123456"); //构建读取SQL ps = connection.prepareStatement("select * from token"); } @Override public void run(SourceContext<TokenVO> sourceContext) throws Exception { try { //执行读取操作 ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { TokenVO vo = new TokenVO(); vo.setCode(resultSet.getString("code")); vo.setState(resultSet.getString("state")); sourceContext.collect(vo); } } catch (Exception e) { logger.error("runException:{}", e); } } @Override public void cancel() { try { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { logger.error("runException:{}", e); } } }
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class MysqlSink extends RichSinkFunction<TokenVO> { private Connection connection; private PreparedStatement preparedStatement; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //加载数据库驱动 Class.forName("com.mysql.cj.jdbc.Driver"); //获取连接 connection = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123"); //构建执行SQL preparedStatement = connection.prepareStatement("UPDATE token SET code = ?,state = ?"); } @Override public void close() throws Exception { super.close(); if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } super.close(); } @Override public void invoke(TokenVO value, Context context) throws Exception { try { //添加新数据,执行SQL preparedStatement.setString(1, "new code 1"); preparedStatement.setString(2, "new state 1"); preparedStatement.executeUpdate(); } catch (Exception e) { e.printStackTrace(); } } }
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test {
public static void main(String[] args) throws Exception {
//获取执行环境
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<TokenVO> source = fsEnv.addSource(new MysqlSource());
//输出数据
source.addSink(new MysqlSink());
//执行该逻辑
fsEnv.execute();
}
}
source
数据源是程序读取数据的来源,用户可以通过
env.addSource(SourceFunction)
,将SourceFunction添加到程序中。Flink内置许多已知实现的SourceFunction,但是用户可以自定义实现SourceFunction
(非并行化的接口)接口或者实现ParallelSourceFunction
(并行化)接口,如果需要有状态管理还可以继承RichParallelSourceFunction
sink
Data Sink使用DataStreams并将其转发到文件,Socket,外部系统或打印它们。Flink带有多种内置输出格式,这些格式封装在DataStreams的操作后面。
文末小彩蛋,自建摸鱼网站,各大网站热搜一览,上班和摸鱼很配哦!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。