赞
踩
前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。
Tips:我觉得学习 Flink 还是挺有意思的,虽然学习进度有点慢,但是数据源已经理解清楚了,我相信接下来一切会越来越好的!
# 两种输入类型,一种是元素,一种是元组
DataStreamSource<Object> ds1 = env.fromElements("hadoop","spark", "spark", "flink");
List<Tuple2<String,Long>> tuple2List = new ArrayList<>();
tuple2List.add(Tuple2.of("hadoop",1L));
tuple2List.add(Tuple2.of("spark", 2L));
tuple2List.add(Tuple2.of("flink", 3L));
DataStreamSource<List<Tuple2<String, Long>>> ds2 = env.fromElements(tuple2List);
# 输出-1
6> spark
4> hadoop
5> spark
7> flink
# 输出-2
6> [(hadoop,1), (spark,2), (flink,3)]
# 传入列表
DataStreamSource<String> ds3 = env.fromCollection(Arrays.asList("spark", "flink", "hadoop"));
# 输出-3
8> hadoop
6> spark
7> flink
# fromParallelCollection 并行度队列(0-10闭区间)
DataStreamSource<Long> parallelCollection = env.fromParallelCollection(
new NumberSequenceIterator(0L, 10L),
TypeInformation.of(Long.TYPE)
).setParallelism(3);
# 乱序输出 -parallelCollection
8> 8
2> 10
8> 7
6> 3
6> 5
3> 0
7> 6
1> 9
5> 2
5> 4
4> 1
# 传入队列(左开右闭区间)
DataStreamSource<Long> ds4 = env.generateSequence(1, 10);
# 输出 -4
8> 8
3> 3
2> 2
5> 5
1> 1
1> 9
7> 7
6> 6
4> 4
2> 10
# 传入队列(左开右闭区间)
DataStreamSource<Long> ds5 = env.fromSequence(1, 10);
# 输出 -5
1> 8
7> 6
6> 10
2> 5
3> 1
3> 2
8> 7
4> 9
5> 3
5> 4
package cn.itcast.day02.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-12 23:47:53
* @description TODO:批的方式读取文件
*/
public class BatchFromFile {
public static void main(String[] args) throws Exception {
// 配置端口号信息
Configuration configuration = new Configuration();
configuration.setInteger("rest.port",8081);
// 初始化 UI 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 读取数据源
String path = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt";
DataStreamSource<String> lines = env.readTextFile(path);
// 数据源并行度
int parallelism = lines.getParallelism();
System.out.println("ReadTextFileDemo创建的DataStream的并行度为:" + parallelism);
lines.print();
env.execute();
}
}
package cn.itcast.day02.source;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
/**
* @author lql
* @time 2024-02-13 15:34:11
* @description TODO:流的方式读取数据源,无限流
*/
public class StreamFromFile {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port",8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
String path = "./data/input/wordcount.txt";
// new TextInputFormat(null),文本输入编码格式,null表示默认为utf-8编码
// FileProcessingMode.PROCESS_ONCE 只处理一次
// 2000毫秒表示间隔处理时间
DataStreamSource<String> lines1 = env.readFile(new TextInputFormat(null), path,
FileProcessingMode.PROCESS_ONCE, 2000
);
// FileProcessingMode.PROCESS_CONTINUOUSLY 永续处理,不会停止
DataStreamSource<String> lines2 = env.readFile(new TextInputFormat(null), path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 2000
);
// 查看并行度
System.out.println("lines1的并行度:"+lines1.getParallelism());
System.out.println("lines2的并行度:"+lines2.getParallelism());
//lines1.print();
lines2.print();
env.execute();
}
}
package cn.itcast.day02.source;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author lql
* @time 2024-02-13 16:00:47
* @description TODO:基于socket的数据源
*/
public class StreamSocketSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int parallelism0 = env.getParallelism();
System.out.println("执行环境默认的并行度:" + parallelism0);
DataStreamSource<String> lines = env.socketTextStream("192.168.88.161", 8888);
int parallelism1 = lines.getParallelism();
System.out.println("SocketSource的并行度:" + parallelism1);
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
int parallelism2 = words.getParallelism();
System.out.println("调用完FlatMap后DataStream的并行度:" + parallelism2);
words.print();
env.execute();
}
}
package cn.itcast.day02.source.custom;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
import java.util.UUID;
/**
* @author lql
* @time 2024-02-13 16:21:31
* @description TODO
*/
public class CustomerSourceWithoutParallelDemo {
/**
* 自定义 java Bean 类
* @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
* @AllArgsConstructor:自动生成一个包含所有参数的构造函数。
* @NoArgsConstructor:自动生成一个无参构造函数。
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order{
// 订单
private String id;
// 用户 ID
private String userId;
// 订单金额
private int money;
// 时间戳
private Long timestamp;
}
/**
* 主函数
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//todo 1)获取flink流处理的运行环境
Configuration configuration = new Configuration();
configuration.setInteger("rest.port",8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
System.out.println("初始化环境的并行度:"+ env.getParallelism());
// todo 2) 接入自定义数据源
DataStreamSource<Order> streamSource = env.addSource(new MySource());
System.out.println("streamSource并行度: " + streamSource.getParallelism());
// todo 3) 打印输出
streamSource.printToErr();
env.execute();
}
/**
* 自定义数据源,每秒钟生成一个订单信息
*/
private static class MySource implements SourceFunction<Order> {
// 定义循环生成数据的标记
private boolean isRunning = true;
/**
* 核心方法:生成数据
*/
@Override
public void run(SourceContext<Order> sourceContext) throws Exception {
Random random = new Random();
while (isRunning){
// 订单ID
String orderID = UUID.randomUUID().toString();
// 用户 Id
String userID = String.valueOf(random.nextInt(3));
// 订单金额
int money = random.nextInt(1000);
// 时间
long time = System.currentTimeMillis();
// 返回数据
sourceContext.collect(new Order(orderID, userID, money, time));
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}
结果:默认运行环境的并行度:8, 自定义streamSource的并行度为:1
总结:
1- env.addSource(new MySource()),自定义数据源 [私有静态方法]:
2- 认识了 java bean 类,@Data,@NoArgsConstructor,@AllArgsConstructor 的作用
3- UUID 这个工具类可以随机生成 id,随机数使用需要先 new 一个,random.nextInt() 是左闭右开
4- String.valuesOf()是可以生成字符串类型,while 循环需要有 boolean 标记
5- collect()可以返回对象数据
(2) 实现ParallelSourceFunction创建可并行Source
DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
# 上述非rich的自定义mySource数据源不支持多个并行度
package cn.itcast.day02.source.custom;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.UUID;
/**
* @author lql
* @time 2024-02-13 16:58:49
* @description TODO:多并行度的自定义数据源
*/
public class RichParallelismDemo {
/**
* 自定义 java Bean 类
*
* @Data:自动为类生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
* @AllArgsConstructor:自动生成一个包含所有参数的构造函数。
* @NoArgsConstructor:自动生成一个无参构造函数。
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
// 订单
private String id;
// 用户 ID
private String userId;
// 订单金额
private int money;
// 时间戳
private Long timestamp;
}
/**
* 主函数
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//todo 1)获取flink流处理的运行环境
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
System.out.println("初始化环境的并行度:" + env.getParallelism());
// todo 2) 接入自定义数据源
DataStreamSource<Order> streamSource = env.addSource(new MySource());
streamSource = streamSource;
System.out.println("streamSource并行度: " + streamSource.getParallelism());
// todo 3) 打印输出
streamSource.printToErr();
env.execute();
}
/**
* 自定义数据源,每秒钟生成一个订单信息
*/
private static class MySource extends RichParallelSourceFunction<Order> {
// 定义循环生成数据的标记
private boolean isRunning = true;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void cancel() {
}
@Override
public void run(SourceContext<Order> sourceContext) throws Exception {
Random random = new Random();
while (isRunning) {
// 订单ID
String orderID = UUID.randomUUID().toString();
// 用户 Id
String userID = String.valueOf(random.nextInt(3));
// 订单金额
int money = random.nextInt(1000);
// 时间
long time = System.currentTimeMillis();
// 返回数据
sourceContext.collect(new Order(orderID, userID, money, time));
}
}
}
}
结果:自定义RichParallelSourceFunction支持多个并行度
总结:继承 RichParallelSourceFunction 方法,需要重写方法 open 和 close !
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');
SET FOREIGN_KEY_CHECKS = 1;
package cn.itcast.day02.source.custom;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;
/**
* @author lql
* @time 2024-02-13 17:14:06
* @description TODO:自定义 mysql 数据源
*/
public class MysqlSource {
public static void main(String[] args) throws Exception {
// TODO 1: 获取 flink 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2: 接入自定义数据源
DataStreamSource<UserInfo> streamSource = env.addSource(new MysqlSourceFunction());
System.out.println("MysqlSourceFunction的并行度为:"+streamSource.getParallelism());
// todo 3) 打印输出
streamSource.print();
// todo 4) 启动运行作业
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class UserInfo{
private int id;
private String username;
private String password;
private String name;
}
/**
* 自定义数据源:获取 mysql 数据
*/
private static class MysqlSourceFunction extends RichParallelSourceFunction<UserInfo> {
// 定义 mysql 的连接对象
private Connection connection = null;
// 定义 mysql statement 对象
private PreparedStatement statement = null;
/**
* 实例化的时候会被执行一次,多个并行度会执行多次,因为有多个实例
* 一般由于资源的初始化操作
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 注册驱动
Class.forName("com.mysql.jdbc.Driver");
// 实例化 mysql 的连接对象
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root");
// 实例化 statement 对象
statement = connection.prepareStatement("select * from test.user");
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void run(SourceContext<UserInfo> sourceContext) throws Exception {
while(true){
ResultSet resultSet = statement.executeQuery();
while(resultSet.next()) {
int id = resultSet.getInt("id");
String username = resultSet.getString("username");
String password = resultSet.getString("password");
String name = resultSet.getString("name");
sourceContext.collect(new UserInfo(id,username,password,name));
}
resultSet.close();
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
}
}
}
结果:mysql 的自定义 source,可以多并行度
总结:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。