赞
踩
在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。
<flink.version>1.14.6</flink.version>
<spark.version>2.4.3</spark.version>
<hadoop.version>2.8.5</hadoop.version>
<hbase.version>1.4.9</hbase.version>
<hive.version>2.3.5</hive.version>
<java.version>1.8</java.version>
<scala.version>2.11.8</scala.version>
<mysql.version>8.0.22</mysql.version>
<scala.binary.version>2.11</scala.binary.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql连接器依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
mysql> CREATE TABLE `ws` (
`id` varchar(100) NOT NULL
,`ts` bigint(20) DEFAULT NULL
,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
package com.flink.POJOs; import java.util.Objects; /** * TODO POJO类的特点 * 类是公有(public)的 * 有一个无参的构造方法 * 所有属性都是公有(public)的 * 所有属性的类型都是可以序列化的 */ public class WaterSensor { //类的公共属性 public String id; public Long ts; public Integer vc; //无参构造方法 public WaterSensor() { //System.out.println("调用了无参数的构造方法"); } public WaterSensor(String id, Long ts, Integer vc) { this.id = id; this.ts = ts; this.vc = vc; } //生成get和set方法 public void setId(String id) { this.id = id; } public void setTs(Long ts) { this.ts = ts; } public void setVc(Integer vc) { this.vc = vc; } public String getId() { return id; } public Long getTs() { return ts; } public Integer getVc() { return vc; } //重写toString方法 @Override public String toString() { return "WaterSensor{" + "id='" + id + '\'' + ", ts=" + ts + ", vc=" + vc + '}'; } //重写equals和hasCode方法 @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WaterSensor that = (WaterSensor) o; return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc); } @Override public int hashCode() { return Objects.hash(id, ts, vc); } } //scala的case类?
package com.flink.POJOs;
import org.apache.flink.api.common.functions.MapFunction;
public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
}
package com.flink.DataStream.Sink; import com.flink.POJOs.WaterSensor; import com.flink.POJOs.WaterSensorMapFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.sql.PreparedStatement; import java.sql.SQLException; /** * Flink 输出到 MySQL(JDBC) */ public class flinkSinkJdbc { public static void main(String[] args) throws Exception { //TODO 创建Flink上下文执行环境 StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecutionEnvironment.setParallelism(1); //TODO Source DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888); //TODO Transfer SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction()); /**TODO 写入 mysql * 1、只能用老的 sink 写法 * 2、JDBCSink 的 4 个参数: * 第一个参数: 执行的 sql,一般就是 insert into * 第二个参数: 预编译 sql, 对占位符填充值 * 第三个参数: 执行选项 ---->攒批、重试 * 第四个参数: 连接选项---->url、用户名、密码 */ SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)", new JdbcStatementBuilder<WaterSensor>() { @Override public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException { preparedStatement.setString(1, waterSensor.getId()); preparedStatement.setLong(2, waterSensor.getTs()); preparedStatement.setInt(3, waterSensor.getVc()); System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")"); } } , JdbcExecutionOptions .builder() .withMaxRetries(3) // 重试次数 .withBatchSize(100) // 批次的大小:条数 .withBatchIntervalMs(3000) // 批次的时间 .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8") .withUsername("root") .withPassword("********") .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间 .build() ); //TODO 写入到Mysql waterSensorSingleOutputStreamOperator.addSink(sinkFunction); streamExecutionEnvironment.execute(); } }
nc -lk 9999 #启动necat、并监听8888端口,写入数据
启动Flink程序
查看数据库写入是否正常
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。