当前位置:   article > 正文

Flink的单元测试介绍及示例_flink 单元测试

flink 单元测试

本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

  本文除了maven依赖外,没有其他依赖。

  一、Flink测试概述

  Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。

  本文示例的maven依赖:

  1. <properties>
  2.   <encoding>UTF-8</encoding>
  3.   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  4.   <maven.compiler.source>1.8</maven.compiler.source>
  5.   <maven.compiler.target>1.8</maven.compiler.target>
  6.   <java.version>1.8</java.version>
  7.   <scala.version>2.12</scala.version>
  8.   <flink.version>1.17.0</flink.version>
  9.   </properties>
  10.   <dependencies>
  11.   <dependency>
  12.   <groupId>org.apache.flink</groupId>
  13.   <artifactId>flink-clients</artifactId>
  14.   <version>${flink.version}</version>
  15.   <scope>provided</scope>
  16.   </dependency>
  17.   <dependency>
  18.   <groupId>org.apache.flink</groupId>
  19.   <artifactId>flink-java</artifactId>
  20.   <version>${flink.version}</version>
  21.   <scope>provided</scope>
  22.   </dependency>
  23.   <dependency>
  24.   <groupId>org.apache.flink</groupId>
  25.   <artifactId>flink-streaming-java</artifactId>
  26.   <version>${flink.version}</version>
  27.   <scope>provided</scope>
  28.   </dependency>
  29.   <dependency>
  30.   <groupId>org.apache.flink</groupId>
  31.   <artifactId>flink-csv</artifactId>
  32.   <version>${flink.version}</version>
  33.   <scope>provided</scope>
  34.   </dependency>
  35.   <dependency>
  36.   <groupId>org.apache.flink</groupId>
  37.   <artifactId>flink-json</artifactId>
  38.   <version>${flink.version}</version>
  39.   <scope>provided</scope>
  40.   </dependency>
  41.   <dependency>
  42.   <groupId>junit</groupId>
  43.   <artifactId>junit</artifactId>
  44.   <version>4.13</version>
  45.   </dependency>
  46.   <dependency>
  47.   <groupId>org.mockito</groupId>
  48.   <artifactId>mockito-core</artifactId>
  49.   <version>4.0.0</version>
  50.   <scope>test</scope>
  51.   </dependency>
  52.   </dependencies>

 

二、测试用户自定义函数

  可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。

  1、单元测试无状态、无时间限制的 UDF

  1)、示例-mapFunction

  以下无状态的 MapFunction 为例:

  1. public class IncrementMapFunction implements MapFunction<Long, Long> {
  2.   @Override
  3.   public Long map(Long record) throws Exception {
  4.   return record + 1;
  5.   }
  6.   }

  通过传递合适地参数并验证输出,可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。

  1. import static org.junit.Assert.assertEquals;
  2.   import org.apache.flink.api.common.functions.MapFunction;
  3.   import org.junit.Test;
  4.   /**
  5.    * @author alanchan
  6.    *
  7.    */
  8.   public class TestDemo {
  9.   public class IncrementMapFunction implements MapFunction<Long, Long> {
  10.   @Override
  11.   public Long map(Long record) throws Exception {
  12.   return record + 1;
  13.   }
  14.   }
  15.   @Test
  16.   public void testIncrement() throws Exception {
  17.   IncrementMapFunction incrementer = new IncrementMapFunction();
  18.   assertEquals((Long) 3L, incrementer.map(2L));
  19.   }
  20.   }

