当前位置:   article > 正文

Pgsql数据库跨网络跨平台数据增量同步方案 - kettle全表数据同步_pg 通过网闸同步

pg 通过网闸同步

需求背景:

  1. 需要跨网络:从阿里云服务器上的数据库,通过网闸使用ftp传文件的方式,将数据同步到业主的专网中;阿里云跟业主专网不能直连;
  2. 定时数据增量同步,具体同步哪些表,需要可配置;
  3. 节约工作量,最大限度上不改变当前表结构、业务流程控制等等;
  4. 增量操作包含insert、update;delete使用逻辑删除,等价于update
  5. 常见应用场景:业主的应用基本都部署在专网中,但是部分业务,需要使用微信小程序、微信公众号等,需要部署应用到互联网上,这个时候,专网跟互联网进行数据同步就有需要了。常见的网闸,会支持mysql、Oracle直连,但是支持pgsql的就比较少了,小编就碰到了这样BT的业务场景。

方案概要:

       使用kettle,定时将需要同步的数据,生成insert语句的sql文件,通过ftp方式传入专网;专网使用kettle,定时执行sql文件,将数据写入数据库;

       使用触发器,实现pgsql的replace into操作,处理update数据;

       具体的方案,往下看,当前仅介绍数据的来源跟去向流程,一些监控手段就不做介绍了。

解决的问题:

  1. 跨网络进行增量同步数据,即隔着网闸,网络不能直连,使用dblink、主从库等方案解决不了;
  2. 源表跟目标表字段名称可以不一致,但表名称要一致(表名称不一致也可以实现);
  3. 自定义增量的控制字段,可以是create_time、update_time也可以是id等
  4. 方案通用数据库全表,定制化操作极少

备注:

  1. delete使用逻辑删除,当前方案,不支持物理删除操作;参考方案:可以创建删除操作触发器,记录所有物理删除数据的表名称、主键,然后进行同步即可。
  2. 处理update数据,使用自定义触发器,仿replace into操作,先删除数据,然后新增;

具体数据增量同步方案实现:

       kettle怎么安装,怎么用,请自行百度。

从数据库到文件:

  1. 要同步数据的表,放到一张配置表里面,在获取表名称的节点,查询出来
  2. 将查询出来的表名称list,放到kettle的全局变量里面
  3. 通过【检验字段的值】【js脚本控制循环变量】构建一个循环操作;遍历list,查询数据,成sql文件;生成sql文件前,先更新增量数据标志位;
  4. 循环结束之后,将生成的sql文件,通过ftp协议上传到指定位置;

从文件到数据库:

  1. 先通过ftp协议,下载sql文件,保存到指定位置;
  2. 读取sql文件,执行insert操作;在目标数据库中,添加插入操作触发器,每次插入前,将原有主键数据删除,用于处理update数据;

关键步骤解析

       获取表名称

ctrl_column: 控制增量的字段,用于拼接查询sql,where条件

ctrl_value: 控制增量字段的取值,用于拼接查询sql,where条件取值

column_type: 控制增量字段的类型,用于拼接查询sql,强转where条件取值的类型

column_list:  表的字段list,用于拼接查询sql,调整查询数据格式、字段名称等;非必须

例如拼接后查询语句: select id, ower_id …… from t_com_dictionary where id > ‘0’::integer

sync_flag: 控制是否同步该表数据

  1. -- pgsql获取所有表名称:
  2. select tablename from pg_tables where schemaname='模式名称' order by tablename
  3. -- pgsql获取指定表字段(也可以通过kettle获取表字段):
  4. SELECT col_description(a.attrelid,a.attnum) as comment,format_type(a.atttypid,a.atttypmod) as type,a.attname as name, a.attnotnull as notnull
  5. FROM pg_class as c,pg_attribute as a
  6. where c.relname = '表名称' and a.attrelid = c.oid and a.attnum>0

