当前位置:   article > 正文

DM8到KAFKA数据同步搭建

dm8到kafka数据同步

目录​​​​​​​

1 DM端准备工作

1.1数据库参数设置

1.1.1 启用归档

1.1.2 启用附加日志

1.2 创建同步DDL表

2 DMHS源端准备工作

2.1 规划DMHS安装

2.1.1 安装目录

2.1.2 安装DMHS

 2.1.3 设置DMHS环境

 2.1.4 配置服务脚本

3 KAFKA执行端准备工作

3.1 DMHS安装

3.2 检查Java环境

3.3 配置启动脚本

4 同步参数配置

4.1 源DMHS参数配置

4.2 KAFKA执行端配置

4.2.1 DMHS同步参数配置

4.2.2  KAFKA发送参数配置

4.2.3 json_format.ini配置

5 数据同步及验证

5.1 初始装载数据及字典

5.2  启动数据同步

5.3 数据同步验证


1 DM端准备工作

1.1数据库参数设置

1.1.1 启用归档

        DMHS同步源端数据库需要开启归档模式,使用如下SQL检查归档是否开启。

SQL> select arch_mode from v$database;

        查询结果:

         如果ARCH_MODE为“N”,需要手动开启归档。使用如下SQL语句。

  1. SQL> alter database mount;
  2. SQL> alter database add archivelog ‘dest=/dm8/arch,type=local,file_size=128,space_limit=0’;
  3. SQL> alter database archivelog;
  4. SQL> alter database open;

1.1.2 启用附加日志

        DM端需要开启附加日志参数,来支持数据实时同步。查询附加日志是否开启,用如下SQL语句。

SQL> select para_value from v$dm_ini where para_name=’RLOG_APPEND_LOGIC’;

        查询结果:

         如果PARA_VALUE值大于0,说明已经开启附加日志归档,否则需要开启日志。使用如下SQL语句。

SQL> call sp_set_para_value(2,’RLOG_APPEND_LOGIC’,2);

1.2 创建同步DDL表

        DMHS支持DDL同步功能,但是默认不启用该功能。对于同步源端为DM8数据库,有两种方式启用DDL同步功能:

  1. 无触发器方式:启用DM8数据库系统参数“RLOG_APPPEND_SYSTAB_LOGIC”,该参数启用在日志中记录系统表逻辑操作功能。该参数默认取值为0,即不启用;设置为1时,启用该功能。
  2. 触发器方式:在源端DM8数据库中以SYSDBA用户执行DDL同步脚本,在SYSDBA模式下创建相关DMHS系统表及触发器。

       同步脚本位于DMHS安装目录的scripts子目录下,命名为:ddl_sql_dm8.sql

2 DMHS源端准备工作

2.1 规划DMHS安装

2.1.1 安装目录

源端

目的端

安装路径

/dm8/dmhs

/dm8/dmhs

2.1.2 安装DMHS

./dmhs_V4.2.69_dm8-kafka_rev103299_rh6_64_veri_20211105.bin -i

Extract install files..........

1.English(English)

2.Simplified Chinese(简体中文)

Select the language to install[2.Simplified Chinese(简体中文)]:2

/tmp/DMHSInstall/install.log

1.免费试用达梦数据实时同步

2.使用已申请的Key文件

验证许可证文件[1.免费试用达梦数据实时同步]:1

1.精简版

2.完整版(web客户端)

3.自定义

安装类型[1.精简版]:1

1.实时同步软件服务器

2.远程部署工具

3.实时同步软件配置助手

4.手册

所需磁盘空间:536 MB

安装目录: [/home/dmdba/dmhs]/dm8/dmhs

1.统一部署

2.现在初始化

是否初始化达梦数据实时同步系统[1.统一部署]:1

正在安装

default start ...    default finished.

server start ...    server finished.

hs_agent start ...    hs_agent finished.

hsca start ...    hsca finished.

doc start ...    doc finished.

postinstall start ...    postinstall finished.

正在创建快捷方式

安装成功

远程部署工具配置

远程部署工具名称[HsAgent]:

主机Ip(外网)[29.229.20.7](29.229.20.7,xxx.xxx.xxx.182):

远程部署工具管理端口[5456](1000-65535):

内置数据库轮询间隔[3](1-60):

内置数据库IP[]:

输入有误,请重新输入!

内置数据库IP[]:xxx.xxx.xxx.182

