赞
踩
使用flink sql1.12,读取mysql数据存储到kafka中,并将维度数据存储在hbase中。
注意:以下所述都是基于flink sql,不是sql client。
这个案例大概折腾了两天,刚开始用的是flink1.11.1版本,连接mysql和
kafka都没有问题,但是连接hbase会出现各种问题,最终无奈升级到flink1.12才成功连接到hbase。
1、使用flink cdc功能读取mysql业务数据,同步到kakfa作为ods层存储
2、抽取维度数据存储到hbase,方便后期做维表关联
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.12.1</flink.version> <scala.binary.version>2.11</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-2.2_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-format-changelog-json</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-queryable-state-client-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
注意连接hbase需要引入hbase-common依赖,否则会报错
import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala._ object test { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) bsTableEnv.getConfig.getConfiguration.setString("parallelism.default", "8") bsTableEnv.executeSql( """ |CREATE TABLE mysql_binlog ( | id INT, | name STRING, | description STRING |) WITH ( | 'connector' = 'mysql-cdc', | 'hostname' = 'localhost', | 'port' = '3306', | 'username' = 'root', | 'password' = '123456', | 'database-name' = 'mydb', | 'table-name' = 'products' |) |""".stripMargin) bsTableEnv.executeSql( """ |CREATE TABLE pageviews_per_region ( | user_region STRING, | pv BIGINT, | PRIMARY KEY (user_region) NOT ENFORCED |) WITH ( | 'connector' = 'upsert-kafka', | 'topic' = 'pageviews_per_region', | 'properties.bootstrap.servers' = 'localhost:9092', | 'key.format' = 'json', | 'value.format' = 'json' |) |""".stripMargin) bsTableEnv.executeSql("insert into pageviews_per_region select name,count(*) as cnt from mysql_binlog group by name") bsTableEnv.executeSql( """ |CREATE TABLE hTable ( | rowkey STRING, | cf ROW<cnt BIGINT>, | PRIMARY KEY (rowkey) NOT ENFORCED |) WITH ( | 'connector' = 'hbase-2.2', | 'table-name' = 'myhbase', | 'zookeeper.quorum' = '127.0.0.1:2181' |) |""".stripMargin) bsTableEnv.executeSql("INSERT INTO hTable select name,ROW(count(*)) from mysql_binlog group by name") val hbase = bsTableEnv.sqlQuery("select * from hTable") bsTableEnv.toRetractStream[Row](hbase).print().setParallelism(1) bsEnv.execute("hbase") } }
docker run -d -h hbase0 -p 2181:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16201:16201 -p 16301:16301 -p 16020:16020 --name hbase1.3 harisekhon/hbase:1.3 version: '2.1' services: postgres: image: debezium/example-postgres:1.1 ports: - "5432:5432" environment: - POSTGRES_PASSWORD=1234 - POSTGRES_DB=postgres - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw # hbase: # image: dajobe/hbase # ports: # - "2181:2181" # - "8080:8080" # - "8085:8085" # - "9090:9090" # - "9095:9095" # - "16000:16000" # - "16010:16010" # - "16201:16201" # - "16301:16301" elasticsearch: image: elastic/elasticsearch:7.6.0 environment: - cluster.name=docker-cluster - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - discovery.type=single-node ports: - "9200:9200" - "9300:9300" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 kibana: image: elastic/kibana:7.6.0 ports: - "5601:5601" zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "12181:2181" kafka: image: wurstmeister/kafka:2.12-2.2.1 ports: - "9092:9092" - "9094:9094" depends_on: - zookeeper environment: - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092 - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CREATE_TOPICS="user_behavior:1:1" volumes: - /var/run/docker.sock:/var/run/docker.sock
127.0.0.1 hbase0
这套流程自测没有任何问题,欢迎小伙伴来沟通交流
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。