2)、示例-flatMapFunction

  对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。

  1.  import static org.mockito.Mockito.mock;
  2.   import static org.mockito.Mockito.times;
  3.   import org.apache.flink.api.common.functions.FlatMapFunction;
  4.   import org.apache.flink.util.Collector;
  5.   import org.junit.Test;
  6.   import org.junit.runner.RunWith;
  7.   import org.mockito.Mockito;
  8.   import org.mockito.junit.MockitoJUnitRunner;
  9.   /**
  10.    * @author alanchan
  11.    *
  12.    */
  13.   @RunWith(MockitoJUnitRunner.class)
  14.   public class TestDemo2 {
  15.   public static class IncrementFlatMapFunction implements FlatMapFunction<String, Long> {
  16.   @Override
  17.   public void flatMap(String value, Collector<Long> out) throws Exception {
  18.   Long sum = 0L;
  19.   for (String num : value.split(",")) {
  20.   sum += Long.valueOf(num);
  21.   }
  22.   out.collect(sum);
  23.   }
  24.   }
  25.   @Test
  26.   public void testSum() throws Exception {
  27.   IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
  28.   Collector<Long> collector = mock(Collector.class);
  29.   incrementer.flatMap("1,2,3,4,5", collector);
  30.   Mockito.verify(collector, times(1)).collect(15L);
  31.   }
  32.   }

  2、对有状态或及时 UDF 和自定义算子进行单元测试

  对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  ·OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)

  · KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)

  · TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)

  · KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

  要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。

  1)、DataStream API 测试依赖

  如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:

  1. <dependency>
  2.    <groupId>org.apache.flink</groupId>
  3.    <artifactId>flink-test-utils</artifactId>
  4.    <version>1.17.2</version>
  5.    <scope>test</scope>
  6.   </dependency>

 

 在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

  2)、Table API 测试依赖

  如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:

  1.  <dependency>
  2.    <groupId>org.apache.flink</groupId>
  3.    <artifactId>flink-table-test-utils</artifactId>
  4.    <version>1.17.2</version>
  5.    <scope>test</scope>
  6.   </dependency>

