赞
踩
版本说明:
添加maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.1</version>
</dependency>
测试Kafka Consumer:Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。
package com.aikfk.flink.sql.kafka; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.Properties; /** * @author :caizhengjie * @description:TODO * @date :2021/4/10 12:53 下午 */ public class FlinkConnectKafka { public static void main(String[] args) throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建TableEnvironment(Blink planner) EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings); // Kafka Consumer Properties properties = new Properties(); properties.setProperty("bootstrap.servers","bigdata-pro-m07:9092"); // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据 // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的 properties.setProperty("group.id","kfk"); // kafka数据源 DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String> ("kfk", new SimpleStringSchema(), properties)); dataStream.map(new MapFunction<String, Tuple2<String,String>>() { @Override public Tuple2<String,String> map(String line) throws Exception { String[] words = line.split(","); return new Tuple2<>(words[0],words[1]); } }).print(); env.execute("kafka"); } }
创建topic:
bin/kafka-topics.sh --bootstrap-server bigdata-pro-m07:9092 --create --topic kfk --partitions 1 --replication-factor 1
创建生产者:
bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk
测试数据:
1> (hibve,dsd)
关于flink stream与kafka的更多集成见官方文档:
在/opt/modules/flink/lib
目录下添加jar包
flink-sql-connector-kafka_2.11-1.12.1.jar
下载地址:
见文章:https://blog.csdn.net/weixin_45366499/article/details/115576853
将hive-site.xml放在/Users/caizhengjie/Desktop/hive-conf
目录下
package com.aikfk.flink.sql.kafka; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.table.catalog.hive.HiveCatalog; /** * @author :caizhengjie * @description:TODO * @date :2021/4/10 12:53 下午 */ public class FlinkConnectKafkaDDL { public static void main(String[] args) throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建TableEnvironment(Blink planner) EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings); String catalogName = "flink_hive"; String hiveDataBase = "flink"; String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf"; // Catalog HiveCatalog hiveCatalog = new HiveCatalog(catalogName,hiveDataBase,hiveConfDir); tableEnvironment.registerCatalog(catalogName , hiveCatalog); tableEnvironment.useCatalog(catalogName); // DDL,根据kafka数据源创建表 String kafkaTable = "person"; String dropsql = "DROP TABLE IF EXISTS "+kafkaTable; String sql = "CREATE TABLE "+kafkaTable+" (\n" + " user_id String,\n" + " user_name String,\n" + " age Int\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = 'universal',\n" + " 'connector.topic' = 'kfk',\n" + " 'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" + " 'format.type' = 'csv',\n" + " 'update-mode' = 'append'\n" + ")"; tableEnvironment.executeSql(dropsql); tableEnvironment.executeSql(sql); Table table = tableEnvironment.sqlQuery("select * from person"); tableEnvironment.toAppendStream(table , Row.class).print(); env.execute("kafka"); } }
这里会出现一个问题,没有请跳过!
MetaException(message:An exception was thrown while adding/validating class(es) : Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead
原因是mysql中hive的元数据库的字符集问题,也正是因为字符集问题,导致了create或者insert或者load等等操作出现了问题!
解决方法:
use metastore; mysql> show variables like '%char%'; +--------------------------------------+----------------------------+ | Variable_name | Value | +--------------------------------------+----------------------------+ | character_set_client | utf8 | | character_set_connection | utf8 | | character_set_database | utf8 | | character_set_filesystem | binary | | character_set_results | utf8 | | character_set_server | utf8 | | character_set_system | utf8 | | character_sets_dir | /usr/share/mysql/charsets/ | | validate_password_special_char_count | 1 | +--------------------------------------+----------------------------+ mysql> alter database metastore character set latin1; Query OK, 1 row affected (0.00 sec) mysql> show variables like '%char%'; +--------------------------------------+----------------------------+ | Variable_name | Value | +--------------------------------------+----------------------------+ | character_set_client | utf8 | | character_set_connection | utf8 | | character_set_database | latin1 | | character_set_filesystem | binary | | character_set_results | utf8 | | character_set_server | utf8 | | character_set_system | utf8 | | character_sets_dir | /usr/share/mysql/charsets/ | | validate_password_special_char_count | 1 | +--------------------------------------+----------------------------+ 9 rows in set (0.01 sec)
所以需要设置编码集为 :latin1,即可解决上面问题。
如果没有报错则会出现这样:原因是还没有产生数据
此时查看hive,或者flink sql client会发现有person这个表
flink sql client:
Flink SQL> show databases; default flink Flink SQL> use flink; Flink SQL> show tables; person Flink SQL> desc person; +-----------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-----------+--------+------+-----+--------+-----------+ | user_id | STRING | true | | | | | user_name | STRING | true | | | | | age | INT | true | | | | +-----------+--------+------+-----+--------+-----------+ 3 rows in set
hive:
hive (default)> use flink;
OK
Time taken: 1.056 seconds
hive (flink)> show tables;
OK
tab_name
person
Time taken: 0.179 seconds, Fetched: 1 row(s)
hive (flink)> desc person;
OK
col_name data_type comment
Time taken: 0.117 seconds
hive (flink)>
创建生产者:
bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk
测试数据:
>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10
运行结果
通过Flink SQL Client查看结果:
bin/sql-client.sh embedded
select * from person;
如果在执行sql语句时会出现这个错误,那么多试几遍
Flink SQL> select * from person;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
到此Flink与Kafka集成完毕,相关Flink SQL与Kafka的集成见官网:
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。