赞
踩
服务器环境:centos7
vim /etc/hosts
x.x.x.x jobmanager
x.x.x.x taskmanager
x.x.x.x dinky-mysql
. ├── conf │ ├── JobManager │ │ ├── flink-conf.yaml │ │ ├── log4j-cli.properties │ │ ├── log4j-console.properties │ │ ├── log4j.properties │ │ ├── log4j-session.properties │ │ ├── logback-console.xml │ │ ├── logback-session.xml │ │ ├── logback.xml │ │ ├── masters │ │ ├── workers │ │ └── zoo.cfg │ └── TaskManager │ ├── flink-conf.yaml │ ├── log4j-cli.properties │ ├── log4j-console.properties │ ├── log4j.properties │ ├── log4j-session.properties │ ├── logback-console.xml │ ├── logback-session.xml │ ├── logback.xml │ ├── masters │ ├── workers │ └── zoo.cfg ├── dinky │ ├── config │ │ └── application.yml │ └── lib ├── dinky-mysql │ ├── conf │ │ └── my.cnf │ └── data ├── docker-compose-flink-dinky.yml ├── jar ├── plugins └── sql
说明:
(建议第一次启动容器时不映射配置文件及lib包路径,将默认的配置文件和lib包复制到宿主机后,第二次启动时再映射配置文件及lib包路径,方便后期功能扩展)
docker-compose-flink-dinky.yml
version: '3' services: jobmanager: image: flink:1.14.0-scala_2.12-java8 container_name: jobmanager hostname: jobmanager expose: - "6123" ports: - "48081:8081" # Flink Web UI - "6123:6123" # JobManager RPC - "6124:6124" # JobManager REST - "8082:8082" # Metrics - "6125:6125" # Blob Server command: jobmanager networks: - flink-net restart: always volumes: - /opt/flink/conf/JobManager/:/opt/flink/conf/ #第一次启动不映射,将conf复制到宿主机后,第二次启动再映射 - /opt/flink/jar/:/opt/flink/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射 - /opt/flink/plugins/:/opt/flink/plugins/ - /opt/flink/sql/:/opt/flink/sql/ taskmanager: image: flink:1.14.0-scala_2.12-java8 container_name: taskmanager hostname: taskmanager depends_on: - jobmanager scale: 1 # 可以根据需要增加 TaskManagers 的数量 command: taskmanager networks: - flink-net restart: always environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager volumes: - /opt/flink/conf/TaskManager/:/opt/flink/conf/ #第一次启动不映射,将conf复制到宿主机后,第二次启动再映射 - /opt/flink/jar/:/opt/flink/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射 - /opt/flink/plugins/:/opt/flink/plugins/ - /opt/flink/sql/:/opt/flink/sql/ dinky: image: dinkydocker/dinky-standalone-server:0.7.0-flink14 container_name: dinky hostname: dinky ports: - "38081:8081" - "38888:8888" networks: - flink-net restart: always environment: - SPRING_PROFILES_ACTIVE=prod - MYSQL_HOST=dinky-mysql - MYSQL_PORT=33306 - MYSQL_DATABASE=dlink - MYSQL_USERNAME=dlink - MYSQL_PASSWORD=dlink depends_on: - dinky-mysql volumes: - /opt/flink/dinky/config/application.yml:/opt/dinky/config/application.yml #第一次启动不映射,将application.yml复制到宿主机后,第二次启动再映射 - /opt/flink/dinky/lib/:/opt/dinky/lib/ #第一次启动不映射,将lib复制到宿主机后,第二次启动再映射 dinky-mysql: image: dinkydocker/dinky-mysql-server:0.7.0 container_name: dinky-mysql hostname: dinky-mysql ports: - "33306:3306" networks: - flink-net restart: always environment: - MYSQL_ROOT_PASSWORD=dlink - MYSQL_DATABASE=dlink volumes: - /opt/flink/dinky-mysql/data/:/var/lib/mysql/ - /opt/flink/dinky-mysql/conf/:/etc/mysql/conf.d/ networks: flink-net: driver: bridge
创建容器
docker-compose -f docker-compose-flink-dinky.yml up -d
如果创建容器时,报错 iptables: No chain/target/match by that name. 需开启防火墙(systemctl start firewalld)
查看容器状态
如果容器都是up状态,但是web页面访问不到,注意一下两点:
flink:http://ip:48081/
dinky:http://ip:38888/
用户名/密码:admin/admin
在 dinky --> 注册中心 --> 集群管理 --> Flink实例管理 中配置 flink的JobManager地址
在测试数据库创建数据表user_source 和 target_source
在dinky创建sql并执行
-- 创建源表映射语句 DROP TABLE if exists user_source_1; CREATE TABLE IF NOT EXISTS user_source_1 ( `id` STRING, `name` STRING, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:3306/test_database?characterEncoding=UTF-8&allowMultiQueries=true&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true', 'table-name' = 'user_source', 'username' = 'root', 'password' = '123456' ); -- 创建目标表映射语句 DROP TABLE if exists user_target_1; CREATE TABLE IF NOT EXISTS user_target_1 ( `id` STRING, `name` STRING, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:3306/test_database?characterEncoding=UTF-8&allowMultiQueries=true&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true', 'table-name' = 'user_target', 'username' = 'root', 'password' = '123456' ); -- 同步数据语句 insert into user_target_1 select * from user_source_1;
配置执行模式,并执行sql。执行完成后在flink管理页面也可以看到执行的sql任务
如果在执行sql过程中有下面报错,是因为缺少对应的jar包,需要将包传入dinky的lib目录
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:587)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:561)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:146)
... 112 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:399)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:583)
... 114 more
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。