赞
踩
最近公司要求上线实时数仓,在探索的过程中总结了部分经验,现在发布出来供大家参考
1. 修改my.cnf
log_bin = MYON // 任意名称均可,只要配置了名称即开启binlog
server-id = 123 // 配置与其它server不同的id
binlog-format = row // 设置binlog模式为row
// ------------可选配置项
binlog_ignore_db = *** //指定不记录二进制日志的数据库
binlog_do_db = *** //明确指定要记录日志的数据库
log_bin_index = *** //指定mysql-bin.index文件的路径
2. 重启mysql
3. 进入mysql修改binlog_row_image为full
set global binlog_row_image="full";
4. 查看binlog开启是否生效
show variables like '%log_bin%';(需要设置为ON)
5. 查看binlog模式和binlog_row_image配置
show global variables like '%binlog_format%';(需要设置为ROW模式)
show variables like '%binlog_row_image%';(需要设置为FULL)
6. 创建用户和赋权
> create user 'dp_test'@'%' identified by '12345678';
> grant SELECT on mysql.db to 'dp_test'@'%';
> grant SELECT on mysql.tables_priv to 'dp_test'@'%';
> grant SELECT on mysql.user to 'dp_test'@'%';
> grant SELECT on test.* to 'dp_test'@'%';
> grant LOCK TABLES on test.* to 'dp_test'@'%';
> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dp_test' IDENTIFIED BY '12345678';
> flush privileges;
1. 环境检查
1.1 版本检查
SELECT @@VERSION;
1.2 检查CDC服务开启状态
select is_cdc_enabled from sys.databases where name='dbname';
–0为关闭,1为开启。数据库名为dbname
-- 查看数据库是否启用cdc
SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;
–0为关闭,1为开启。数据库名为dbname
-- 查看当前数据库表是否启用cdc
SELECT name,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1;
2. 数据库创建与cdc功能启用
create database mydb collate Chinese_PRC_CI_AS;
-- 对当前数据库启用 CDC
USE mydb
GO
EXECUTE sys.sp_cdc_enable_db;
GO
-- 对当前数据库禁用 CDC
USE mydb
GO
EXEC sys.sp_cdc_disable_db
GO
-- 启用
USE mydb
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'sqlserver_cdc',
@source_name = N'employee',
@role_name = NULL,
@supports_net_changes = 1
GO
-- 禁用
USE mydb
GO
EXEC sys.sp_cdc_disable_table
@source_schema = N'sqlserver_cdc',
@source_name = N'employee',
@capture_instance = N'sqlserver_cdc_employee'
GO
3. 启动sqlserver代理
方法一:
在cmd命令提示符下,输入下列命令:
net start SQLSERVERAGENT
方法二:
这个也可以在电脑中手动启动,win7下操作,选择我的电脑==>右键==>管理,选中服务==>在右边名称中找到 SQL Server代理(MSSQLSERVER),如果状态栏显示已启动,就代表启动了,如果没有,选中,右键==>启动,SQL Server Agent服务开机启动.
找到 SQL Server代理(MSSQLSERVER)==>选中==>右键==>属性,将“启动类型”设置为 自动,那么在电脑重启或开机时, SQL Server代理(MSSQLSERVER)就会自动启动了。
如果“启动类型”为手动,那么在电脑重启时,这个服务就不会自动启动了,就需要手动启动了
1. 开启日志归档。
1.1 在命令行工具中执行以下命令以sys用户连接到数据库。
在实际使用过程中,可以有多种方式连接数据库,此处以命令行方式为例进行说明。
sqlplus /nolog
CONNECT sys/password@host:port AS SYSDBA;
其中:
password为数据库sys用户的密码,可向数据库管理员获取。
host为数据库实例所在服务器的IP地址,请根据实际情况设置。
port为数据库实例所使用的端口,请根据实际情况设置。
1.2 执行以下命令,检查日志归档是否已开启。
archive log list;
若回显打印“Database log mode: No Archive Mode”,说明日志归档未开启,继续执行下一步。
若回显打印“Database log mode: Archive Mode”,说明日志归档已开启,直接跳到1.6。
1.3 执行以下命令配置归档日志参数。
alter system set db_recovery_file_dest_size = 100G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
其中:
100G为日志文件存储空间的大小,请根据实际情况设置。
/opt/oracle/oradata/recovery_area为日志存储路径,请根据实际规划设置,但须确保路径提前创建。
1.4 执行以下命令开启日志归档。
须知:
开启日志归档功能需重启数据库,重启期间将导致业务中断,请谨慎操作。
归档日志会占用较多的磁盘空间,若磁盘空间满了会影响业务,请定期清理过期归档日志。
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
1.5 执行以下命令,确认日志归档是否已成功开启。
archive log list;
当回显打印“Database log mode: Archive Mode”,说明日志归档已开启。
1.6 执行以下命令退出数据库连接。
exit;
2. 安装LogMiner工具。
2.1 在命令行工具中执行以下命令以sys用户连接到数据库实例。
sqlplus sys/password@host:port/SID as sysdba
其中:
password为数据库sys用户的密码,请向数据库管理员获取。
host为数据库实例所在服务器的IP地址,请根据实际情况设置。
port为数据库实例所使用的端口,请根据实际情况设置。
SID为要同步数据所在实例的实例名,请根据实际情况设置。
2.2 执行以下命令,检查LogMiner工具是否已安装。
desc DBMS_LOGMNR
desc DBMS_LOGMNR_D
若无打印信息返回,说明LogMiner工具未安装,继续执行下一步。
若有打印信息返回,说明LogMiner工具已安装,直接跳到3。
2.3 执行以下命令,安装LogMiner工具。
@$ORACLE_HOME/rdbms/admin/dbmslm.sql
@$ORACLE_HOME/rdbms/admin/dbmslmd.sql
3. 创建LogMiner执行用户并给用户赋予权限。
3.1 执行以下命令创建LogMiner用户角色并配置权限。
create role roma_logminer_privs;
grant create session,
execute_catalog_role,
select any transaction,
flashback any table,
select any table,
lock any table,
select any dictionary to roma_logminer_privs;
grant select on SYSTEM.LOGMNR_COL$ to roma_logminer_privs;
grant select on SYSTEM.LOGMNR_OBJ$ to roma_logminer_privs;
grant select on SYSTEM.LOGMNR_USER$ to roma_logminer_privs;
grant select on SYSTEM.LOGMNR_UID$ to roma_logminer_privs;
grant select on V_$DATABASE to roma_logminer_privs;
grant select_catalog_role to roma_logminer_privs;
grant LOGMINING to roma_logminer_privs;
其中:
roma_logminer_privs为LogMiner用户角色名,请根据实际规划设置。
“grant LOGMINING to roma_logminer_privs;”仅当Oracle为12c版本时,才需要添加,否则删除此行内容。
3.2 执行以下命令创建LogMiner执行用户。
create user roma_logminer identified by password default tablespace users;
grant roma_logminer_privs to roma_logminer;
alter user roma_logminer quota unlimited on users;
其中:
roma_logminer为LogMiner用户名,请根据实际规划设置。
password为LogMiner用户密码,请根据实际规划设置。
roma_logminer_privs为LogMiner用户角色,在3.a中创建。
3.3 执行以下命令修改日志记录参数。
alter database add supplemental log data (all) columns;
3.4 执行以下命令退出数据库连接。
exit;
1. 开启日志归档。
1.1 在命令行工具中执行以下命令以sys用户连接到数据库。
在实际使用过程中,可以有多种方式连接数据库,此处以命令行方式为例进行说明。
sqlplus /nolog
CONNECT sys/password@host:port AS SYSDBA;
其中:
password为数据库sys用户的密码,可向数据库管理员获取。
host为数据库实例所在服务器的IP地址,请根据实际情况设置。
port为数据库实例所使用的端口,请根据实际情况设置。
1.2 执行以下命令,检查日志归档是否已开启。
archive log list;
若回显打印“Database log mode: No Archive Mode”,说明日志归档未开启,继续执行下一步。
若回显打印“Database log mode: Archive Mode”,说明日志归档已开启,直接跳到1.6。
1.3 执行以下命令配置归档日志参数。
alter system set db_recovery_file_dest_size = 100G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
其中:
100G为日志文件存储空间的大小,请根据实际情况设置。
/opt/oracle/oradata/recovery_area为日志存储路径,请根据实际规划设置,但须确保路径提前创建。
1.4 执行以下命令开启日志归档。
须知:
开启日志归档功能需重启数据库,重启期间将导致业务中断,请谨慎操作。
归档日志会占用较多的磁盘空间,若磁盘空间满了会影响业务,请定期清理过期归档日志。
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
1.5 执行以下命令,确认日志归档是否已成功开启。
archive log list;
当回显打印“Database log mode: Archive Mode”,说明日志归档已开启。
1.6 执行以下命令退出数据库连接。
exit;
2. 安装LogMiner工具。
2.1 在命令行工具中执行以下命令以sys用户连接到数据库实例。
sqlplus sys/password@host:port/SID as sysdba
其中:
password为数据库sys用户的密码,请向数据库管理员获取。
host为数据库实例所在服务器的IP地址,请根据实际情况设置。
port为数据库实例所使用的端口,请根据实际情况设置。
SID为要同步数据所在实例的实例名,请根据实际情况设置。
2.2 执行以下命令,检查LogMiner工具是否已安装。
desc DBMS_LOGMNR
desc DBMS_LOGMNR_D
若无打印信息返回,说明LogMiner工具未安装,继续执行下一步。
若有打印信息返回,说明LogMiner工具已安装,直接跳到3。
2.3 执行以下命令,安装LogMiner工具。
@$ORACLE_HOME/rdbms/admin/dbmslm.sql
@$ORACLE_HOME/rdbms/admin/dbmslmd.sql
3. 创建LogMiner执行用户并给用户赋予权限。
3.1 执行以下命令创建LogMiner用户角色并配置权限。
create role c##roma_logminer_privs container=all;
grant create session,
execute_catalog_role,
select any transaction,
flashback any table,
select any table,
lock any table,
logmining,
set container,
select any dictionary to c##roma_logminer_privs container=all;
grant select on SYSTEM.LOGMNR_COL$ to c##roma_logminer_privs container=all;
grant select on SYSTEM.LOGMNR_OBJ$ to c##roma_logminer_privs container=all;
grant select on SYSTEM.LOGMNR_USER$ to c##roma_logminer_privs container=all;
grant select on SYSTEM.LOGMNR_UID$ to c##roma_logminer_privs container=all;
grant select on V_$DATABASE to c##roma_logminer_privs container=all;
grant select_catalog_role to c##roma_logminer_privs container=all;
其中,c##roma_logminer_privs为LogMiner用户角色名,请根据实际规划设置。
3.2 执行以下命令创建LogMiner执行用户。
create user c##roma_logminer identified by password default tablespace users container=all;
grant c##roma_logminer_privs to c##roma_logminer container=all;
alter user c##roma_logminer quota unlimited on users container=all;
其中:
c##roma_logminer为LogMiner用户名,请根据实际规划设置。
password为LogMiner用户密码,请根据实际规划设置。
c##roma_logminer_privs为LogMiner用户角色,在3.a中创建。
3.3 执行以下命令修改日志记录参数。
alter database add supplemental log data (all) columns;
3.4 执行以下命令退出数据库连接。
exit;
---- 数据库创建与cdc功能启用
docker exec -it -u root 48ba43425e6b bash
su - oracle
sqlplus / as sysdba
-- sqlplus sys/system@helowin as sysdba
-- 启用日志归档
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
-- 检查日志归档是否开启
archive log list;
-- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 创建表空间
CREATE TABLESPACE logminer_tbs_cdc DATAFILE '/home/oracle/oradata/helowin/logminer_tbs_cdc.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS_CDC QUOTA UNLIMITED ON LOGMINER_TBS_CDC;
grant connect,resource,dba to flinkuser;
GRANT CREATE TRIGGER to flinkuser;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
-- 创建 FLINKUSER.EMPLOYEE 表
create table FLINKUSER.EMPLOYEE
(
EMPLOYEE_ID NUMBER not null constraint EMPLOYEE_PK primary key,
NAME VARCHAR2(64),
AGE NUMBER,
WEIGHT NUMBER(10,2),
CREATE_USER VARCHAR2(64),
CREATE_TIME DATE,
UPDATE_USER VARCHAR2(64),
UPDATE_TIME DATE
);
comment on table EMPLOYEE is '员工表';
comment on column EMPLOYEE.NAME is '员工名称';
comment on column EMPLOYEE.WEIGHT is '员工体重';
comment on column EMPLOYEE.CREATE_USER is '创建人';
comment on column EMPLOYEE.CREATE_TIME is '创建时间';
comment on column EMPLOYEE.UPDATE_USER is '更新人';
comment on column EMPLOYEE.UPDATE_TIME is '更新时间';
-- 创建序列
create sequence employee_id_seq
-- 从主键从1开始
minvalue 1
-- 不设置最大值
nomaxvalue
-- 每次增加1
increment by 1
nocache;
-- 创建触发器
create trigger employee_id_tri
before insert
on FLINKUSER.employee
for each row
when (new.employee_id is null)
begin
select employee_id_seq.nextval into :new.employee_id from dual;
end;
INSERT INTO FLINKUSER.EMPLOYEE (NAME, AGE, WEIGHT, CREATE_USER, CREATE_TIME, UPDATE_USER, UPDATE_TIME)
VALUES ('chatJD', 17, 72.50, 'creator', TO_DATE('2023-02-07 15:23:50', 'YYYY-MM-DD HH24:MI:SS'), 'creator',
TO_DATE('2023-02-07 15:24:10', 'YYYY-MM-DD HH24:MI:SS'));
insert into FLINKUSER.EMPLOYEE(name, age, weight, create_user, create_time, update_user, update_time)
values ('chatGPT', 19, 72.50, 'creator', SYSDATE, 'creator', SYSDATE);
ALTER TABLE FLINKUSER.EMPLOYEE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
-- ALTER TABLE FLINKUSER.EMPLOYEE DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
sudo vim /etc/profile
# java env
export JAVA_HOME=/home/soft/jdk1.8.0_251
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
# hadoop (yarn 模式启动才需要配置)
HADOOP_HOME=/opt/module/hadoop-2.7.5
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
1、下载地址
https://archive.apache.org/dist/flink/
验证时选择的版本是:flink-1.13.6-bin-scala_2.12.tgz
2、解压文件
tar -zxvf flink-1.13.6-bin-scala_2.12.tgz -C /usr/local/myroom/
3、配置环境变量,在最后一行新增两个export
vi /etc/profile
export FLINK_HOME=/usr/local/myroom/flink-1.13.6
export PATH=$PATH:$FLINK_HOME/bin
4、重新加载,使配置立即生效
source /etc/profile
5、验证安装是否成功
flink --version
6、启动flink
start-cluster.sh
备注,停止命令是:stop-cluster.sh
7、查看相关进程是否启动
[root@single myroom]# jps
16931 TaskManagerRunner
16662 StandaloneSessionClusterEntrypoint
16952 Jps
8、页面访问
http://192.168.23.190:8081/
1、节点规划
JobManager: 192.168.23.151
TaskManager:192.168.23.152
TaskManager:192.168.23.153
TaskManager:192.168.23.154
2、安装配置
1)首先在一台机器上安装flink,参考flink本地开发环境安装与部署(单机)
2)修改flink-conf.yaml
vi flink-1.13.6/conf/flink-conf.yaml
jobmanager.rpc.address: 192.168.23.151(这一行修改成机器IP)
3)修改master
vi flink-1.13.6/conf/masters
192.168.23.151:8081(内容是JobManager的IP:端口)
4)修改workers
vi flink-1.13.6/conf/workers
192.168.23.152(内容是3个TaskManager的IP)
192.168.23.153
192.168.23.154
5)将151节点安装配置好的flink复制到其它3台机器
scp -r ./flink-1.13.6 192.168.23.152://usr/local/myroom/
scp -r ./flink-1.13.6 192.168.23.153://usr/local/myroom/
scp -r ./flink-1.13.6 192.168.23.154://usr/local/myroom/
3、部署测试
1)启动
start-cluster.sh
# Jobmanager的地址,taskmanager必须要识别并能连上。
# 只有standalone的集群模式起作用,当执行bin/jobmanager.sh --host<hostname>的时候将被覆盖
# 在YARN或者Mesos的集群模式下将自动替换为jobmanager所在节点的hostname
# JobMaster节点IP地址
jobmanager.rpc.address
# JobMaster端口号,默认6123
jobmanager.rpc.port:6123
# JobManager能够使用的内存的最大值
jobmanager.heap.size: 1024m
# taskmanager 能够使用的最大内存大小
taskmanager.memory.process.size: 2048m
# taskmanager最大内存,不包含虚拟机的metaspace和其他的开销(不能和taskmanager.memory.process.size同时设置)
# taskmanager.memory.flink.size: 2048m
# 这俩不能和taskmanager.memory.flink.size或taskmanager.memory.process.size同时设置
# 指定JVM堆内存大小
# taskmanager.memory.task.heap.size: 2048m
# 指定JVM托管内存大小
# taskmanager.memory.managed.size: 512m
# 每个TaskManager提供的slot数量,每个slot上能运行一个并行的pipeline任务。
taskmanager.numberOfTaskSlots: 1
# 没有指定并行度的情况下,默认的全局并行度
parallelism.default: 1
# 默认的文件系统的数据结构和授权
# 如果是本地文件,那么格式为: 'file:///'
# 如果是HDFS文件,那么格式为:'hdfs://namenode_name:9870'
# fs.default-scheme
#==============================================================================
# 高可用配置
#==============================================================================
# 高可用模型的选择有:'NONE' 或者 'zookeeper'
# high-availability: zookeeper
# 指定master恢复所需要的元数据存储的路径。这个路径需要存储像持久化数据流图一样的大对象
# high-availability.storageDir: hdfs:///flink/ha/
# # ZooKeeper集群的地址,用于协调高可用模型的建立
# high-availability.zookeeper.quorum: localhost:2181
# 如果zookeeper安全策略开启,那么下面这个值就应该是creator,默认是oepn
# high-availability.zookeeper.client.acl: open
#==============================================================================
# 容错和检查点配置
#==============================================================================
# Flink后端会默认存储算子的状态,当checkpointing是开启状态的时候
# Flink支持的后端存储有 'jobmanager(MemoryStateBackend)', 'filesystem', 'rocksdb', 或者自定的类<class-name-of-factory>.
# state.backend: filesystem
# 自动存储检查点的数据文件和元数据的默认目录,这个目录必须能被所有的进程、节点访问
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# 手动保存状态时 savepoints 的目录(可选),用于状态存储写文件时使用
state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# 是否开启增量存储(类似RocksDB存储转态的情况)
# state.backend.incremental: false
# 故障恢复的策略,怎么样从失败的任务重恢复计算。配置可选: full 或者 region
# full: 表示集群中的任务发生故障的时候,所有的task重新启动
# region: 表示集群中的任务发生故障的时候,仅重启被失败失败任务所影响的下游的任务和产生数据的上游任务。
jobmanager.execution.failover-strategy: region
#==============================================================================
# REST和前端设置
#==============================================================================
# Web端运行时监视器端口
rest.port: 8081
# Web Server的地址配置,用于REST api调用
#rest.address: 0.0.0.0
# 配置REST端口绑定的范围
#rest.bind-port: 8080-8090
# 配置Server地址并绑定server
#rest.bind-address: 0.0.0.0
# 配置任务提交是否在网页端开启,默认开启
#web.submit.enable: false
#==============================================================================
# 高级配置
#==============================================================================
# 指定默认的本地临时目录。Flink在运行时会把临时数据写到本地文件系统,
# 比如 Flink 接收到的JAR、应用程序状态(当用RocksDB存储应用程序状态时),要避免目录里的数据被服务器自动清空
# 否则 job 重启时可能因找不到元数据导致恢复失败
# 注意:推荐是添加多个文件夹地址(unix系统使用冒号隔开,windows使用逗号隔开),多个文件夹地址可以是相同的,多文件地址的配置致使Flink创建多个线程来出来,提高读写效率和吞度量
# io.tmp.dirs: /tmp
# 是否在 TM 启动时预先分配 TM 管理的内存
taskmanager.memory.preallocate: false
# 类加载顺序的设定,Flink默认是'child-first',还可以选:'parent-first'
# 'child-first'这种加载机制允许用户在自己的应用中使用不同的依赖或者JAR包的版本。
# classloader.resolve-order: child-first
# 网络堆栈能使用的内存总量的配置。这些配置一般不需要调优。
# 当遇到"Insufficient number of network buffers" 错误的时候就需要调整下面的参数了。
# 默认最小是64MB, 最大值是 1GB
# TM 节点为网络缓冲区分配的内存,默认占JVM堆内存10%
# taskmanager.memory.network.fraction: 0.1
# 网络缓冲区最小值
# taskmanager.memory.network.min: 64mb
# 网络缓冲区最大值
# taskmanager.memory.network.max: 1gb
#==============================================================================
# Flink集群安全配置
#==============================================================================
# Kerberos认证,主要用在Hadoop,Zookeeper, connectors上的安全组件
# 通常开启的步骤为:
# 1. 配置本地的krb5.conf文件
# 2. 提供Kerberos的凭证(可以是一个keytab或者一个ticket cache)
# 3. 针对不同的JAAS的登录上下文,激活Kerberos的凭证
# 4. 使用JAAS/SASL配置连接
# 下面这些配置是说明Kerberos怎么提供安全凭证的。
# 当用户提供了keytab path 和 principal,keytab将用于代贴ticket cache
# 是否从 Kerberos ticket 缓存中读取
# security.kerberos.login.use-ticket-cache: true
# 包含用户凭据的 Kerberos 密钥表文件的绝对路径
# security.kerberos.login.keytab: /path/to/kerberos/keytab
# 与 keytab 关联的 Kerberos 主体名称
# security.kerberos.login.principal: flink-user
# 以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据
# 例如,Client,KafkaClient 使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证# security.kerberos.login.contexts: Client,KafkaClient
#==============================================================================
# Zookeeper 安全配置
#==============================================================================
# 当zookeeper开启了安全模式,下面的配置就是必要的。
# 如果zookeeper配置了服务名称,就需要修改下面这个配置
# zookeeper.sasl.service-name: zookeeper
# 该配置必须匹配 security.kerberos.login.contexts 配置中的列表(含有一个)
# zookeeper.sasl.login-context-name: Client
#==============================================================================
# 历史任务服务器配置
#==============================================================================
# HistoryServer 通过bin/historyserver.sh (start|stop)开启和关闭
# 用于上传执行完成的job。也是HistoryServer监控的目录。
#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
# 基于 Web 的 HistoryServer 的地址
#historyserver.web.address: 0.0.0.0
# 基于 Web 的 HistoryServer 的端口号
#historyserver.web.port: 8082
# 以逗号分隔的目录列表,指定监控已完成的job所在的地址
#historyserver.archive.fs.dir: hdfs:///completed-jobs/
# 刷新受监控目录的时间间隔(毫秒)
#historyserver.archive.fs.refresh-interval: 10000
# 集群的 master 节点
localhost:8081
# 集群中所有的工作节点
localhost
# 每个 tick 的毫秒数
tickTime=2000
# 初始同步阶段可以采用的 tick 数
initLimit=10
# 在发送请求和获取确认之间可以传递的 tick 数
syncLimit=5
# 存储快照的目录
# dataDir=/tmp/zookeeper
# 客户端将连接的端口
clientPort=2181
# ZooKeeper quorum peers
server.1=localhost:2888:3888
# server.2=host:peer-port:leader-port
# 在不同平台下Flink运行的日志文件
log4j-cli.properties
log4j-console.properties
log4j-yarn-session.properties
log4j.properties
logback-console.xml
logback-yarn.xml
logback.xml
classloader.resolve-order: parent-first
classloader.check-leaked-classloader: false
# 此配置按实际work数量来修改
taskmanager.numberOfTaskSlots: 4
state.backend: rocksdb
execution.checkpointing.interval: 30000
# 此配置按实际hdfs配置来修改
state.checkpoints.dir: hdfs://hadoop1:8020/ckps
state.backend.incremental: true
cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/
cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/
cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle_2.12-0.12.0.jar /opt/module/flink-1.13.6/lib/
cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/
在/flink/lib中加入下列jar包
flink-connector-jdbc_2.12-1.13.6.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
mysql-connector-java-8.0.13.jar
在/flink/lib中加入下列jar包
flink-sql-connector-sqlserver-cdc-2.2.1.jar
在/flink/lib中加入下列jar包
flink-sql-connector-oracle-cdc-2.2.1.jar
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。