这将自动引入查询计划器和运行时,分别用于计划和执行查询。

  flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。

  3)、flatmap function 单元测试

  现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

  示例如下:

  1. /*
  2.    * @Author: alanchan
  3.    * @LastEditors: alanchan
  4.    * @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
  5.    */
  6.   import java.util.concurrent.ConcurrentLinkedQueue;
  7.   import org.apache.flink.api.common.functions.FlatMapFunction;
  8.   import org.apache.flink.streaming.api.operators.StreamFlatMap;
  9.   import org.apache.flink.streaming.api.watermark.Watermark;
  10.   import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  11.   import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  12.   import org.apache.flink.streaming.util.TestHarnessUtil;
  13.   import org.apache.flink.util.Collector;
  14.   import org.junit.Before;
  15.   import org.junit.Test;
  16.   public class TestStatefulFlatMapDemo3 {
  17.    static class AlanFlatMapFunction implements FlatMapFunction<Integer, Integer> {
  18.    @Override
  19.    public void flatMap(Integer value, Collector<Integer> out) throws Exception {
  20.    if (value % 2 == 0) {
  21.    out.collect(value);
  22.    out.collect(value * value);
  23.    }
  24.    }
  25.    }
  26.    OneInputStreamOperatorTestHarness<Integer, Integer> testHarness;
  27.    @Before
  28.    public void setupTestHarness() throws Exception {
  29.    StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new AlanFlatMapFunction());
  30.    testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
  31.    testHarness.open();
  32.    }
  33.    @Test
  34.    public void testFlatMap2() throws Exception {
  35.    long initialTime = 0L;
  36.    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
  37.    testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
  38.    testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
  39.    testHarness.processWatermark(new Watermark(initialTime + 2));
  40.    testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
  41.    testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
  42.    testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
  43.    testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
  44.    testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
  45.    testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
  46.    expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
  47.    expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
  48.    expectedOutput.add(new Watermark(initialTime + 2));
  49.    expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
  50.    expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
  51.    expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
  52.    expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
  53.    expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
  54.    expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
  55.    TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());
  56.    }
  57.   }

 KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。

  示例如下:

  1.  /*
  2.    * @Author: alanchan
  3.    * @LastEditors: alanchan
  4.    * @Description: 按照城市分类,并将城市缩写变成大写
  5.    */
  6.   import com.google.common.collect.Lists;
  7.   import org.apache.flink.api.common.functions.RichFlatMapFunction;
  8.   import org.apache.flink.api.common.state.ValueState;
  9.   import org.apache.flink.api.common.state.ValueStateDescriptor;
  10.   import org.apache.flink.api.common.typeinfo.Types;
  11.   import org.apache.flink.api.java.functions.KeySelector;
  12.   import org.apache.flink.configuration.Configuration;
  13.   import org.apache.flink.streaming.api.operators.StreamFlatMap;
  14.   import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  15.   import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
  16.   import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  17.   import org.apache.flink.util.Collector;
  18.   import org.junit.Assert;
  19.   import org.junit.Before;
  20.   import org.junit.Test;
  21.   import lombok.AllArgsConstructor;
  22.   import lombok.Data;
  23.   import lombok.NoArgsConstructor;
  24.   public class TestStatefulFlatMapDemo2 {
  25.    @Data
  26.    @NoArgsConstructor
  27.    @AllArgsConstructor
  28.    static class User {
  29.    private int id;
  30.    private String name;
  31.    private int age;
  32.    private String city;
  33.    }
  34.    static class AlanFlatMapFunction extends RichFlatMapFunction<User, User> {
  35.    // The state is only accessible by functions applied on a {@code KeyedStream}
  36.    ValueState<User> previousInput;
  37.    @Override
  38.    public void open(Configuration parameters) throws Exception {
  39.    super.open(parameters);
  40.    previousInput = getRuntimeContext()
  41.    .getState(new ValueStateDescriptor<User>("previousInput", User.class));
  42.    }
  43.    @Override
  44.    public void flatMap(User input, Collector<User> out) throws Exception {
  45.    previousInput.update(input);
  46.    input.setCity(input.getCity().toUpperCase());
  47.    out.collect(input);
  48.    }
  49.    }
  50.    AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
  51.    OneInputStreamOperatorTestHarness<User, User> testHarness;
  52.    @Before
  53.    public void setupTestHarness() throws Exception {
  54.    alanFlatMapFunction = new AlanFlatMapFunction();
  55.    testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
  56.    new KeySelector<User, String>() {
  57.    @Override
  58.    public String getKey(User value) throws Exception {
  59.    return value.getCity();
  60.    }
  61.    }, Types.STRING);
  62.   
  63.    testHarness.open();
  64.    }
  65.    @Test
  66.    public void testFlatMap() throws Exception {
  67.    testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);
  68.    ValueState<User> previousInput = alanFlatMapFunction.getRuntimeContext().getState(
  69.    new ValueStateDescriptor<>("previousInput", User.class));
  70.    User stateValue = previousInput.value();
  71.    Assert.assertEquals(
  72.    Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
  73.    testHarness.extractOutputStreamRecords());
  74.    Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);
  75.    testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
  76.    Assert.assertEquals(
  77.    Lists.newArrayList(
  78.    new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
  79.    new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
  80.    testHarness.extractOutputStreamRecords());
  81.    Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());
  82.    }
  83.   }

 4)、Process Function 单元测试

  除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。

  ·OneInputStreamOperatorTestHarness示例

  1. import com.google.common.collect.Lists;
  2.   import org.apache.flink.api.common.typeinfo.Types;
  3.   import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  4.   import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
  5.   import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  6.   import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
  7.   import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  8.   import org.apache.flink.util.Collector;
  9.   import org.junit.Assert;
  10.   import org.junit.Before;
  11.   import org.junit.Test;
  12.   /*
  13.    * @Author: alanchan
  14.    * @LastEditors: alanchan
  15.    * @Description:
  16.    */
  17.   public class TestProcessOperatorDemo1 {
  18.    // public abstract class KeyedProcessFunction<K, I, O>
  19.    static class AlanProcessFunction extends KeyedProcessFunction<String, String, String> {
  20.    @Override
  21.    public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
  22.    Collector<String> out) throws Exception {
  23.    ctx.timerService().registerProcessingTimeTimer(50);
  24.    out.collect("vx->" + value);
  25.    }
  26.    @Override
  27.    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  28.    // 到达时间点触发事件操作
  29.    out.collect(String.format("定时器在 %d 被触发", timestamp));
  30.    }
  31.    }
  32.    private OneInputStreamOperatorTestHarness<String, String> testHarness;
  33.    private AlanProcessFunction processFunction;
  34.    @Before
  35.    public void setupTestHarness() throws Exception {
  36.    processFunction = new AlanProcessFunction();
  37.    testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
  38.    new KeyedProcessOperator<>(processFunction),
  39.    x -> "1",
  40.    Types.STRING);
  41.    // Function time is initialized to 0
  42.    testHarness.open();
  43.    }
  44.    @Test
  45.    public void testProcessElement() throws Exception {
  46.    testHarness.processElement("alanchanchn", 10);
  47.    Assert.assertEquals(
  48.    Lists.newArrayList(
  49.    new StreamRecord<>("vx->alanchanchn", 10)),
  50.    testHarness.extractOutputStreamRecords());
  51.    }
  52.    @Test
  53.    public void testOnTimer() throws Exception {
  54.    // test first record
  55.    testHarness.processElement("alanchanchn", 10);
  56.    Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
  57.    // Function time 设置为 100
  58.    testHarness.setProcessingTime(100);
  59.    Assert.assertEquals(
  60.    Lists.newArrayList(
  61.    new StreamRecord<>("vx->alanchanchn", 10),
  62.    new StreamRecord<>("定时器在 100 被触发")),
  63.    testHarness.extractOutputStreamRecords());
  64.    }
  65.   }