内置数据库端口[15236](1000-65535):

内置数据库用户名[SYSDBA]:

内置数据库密码[SYSDBA]:

服务脚本环境变量设置

依赖库路径

提示:此配置项供用户配置源或目的数据库依赖库路径和odbc依赖库路径, 多个路径以":"隔开(例:/opt/dmdbms/bin:/usr/local/lib),此配置项会添加到服务脚本的NEED_LIB_PATH的变量值中。

请配置依赖库路径:

Oracle字符集

提示:注意此处配置为ORACLE数据库的NLS_LANG,此配置项由源端数据库字符集编码格式决定,需与源端字符集编码适配。

1.SIMPLIFIED CHINESE_CHINA.ZHS32GB18030

2.SIMPLIFIED CHINESE_CHINA.AL32UTF8

3.TRADITIONAL CHINESE_TAIWAN.ZHT16BIG5

4.TRADITIONAL CHINESE_TAIWAN.AL32UTF8

5.AMERICAN_AMERICA.AL32UTF8

6.AMERICAN_AMERICA.WE8ISO8859P1

7.AMERICAN_AMERICA.WE8ISO8859P15

8.AMERICAN_AMERICA.ZHS16GBK

9.不设置

请配置Oracle字符集[9.不设置]:

远程控制服务

1.自动

2.手动

启动方式:[2.手动]

正在创建远程控制服务

达梦数据实时同步V4.0安装完成

更多安装信息,请查看安装日志文件:

/home/dmdba/dmhs/source/log/install.log

 2.1.3 设置DMHS环境

        1)检查DMHS是否缺少的so文件

  1. cd /dm8/dmhs/bin
  2. ldd ./libdmhs_ld_dm8.so

          查询结果如下:

  1. #设置DM8的安装路径到LD_LIBRARY_PATH,因为需要用到libdmoci.so这个库。
  2. #如果查询到没有对应的so文件,需要我们从其他地方进行拷贝。如下例:
  3. find / -name libdmoci.so
  4. cp /dm8/dmhs/bin/stat/libdmoci.so /lib64/libdmoci.so

 2.1.4 配置服务脚本

  1. #进入安装路径下service_template
  2. cd /dm8/dmhs/bin/service_template
  3. cp TemplateDmhsService ../DmhsService

        具体修改如下:

3 KAFKA执行端准备工作

3.1 DMHS安装

        DMHS的安装与源端安装相同,但需要检查如下组件是否存在。

组件名称

功能

dmga-dmhs-kafka-service.jar

Java同步程序,负责将json串发送至Kafka

fastjson-1.2.21.jar

json格式校验包,进行json格式校验

dmhs_kafka.properties

Java同步程序的配置文件,进行Kafka发送相关参数设置

mgrddltest(可选)

Kafka执行测试程序,可以将Json串落地为文件,方便调试分析

start_dmhs_kafka.sh

DMHS的Kafka执行端启动脚本

kafka_1_0.ctl

根据siteid命名的Kafka执行端控制文件,记录有同步起始点以及检查点的LSN值,不能删除

 3.2 检查Java环境

        kafka执行端需要启动Java程序。环境如下图所示。

 3.3 配置启动脚本

        首先,需要查到Java安装路径。

  1. #查询java安装目录
  2. which java
  3. ll -ltr /bin/java
  4. ll -ltr /etc/alternatives/java

        查询结果如下图所示:

        #配置start_dmhs_kafka.sh 

  1. #!/bin/sh
  2. export LANG=en_US.UTF-8
  3. /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-1.el7.x86_64/jre/bin/java -Djava.ext.dirs="/opt/kafka/kafka_2.11-2.4.0/libs:." com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService /dm8/dmhs/bin/dmhs_kafka.properties

        注意:
                1)/opt/kafka/kafka_2.11-2.4.0/libs为kafka安装目录下的libs依赖包目录
                2)/dm8/dmhs/bin/dmhs_kafka.properties为dmhs-kafka的同步配置

4 同步参数配置

