赞
踩
目录
DMHS同步源端数据库需要开启归档模式,使用如下SQL检查归档是否开启。
SQL> select arch_mode from v$database;
查询结果:
如果ARCH_MODE为“N”,需要手动开启归档。使用如下SQL语句。
- SQL> alter database mount;
- SQL> alter database add archivelog ‘dest=/dm8/arch,type=local,file_size=128,space_limit=0’;
- SQL> alter database archivelog;
- SQL> alter database open;
DM端需要开启附加日志参数,来支持数据实时同步。查询附加日志是否开启,用如下SQL语句。
SQL> select para_value from v$dm_ini where para_name=’RLOG_APPEND_LOGIC’;
查询结果:
如果PARA_VALUE值大于0,说明已经开启附加日志归档,否则需要开启日志。使用如下SQL语句。
SQL> call sp_set_para_value(2,’RLOG_APPEND_LOGIC’,2);
DMHS支持DDL同步功能,但是默认不启用该功能。对于同步源端为DM8数据库,有两种方式启用DDL同步功能:
同步脚本位于DMHS安装目录的scripts子目录下,命名为:ddl_sql_dm8.sql。
源端 | 目的端 | |
安装路径 | /dm8/dmhs | /dm8/dmhs |
./dmhs_V4.2.69_dm8-kafka_rev103299_rh6_64_veri_20211105.bin -i
Extract install files..........
1.English(English)
2.Simplified Chinese(简体中文)
Select the language to install[2.Simplified Chinese(简体中文)]:2
/tmp/DMHSInstall/install.log
1.免费试用达梦数据实时同步
2.使用已申请的Key文件
验证许可证文件[1.免费试用达梦数据实时同步]:1
1.精简版
2.完整版(web客户端)
3.自定义
安装类型[1.精简版]:1
1.实时同步软件服务器
2.远程部署工具
3.实时同步软件配置助手
4.手册
所需磁盘空间:536 MB
安装目录: [/home/dmdba/dmhs]/dm8/dmhs
1.统一部署
2.现在初始化
是否初始化达梦数据实时同步系统[1.统一部署]:1
正在安装
default start ... default finished.
server start ... server finished.
hs_agent start ... hs_agent finished.
hsca start ... hsca finished.
doc start ... doc finished.
postinstall start ... postinstall finished.
正在创建快捷方式
安装成功
远程部署工具配置
远程部署工具名称[HsAgent]:
主机Ip(外网)[29.229.20.7](29.229.20.7,xxx.xxx.xxx.182):
远程部署工具管理端口[5456](1000-65535):
内置数据库轮询间隔[3](1-60):
内置数据库IP[]:
输入有误,请重新输入!
内置数据库IP[]:xxx.xxx.xxx.182
内置数据库端口[15236](1000-65535):
内置数据库用户名[SYSDBA]:
内置数据库密码[SYSDBA]:
服务脚本环境变量设置
依赖库路径
提示:此配置项供用户配置源或目的数据库依赖库路径和odbc依赖库路径, 多个路径以":"隔开(例:/opt/dmdbms/bin:/usr/local/lib),此配置项会添加到服务脚本的NEED_LIB_PATH的变量值中。
请配置依赖库路径:
Oracle字符集
提示:注意此处配置为ORACLE数据库的NLS_LANG,此配置项由源端数据库字符集编码格式决定,需与源端字符集编码适配。
1.SIMPLIFIED CHINESE_CHINA.ZHS32GB18030
2.SIMPLIFIED CHINESE_CHINA.AL32UTF8
3.TRADITIONAL CHINESE_TAIWAN.ZHT16BIG5
4.TRADITIONAL CHINESE_TAIWAN.AL32UTF8
5.AMERICAN_AMERICA.AL32UTF8
6.AMERICAN_AMERICA.WE8ISO8859P1
7.AMERICAN_AMERICA.WE8ISO8859P15
8.AMERICAN_AMERICA.ZHS16GBK
9.不设置
请配置Oracle字符集[9.不设置]:
远程控制服务
1.自动
2.手动
启动方式:[2.手动]
正在创建远程控制服务
达梦数据实时同步V4.0安装完成
更多安装信息,请查看安装日志文件:
/home/dmdba/dmhs/source/log/install.log
1)检查DMHS是否缺少的so文件
- cd /dm8/dmhs/bin
- ldd ./libdmhs_ld_dm8.so
查询结果如下:
- #设置DM8的安装路径到LD_LIBRARY_PATH,因为需要用到libdmoci.so这个库。
- #如果查询到没有对应的so文件,需要我们从其他地方进行拷贝。如下例:
- find / -name libdmoci.so
- cp /dm8/dmhs/bin/stat/libdmoci.so /lib64/libdmoci.so
- #进入安装路径下service_template
- cd /dm8/dmhs/bin/service_template
- cp TemplateDmhsService ../DmhsService
具体修改如下:
DMHS的安装与源端安装相同,但需要检查如下组件是否存在。
组件名称 | 功能 |
dmga-dmhs-kafka-service.jar | Java同步程序,负责将json串发送至Kafka |
fastjson-1.2.21.jar | json格式校验包,进行json格式校验 |
dmhs_kafka.properties | Java同步程序的配置文件,进行Kafka发送相关参数设置 |
mgrddltest(可选) | Kafka执行测试程序,可以将Json串落地为文件,方便调试分析 |
start_dmhs_kafka.sh | DMHS的Kafka执行端启动脚本 |
kafka_1_0.ctl | 根据siteid命名的Kafka执行端控制文件,记录有同步起始点以及检查点的LSN值,不能删除 |
kafka执行端需要启动Java程序。环境如下图所示。
首先,需要查到Java安装路径。
- #查询java安装目录
- which java
- ll -ltr /bin/java
- ll -ltr /etc/alternatives/java
查询结果如下图所示:
#配置start_dmhs_kafka.sh
- #!/bin/sh
- export LANG=en_US.UTF-8
- /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-1.el7.x86_64/jre/bin/java -Djava.ext.dirs="/opt/kafka/kafka_2.11-2.4.0/libs:." com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService /dm8/dmhs/bin/dmhs_kafka.properties
注意:
1)/opt/kafka/kafka_2.11-2.4.0/libs为kafka安装目录下的libs依赖包目录
2)/dm8/dmhs/bin/dmhs_kafka.properties为dmhs-kafka的同步配置
- <!--配置dmhs.hs-->
- <?xml version="1.0" encoding="GB2312" standalone="no"?>
- <dmhs>
- <base>
- <lang>ch</lang>
- <mgr_port>5347</mgr_port>
- <chk_interval>3</chk_interval>
- <ckpt_interval>60</ckpt_interval>
- <siteid>112</siteid>
- <version>2.0</version>
- </base>
- <cpt>
- <db_type>DM8</db_type>
- <db_server>192.168.61.206</db_server>
- <db_user>DMHS</db_user>
- <db_pwd>Dameng123</db_pwd>
- <db_port>5236</db_port>
- <idle_time>10</idle_time>
- <ddl_mask>OBJ:OP</ddl_mask> <!--DDL同步参数-->
- <cpt_mask>PARSE:POST:REG_OP2</cpt_mask>
- <char_code>PG_UTF8</char_code>
- <n2c>0</n2c>
- <update_fill_flag>3</update_fill_flag>
- <arch>
- <clear_interval>600</clear_interval>
- <clear_flag>0</clear_flag>
- </arch>
- <send>
- <ip>192.168.61.205</ip>
- <mgr_port>15345</mgr_port>
- <data_port>15346</data_port>
- <net_pack_size>256</net_pack_size>
- <net_turns>0</net_turns>
- <crc_check>0</crc_check>
- <trigger>0</trigger>
- <constraint>0</constraint>
- <identity>0</identity>
- <filter>
- <enable>
- <item>DMHS.*</item>
- </enable>
- </filter>
- <map>
- </map>
- </send>
- </cpt>
- </dmhs>
- <!--配置dmhs.hs-->
- <?xml version="1.0" encoding="GB2312" standalone="no"?>
- <dmhs>
- <base>
- <lang>ch</lang>
- <version>2.0</version>
- <mgr_port>15345</mgr_port>
- <chk_interval>3</chk_interval>
- <ckpt_interval>60</ckpt_interval>
- <siteid>2</siteid>
- </base>
- <exec>
- <recv>
- <data_port>15346</data_port>
- </recv>
- <enable>1</enable>
- <char_code>PG_UTF8</char_code>
- <level>0</level>
- <exec_thr>4</exec_thr>
- <exec_sql>1024</exec_sql>
- <exec_trx>5000</exec_trx>
- <exec_rows>250</exec_rows>
- <recv_caches>8</recv_caches>
- <trxid_tables>1</trxid_tables>
- <exec_policy>2</exec_policy>
- <is_kafka>1</is_kafka>
- <max_packet_size>16</max_packet_size>
- <json_format>file</json_format>
- </exec>
- </dmhs>
Kafka发送参数在配置文件dmhs_kafka.properties中进行配置,用于设置Kafka发送的相关属性,如配置发送Kafka的topic名称、是否进行json格式校验、Kafka发送确认、batch.size参数、linger.ms参数配置等。
- #配置dmhs_kafka.properties
- # DMHS config file path
- dmhs.conf.path=/dm8/dmhs/bin/dmhs.hs #实际dmhs安装路径
- # kafka broker list,such as ip1:port1,ip2:port2,...
- bootstrap.servers=192.168.61.205:9092,192.168.61.202:9092 #Kafka端口,可以有多个
- # kafka topic name
- kafka.topic.name=dm-test1 #根据具体项目创建topic
- # whether to enable JSON format check
- json.format.check=1
- # How many messages print cost time
- print.message.num=1000
- # How many messages batch to get
- dmhs.min.batch.size=20
- # kafka serializer class
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
- # kafka partitioner config
- partitioner.class=com.dameng.dmhs.dmga.service.impl.SimplePartitioner #采用多分区
- # kafka request acks config
- acks=-1
- max.request.size=5024000
- #batch.size=1048576
- #linger.ms=3
- #buffer.memory=134217728
- retries=3
- #enable.idempotence=true
- compression.type=none
- max.in.flight.requests.per.connection=1
- send.buffer.bytes=1048576
- metadata.max.age.ms=300000
- BATCH_COMMIT = 1
- OP_TIME_FORMAT = (yyyy-mm-dd hh:mi:ss.ff)
- CUR_TIME_FORMAT = (yyyy-mm-ddThh:mi:ss.ff)
- NEED_CRLF = 0
- OPCMD_LEN = 7
- SET_NULL = 1
- SET_QUOTA = 0
- CHAR_REPLACE = (",\"),(0x08,\\b),(0x0c,\\f),(0x0D,\\n),(0x0A,\\r),(0x09,\\t),(0x00,\\0)
- ADD_TABLE_TOPIC = 0
- NEW_VALUES = ALL
- SET_ROWID_COL = 1
- LOB_PIECE = 0
- CLOB_FORMAT = CHAR
-
- JSON_FORMAT_INS={"table":"#SCHEMA.#TABLE","op_type":"#OP_TYPE","op_ts":"#OP_TIME","current_ts":"#TIME","pos":"#OP_SCN","primary_keys":[#PRIMARY_KEY],#CUT"after":{#NEW_VALUES}}
- JSON_FORMAT_UPD={"table":"#SCHEMA.#TABLE","op_type":"#OP_TYPE","op_ts":"#OP_TIME","current_ts":"#TIME","pos":"#OP_SCN","primary_keys":[#PRIMARY_KEY],#CUT"before":{#OLD_VALUES},"after":{#NEW_VALUES}}
- JSON_FORMAT_DEL={"table":"#SCHEMA.#TABLE","op_type":"#OP_TYPE","op_ts":"#OP_TIME","current_ts":"#TIME","pos":"#OP_SCN","primary_keys":[#PRIMARY_KEY],#CUT"before":{#OLD_VALUES}
- JSON_FORMAT_DDL={"table":"#SCHEMA.#TABLE","op_type":"#OP_TYPE","op_ts":"#OP_TIME","current_ts":"#TIME","pos":"#OP_SCN","after":{"DB_TYPE":"DM8","OBJECT_TYPE":"#OBJ_TYPE","OBJECT_NAME":"#SCHEMA.#TABLE","DDL_TEXT":"#DDL_SQL"}}
1.启动kafka执行端
- cd /dm8/dmhs/bin
- ./start_dmhs_kafka.sh
结果如下图所示:
2.源端采用后台启动
./DmhsService start
3. 装载字典及初始化数据
- DMHS> connect 192.168.61.206:5347
- DMHS> clear exec lsn
- DMHS> copy 0 "sch.name='SYSDBA'" dict|insert
执行端日志:
上述字典装载时已启动Kafka执行端,并完成了字典初始装载操作。此时可以执行start cpt启动源端的捕获服务,进行增量数据实时同步。
源端日志:
执行端日志:
源端执行建表:
- create table dmhs.test2(id int,dt date);
-
- #插入数据
- declare
- i int:=0;
- V_SQL varchar2(200);
- V_TIME varchar2(20);
- begin
- for i in 1..100000 loop
- select to_char(sysdate,'YYYY-MM-DD HH:MI:SS') into V_TIME from dual;
- V_SQL :='insert into dmhs.test2 values('||i||','''||V_TIME||''');';
- execute immediate V_SQL;
- end loop;
- commit;
- end;
执行端查看消费验证:
./kafka-console-consumer.sh --bootstrap-server 192.168.61.202:9092,192.168.61.206:9092 --topic dm-test1 --from-beginning --group dm-test1
查看具体消费数据:
./kafka-consumer-groups.sh --bootstrap-server 192.168.61.205:9092,192.168.61.202:9092 --describe --group dm-test1
至此,DM到KAFKA搭建已经完成。
想要了解更多关于DM数据库的信息,可以到达梦官网查看相应的手册:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。