·ProcessFunctionTestHarnesses示例:

  本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。

  1.  import java.util.Arrays;
  2.   import java.util.Collections;
  3.   import org.apache.flink.api.common.state.MapStateDescriptor;
  4.   import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
  5.   import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6.   import org.apache.flink.api.common.typeinfo.TypeInformation;
  7.   import org.apache.flink.api.java.functions.KeySelector;
  8.   import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  9.   import org.apache.flink.streaming.api.functions.ProcessFunction;
  10.   import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
  11.   import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
  12.   import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
  13.   import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;
  14.   import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
  15.   import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  16.   import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
  17.   import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
  18.   import org.apache.flink.util.Collector;
  19.   import org.junit.Assert;
  20.   import org.junit.Test;
  21.   import lombok.AllArgsConstructor;
  22.   import lombok.Data;
  23.   import lombok.NoArgsConstructor;
  24.   /*
  25.    * @Author: alanchan
  26.    *
  27.    * @LastEditors: alanchan
  28.    *
  29.    * @Description:
  30.    */
  31.   public class TestProcessOperatorDemo3 {
  32.    @Data
  33.    @NoArgsConstructor
  34.    @AllArgsConstructor
  35.    static class User {
  36.    private int id;
  37.    private String name;
  38.    private int age;
  39.    private String city;
  40.    }
  41.    // 测试ProcessFunction 的 processElement
  42.    @Test
  43.    public void testProcessFunction() throws Exception {
  44.    // public abstract class ProcessFunction<I, O>
  45.    ProcessFunction<String, String> function = new ProcessFunction<String, String>() {
  46.    @Override
  47.    public void processElement(
  48.    String value, Context ctx, Collector<String> out) throws Exception {
  49.    out.collect("vx->" + value);
  50.    }
  51.    };
  52.    OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
  53.    .forProcessFunction(function);
  54.    harness.processElement("alanchanchn", 10);
  55.    Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));
  56.    }
  57.    // 测试KeyedProcessFunction 的 processElement
  58.    @Test
  59.    public void testKeyedProcessFunction() throws Exception {
  60.    // public abstract class KeyedProcessFunction<K, I, O>
  61.    KeyedProcessFunction<String, String, String> function = new KeyedProcessFunction<String, String, String>() {
  62.    @Override
  63.    public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
  64.    Collector<String> out) throws Exception {
  65.    out.collect("vx->" + value);
  66.    }
  67.    };
  68.    OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
  69.    .forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);
  70.    harness.processElement("alanchan", 10);
  71.    Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
  72.    }
  73.    // 测试CoProcessFunction 的 processElement1、processElement2
  74.    @Test
  75.    public void testCoProcessFunction() throws Exception {
  76.    // public abstract class CoProcessFunction<IN1, IN2, OUT>
  77.    CoProcessFunction<String, User, User> function = new CoProcessFunction<String, User, User>() {
  78.    @Override
  79.    public void processElement1(String value, CoProcessFunction<String, User, User>.Context ctx,
  80.    Collector<User> out) throws Exception {
  81.    String[] userStr = value.split(",");
  82.    out.collect(
  83.    new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
  84.    }
  85.    @Override
  86.    public void processElement2(User value, CoProcessFunction<String, User, User>.Context ctx,
  87.    Collector<User> out) throws Exception {
  88.    out.collect(value);
  89.    }
  90.    };
  91.    TwoInputStreamOperatorTestHarness<String, User, User> harness = ProcessFunctionTestHarnesses
  92.    .forCoProcessFunction(function);
  93.    harness.processElement2(new User(2, "alan", 19, "bj"), 100);
  94.    harness.processElement1("1,alanchan,18,sh", 10);
  95.    Assert.assertEquals(harness.extractOutputValues(),
  96.    Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
  97.    }
  98.    // 测试KeyedCoProcessFunction 的 processElement1和processElement2
  99.    @Test
  100.    public void testKeyedCoProcessFunction() throws Exception {
  101.    // public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT>
  102.    KeyedCoProcessFunction<String, String, User, User> function = new KeyedCoProcessFunction<String, String, User, User>() {
  103.    @Override
  104.    public void processElement1(String value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
  105.    Collector<User> out) throws Exception {
  106.    String[] userStr = value.split(",");
  107.    out.collect(
  108.    new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
  109.    }
  110.    @Override
  111.    public void processElement2(User value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
  112.    Collector<User> out) throws Exception {
  113.    out.collect(value);
  114.    }
  115.    };
  116.    // public static <K,IN1,IN2,OUT>
  117.    // KeyedTwoInputStreamOperatorTestHarness<K,IN1,IN2,OUT>
  118.    // forKeyedCoProcessFunction(
  119.    // KeyedCoProcessFunction<K,IN1,IN2,OUT> function,
  120.    // KeySelector<IN1,K> keySelector1,
  121.    // KeySelector<IN2,K> keySelector2,
  122.    // TypeInformation<K> keyType)
  123.    KeyedTwoInputStreamOperatorTestHarness<String, String, User, User> harness = ProcessFunctionTestHarnesses
  124.    .forKeyedCoProcessFunction(function, new KeySelector<String, String>() {
  125.    @Override
  126.    public String getKey(String value) throws Exception {
  127.    return value.split(",")[3];
  128.    }
  129.    }, new KeySelector<User, String>() {
  130.    @Override
  131.    public String getKey(User value) throws Exception {
  132.    return value.getCity();
  133.    }
  134.    }, TypeInformation.of(String.class));
  135.    harness.processElement2(new User(2, "alan", 19, "bj"), 100);
  136.    harness.processElement1("1,alanchan,18,sh", 10);
  137.    Assert.assertEquals(harness.extractOutputValues(),
  138.    Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
  139.    }
  140.    // 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement
  141.    @Test
  142.    public void testBroadcastOperator() throws Exception {
  143.    // 定义广播
  144.    // 数据格式:
  145.    // sh,上海
  146.    // bj,北京
  147.    // public class MapStateDescriptor<UK, UV>
  148.    MapStateDescriptor<String, String> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
  149.    String.class,
  150.    String.class);
  151.    // public abstract class BroadcastProcessFunction<IN1, IN2, OUT>
  152.    // * @param <IN1> The input type of the non-broadcast side.
  153.    // * @param <IN2> The input type of the broadcast side.
  154.    // * @param <OUT> The output type of the operator.
  155.    BroadcastProcessFunction<User, String, User> function = new BroadcastProcessFunction<User, String, User>() {
  156.    // 负责处理广播流的元素
  157.    @Override
  158.    public void processBroadcastElement(String value, BroadcastProcessFunction<User, String, User>.Context ctx,
  159.    Collector<User> out) throws Exception {
  160.    System.out.println("收到广播数据:" + value);
  161.    // 得到广播流的存储状态
  162.    ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);
  163.    }
  164.    // 处理非广播流,关联维度
  165.    @Override
  166.    public void processElement(User value, BroadcastProcessFunction<User, String, User>.ReadOnlyContext ctx,
  167.    Collector<User> out) throws Exception {
  168.    // 得到广播流的存储状态
  169.    ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastDesc);
  170.    value.setCity(state.get(value.getCity()));
  171.    out.collect(value);
  172.    }
  173.    };
  174.    BroadcastOperatorTestHarness<User, String, User> harness = ProcessFunctionTestHarnesses
  175.    .forBroadcastProcessFunction(function, broadcastDesc);
  176.    harness.processBroadcastElement("sh,上海", 10);
  177.    harness.processBroadcastElement("bj,北京", 20);
  178.    harness.processElement(new User(2, "alan", 19, "bj"), 10);
  179.    harness.processElement(new User(1, "alanchan", 18, "sh"), 30);
  180.    Assert.assertEquals(harness.extractOutputValues(),
  181.    Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));
  182.    }
  183.   }

 三、测试 Flink 作业

  1、JUnit 规则 MiniClusterWithClientResource

  Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

  要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

  1. <dependency>
  2.    <groupId>org.apache.flink</groupId>
  3.    <artifactId>flink-test-utils</artifactId>
  4.    <version>1.17.2</version>
  5.    <scope>test</scope>
  6.   </dependency>

 让我们采用与前面几节相同的简单 MapFunction来做示例。

  1.  /*
  2.    * @Author: alanchan
  3.    * @LastEditors: alanchan
  4.    * @Description:
  5.    */
  6.   package com.win;
  7.   import static org.junit.Assert.assertFalse;
  8.   import static org.junit.Assert.assertTrue;
  9.   import java.util.ArrayList;
  10.   import java.util.Arrays;
  11.   import java.util.Collections;
  12.   import java.util.List;
  13.   import org.apache.flink.api.common.functions.MapFunction;
  14.   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15.   import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  16.   import org.apache.flink.test.util.MiniClusterResourceConfiguration;
  17.   import org.apache.flink.test.util.MiniClusterWithClientResource;
  18.   import org.junit.ClassRule;
  19.   import org.junit.Test;
  20.   public class TestExampleIntegrationDemo {
  21.    static class AlanIncrementMapFunction implements MapFunction<Long, Long> {
  22.    @Override
  23.    public Long map(Long record) throws Exception {
  24.    return record + 1;
  25.    }
  26.    }
  27.    @ClassRule
  28.    public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(
  29.    new MiniClusterResourceConfiguration.Builder()
  30.    .setNumberSlotsPerTaskManager(2)
  31.    .setNumberTaskManagers(1)
  32.    .build());
  33.    @Test
  34.    public void testIncrementPipeline() throws Exception {
  35.    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  36.    // configure your test environment
  37.    env.setParallelism(2);
  38.    // values are collected in a static variable
  39.    CollectSink.values.clear();
  40.    // create a stream of custom elements and apply transformations
  41.    env.fromElements(1L, 21L, 22L)
  42.    .map(new AlanIncrementMapFunction())
  43.    .addSink(new CollectSink());
  44.    // execute
  45.    env.execute();
  46.    // verify your results
  47.    assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L)));
  48.    }
  49.    // create a testing sink
  50.    private static class CollectSink implements SinkFunction<Long> {
  51.    // must be static
  52.    public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
  53.    @Override
  54.    public void invoke(Long value, SinkFunction.Context context) throws Exception {
  55.    values.add(value);
  56.    }
  57.    }
  58.   }

 关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:

  ·为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。

  · 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。

  · 如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。

  · 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。

  · 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。

  · 如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。

  以上,本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

 感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

 

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取 

 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/900862
推荐阅读
相关标签
  

闽ICP备14008679号