当前位置:   article > 正文

数据集成工具——FlinkX_集成flinkx

集成flinkx

FlinkX的安装与简单使用

1、FlinkX的安装

1)、上传并解压

unzip flinkx-1.10.zip -d /usr/local/soft/
  • 1

2)、配置环境变量

3)、给bin/flinkx这个文件加上执行权限

chmod a+x flinkx
  • 1

4)、修改配置文件,设置运行端口

vim flinkconf/flink-conf.yaml
  • 1
## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888
  • 1
  • 2

2、FlinkX的简单使用

1)、MySQLToHDFS

  • 配置文件
{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ],
                                "table": [
                                    "student"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "customSql": "",
                        "where": "clazz = '理科二班'",
                        "splitPk": "",
                        "queryTimeOut": 1000,
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "path": "hdfs://master:9000/data/flinkx/student",
                        "defaultFS": "hdfs://master:9000",
                        "column": [
                            {
                                "name": "col1",
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "name": "col2",
                                "index": 1,
                                "type": "string"
                            },
                            {
                                "name": "col3",
                                "index": 2,
                                "type": "string"
                            },
                            {
                                "name": "col4",
                                "index": 3,
                                "type": "string"
                            },
                            {
                                "name": "col5",
                                "index": 4,
                                "type": "string"
                            },
                            {
                                "name": "col6",
                                "index": 5,
                                "type": "string"
                            }
                        ],
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "writeMode": "overwrite"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 启动任务
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHDFS.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
  • 1
  • 监听日志

flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件

tail -f nohup.out
  • 1
  • 通过web界面查看任务运行情况
http://master:8888
  • 1

2)、MySQLToHive

  • 配置文件
{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ],
                                "table": [
                                    "student"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "customSql": "",
                        "where": "clazz < '文科二班'",
                        "splitPk": "",
                        "queryTimeOut": 1000,
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hivewriter",
                    "parameter": {
                        "jdbcUrl": "jdbc:hive2://master:10000/testflinkx",
                        "username": "",
                        "password": "",
                        "fileType": "text",
                        "fieldDelimiter": ",",
                        "writeMode": "overwrite",
                        "compress": "",
                        "charsetName": "UTF-8",
                        "maxFileSize": 1073741824,
                        "tablesColumn": "{\"student\":[{\"key\":\"id\",\"type\":\"string\"},{\"key\":\"name\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"string\"}]}",
                        "defaultFS": "hdfs://master:9000"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 在hive中创建testflinkx数据库,并创建student分区表
create database testflinkx;
use testflinkx;
CREATE TABLE `student`(
  `id` string, 
  `name` string, 
  `age` string)
PARTITIONED BY ( 
  `pt` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 启动hiveserver2
# 第一种方式:
hiveserver2
# 第二种方式:
hive --service hiveserver2
  • 1
  • 2
  • 3
  • 4
  • 启动任务
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
  • 1
  • 查看日志及运行情况同上

3)、MySQLToHBase

  • 配置文件
{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ],
                                "table": [
                                    "score"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "customSql": "",
                        "splitPk": "",
                        "queryTimeOut": 1000,
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hbasewriter",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.property.clientPort": "2181",
                            "hbase.rootdir": "hdfs://master:9000/hbase",
                            "hbase.cluster.distributed": "true",
                            "hbase.zookeeper.quorum": "master,node1,node2",
                            "zookeeper.znode.parent": "/hbase"
                        },
                        "table": "testFlinkx",
                        "rowkeyColumn": "$(info:student_id)_$(info:course_id)",
                        "column": [
                            {
                                "name": "info:student_id",
                                "type": "string"
                            },
                            {
                                "name": "info:course_id",
                                "type": "string"
                            },
                            {
                                "name": "info:score",
                                "type": "string"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 启动hbase 并创建testflinkx表
create 'testFlinkx','info'
  • 1
  • 启动任务
flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHBase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
  • 1
  • 查看日志及运行情况同上
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/745530
推荐阅读
相关标签
  

闽ICP备14008679号