4.1 源DMHS参数配置

  1. <!--配置dmhs.hs-->
  2. <?xml version="1.0" encoding="GB2312" standalone="no"?>
  3. <dmhs>
  4. <base>
  5. <lang>ch</lang>
  6. <mgr_port>5347</mgr_port>
  7. <chk_interval>3</chk_interval>
  8. <ckpt_interval>60</ckpt_interval>
  9. <siteid>112</siteid>
  10. <version>2.0</version>
  11. </base>
  12. <cpt>
  13. <db_type>DM8</db_type>
  14. <db_server>192.168.61.206</db_server>
  15. <db_user>DMHS</db_user>
  16. <db_pwd>Dameng123</db_pwd>
  17. <db_port>5236</db_port>
  18. <idle_time>10</idle_time>
  19. <ddl_mask>OBJ:OP</ddl_mask> <!--DDL同步参数-->
  20. <cpt_mask>PARSE:POST:REG_OP2</cpt_mask>
  21. <char_code>PG_UTF8</char_code>
  22. <n2c>0</n2c>
  23. <update_fill_flag>3</update_fill_flag>
  24. <arch>
  25. <clear_interval>600</clear_interval>
  26. <clear_flag>0</clear_flag>
  27. </arch>
  28. <send>
  29. <ip>192.168.61.205</ip>
  30. <mgr_port>15345</mgr_port>
  31. <data_port>15346</data_port>
  32. <net_pack_size>256</net_pack_size>
  33. <net_turns>0</net_turns>
  34. <crc_check>0</crc_check>
  35. <trigger>0</trigger>
  36. <constraint>0</constraint>
  37. <identity>0</identity>
  38. <filter>
  39. <enable>
  40. <item>DMHS.*</item>
  41. </enable>
  42. </filter>
  43. <map>
  44. </map>
  45. </send>
  46. </cpt>
  47. </dmhs>

4.2 KAFKA执行端配置

4.2.1 DMHS同步参数配置

  1. <!--配置dmhs.hs-->
  2. <?xml version="1.0" encoding="GB2312" standalone="no"?>
  3. <dmhs>
  4. <base>
  5. <lang>ch</lang>
  6. <version>2.0</version>
  7. <mgr_port>15345</mgr_port>
  8. <chk_interval>3</chk_interval>
  9. <ckpt_interval>60</ckpt_interval>
  10. <siteid>2</siteid>
  11. </base>
  12. <exec>
  13. <recv>
  14. <data_port>15346</data_port>
  15. </recv>
  16. <enable>1</enable>
  17. <char_code>PG_UTF8</char_code>
  18. <level>0</level>
  19. <exec_thr>4</exec_thr>
  20. <exec_sql>1024</exec_sql>
  21. <exec_trx>5000</exec_trx>
  22. <exec_rows>250</exec_rows>
  23. <recv_caches>8</recv_caches>
  24. <trxid_tables>1</trxid_tables>
  25. <exec_policy>2</exec_policy>
  26. <is_kafka>1</is_kafka>
  27. <max_packet_size>16</max_packet_size>
  28. <json_format>file</json_format>
  29. </exec>
  30. </dmhs>

4.2.2  KAFKA发送参数配置

        Kafka发送参数在配置文件dmhs_kafka.properties中进行配置,用于设置Kafka发送的相关属性,如配置发送Kafka的topic名称、是否进行json格式校验、Kafka发送确认、batch.size参数、linger.ms参数配置等。

  1. #配置dmhs_kafka.properties
  2. # DMHS config file path
  3. dmhs.conf.path=/dm8/dmhs/bin/dmhs.hs #实际dmhs安装路径
  4. # kafka broker list,such as ip1:port1,ip2:port2,...
  5. bootstrap.servers=192.168.61.205:9092,192.168.61.202:9092 #Kafka端口,可以有多个
  6. # kafka topic name
  7. kafka.topic.name=dm-test1 #根据具体项目创建topic
  8. # whether to enable JSON format check
  9. json.format.check=1
  10. # How many messages print cost time
  11. print.message.num=1000
  12. # How many messages batch to get
  13. dmhs.min.batch.size=20
  14. # kafka serializer class
  15. key.serializer=org.apache.kafka.common.serialization.StringSerializer
  16. value.serializer=org.apache.kafka.common.serialization.StringSerializer
  17. # kafka partitioner config
  18. partitioner.class=com.dameng.dmhs.dmga.service.impl.SimplePartitioner #采用多分区
  19. # kafka request acks config
  20. acks=-1
  21. max.request.size=5024000
  22. #batch.size=1048576
  23. #linger.ms=3
  24. #buffer.memory=134217728
  25. retries=3
  26. #enable.idempotence=true
  27. compression.type=none
  28. max.in.flight.requests.per.connection=1
  29. send.buffer.bytes=1048576
  30. metadata.max.age.ms=300000

