当前位置:   article > 正文

FlinkCDC基础篇章2-数据源 SqlServerCDC写入到ES中_flink cdc sqlserver op字段

flink cdc sqlserver op字段

接着 上期FlinkCDC基础篇章1-安装使用  

  1. 下载 Flink 1.17.0 并将其解压至目录 flink-1.17.0

  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.17.0/lib/ 下:

    下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

  1. 首先,开启 checkpoint,每隔3秒做一次 checkpoint
  2. -- Flink SQL
  3. Flink SQL> SET execution.checkpointing.interval = 3s
  1. -- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
  2. CREATE TABLE t_source_sqlserver (
  3. id INT,
  4. order_date DATE,
  5. purchaser INT,
  6. quantity INT,
  7. product_id INT,
  8. PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
  9. ) WITH (
  10. 'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC连接器
  11. 'hostname' = '10.194.183.120', -- SQL Server主机名
  12. 'port' = '30027', -- SQL Server端口
  13. 'username' = 'sa', -- SQL Server用户名
  14. 'password' = 'abc@123456', -- SQL Server密码
  15. 'database-name' = 'cdc_test', -- 数据库名称
  16. 'schema-name' = 'dbo', -- 模式名称
  17. 'table-name' = 'orders' -- 要捕获更改的表名
  18. );
  19. -- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
  20. CREATE TABLE table_sink_mysql (
  21. id INT,
  22. order_date DATE,
  23. purchaser INT,
  24. quantity INT,
  25. product_id INT,
  26. PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
  27. )
  28. WITH (
  29. 'connector' = 'jdbc', -- 使用JDBC连接器
  30. 'url' = 'jdbc:mysql://10.194.183.120:30025/test', -- MySQL的JDBC URL
  31. 'username' = 'root', -- MySQL用户名
  32. 'password' = 'root', -- MySQL密码
  33. 'table-name' = 'orders' -- 要写入的MySQL表名
  34. );
  35. -- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
  36. INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
  1. CREATE TABLE income_distribution (
  2. serviceCode STRING,
  3. accountPeriod STRING,
  4. subjectCode STRING,
  5. subjectName STRING,
  6. amt DECIMAL(13,2),
  7. PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'elasticsearch-7',
  10. 'hosts' = 'http://xxxx:9200',
  11. 'index' = 'income_distribution',
  12. 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
  13. );

可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

参考文献:

使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC

flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/937861
推荐阅读
相关标签
  

闽ICP备14008679号