赞
踩
由于flinkx已经改名chunjun 官网已不存在
(https://gitee.com/lugela/flinkx#flinkx)这里可以看到flinkx的操作文档
1、上传并解压
unzip flinkx-1.10.zip -d /usr/local/soft/
2、配置环境变量
FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$FLINKX_HOME/bin:$PATH
3、给bin/flinkx这个文件加上执行权限
chmod +x flinkx
4、修改配置文件,设置运行端口
vim flinkconf/flink-conf.yaml
## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888
启动
命令行参数选项
FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。
不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
FlinkX是一个基于Flink的批流统一体的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等
在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行.
场景
将mysql Y1数据库下的Student表数据写入HDFS上的指定路径中
参考文档
mysqlreader:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/reader/mysqlreader.md)
hdfswriter:(https://gitee.com/lugela/flinkx/blob/1.10_release/docs/offline/writer/hdfswriter.md)
创建mysql2hdfs.json文件
{ "job": { "content": [ { "reader": { "parameter": { "username": "root", "password": "123456", "connection": [ { "jdbcUrl": [ "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false" ], "table": [ "Student" ] } ], "column": [ "*" ], "where": "Sid > 05 ", "requestAccumulatorInterval": 2 }, "name": "mysqlreader" }, "writer": { "name": "hdfswriter", "parameter": { "path": "hdfs://master:9000/bigdata30/flinkx/out1", "defaultFS": "hdfs://master:9000", "column": [ { "name": "col1", "index": 0, "type": "string" },{ "name": "col2", "index": 1, "type": "string" },{ "name": "col3", "index": 2, "type": "string" },{ "name": "col4", "index": 3, "type": "string" } ], "fieldDelimiter": ",", "fileType": "text", "writeMode": "append" } } } ], "setting": { "restore": { "isRestore": false, "isStream": false }, "errorLimit": {}, "speed": { "channel": 1 } } } }
运行模式
运行:
flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
监听日志:
flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件
tail -f nohup.out
通过web界面查看任务运行情况
http://master:8888
hdfs上出现文件:
查看该文件:
hdfs dfs -cat /bigdata30/flinkx/out1/0.44b7d6c8dcaadcc14ae55fb482f9fb27.0
出现Sid大于05的学生:
hivewrite:(https://github.com/oceanos/flinkx/blob/1.8_release/docs/hivewriter.md)
配置文件:
{ "job": { "content": [ { "reader": { "parameter": { "username": "root", "password": "123456", "connection": [ { "jdbcUrl": [ "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false" ], "table": [ "Student" ] } ], "column": [ "*" ], "where": "Sid > 05 ", "requestAccumulatorInterval": 2 }, "name": "mysqlreader" }, "writer": { "name": "hivewriter", "parameter": { "jdbcUrl": "jdbc:hive2://master:10000/bigdata30", "username": "", "password": "", "fileType": "text", "fieldDelimiter": ",", "writeMode": "overwrite", "compress": "", "charsetName": "UTF-8", "maxFileSize": 1073741824, "tablesColumn": "{\"Student\":[{\"key\":\"SId\",\"type\":\"string\"},{\"key\":\"Sname\",\"type\":\"string\"},{\"key\":\"Sage\",\"type\":\"string\"},{\"key\":\"Ssex\",\"type\":\"string\"}]}", "defaultFS": "hdfs://master:9000" } } } ], "setting": { "restore": { "isRestore": false, "isStream": false }, "errorLimit": {}, "speed": { "channel": 1 } } } }
在hive中建表:
CREATE TABLE `bigdata30`.`Student`(
`SId` STRING,
`Sname` STRING,
`Sage` STRING,
`Ssex` STRING
)
PARTITIONED BY (
`pt` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
启动hiveserver2
启动任务
flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
运行发现报错 无法解决。
翻阅chunjun官网 在hive-sink中发现 只支持hive1.x hive2.x 现hive版本为3.1.2 不支持 猜测报错原因
尝试使用chunjun 解决
场景
将mysql Y1数据库中的Student表数据写入HBase flinkx_Student表中
配置文件
{ "job": { "content": [ { "reader": { "parameter": { "username": "root", "password": "123456", "connection": [ { "jdbcUrl": [ "jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=utf8&useSSL=false" ], "table": [ "Student" ] } ], "column": [ "*" ], "where": "Sid > 05 ", "requestAccumulatorInterval": 2 }, "name": "mysqlreader" }, "writer": { "name": "hbasewriter", "parameter": { "hbaseConfig": { "hbase.zookeeper.property.clientPort": "2181", "hbase.rootdir": "hdfs://master:9000/hbase", "hbase.cluster.distributed": "true", "hbase.zookeeper.quorum": "master,node1,node2", "zookeeper.znode.parent": "/hbase" }, "table": "flinkx_Student", "rowkeyColumn": "$(cf1:SId)", "column": [ { "name": "cf1:SId", "type": "string" }, { "name": "cf1:Sname", "type": "string" }, { "name": "cf1:Sage", "type": "string" }, { "name": "cf1:Ssex", "type": "string" } ] } } } ], "setting": { "restore": { "isRestore": false, "isStream": false }, "errorLimit": {}, "speed": { "channel": 1 } } } }
在hbase中创建flinkx_Student表
create 'flinkx_Student','cf1'
启动
flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
hbase中的flinkx_Student表出现数据
场景
将mysql Y1数据库中的Student表数据写入datax1数据库中的Student2表中
配置文件 mysql2mysql.json
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ { "name": "SId", "type": "string" }, { "name": "Sname", "type": "string" }, { "name": "Sage", "type": "string" }, { "name": "Ssex", "type": "string" } ], "username": "root", "password": "123456", "connection": [ { "jdbcUrl": [ "jdbc:mysql://master:3306/Y1?useSSL=false" ], "table": [ "Student" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "123456", "connection": [ { "jdbcUrl": "jdbc:mysql://master:3306/datax1?useSSL=false", "table": [ "Student2" ] } ], "writeMode": "insert", "column": [ { "name": "SId", "type": "string" }, { "name": "Sname", "type": "string" }, { "name": "Sage", "type": "string" }, { "name": "Ssex", "type": "string" } ] } } } ], "setting": { "speed": { "channel": 1, "bytes": 0 } } } }
在mysql datax1数据库中建表:
create table if not exists datax1.Student2(
SID varchar(10),
Sname varchar(100),
Sage varchar(100),
Ssex varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;
运行:
flinkx -mode local -job ./mysql2mysql.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
进入网页查看:
master:8888
查看Student2表 数据已导入:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。