赞
踩
前言:今天是学习 flink 的第七天啦!学习了 flink 中 connector(数据连接器) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,主要学习了数据存储到以下三处:
1、关系型数据库 mysql ;
2、消息队列:kafka;
3、非关系型数据库:redis
我觉得还是比较有意思的,这些是以后工作要用到的技能,我一定要好好掌握!
Tips:“莫道春光难揽取,浮云过后艳阳天!”明天周一,又是新的一天,要深入学习 flink 的四大基石属性!
该连接器可以向 JDBC 数据库写入数据。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.13.1</version>
</dependency>
需求:从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL
Mysql创建表:
CREATE TABLE `t_wordcount` (
`word` varchar(255) NOT NULL,
`counts` int(11) DEFAULT '0',
PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
package cn.itcast.day07.connectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author lql
* @time 2024-02-18 20:25:31
* @description TODO:从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL
*/
public class JDBCSinkDemo {
public static void main(String[] args) throws Exception {
// 1) 初始化flink流处理的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启动检查点机制,每隔 5 秒生成一个 slot
// 栅栏会发送给job作业第一个算子(source),(source)收到栅栏后,会阻塞当前算子计算任务,将当前计算结果 state 数据持久化存储
// 通过checkpoint将中间结果存储下来,只需要在处理完成后进行一次性写入,可以减少与数据库的交互次数
// 而通过checkpoint将数据分批写入数据库,可以避免数据库锁,资源竞争等问题。
// 一致性语义: exactly-once:就是操作失败时会进行回滚
env.enableCheckpointing(5000);
// 2)定义数据源
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
// 词频统计
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
}
});
// 分组
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(f -> f.f0);
// 聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);
sumed.print();
// 将聚合后的数据写入到数据库中
sumed.addSink(JdbcSink.sink(
"INSERT INTO t_wordcount(word, counts) VALUES(?,?) ON DUPLICATE KEY UPDATE counts = ?",
(ps, t) -> {
ps.setString(1, t.f0);
ps.setInt(2, t.f1);
ps.setInt(3, t.f1);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://node1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
));
// 启动
env.execute();
}
}
结果:
终端输出:打印更新后的词频统计
8> (hadoop,1)
7> (flink,1)
1> (spark,1)
1> (kafak,1)
7> (flink,2)
7> (flume,1)
mysql 的表中实时更新词频统计数据
总结:
Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。
Flink 的 Kafka Producer 称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。
# 要使用此反序列化 schema 必须添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>{{site.version }}</version>
</dependency>
将 socket 数据源写入到 kafka 的 test Topic 中
package cn.itcast.day07.connectors;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/**
* @author lql
* @time 2024-02-18 22:31:46
* @description TODO:使用自定义sink官方提供flink-connector-kafka-2.11中的FlinkKafkaProducer实现数据的流的方式写入kafka数据
*/
public class KafkaProducer {
public static void main(String[] args) throws Exception {
//todo 1)初始化flink流处理环境
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8081);//设置webui的端口号
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 开启 checkpoint 保证一致性语义
env.enableCheckpointing(5000);
//todo 2)接入数据源
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//todo 3)创建kafka的生产者实例
//指定topic的名称
String topicName = "test";
// 实例化 FlinkKafkaProducer 对象
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(topicName, new SimpleStringSchema(), props);
// todo 4) 将数据写入 kafka 中
lines.addSink(myProducer);
env.execute();
}
}
结果:socket 数据能够源源不断写入到 kafka 中
总结:
消费 kafka 指定主题中的数据
package cn.itcast.day07.connectors;
/**
* @author lql
* @time 2024-02-18 22:37:50
* @description TODO:使用flink-connector-kafka-2.11中的FlinkKafkaConsumer消费kafka中的数据进行wordcount计算
*/
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
/**
* 需求:使用flink-connector-kafka-2.11中的FlinkKafkaConsumer消费kafka中的数据进行wordcount计算
* 需要设置的参数:
* 1:主题名称
* 2:反序列化的规则
* 3:消费者属性-集群地址
* 4:消费者属性-消费者组id(如果不设置,会有默认的消费者组id,但是默认的不方便管理)
* 5:消费者属性-offset重置规则
* 6:动态分区检测(当kafka的分区数量发生变化,flink能够感知到)
* 7:如果没有开启checkpoint,那么可以设置自动递交offset
* 如果开启了checkpoint,checkpoint会将kafka的offset随着checkpoint成功的时候递交到默认的主题中
*/
public class KafkaConsumer {
public static void main(String[] args) throws Exception {
//todo 1)初始化flink流处理环境
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8082);//设置webui的端口号
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//todo 2)接入数据源
//指定topic的名称
String topicName = "test";
//实例化kafkaConsumer对象
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 消费最新的数据
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量offset
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // 提交偏移量的时间间隔
props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props);
//在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
kafkaSource.setCommitOffsetsOnCheckpoints(true);
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//todo 3)单词计数操作
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//todo 4)单词分组操作
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);
//todo 5)打印计算结果
result.print();
//todo 6)启动作业
env.execute();
}
}
结果:一个模拟生产者,一个模拟消费者,做到实时生产数据和消费数据!
总结:
拓展:
[root@node1 ~]# vim startZk.sh
#!/bin/bash
hosts=(node1 node2 node3)
for host in ${hosts[*]}
do
ssh $host "source /etc/profile;/export/server/zookeeper-3/bin/zkServer.sh start"
done
# 赋予脚本用户执行权限
[root@node1 ~]# chmod u+x startZk.sh
[root@node1 ~]# vim stopZk.sh
#!/bin/bash
hosts=(node1 node2 node3)
for host in ${hosts[*]}
do
ssh $host "/export/server/zookeeper-3.4.6/bin/zkServer.sh stop"
Done
# 赋予脚本用户执行权限
[root@node1 ~]# chmod u+x stopZk.sh
前台:
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-start.sh config/server.properties
后台:
cd /export/servers/kafka_2.11-0.10.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
停止:
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-stop.sh
消费数据:
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic vehicledata-dev
生产数据:
bin/kafka-console-producer.sh --broker-list node01:9092 --topic vehicledata-dev
创建topic:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
删除topic:
./bin/kafka-topics.sh --delete --zookeeper node01:2181 --topic vehicledata-dev
bin/zkCli.sh -server node01:2181
rmr /brokers/topics/vehicledata-dev
罗列topic:
bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
数据输出到Redis数据库中,Redis是一个基于内存、性能极高的NoSQL数据库,数据还可以持久化到磁盘,读写速度快,适合存储key-value类型的数据。
/**
* 从指定的socket读取数据,对单词进行计算,将结果写入到Redis中
*/
public class RedisSinkDemo {
public static void main(String[] args) throws Exception {
//创建Flink流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建DataStream
//Source
DataStreamSource<String> lines = env.socketTextStream("node01", 9999);
//调用Transformation开始
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
//new Tuple2<String, Integer>(word, 1)
collector.collect(Tuple2.of(word, 1));
}
}
});
//分组
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tp) throws Exception {
return tp.f0;
}
});
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
//Transformation结束
//调用Sink
//summed.addSink()
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node03").setPassword("123456").setDatabase(8).build();
summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
//启动执行
env.execute("StreamingWordCount");
}
// 声明一个公共的静态内部类 RedisWordCountMapper,它实现了 RedisMapper 接口。
// RedisMapper 接口可能是用于将数据映射到Redis命令和键值对的自定义接口。
public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
// 重写 getCommandDescription 方法,该方法返回一个 RedisCommandDescription 对象,
// 该对象描述了要执行的Redis命令和相关的参数。
// 在这里,它配置为执行 HSET 命令(用于设置哈希表中的字段的值)在名为 "WORD_COUNT" 的Redis哈希上。
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
}
// 重写 getKeyFromData 方法,该方法从给定的 Tuple2 数据中提取键(在这里是字符串)。
// Tuple2 是一个包含两个元素(在这里是一个字符串和一个整数)的元组,f0 是其第一个元素。
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0; // 返回元组的第一个元素,即字符串,作为Redis的键。
}
// 重写 getValueFromData 方法,该方法从给定的 Tuple2 数据中提取值(在这里是整数),并将其转换为字符串。
// Tuple2 的 f1 是其第二个元素,即整数。这个方法将这个整数转换为字符串,因为Redis通常存储字符串值。
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString(); // 返回元组的第二个元素(整数)的字符串表示形式,作为Redis的值。
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。