当前位置:   article > 正文

mysql全量到kafka_MySQL数据实时增量同步到Kafka Flume

mysql binlog全量+增量转发kafka

写在前面的话

需求,将MySQL里的数据实时增量同步到Kafka。接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka。不过对比了一些工具,例如:Canel,Databus,Puma等,这些都是需要部署server和client的。其中server端是由这些工具实现,配置了就可以读binlog,而client端是需要我们动手编写程序的,远没有达到我即插即用的期望和懒人的标准。

同步的格式

原作者的插件flume-ng-sql-source只支持csv的格式,如果开始同步之后,数据库表需要增减字段,则会给开发者造成很大的困扰。所以我添加了一个分支版本,用来将数据以JSON的格式,同步到kafka,字段语义更加清晰。

将此jar包下载之后,和相应的数据库驱动包,一起放到flume的lib目录之下即可。

处理机制

flume-ng-sql-source在【status.file.name】文件中记录读取数据库表的偏移量,进程重启后,可以接着上次的进度,继续增量读表。

启动说明

说明:启动命令里的【YYYYMM=201711】,会传入到flume.properties里面,替换${YYYYMM}

[test@localhost~]$ YYYYMM=201711bin/flume-ng agent-c conf-f conf/flume.properties-n sync&

-c:表示配置文件的目录,在此我们配置了flume-env.sh,也在conf目录下;

-f:指定配置文件,这个配置文件必须在全局选项的--conf参数定义的目录下,就是说这个配置文件要在前面配置的conf目录下面;

-n:表示要启动的agent的名称,也就是我们flume.properties配置文件里面,配置项的前缀,这里我们配的前缀是【sync】;

flume的配置说明

flume-env.sh

# 配置JVM堆内存和java运行参数,配置-DpropertiesImplementation参数是为了在flume.properties配置文件中使用环境变量

exportJAVA_OPTS="

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/953514
推荐阅读
相关标签
  

闽ICP备14008679号