当前位置:   article > 正文

4.4、Flink任务中如何使用DataGeneratorSource生成测试数据源

datageneratorsource

目录

1、功能说明

2、API使用说明

3、代码示例


1、什么是 flink的DataGeneratorSource

        flink中提供了DataGeneratorSource类,来创建一个`可并行的生成测试数据的数据源`,它支持自定义生成数据的类型、生成数据的行数、生成数据的速率,能够很好的模拟真实的数据源,常被用来做flink流任务测试和性能测试


2、使用 DataGeneratorSource 生成测试数据

2.1 API使用说明:

  1. DataGeneratorSource(
  2. GeneratorFunction<Long, OUT> generatorFunction
  3. ,long count
  4. ,RateLimiterStrategy rateLimiterStrategy
  5. ,TypeInformation<OUT> typeInfo)
  6. 参数说明:
  7. @generatorFunction : 指定 GeneratorFunction 实现类(生成数据的具体实现类)
  8. @count : 指定输出数据的总行数(如果想一直输出,可以设置为Long.MAX_VALUE)
  9. @rateLimiterStrategy : 指定发射速率(每秒发射的记录数)
  10. @typeInfo : 指定返回值类型

2.2 这是一个完整的入门案例

开发语言:Java1.8

flink版本:Flink1.17.0

示例代码:

  1. package com.baidu.bean;
  2. public class FlinkUser {
  3. public Long id;
  4. public String name;
  5. public Long createtime;
  6. // 一定要提供一个 空参 的构造器(反射的时候要使用)
  7. public FlinkUser() {
  8. }
  9. public FlinkUser(Long id, String name, Long createtime) {
  10. this.id = id;
  11. this.name = name;
  12. this.createtime = createtime;
  13. }
  14. public Long getId() {
  15. return id;
  16. }
  17. public void setId(Long id) {
  18. this.id = id;
  19. }
  20. public String getName() {
  21. return name;
  22. }
  23. public void setName(String name) {
  24. this.name = name;
  25. }
  26. public Long getCreatetime() {
  27. return createtime;
  28. }
  29. public void setCreatetime(Long createtime) {
  30. this.createtime = createtime;
  31. }
  32. @Override
  33. public int hashCode() {
  34. return this.name.hashCode();
  35. }
  36. @Override
  37. public String toString() {
  38. return "FlinkUser{" +
  39. "id=" + id +
  40. ", name='" + name + '\'' +
  41. ", createtime=" + createtime +
  42. '}';
  43. }
  44. }
  45. package com.baidu.datastream.source;
  46. import com.baidu.bean.FlinkUser;
  47. import org.apache.commons.math3.random.RandomDataGenerator;
  48. import org.apache.flink.api.connector.source.SourceReaderContext;
  49. import org.apache.flink.connector.datagen.source.GeneratorFunction;
  50. /*
  51. *
  52. * TODO GeneratorFunction<T, O>接口说明:
  53. * 功能说明:
  54. * 数据生成器函数的基本接口,用来定义生成数据的核心逻辑
  55. * 泛型说明:
  56. * T : 输入数据的类型(默认为Long),表示数据生成器的自增ID
  57. * O : 输出数据的类型,要指定flink的数据类型(TypeInformation)
  58. * 实现方法:
  59. * open : 创建对象时,调用一次,用来做资源初始化
  60. * close : 销毁对象时,调用一次,用来做资源关闭
  61. * map : 数据的生成逻辑,每生成一次数据调用一次,参数为自增ID
  62. * */
  63. public class MyGeneratorFunction implements GeneratorFunction<Long, FlinkUser> {
  64. // 定义随机数数据生成器
  65. public RandomDataGenerator generator;
  66. // 初始化随机数数据生成器
  67. @Override
  68. public void open(SourceReaderContext readerContext) throws Exception {
  69. generator = new RandomDataGenerator();
  70. }
  71. @Override
  72. public FlinkUser map(Long value) throws Exception {
  73. // 使用 随机数数据生成器 来创建 FlinkUser实例
  74. FlinkUser flinkUser = new FlinkUser(value
  75. , generator.nextHexString(4) // 生成随机的4位字符串
  76. , System.currentTimeMillis()
  77. );
  78. return flinkUser;
  79. }
  80. }
  81. package com.baidu.datastream.source;
  82. import com.baidu.bean.FlinkUser;
  83. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  84. import org.apache.flink.api.common.typeinfo.TypeInformation;
  85. import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
  86. import org.apache.flink.connector.datagen.source.DataGeneratorSource;
  87. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  88. // --------------------------------------------------------------------------------------------
  89. // TODO 从 数据生成器(DataGenerator) 中读取数据
  90. // --------------------------------------------------------------------------------------------
  91. public class ReadDataGeneratorSource {
  92. public static void main(String[] args) throws Exception {
  93. // 1.获取执行环境
  94. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  95. env.setParallelism(3);
  96. // 2.自定义数据生成器Source
  97. /*
  98. * TODO DataGeneratorSource(
  99. * GeneratorFunction<Long, OUT> generatorFunction
  100. * ,long count
  101. * ,RateLimiterStrategy rateLimiterStrategy
  102. * ,TypeInformation<OUT> typeInfo)
  103. * 参数说明:
  104. * @generatorFunction : 指定 GeneratorFunction 实现类(生成数据的具体实现类)
  105. * @count : 指定输出数据的总行数(如果想一直输出,可以设置为Long.MAX_VALUE)
  106. * @rateLimiterStrategy : 指定发射速率(每秒发射的记录数)
  107. * @typeInfo : 指定返回值类型
  108. * */
  109. DataGeneratorSource<FlinkUser> dataGeneratorSource = new DataGeneratorSource<>(
  110. // 指定 GeneratorFunction 实现类
  111. new MyGeneratorFunction(),
  112. // 指定 输出数据的总行数
  113. 100,
  114. // 指定 每秒发射的记录数
  115. RateLimiterStrategy.perSecond(100),
  116. // 指定返回值类型
  117. TypeInformation.of(FlinkUser.class) // 将java的FlinkUser封装成到TypeInformation
  118. );
  119. // 3.读取 dataGeneratorSource 中的数据
  120. env.fromSource(dataGeneratorSource
  121. , WatermarkStrategy.noWatermarks() // 指定水位线生成策略
  122. , "data-generator")
  123. .print();
  124. /*
  125. * 注意:生成的dataGeneratorSource为可并行算子
  126. * 生成的数据会均匀的分配到并行子任务中
  127. * */
  128. env.execute();
  129. }
  130. }

运行结果:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/391189
推荐阅读
相关标签
  

闽ICP备14008679号