当前位置:   article > 正文

使用flink sqlserver cdc 同步数据到StarRocks_flinkcdc亿级表同步

flinkcdc亿级表同步

前沿: flink cdc功能越发强大,支持的数据源也越多,本篇介绍使用flink cdc实现:

sqlserver-》(using flink cdc)-〉flink -》(using flink starrocks connector)-〉starrocks整个流程

1.sqlserver 环境准备(得使用sqlserver 16以下版本,flink cdc当前只支持16以下sqlserver版本)

我这个使用的是docker环境:

  1. xiuchenggong@xiuchengdeMacBook-Pro ~ % docker images
  2. REPOSITORY TAG IMAGE ID CREATED SIZE
  3. starrocks.docker.scarf.sh/starrocks/allin1-ubuntu latest 4d3c0066a012 3 days ago 4.71GB
  4. mcr.microsoft.com/mssql/server 2019-latest e7fc0b49be3c 4 weeks ago 1.47GB
  5. mcr.microsoft.com/mssql/server 2022-latest 683d523cd395 5 weeks ago 2.9GB
  6. federatedai/standalone_fate latest 6019ec787699 9 months ago 5.29GB
  7. milvusdb/milvus v2.1.4 d9a5c977c414 11 months ago 711MB
  8. starrocks/dev-env main 8f4edba3b115 16 months ago 7.65GB
  9. minio/minio RELEASE.2022-03-17T06-34-49Z 239acc52a73a 17 months ago 228MB
  10. kicbase/stable v0.0.29 64d09634c60d 20 months ago 1.14GB
  11. quay.io/coreos/etcd v3.5.0 a7908fd5fb88 2 years ago 110MB
  1. docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest
docker exec -it --user root sql_server_2019 bash

开启代理,重启sqlserver环境,连接: 

  1. xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash
  2. root@99e196828047:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true
  3. SQL Server needs to be restarted in order to apply this setting. Please run
  4. 'systemctl restart mssql-server.service'.
  5. root@99e196828047:/# exit
  6. exit
  7. xiuchenggong@xiuchengdeMacBook-Pro ~ % docker restart sql_server_2019
  8. sql_server_2019
  9. xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash
  10. root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"

开启sqlserver cdc功能:

  1. root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"
  2. 1> use cdc_test;
  3. 2> go
  4. Changed database context to 'cdc_test'.
  5. 1> EXEC sys.sp_cdc_enable_db;
  6. 2> go
  7. 1> SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';
  8. 2> go
  9. is_cdc_enabled
  10. 1> CREATE TABLE orders (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id]))
  11. 2> go
  12. 1>
  13. 2>
  14. 3> EXEC sys.sp_cdc_enable_table
  15. 4> @source_schema = 'dbo',
  16. 5> @source_name = 'orders',
  17. 6> @role_name = 'cdc_role';
  18. 7> go
  19. Job 'cdc.cdc_test_capture' started successfully.
  20. Job 'cdc.cdc_test_cleanup' started successfully.

插入一些数据:

  1. 1> select * from orders;
  2. 2> go
  3. id order_date purchaser quantity product_id
  4. ----------- ---------------- ----------- ----------- -----------
  5. 1 2023-07-07 1 1 1
  6. 2 2023-07-07 2 2 2
  7. 3 2023-07-07 3 3 3
  8. 4 2023-07-07 4 4 4
  9. 45 2023-07-07 5 5 5
  10. (5 rows affected)
  11. 1> update orders set quantity = 100 where id =1 ;
  12. 2> go
  13. (1 rows affected)
  14. 1> select * from orders;
  15. 2> go
  16. id order_date purchaser quantity product_id
  17. ----------- ---------------- ----------- ----------- -----------
  18. 1 2023-07-07 1 100 1
  19. 2 2023-07-07 2 2 2
  20. 3 2023-07-07 3 3 3
  21. 4 2023-07-07 4 4 4
  22. 45 2023-07-07 5 5 5
  23. (5 rows affected)
  24. 1> update orders set quantity = 200 where id = 2;
  25. 2> go

2.准备flink环境:

3.准备starrocks docker环境:

见链接:使用 Docker 部署 StarRocks @ deploy_with_docker @ StarRocks Docs