表名称放入变量

       通过js脚本,获取前一个节点查询到的表名称list,将数据放到kettle内存变量里面

  1. var tableRows=previous_result.getRows();
  2. if (tableRows == null && (tableRows.size()==0)){
  3. false; //提示参数异常
  4. } else {
  5. parent_job.setVariable("tables", tableRows);//ArrayList存储表名变量
  6. parent_job.setVariable("size", tableRows.size());//存储执行表的总数量
  7. parent_job.setVariable("i", 0);//循环控制变量
  8. var tableRow = tableRows.get(0); //将第一行数据放到内存变量
  9. parent_job.setVariable("table_name", tableRow.getString("table_name",""));
  10. parent_job.setVariable("ctrl_column", tableRow.getString("ctrl_column",""));
  11. parent_job.setVariable("ctrl_value", tableRow.getString("ctrl_value",""));
  12. parent_job.setVariable("column_type", tableRow.getString("column_type",""));
  13. parent_job.setVariable("column_list", tableRow.getString("column_list",""));
  14. true;
  15. }

检验字段值

更新标志位

生成sql文件

控制循环

  1. var tableRows=previous_result.getRows();
  2. var size = new Number(parent_job.getVariable("size"));
  3. var i = new Number(parent_job.getVariable("i"))+1;
  4. if(i < size){
  5. var tableRow = tableRows.get(i); //循环将每一行数据放到内存变量
  6. parent_job.setVariable("table_name", tableRow.getString("table_name",""));
  7. parent_job.setVariable("ctrl_column", tableRow.getString("ctrl_column",""));
  8. parent_job.setVariable("ctrl_value", tableRow.getString("ctrl_value",""));
  9. parent_job.setVariable("column_type", tableRow.getString("column_type",""));
  10. parent_job.setVariable("column_list", tableRow.getString("column_list",""));
  11. true;
  12. }
  13. parent_job.setVariable("i",i); //重置i的值,用于校验字段值节点,控制循环
  14. true;

sql文件写入数据库

replace into 触发器

  1. CREATE OR REPLACE FUNCTION fn_replace_into() RETURNS TRIGGER AS $BODY$
  2. DECLARE
  3. strSQL VARCHAR;
  4. BEGIN
  5. strSQL = 'DELETE FROM '||TG_TABLE_NAME||' WHERE ID = ' || NEW.ID;
  6. EXECUTE strSQL;
  7. RETURN NEW;
  8. END;
  9. $BODY$ LANGUAGE plpgsql;
  10. CREATE TRIGGER trg_t_system_user BEFORE INSERT ON t_system_user FOR EACH ROW EXECUTE PROCEDURE fn_replace_into();

下载地址:

https://download.csdn.net/download/weixin_42686388/11224766

遇到的问题:

  1. 生成sql文件的时候,特殊类型字段数据,如timestamp、geometry类型等,会导致insert语句不能直接运行,在pgsql中可以将数据转成varchar类型,然后插入;
  2. 使用copy命令生成csv文本文件,该方案也是可行的,需要将文件放到数据库所在服务器;使用copy命令方式,性能更佳,而且不需要考虑数据类型导致insert语句异常问题;
  3. Linux定时任务,建议开启队列运行,防止同步时间间隔较短,单任务执行时间较长,导致并行任务过多,服务器内存爆掉问题;
  4. 编码问题,生成sql文件,建议使用gbk编码方式,因为执行sql的时候,没地方设置编码(至少我没找到),默认gbk;
  5. 理论上,当前方案也能实现跨数据库类型的数据同步,时间关系未测试,有兴趣的同学可以实践一下;

 

部署Linux需要自行修改一些配置、去掉文件中文名称等;可能这并不是最好的方案,但是我咨询过专业的DBA,网上了查了很多资料,都没找到更适合这种需求的方案,所以整理发出来大家一些交流学习下吧。这个方案目前已经在生产环境运行了将近半年多,还算稳定

觉得写的好的给个赞;觉得有待改进的,请留言一起学习,非喜勿喷。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/155926
推荐阅读
相关标签
  

闽ICP备14008679号