当前位置:   article > 正文

Flink实践——CDC(一)_idea 创建 flink 作业 对接cdc 样例

idea 创建 flink 作业 对接cdc 样例

Refer

遵循历史,依然要感谢各位在网上的输出!

CDC

详细步骤

开启MySQL BinLog

首先要明确的是是否需要开启BinLog,如果在正式环境上,一般不用自己开启,运维侧一般为了恢复数据等恢复策略已经开启了BinLog哈,我这里的操作仅支持个人测试使用哦!

  • Linux:更改my.cnf文件

  • Windows:更改my.ini文件(可以从“服务”入手查看my.ini文件地址)
    在这里插入图片描述
    在这里插入图片描述

  • 在my.ini中的[mysqld]下添加以下语句:

#log-bin:备份的文件目录和文件名
#binlog_do_db:指定数据库名,不写即默认所有数据库
log-bin="D:/mysql/data/log"
#binlog_do_db=test
  • 1
  • 2
  • 3
  • 4

IDEA Flink Project

POM

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.13.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.3</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.5</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

IDEA问题解决

  • java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
    在这里插入图片描述
  • 解决方案(按照步骤一步一步添加Flink的dist jar包就可以啦)

    在这里插入图片描述
    在这里插入图片描述

基本功能示例

这里的代码示例与refer中的示例类似哈,可以借鉴那位大哥的一样的

package com.example.flinkcdc;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class mysqlBinlogSource {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("20210906_test")
                .tableList("20210906_test.city")
                .username("root")
                .password("123456")
                .serverTimeZone("Asia/Shanghai")
                // 可以自定义反序列化器,类似于binlog的不同数据处理方式的处理办法
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();
//        //TODO 2.检查点配置
//        //2.1 开启检查点
//        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);//5秒执行一次,模式:精准一次性
//        //2.2 设置检查点超时时间
//        env.getCheckpointConfig().setCheckpointTimeout(60*1000);
//        //2.3 设置重启策略
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2*1000));//两次,两秒执行一次
//        //2.4 设置job取消后检查点是否保留
//        env.getCheckpointConfig().enableExternalizedCheckpoints(
//                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//保留
//        //2.5 设置状态后端-->保存到hdfs
//        env.setStateBackend(new FsStateBackend("hdfs://192.168.231.121:8020/ck"));
//        //2.6 指定操作hdfs的用户
//        System.setProperty("HADOOP_USER_NAME", "gaogc");

        //3.2 从源端获取数据
        DataStreamSource<String> sourceDS = env.addSource(sourceFunction);
        //打印测试
        sourceDS.print();
        //执行
        env.execute();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 样例数据:查询
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=binlog.000001, pos=1572}} ConnectRecord{topic='mysql_binlog_source.20210906_test.city', kafkaPartition=null, key=Struct{ID=110011}, keySchema=Schema{mysql_binlog_source.20210906_test.city.Key:STRUCT}, value=Struct{after=Struct{ID=110011,Name=TIANJIN,CountryCode=IDN,District=WEST JAVA,Population=123123123},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=20210906_test,table=city,server_id=0,file=binlog.000001,pos=1572,row=0},op=c,ts_ms=1639388695105}, valueSchema=Schema{mysql_binlog_source.20210906_test.city.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  • 1
  • 样例数据:修改
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1639388718, file=binlog.000001, pos=1637, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.20210906_test.city', kafkaPartition=null, key=Struct{ID=110011}, keySchema=Schema{mysql_binlog_source.20210906_test.city.Key:STRUCT}, value=Struct{before=Struct{ID=110011,Name=TIANJIN,CountryCode=IDN,District=WEST JAVA,Population=123123123},after=Struct{ID=110011,Name=beijing,CountryCode=IDN,District=WEST JAVA,Population=123123123},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1639388718000,db=20210906_test,table=city,server_id=1,file=binlog.000001,pos=1784,row=0,thread=5},op=u,ts_ms=1639388718331}, valueSchema=Schema{mysql_binlog_source.20210906_test.city.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  • 1
  • 样例数据:新增
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1639388757, file=binlog.000001, pos=1978, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.20210906_test.city', kafkaPartition=null, key=Struct{ID=10012}, keySchema=Schema{mysql_binlog_source.20210906_test.city.Key:STRUCT}, value=Struct{after=Struct{ID=10012,Name=dongbei,CountryCode=IDN,District=WEST JAVA,Population=1231231231},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1639388757000,db=20210906_test,table=city,server_id=1,file=binlog.000001,pos=2125,row=0,thread=5},op=c,ts_ms=1639388757341}, valueSchema=Schema{mysql_binlog_source.20210906_test.city.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  • 1
  • 样例数据:删除
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1639388768, file=binlog.000001, pos=2287, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.20210906_test.city', kafkaPartition=null, key=Struct{ID=10012}, keySchema=Schema{mysql_binlog_source.20210906_test.city.Key:STRUCT}, value=Struct{before=Struct{ID=10012,Name=dongbei,CountryCode=IDN,District=WEST JAVA,Population=1231231231},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1639388768000,db=20210906_test,table=city,server_id=1,file=binlog.000001,pos=2434,row=0,thread=5},op=d,ts_ms=1639388768361}, valueSchema=Schema{mysql_binlog_source.20210906_test.city.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  • 1

基本功能示例

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/581616
推荐阅读
相关标签
  

闽ICP备14008679号