4.启动flink环境(cd {FLINK_HOME}):

  1. xiuchenggong@xiuchengdeMacBook-Pro bin % ./start-cluster.sh
  2. Starting cluster.
  3. Starting standalonesession daemon on host xiuchengdeMacBook-Pro.local.
  4. Starting taskexecutor daemon on host xiuchengdeMacBook-Pro.local.
  5. xiuchenggong@xiuchengdeMacBook-Pro bin % ./sql-client.sh embedded
  6. SLF4J: Class path contains multiple SLF4J bindings.
  7. SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/flink-1.16.2/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  8. SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  9. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  10. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
  11. ▒▓██▓██▒
  12. ▓████▒▒█▓▒▓███▓▒
  13. ▓███▓░░ ▒▒▒▓██▒ ▒
  14. ░██▒ ▒▒▓▓█▓▓▒░ ▒████
  15. ██▒ ░▒▓███▒ ▒█▒█▒
  16. ░▓█ ███ ▓░▒██
  17. ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
  18. █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
  19. ████░ ▒▓█▓ ██▒▒▒ ▓███▒
  20. ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
  21. ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
  22. ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
  23. ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
  24. ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
  25. ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
  26. ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
  27. ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
  28. ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
  29. ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
  30. ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
  31. █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
  32. ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
  33. ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
  34. ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
  35. ▓█ ▒█▓ ░ █░ ▒█ █▓
  36. █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
  37. █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
  38. ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
  39. ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
  40. ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
  41. ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
  42. ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
  43. ______ _ _ _ _____ ____ _ _____ _ _ _ BETA
  44. | ____| (_) | | / ____|/ __ \| | / ____| (_) | |
  45. | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
  46. | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
  47. | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
  48. |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
  49. Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
  50. Command history file path: /Users/xiuchenggong/.flink-sql-history
  51. Flink SQL>

建sqlsever到flink的表:

  1. Flink SQL> CREATE TABLE t_source_sqlserver (
  2. > id INT,
  3. > order_date DATE,
  4. > purchaser INT,
  5. > quantity INT,
  6. > product_id INT,
  7. > PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
  8. > ) WITH (
  9. > 'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC连接器
  10. > 'hostname' = 'localhost', -- SQL Server主机名
  11. > 'port' = '30027', -- SQL Server端口
  12. > 'username' = 'sa', -- SQL Server用户名
  13. > 'password' = 'abc@123456', -- SQL Server密码
  14. > 'database-name' = 'cdc_test', -- 数据库名称
  15. > 'schema-name' = 'dbo', -- 模式名称
  16. > 'table-name' = 'orders' -- 要捕获更改的表名
  17. > );

 再建flink到starrocks的表:

  1. Flink SQL>
  2. >
  3. > CREATE TABLE IF NOT EXISTS `orders_sink` (
  4. > id int,
  5. > order_date date,
  6. > purchaser int,
  7. > quantity int,
  8. > product_id int,
  9. > PRIMARY KEY(`id`) NOT ENFORCED
  10. > ) with (
  11. > 'load-url' = 'localhost:8030',
  12. > 'sink.buffer-flush.interval-ms' = '15000',
  13. > 'sink.properties.row_delimiter' = '\x02',
  14. > 'sink.properties.column_separator' = '\x01',
  15. > 'connector' = 'starrocks',
  16. > 'database-name' = 'test',
  17. > 'table-name' = 'orders',
  18. > 'jdbc-url' = 'jdbc:mysql://localhost:9030',
  19. > 'password' = '',
  20. > 'username' = 'root'
  21. > )
  22. > ;
  23. [INFO] Execute statement succeed.
  1. Flink SQL> show tables;
  2. +--------------------+
  3. | table name |
  4. +--------------------+
  5. | orders_sink |
  6. | t_source_sqlserver |
  7. +--------------------+
  8. 2 rows in set

提交作业:

  1. Flink SQL> insert into orders_sink select * from t_source_sqlserver;
  2. [INFO] Submitting SQL update statement to the cluster...
  3. WARNING: An illegal reflective access operation has occurred
  4. WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/xiuchenggong/flink/flink-1.16.2/lib/flink-dist-1.16.2.jar) to field java.lang.Class.ANNOTATION
  5. WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
  6. WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
  7. WARNING: All illegal access operations will be denied in a future release
  8. [INFO] SQL update statement has been successfully submitted to the cluster:
  9. Job ID: 746cc173cd71133e96d080f25327e9bc

flink webui看到长期驻留的作业:

5.验证在sqlserver中的数据是不是已经同步到starrocks中了,insert/update/delete:

  1. StarRocks > select * from orders;
  2. +------+------------+-----------+----------+------------+
  3. | id | order_date | purchaser | quantity | product_id |
  4. +------+------------+-----------+----------+------------+
  5. | 1 | 2023-07-07 | 1 | 100 | 1 |
  6. | 3 | 2023-07-07 | 3 | 3 | 3 |
  7. | 4 | 2023-07-07 | 4 | 4 | 4 |
  8. | 45 | 2023-07-07 | 5 | 5 | 5 |
  9. | 2 | 2023-07-07 | 2 | 200 | 2 |
  10. +------+------------+-----------+----------+------------+
  11. 5 rows in set (0.016 sec)
  12. StarRocks >

数据的增删改都同步过去了;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/579461
推荐阅读
相关标签
  

闽ICP备14008679号