当前位置:   article > 正文

flink重温笔记(七):Flink 流批一体 API 开发—— Connector 连接器_flink connector

flink connector

Flink学习笔记

前言:今天是学习 flink 的第七天啦!学习了 flink 中 connector(数据连接器) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,主要学习了数据存储到以下三处:
1、关系型数据库 mysql ;
2、消息队列:kafka;
3、非关系型数据库:redis
我觉得还是比较有意思的,这些是以后工作要用到的技能,我一定要好好掌握!

Tips:“莫道春光难揽取,浮云过后艳阳天!”明天周一,又是新的一天,要深入学习 flink 的四大基石属性!

二、Flink 流批一体 API 开发

5. Connectors

5.1 JDBC Connector

该连接器可以向 JDBC 数据库写入数据。

5.1.1 依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.13.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
5.1.2 案例演示
  • 需求:从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL

  • Mysql创建表:

CREATE TABLE `t_wordcount` (
   `word` varchar(255) NOT NULL,
   `counts` int(11) DEFAULT '0',
    PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 代码:
package cn.itcast.day07.connectors;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * @author lql
 * @time 2024-02-18 20:25:31
 * @description TODO:从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL
 */
public class JDBCSinkDemo {
    public static void main(String[] args) throws Exception {
        // 1) 初始化flink流处理的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 启动检查点机制,每隔 5 秒生成一个 slot
        // 栅栏会发送给job作业第一个算子(source),(source)收到栅栏后,会阻塞当前算子计算任务,将当前计算结果 state 数据持久化存储
        // 通过checkpoint将中间结果存储下来,只需要在处理完成后进行一次性写入,可以减少与数据库的交互次数
        // 而通过checkpoint将数据分批写入数据库,可以避免数据库锁,资源竞争等问题。
        // 一致性语义: exactly-once:就是操作失败时会进行回滚
        env.enableCheckpointing(5000);

        // 2)定义数据源
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        // 词频统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });
        // 分组
        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(f -> f.f0);
        // 聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);
        sumed.print();

        // 将聚合后的数据写入到数据库中
        sumed.addSink(JdbcSink.sink(
                "INSERT INTO t_wordcount(word, counts) VALUES(?,?) ON DUPLICATE KEY UPDATE counts = ?",
                (ps, t) -> {
                    ps.setString(1, t.f0);
                    ps.setInt(2, t.f1);
                    ps.setInt(3, t.f1);
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://node1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false")
                .withDriverName("com.mysql.jdbc.Driver")
                .withUsername("root")
                .withPassword("123456")
                .build()
        ));
        // 启动
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

结果:

终端输出:打印更新后的词频统计
8> (hadoop,1)
7> (flink,1)
1> (spark,1)
1> (kafak,1)
7> (flink,2)
7> (flume,1)

mysql 的表中实时更新词频统计数据
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

总结:

  • 1- checkpoint 机制能够生成 state,然后批量写入数据库
  • 2- 写入数据库操作中,此事不需要和之前数据源在 mysql 中要定义 java bean 类
  • 3- (ps, t) -> 这里是运用 lambda 表达式,preparedstatement 对象和 tuple 元组获取 sql 语句中的输入参数
  • 4- 写入到 mysql 中要 JdbcSink.sink 方法,四个参数,一个是 sql 语句,一个是 sql 传入参数,一个是 mysql 配置连接
  • 5- mysql 连接对象需要 new 一个 JdbcConnectionOptions.JdbcConnectionOptionsBuilder
5.2 Kafka Connector

Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。

Flink 的 Kafka Producer 称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。

5.2.1 依赖
# 要使用此反序列化 schema 必须添加以下依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro</artifactId>
    <version>{{site.version }}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
5.2.2 代码实现-Kafka Producer

将 socket 数据源写入到 kafka 的 test Topic 中

package cn.itcast.day07.connectors;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * @author lql
 * @time 2024-02-18 22:31:46
 * @description TODO:使用自定义sink官方提供flink-connector-kafka-2.11中的FlinkKafkaProducer实现数据的流的方式写入kafka数据
 */
public class KafkaProducer {
    public static void main(String[] args) throws Exception {
        //todo 1)初始化flink流处理环境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8081);//设置webui的端口号
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        // 开启 checkpoint 保证一致性语义
        env.enableCheckpointing(5000);

        //todo 2)接入数据源
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //todo 3)创建kafka的生产者实例
        //指定topic的名称
        String topicName = "test";

        // 实例化 FlinkKafkaProducer 对象
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(topicName, new SimpleStringSchema(), props);

        // todo 4) 将数据写入 kafka 中
        lines.addSink(myProducer);
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

结果:socket 数据能够源源不断写入到 kafka 中

总结:

  • 1- 开启 checkpoint 能够保证一致性语义
  • 2- FlinkKafkaProducer 能够担任 kafka 生产者角色
5.2.3 代码实现-Kafka Comsumer

消费 kafka 指定主题中的数据

package cn.itcast.day07.connectors;

/**
 * @author lql
 * @time 2024-02-18 22:37:50
 * @description TODO:使用flink-connector-kafka-2.11中的FlinkKafkaConsumer消费kafka中的数据进行wordcount计算
 */
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * 需求:使用flink-connector-kafka-2.11中的FlinkKafkaConsumer消费kafka中的数据进行wordcount计算
 * 需要设置的参数:
 * 1:主题名称
 * 2:反序列化的规则
 * 3:消费者属性-集群地址
 * 4:消费者属性-消费者组id(如果不设置,会有默认的消费者组id,但是默认的不方便管理)
 * 5:消费者属性-offset重置规则
 * 6:动态分区检测(当kafka的分区数量发生变化,flink能够感知到)
 * 7:如果没有开启checkpoint,那么可以设置自动递交offset
 *    如果开启了checkpoint,checkpoint会将kafka的offset随着checkpoint成功的时候递交到默认的主题中
 */
public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        //todo 1)初始化flink流处理环境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8082);//设置webui的端口号
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        //todo 2)接入数据源
        //指定topic的名称
        String topicName = "test";
        //实例化kafkaConsumer对象
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 消费最新的数据
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量offset
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // 提交偏移量的时间间隔
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props);
        //在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
        kafkaSource.setCommitOffsetsOnCheckpoints(true);

        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //todo 3)单词计数操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //todo 4)单词分组操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);

        //todo 5)打印计算结果
        result.print();

        //todo 6)启动作业
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

结果:一个模拟生产者,一个模拟消费者,做到实时生产数据和消费数据!

总结:

  • 1- 理解各类参数作用
  • 2- FlinkKafkaConsumer 是 kafka 消费者
  • 3- setCommitOffsetsOnCheckpoints 这里是 offset 要实现一致性语义

拓展:

  • 1- 首先要启动 zookeeper,三台机器一键启动脚本
[root@node1 ~]# vim startZk.sh
  
#!/bin/bash
hosts=(node1 node2 node3)
for host in ${hosts[*]}
do
 ssh $host "source /etc/profile;/export/server/zookeeper-3/bin/zkServer.sh start"
done

# 赋予脚本用户执行权限
[root@node1 ~]# chmod u+x startZk.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 2- zookeeper 一键关闭脚本
[root@node1 ~]# vim stopZk.sh
  
#!/bin/bash
hosts=(node1 node2 node3)
for host in ${hosts[*]}
do
 ssh $host "/export/server/zookeeper-3.4.6/bin/zkServer.sh stop"
Done

# 赋予脚本用户执行权限
[root@node1 ~]# chmod u+x stopZk.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 3- kafka 可视化工具 kafka Tool 很方便!
  • 4- kafka 基本命令:
	前台:
	cd /export/servers/kafka_2.11-0.10.0.0 
	bin/kafka-server-start.sh config/server.properties
	后台:
	cd /export/servers/kafka_2.11-0.10.0.0
	nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
	停止:
	cd /export/servers/kafka_2.11-0.10.0.0
	bin/kafka-server-stop.sh

	消费数据:
	bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic vehicledata-dev
	
	生产数据:
	bin/kafka-console-producer.sh --broker-list node01:9092 --topic vehicledata-dev

	创建topic:
	./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
	
	删除topic:
	./bin/kafka-topics.sh  --delete --zookeeper node01:2181  --topic vehicledata-dev
	bin/zkCli.sh -server node01:2181
	rmr /brokers/topics/vehicledata-dev
	
	罗列topic:
	bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
5.3 Redis Connector

数据输出到Redis数据库中,Redis是一个基于内存、性能极高的NoSQL数据库,数据还可以持久化到磁盘,读写速度快,适合存储key-value类型的数据。

/**
 * 从指定的socket读取数据,对单词进行计算,将结果写入到Redis中
 */
public class RedisSinkDemo {
    public static void main(String[] args) throws Exception {
        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //创建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("node01", 9999);
        //调用Transformation开始
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        //Transformation结束

        //调用Sink
        //summed.addSink()
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node03").setPassword("123456").setDatabase(8).build();

        summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //启动执行
        env.execute("StreamingWordCount");

    }

    // 声明一个公共的静态内部类 RedisWordCountMapper,它实现了 RedisMapper 接口。  
	// RedisMapper 接口可能是用于将数据映射到Redis命令和键值对的自定义接口。  
	public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {  
  
    	// 重写 getCommandDescription 方法,该方法返回一个 RedisCommandDescription 对象,  
    	// 该对象描述了要执行的Redis命令和相关的参数。  
    	// 在这里,它配置为执行 HSET 命令(用于设置哈希表中的字段的值)在名为 "WORD_COUNT" 的Redis哈希上。  
    	@Override  
    	public RedisCommandDescription getCommandDescription() {  
        	return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");  
    }  
  
    	// 重写 getKeyFromData 方法,该方法从给定的 Tuple2 数据中提取键(在这里是字符串)。  
    	// Tuple2 是一个包含两个元素(在这里是一个字符串和一个整数)的元组,f0 是其第一个元素。  
    	@Override  
    	public String getKeyFromData(Tuple2<String, Integer> data) {  
    	    return data.f0;  // 返回元组的第一个元素,即字符串,作为Redis的键。  
    }  
  	
    	// 重写 getValueFromData 方法,该方法从给定的 Tuple2 数据中提取值(在这里是整数),并将其转换为字符串。  
    	// Tuple2 的 f1 是其第二个元素,即整数。这个方法将这个整数转换为字符串,因为Redis通常存储字符串值。  
    	@Override  
    	public String getValueFromData(Tuple2<String, Integer> data) {  
    	    return data.f1.toString();  // 返回元组的第二个元素(整数)的字符串表示形式,作为Redis的值。  
    }  
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/886323
推荐阅读
相关标签
  

闽ICP备14008679号