赞
踩
目录
flink中提供了DataGeneratorSource类,来创建一个`可并行的生成测试数据的数据源`,它支持自定义生成数据的类型、生成数据的行数、生成数据的速率,能够很好的模拟真实的数据源,常被用来做flink流任务测试和性能测试
- DataGeneratorSource(
- GeneratorFunction<Long, OUT> generatorFunction
- ,long count
- ,RateLimiterStrategy rateLimiterStrategy
- ,TypeInformation<OUT> typeInfo)
-
- 参数说明:
- @generatorFunction : 指定 GeneratorFunction 实现类(生成数据的具体实现类)
- @count : 指定输出数据的总行数(如果想一直输出,可以设置为Long.MAX_VALUE)
- @rateLimiterStrategy : 指定发射速率(每秒发射的记录数)
- @typeInfo : 指定返回值类型
开发语言:Java1.8
flink版本:Flink1.17.0
示例代码:
- package com.baidu.bean;
-
- public class FlinkUser {
- public Long id;
- public String name;
- public Long createtime;
-
- // 一定要提供一个 空参 的构造器(反射的时候要使用)
- public FlinkUser() {
- }
-
- public FlinkUser(Long id, String name, Long createtime) {
- this.id = id;
- this.name = name;
- this.createtime = createtime;
- }
-
- public Long getId() {
- return id;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public Long getCreatetime() {
- return createtime;
- }
-
- public void setCreatetime(Long createtime) {
- this.createtime = createtime;
- }
-
- @Override
- public int hashCode() {
- return this.name.hashCode();
- }
-
- @Override
- public String toString() {
- return "FlinkUser{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", createtime=" + createtime +
- '}';
- }
- }
-
-
- package com.baidu.datastream.source;
-
- import com.baidu.bean.FlinkUser;
- import org.apache.commons.math3.random.RandomDataGenerator;
- import org.apache.flink.api.connector.source.SourceReaderContext;
- import org.apache.flink.connector.datagen.source.GeneratorFunction;
-
- /*
- *
- * TODO GeneratorFunction<T, O>接口说明:
- * 功能说明:
- * 数据生成器函数的基本接口,用来定义生成数据的核心逻辑
- * 泛型说明:
- * T : 输入数据的类型(默认为Long),表示数据生成器的自增ID
- * O : 输出数据的类型,要指定flink的数据类型(TypeInformation)
- * 实现方法:
- * open : 创建对象时,调用一次,用来做资源初始化
- * close : 销毁对象时,调用一次,用来做资源关闭
- * map : 数据的生成逻辑,每生成一次数据调用一次,参数为自增ID
- * */
- public class MyGeneratorFunction implements GeneratorFunction<Long, FlinkUser> {
-
- // 定义随机数数据生成器
- public RandomDataGenerator generator;
-
- // 初始化随机数数据生成器
- @Override
- public void open(SourceReaderContext readerContext) throws Exception {
- generator = new RandomDataGenerator();
- }
-
- @Override
- public FlinkUser map(Long value) throws Exception {
- // 使用 随机数数据生成器 来创建 FlinkUser实例
- FlinkUser flinkUser = new FlinkUser(value
- , generator.nextHexString(4) // 生成随机的4位字符串
- , System.currentTimeMillis()
- );
- return flinkUser;
- }
-
- }
-
-
- package com.baidu.datastream.source;
-
- import com.baidu.bean.FlinkUser;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
- import org.apache.flink.connector.datagen.source.DataGeneratorSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- // --------------------------------------------------------------------------------------------
- // TODO 从 数据生成器(DataGenerator) 中读取数据
- // --------------------------------------------------------------------------------------------
-
- public class ReadDataGeneratorSource {
- public static void main(String[] args) throws Exception {
- // 1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- // 2.自定义数据生成器Source
- /*
- * TODO DataGeneratorSource(
- * GeneratorFunction<Long, OUT> generatorFunction
- * ,long count
- * ,RateLimiterStrategy rateLimiterStrategy
- * ,TypeInformation<OUT> typeInfo)
- * 参数说明:
- * @generatorFunction : 指定 GeneratorFunction 实现类(生成数据的具体实现类)
- * @count : 指定输出数据的总行数(如果想一直输出,可以设置为Long.MAX_VALUE)
- * @rateLimiterStrategy : 指定发射速率(每秒发射的记录数)
- * @typeInfo : 指定返回值类型
- * */
- DataGeneratorSource<FlinkUser> dataGeneratorSource = new DataGeneratorSource<>(
- // 指定 GeneratorFunction 实现类
- new MyGeneratorFunction(),
- // 指定 输出数据的总行数
- 100,
- // 指定 每秒发射的记录数
- RateLimiterStrategy.perSecond(100),
- // 指定返回值类型
- TypeInformation.of(FlinkUser.class) // 将java的FlinkUser封装成到TypeInformation
- );
-
- // 3.读取 dataGeneratorSource 中的数据
- env.fromSource(dataGeneratorSource
- , WatermarkStrategy.noWatermarks() // 指定水位线生成策略
- , "data-generator")
- .print();
- /*
- * 注意:生成的dataGeneratorSource为可并行算子
- * 生成的数据会均匀的分配到并行子任务中
- * */
-
- env.execute();
- }
- }
运行结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。