4.2.3 json_format.ini配置

  1. BATCH_COMMIT = 1
  2. OP_TIME_FORMAT = (yyyy-mm-dd hh:mi:ss.ff)
  3. CUR_TIME_FORMAT = (yyyy-mm-ddThh:mi:ss.ff)
  4. NEED_CRLF = 0
  5. OPCMD_LEN = 7
  6. SET_NULL = 1
  7. SET_QUOTA = 0
  8. CHAR_REPLACE = (",\"),(0x08,\\b),(0x0c,\\f),(0x0D,\\n),(0x0A,\\r),(0x09,\\t),(0x00,\\0)
  9. ADD_TABLE_TOPIC = 0
  10. NEW_VALUES = ALL
  11. SET_ROWID_COL = 1
  12. LOB_PIECE = 0
  13. CLOB_FORMAT = CHAR
  14. JSON_FORMAT_INS={"table":"#SCHEMA.#TABLE","op_type":"#OP_TYPE","op_ts":"#OP_TIME","current_ts":"#TIME","pos":"#OP_SCN","primary_keys":[#PRIMARY_KEY],#CUT"after":{#NEW_VALUES}}
  15. JSON_FORMAT_UPD={"table":"#SCHEMA.#TABLE","op_type":"#OP_TYPE","op_ts":"#OP_TIME","current_ts":"#TIME","pos":"#OP_SCN","primary_keys":[#PRIMARY_KEY],#CUT"before":{#OLD_VALUES},"after":{#NEW_VALUES}}
  16. JSON_FORMAT_DEL={"table":"#SCHEMA.#TABLE","op_type":"#OP_TYPE","op_ts":"#OP_TIME","current_ts":"#TIME","pos":"#OP_SCN","primary_keys":[#PRIMARY_KEY],#CUT"before":{#OLD_VALUES}
  17. JSON_FORMAT_DDL={"table":"#SCHEMA.#TABLE","op_type":"#OP_TYPE","op_ts":"#OP_TIME","current_ts":"#TIME","pos":"#OP_SCN","after":{"DB_TYPE":"DM8","OBJECT_TYPE":"#OBJ_TYPE","OBJECT_NAME":"#SCHEMA.#TABLE","DDL_TEXT":"#DDL_SQL"}}

5 数据同步及验证

5.1 初始装载数据及字典

         1.启动kafka执行端

  1. cd /dm8/dmhs/bin
  2. ./start_dmhs_kafka.sh

        结果如下图所示:

         2.源端采用后台启动

 ./DmhsService start

         3. 装载字典及初始化数据

  1. DMHS> connect 192.168.61.206:5347
  2. DMHS> clear exec lsn
  3. DMHS> copy 0 "sch.name='SYSDBA'" dict|insert

        执行端日志:

5.2  启动数据同步

        上述字典装载时已启动Kafka执行端,并完成了字典初始装载操作。此时可以执行start cpt启动源端的捕获服务,进行增量数据实时同步。

        源端日志:

        执行端日志:

 5.3 数据同步验证

          源端执行建表:

  1. create table dmhs.test2(id int,dt date);
  2. #插入数据
  3. declare
  4. i int:=0;
  5. V_SQL varchar2(200);
  6. V_TIME varchar2(20);
  7. begin
  8. for i in 1..100000 loop
  9. select to_char(sysdate,'YYYY-MM-DD HH:MI:SS') into V_TIME from dual;
  10. V_SQL :='insert into dmhs.test2 values('||i||','''||V_TIME||''');';
  11. execute immediate V_SQL;
  12. end loop;
  13. commit;
  14. end;

           执行端查看消费验证:

./kafka-console-consumer.sh --bootstrap-server 192.168.61.202:9092192.168.61.2069092 --topic dm-test1  --from-beginning --group dm-test1

        查看具体消费数据: 

./kafka-consumer-groups.sh --bootstrap-server 192.168.61.205:9092,192.168.61.202:9092 --describe --group  dm-test1

         至此,DM到KAFKA搭建已经完成。


想要了解更多关于DM数据库的信息,可以到达梦官网查看相应的手册:

达梦数据库 - 新一代大型通用关系型数据库 | 达梦在线服务平台

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/538279
推荐阅读
相关标签
  

闽ICP备14008679号