当前位置:   article > 正文

Flink SQL与Kafka的集成_flink-sql-connector-kafka

flink-sql-connector-kafka

版本说明:

  • flink-1.12.1
  • kafka_2.12-2.4.1

(1)Flink Stream与Kafka的集成

添加maven依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.11</artifactId>
	<version>1.12.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

测试Kafka Consumer:Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。

package com.aikfk.flink.sql.kafka;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;

/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/10 12:53 下午
 */
public class FlinkConnectKafka {
    public static void main(String[] args) throws Exception {

        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);

        // Kafka Consumer
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","bigdata-pro-m07:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.setProperty("group.id","kfk");

        // kafka数据源
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>
                ("kfk", new SimpleStringSchema(), properties));

        dataStream.map(new MapFunction<String, Tuple2<String,String>>() {
            @Override
            public Tuple2<String,String> map(String line) throws Exception {
                String[] words = line.split(",");
                return new Tuple2<>(words[0],words[1]);
            }
        }).print();

        env.execute("kafka");

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

创建topic:

bin/kafka-topics.sh --bootstrap-server bigdata-pro-m07:9092 --create --topic kfk --partitions 1 --replication-factor 1
  • 1

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk
  • 1

测试数据:

1> (hibve,dsd)
  • 1

关于flink stream与kafka的更多集成见官方文档:

(2)Flink SQL与Kafka的集成

/opt/modules/flink/lib目录下添加jar包
flink-sql-connector-kafka_2.11-1.12.1.jar

下载地址:

(2.1)第一步:Flink SQL Client配置好

见文章:https://blog.csdn.net/weixin_45366499/article/details/115576853

(2.2)第二步:创建hiveConfDir

将hive-site.xml放在/Users/caizhengjie/Desktop/hive-conf目录下

(2.3)第三步:测试Flink SQL与Kafka集成的代码

package com.aikfk.flink.sql.kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.catalog.hive.HiveCatalog;


/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/10 12:53 下午
 */
public class FlinkConnectKafkaDDL {
    public static void main(String[] args) throws Exception {

        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);

        String catalogName = "flink_hive";
        String hiveDataBase = "flink";
        String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";

        // Catalog
        HiveCatalog hiveCatalog =
                new HiveCatalog(catalogName,hiveDataBase,hiveConfDir);

        tableEnvironment.registerCatalog(catalogName , hiveCatalog);
        tableEnvironment.useCatalog(catalogName);

        // DDL,根据kafka数据源创建表
        String kafkaTable = "person";
        String dropsql = "DROP TABLE IF EXISTS "+kafkaTable;
        String sql
                = "CREATE TABLE "+kafkaTable+" (\n" +
                "    user_id String,\n" +
                "    user_name String,\n" +
                "    age Int\n" +
                ") WITH (\n" +
                "   'connector.type' = 'kafka',\n" +
                "   'connector.version' = 'universal',\n" +
                "   'connector.topic' = 'kfk',\n" +
                "   'connector.properties.bootstrap.servers' = 'bigdata-pro-m07:9092',\n" +
                "   'format.type' = 'csv',\n" +
                "   'update-mode' = 'append'\n" +
                ")";
        tableEnvironment.executeSql(dropsql);
        tableEnvironment.executeSql(sql);

        Table table = tableEnvironment.sqlQuery("select * from person");

        tableEnvironment.toAppendStream(table , Row.class).print();

        env.execute("kafka");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

这里会出现一个问题,没有请跳过!

MetaException(message:An exception was thrown while adding/validating class(es) : Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Column length too big for column 'PARAM_VALUE' (max = 21845); use BLOB or TEXT instead
  • 1
  • 2

原因是mysql中hive的元数据库的字符集问题,也正是因为字符集问题,导致了create或者insert或者load等等操作出现了问题!

解决方法:

use metastore;

mysql> show variables like '%char%';
+--------------------------------------+----------------------------+
| Variable_name                        | Value                      |
+--------------------------------------+----------------------------+
| character_set_client                 | utf8                       |
| character_set_connection             | utf8                       |
| character_set_database               | utf8                       |
| character_set_filesystem             | binary                     |
| character_set_results                | utf8                       |
| character_set_server                 | utf8                       |
| character_set_system                 | utf8                       |
| character_sets_dir                   | /usr/share/mysql/charsets/ |
| validate_password_special_char_count | 1                          |
+--------------------------------------+----------------------------+

mysql> alter database metastore character set latin1;
Query OK, 1 row affected (0.00 sec)

mysql> show variables like '%char%';
+--------------------------------------+----------------------------+
| Variable_name                        | Value                      |
+--------------------------------------+----------------------------+
| character_set_client                 | utf8                       |
| character_set_connection             | utf8                       |
| character_set_database               | latin1                     |
| character_set_filesystem             | binary                     |
| character_set_results                | utf8                       |
| character_set_server                 | utf8                       |
| character_set_system                 | utf8                       |
| character_sets_dir                   | /usr/share/mysql/charsets/ |
| validate_password_special_char_count | 1                          |
+--------------------------------------+----------------------------+
9 rows in set (0.01 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

所以需要设置编码集为 :latin1,即可解决上面问题。

如果没有报错则会出现这样:原因是还没有产生数据
在这里插入图片描述
此时查看hive,或者flink sql client会发现有person这个表
flink sql client:

Flink SQL> show databases;
default
flink

Flink SQL> use flink;

Flink SQL> show tables;
person
Flink SQL> desc person;
+-----------+--------+------+-----+--------+-----------+
|      name |   type | null | key | extras | watermark |
+-----------+--------+------+-----+--------+-----------+
|   user_id | STRING | true |     |        |           |
| user_name | STRING | true |     |        |           |
|       age |    INT | true |     |        |           |
+-----------+--------+------+-----+--------+-----------+
3 rows in set
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

hive:

hive (default)> use flink;
OK
Time taken: 1.056 seconds
hive (flink)> show tables;
OK
tab_name
person
Time taken: 0.179 seconds, Fetched: 1 row(s)
hive (flink)> desc person;
OK
col_name	data_type	comment
Time taken: 0.117 seconds
hive (flink)> 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

(2.4)第四步:测试kafka数据

创建生产者:

bin/kafka-console-producer.sh --broker-list bigdata-pro-m07:9092 --topic kfk
  • 1

测试数据:

>100,alex,10
>100,alex,10
>100,alex,10
>100,alex,10
  • 1
  • 2
  • 3
  • 4

运行结果
在这里插入图片描述
通过Flink SQL Client查看结果:

bin/sql-client.sh embedded
  • 1
select * from person;
  • 1

在这里插入图片描述
如果在执行sql语句时会出现这个错误,那么多试几遍

Flink SQL> select * from person;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  • 1
  • 2
  • 3

到此Flink与Kafka集成完毕,相关Flink SQL与Kafka的集成见官网:


以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/554200
推荐阅读
相关标签
  

闽ICP备14008679号