Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。
- <properties>
- <encoding>UTF-8</encoding>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <java.version>1.8</java.version>
- <scala.version>2.12</scala.version>
- <flink.version>1.17.0</flink.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-csv</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.13</version>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>4.0.0</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。
1、单元测试无状态、无时间限制的 UDF
以下无状态的 MapFunction 为例:
- public class IncrementMapFunction implements MapFunction<Long, Long> {
- @Override
- public Long map(Long record) throws Exception {
- return record + 1;
- }
- }
- import static org.junit.Assert.assertEquals;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.junit.Test;
- /**
- * @author alanchan
- *
- */
- public class TestDemo {
- public class IncrementMapFunction implements MapFunction<Long, Long> {
- @Override
- public Long map(Long record) throws Exception {
- return record + 1;
- }
- }
- @Test
- public void testIncrement() throws Exception {
- IncrementMapFunction incrementer = new IncrementMapFunction();
- assertEquals((Long) 3L, incrementer.map(2L));
- }
- }
对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。
- import static org.mockito.Mockito.mock;
- import static org.mockito.Mockito.times;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.util.Collector;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.mockito.Mockito;
- import org.mockito.junit.MockitoJUnitRunner;
- /**
- * @author alanchan
- *
- */
- @RunWith(MockitoJUnitRunner.class)
- public class TestDemo2 {
- public static class IncrementFlatMapFunction implements FlatMapFunction<String, Long> {
- @Override
- public void flatMap(String value, Collector<Long> out) throws Exception {
- Long sum = 0L;
- for (String num : value.split(",")) {
- sum += Long.valueOf(num);
- }
- out.collect(sum);
- }
- }
- @Test
- public void testSum() throws Exception {
- IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
- Collector<Long> collector = mock(Collector.class);
- incrementer.flatMap("1,2,3,4,5", collector);
- Mockito.verify(collector, times(1)).collect(15L);
- }
- }
2、对有状态或及时 UDF 和自定义算子进行单元测试
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
·OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
· KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
· TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
· KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)
1)、DataStream API 测试依赖
如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>1.17.2</version>
- <scope>test</scope>
- </dependency>
在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。
2)、Table API 测试依赖
如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-test-utils</artifactId>
- <version>1.17.2</version>
- <scope>test</scope>
- </dependency>
flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。
3)、flatmap function 单元测试
现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。
- /*
- * @Author: alanchan
- * @LastEditors: alanchan
- * @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
- */
- import java.util.concurrent.ConcurrentLinkedQueue;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.streaming.api.operators.StreamFlatMap;
- import org.apache.flink.streaming.api.watermark.Watermark;
- import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
- import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
- import org.apache.flink.streaming.util.TestHarnessUtil;
- import org.apache.flink.util.Collector;
- import org.junit.Before;
- import org.junit.Test;
- public class TestStatefulFlatMapDemo3 {
- static class AlanFlatMapFunction implements FlatMapFunction<Integer, Integer> {
- @Override
- public void flatMap(Integer value, Collector<Integer> out) throws Exception {
- if (value % 2 == 0) {
- out.collect(value);
- out.collect(value * value);
- }
- }
- }
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness;
- @Before
- public void setupTestHarness() throws Exception {
- StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new AlanFlatMapFunction());
- testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
- testHarness.open();
- }
- @Test
- public void testFlatMap2() throws Exception {
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
- testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
- testHarness.processWatermark(new Watermark(initialTime + 2));
- testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
- testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
- testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
- testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
- testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
- testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
- expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
- expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
- expectedOutput.add(new Watermark(initialTime + 2));
- expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
- expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
- expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
- expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
- expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
- expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
- TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());
- }
- }
KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。
- /*
- * @Author: alanchan
- * @LastEditors: alanchan
- * @Description: 按照城市分类,并将城市缩写变成大写
- */
- import com.google.common.collect.Lists;
- import org.apache.flink.api.common.functions.RichFlatMapFunction;
- import org.apache.flink.api.common.state.ValueState;
- import org.apache.flink.api.common.state.ValueStateDescriptor;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.operators.StreamFlatMap;
- import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
- import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
- import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
- import org.apache.flink.util.Collector;
- import org.junit.Assert;
- import org.junit.Before;
- import org.junit.Test;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- public class TestStatefulFlatMapDemo2 {
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- static class User {
- private int id;
- private String name;
- private int age;
- private String city;
- }
- static class AlanFlatMapFunction extends RichFlatMapFunction<User, User> {
- // The state is only accessible by functions applied on a {@code KeyedStream}
- ValueState<User> previousInput;
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- previousInput = getRuntimeContext()
- .getState(new ValueStateDescriptor<User>("previousInput", User.class));
- }
- @Override
- public void flatMap(User input, Collector<User> out) throws Exception {
- previousInput.update(input);
- input.setCity(input.getCity().toUpperCase());
- out.collect(input);
- }
- }
- AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
- OneInputStreamOperatorTestHarness<User, User> testHarness;
- @Before
- public void setupTestHarness() throws Exception {
- alanFlatMapFunction = new AlanFlatMapFunction();
- testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
- new KeySelector<User, String>() {
- @Override
- public String getKey(User value) throws Exception {
- return value.getCity();
- }
- }, Types.STRING);
- testHarness.open();
- }
- @Test
- public void testFlatMap() throws Exception {
- testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);
- ValueState<User> previousInput = alanFlatMapFunction.getRuntimeContext().getState(
- new ValueStateDescriptor<>("previousInput", User.class));
- User stateValue = previousInput.value();
- Assert.assertEquals(
- Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
- testHarness.extractOutputStreamRecords());
- Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);
- testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
- Assert.assertEquals(
- Lists.newArrayList(
- new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
- new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
- testHarness.extractOutputStreamRecords());
- Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());
- }
- }
4)、Process Function 单元测试
除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。
- import com.google.common.collect.Lists;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
- import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
- import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
- import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
- import org.apache.flink.util.Collector;
- import org.junit.Assert;
- import org.junit.Before;
- import org.junit.Test;
- /*
- * @Author: alanchan
- * @LastEditors: alanchan
- * @Description:
- */
- public class TestProcessOperatorDemo1 {
- // public abstract class KeyedProcessFunction<K, I, O>
- static class AlanProcessFunction extends KeyedProcessFunction<String, String, String> {
- @Override
- public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
- Collector<String> out) throws Exception {
- ctx.timerService().registerProcessingTimeTimer(50);
- out.collect("vx->" + value);
- }
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
- // 到达时间点触发事件操作
- out.collect(String.format("定时器在 %d 被触发", timestamp));
- }
- }
- private OneInputStreamOperatorTestHarness<String, String> testHarness;
- private AlanProcessFunction processFunction;
- @Before
- public void setupTestHarness() throws Exception {
- processFunction = new AlanProcessFunction();
- testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
- new KeyedProcessOperator<>(processFunction),
- x -> "1",
- Types.STRING);
- // Function time is initialized to 0
- testHarness.open();
- }
- @Test
- public void testProcessElement() throws Exception {
- testHarness.processElement("alanchanchn", 10);
- Assert.assertEquals(
- Lists.newArrayList(
- new StreamRecord<>("vx->alanchanchn", 10)),
- testHarness.extractOutputStreamRecords());
- }
- @Test
- public void testOnTimer() throws Exception {
- // test first record
- testHarness.processElement("alanchanchn", 10);
- Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
- // Function time 设置为 100
- testHarness.setProcessingTime(100);
- Assert.assertEquals(
- Lists.newArrayList(
- new StreamRecord<>("vx->alanchanchn", 10),
- new StreamRecord<>("定时器在 100 被触发")),
- testHarness.extractOutputStreamRecords());
- }
- }
- import java.util.Arrays;
- import java.util.Collections;
- import org.apache.flink.api.common.state.MapStateDescriptor;
- import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
- import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
- import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
- import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;
- import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
- import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
- import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
- import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
- import org.apache.flink.util.Collector;
- import org.junit.Assert;
- import org.junit.Test;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- /*
- * @Author: alanchan
- *
- * @LastEditors: alanchan
- *
- * @Description:
- */
- public class TestProcessOperatorDemo3 {
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- static class User {
- private int id;
- private String name;
- private int age;
- private String city;
- }
- // 测试ProcessFunction 的 processElement
- @Test
- public void testProcessFunction() throws Exception {
- // public abstract class ProcessFunction<I, O>
- ProcessFunction<String, String> function = new ProcessFunction<String, String>() {
- @Override
- public void processElement(
- String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("vx->" + value);
- }
- };
- OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
- .forProcessFunction(function);
- harness.processElement("alanchanchn", 10);
- Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));
- }
- // 测试KeyedProcessFunction 的 processElement
- @Test
- public void testKeyedProcessFunction() throws Exception {
- // public abstract class KeyedProcessFunction<K, I, O>
- KeyedProcessFunction<String, String, String> function = new KeyedProcessFunction<String, String, String>() {
- @Override
- public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
- Collector<String> out) throws Exception {
- out.collect("vx->" + value);
- }
- };
- OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
- .forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);
- harness.processElement("alanchan", 10);
- Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
- }
- // 测试CoProcessFunction 的 processElement1、processElement2
- @Test
- public void testCoProcessFunction() throws Exception {
- // public abstract class CoProcessFunction<IN1, IN2, OUT>
- CoProcessFunction<String, User, User> function = new CoProcessFunction<String, User, User>() {
- @Override
- public void processElement1(String value, CoProcessFunction<String, User, User>.Context ctx,
- Collector<User> out) throws Exception {
- String[] userStr = value.split(",");
- out.collect(
- new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
- }
- @Override
- public void processElement2(User value, CoProcessFunction<String, User, User>.Context ctx,
- Collector<User> out) throws Exception {
- out.collect(value);
- }
- };
- TwoInputStreamOperatorTestHarness<String, User, User> harness = ProcessFunctionTestHarnesses
- .forCoProcessFunction(function);
- harness.processElement2(new User(2, "alan", 19, "bj"), 100);
- harness.processElement1("1,alanchan,18,sh", 10);
- Assert.assertEquals(harness.extractOutputValues(),
- Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
- }
- // 测试KeyedCoProcessFunction 的 processElement1和processElement2
- @Test
- public void testKeyedCoProcessFunction() throws Exception {
- // public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT>
- KeyedCoProcessFunction<String, String, User, User> function = new KeyedCoProcessFunction<String, String, User, User>() {
- @Override
- public void processElement1(String value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
- Collector<User> out) throws Exception {
- String[] userStr = value.split(",");
- out.collect(
- new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
- }
- @Override
- public void processElement2(User value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
- Collector<User> out) throws Exception {
- out.collect(value);
- }
- };
- // public static <K,IN1,IN2,OUT>
- // KeyedTwoInputStreamOperatorTestHarness<K,IN1,IN2,OUT>
- // forKeyedCoProcessFunction(
- // KeyedCoProcessFunction<K,IN1,IN2,OUT> function,
- // KeySelector<IN1,K> keySelector1,
- // KeySelector<IN2,K> keySelector2,
- // TypeInformation<K> keyType)
- KeyedTwoInputStreamOperatorTestHarness<String, String, User, User> harness = ProcessFunctionTestHarnesses
- .forKeyedCoProcessFunction(function, new KeySelector<String, String>() {
- @Override
- public String getKey(String value) throws Exception {
- return value.split(",")[3];
- }
- }, new KeySelector<User, String>() {
- @Override
- public String getKey(User value) throws Exception {
- return value.getCity();
- }
- }, TypeInformation.of(String.class));
- harness.processElement2(new User(2, "alan", 19, "bj"), 100);
- harness.processElement1("1,alanchan,18,sh", 10);
- Assert.assertEquals(harness.extractOutputValues(),
- Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
- }
- // 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement
- @Test
- public void testBroadcastOperator() throws Exception {
- // 定义广播
- // 数据格式:
- // sh,上海
- // bj,北京
- // public class MapStateDescriptor<UK, UV>
- MapStateDescriptor<String, String> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
- String.class,
- String.class);
- // public abstract class BroadcastProcessFunction<IN1, IN2, OUT>
- // * @param <IN1> The input type of the non-broadcast side.
- // * @param <IN2> The input type of the broadcast side.
- // * @param <OUT> The output type of the operator.
- BroadcastProcessFunction<User, String, User> function = new BroadcastProcessFunction<User, String, User>() {
- // 负责处理广播流的元素
- @Override
- public void processBroadcastElement(String value, BroadcastProcessFunction<User, String, User>.Context ctx,
- Collector<User> out) throws Exception {
- System.out.println("收到广播数据:" + value);
- // 得到广播流的存储状态
- ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);
- }
- // 处理非广播流,关联维度
- @Override
- public void processElement(User value, BroadcastProcessFunction<User, String, User>.ReadOnlyContext ctx,
- Collector<User> out) throws Exception {
- // 得到广播流的存储状态
- ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastDesc);
- value.setCity(state.get(value.getCity()));
- out.collect(value);
- }
- };
- BroadcastOperatorTestHarness<User, String, User> harness = ProcessFunctionTestHarnesses
- .forBroadcastProcessFunction(function, broadcastDesc);
- harness.processBroadcastElement("sh,上海", 10);
- harness.processBroadcastElement("bj,北京", 20);
- harness.processElement(new User(2, "alan", 19, "bj"), 10);
- harness.processElement(new User(1, "alanchan", 18, "sh"), 30);
- Assert.assertEquals(harness.extractOutputValues(),
- Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));
- }
- }
三、测试 Flink 作业
1、JUnit 规则 MiniClusterWithClientResource
Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.
要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>1.17.2</version>
- <scope>test</scope>
- </dependency>
让我们采用与前面几节相同的简单 MapFunction来做示例。
- /*
- * @Author: alanchan
- * @LastEditors: alanchan
- * @Description:
- */
- package com.win;
- import static org.junit.Assert.assertFalse;
- import static org.junit.Assert.assertTrue;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.List;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import org.apache.flink.test.util.MiniClusterResourceConfiguration;
- import org.apache.flink.test.util.MiniClusterWithClientResource;
- import org.junit.ClassRule;
- import org.junit.Test;
- public class TestExampleIntegrationDemo {
- static class AlanIncrementMapFunction implements MapFunction<Long, Long> {
- @Override
- public Long map(Long record) throws Exception {
- return record + 1;
- }
- }
- @ClassRule
- public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberSlotsPerTaskManager(2)
- .setNumberTaskManagers(1)
- .build());
- @Test
- public void testIncrementPipeline() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // configure your test environment
- env.setParallelism(2);
- // values are collected in a static variable
- CollectSink.values.clear();
- // create a stream of custom elements and apply transformations
- env.fromElements(1L, 21L, 22L)
- .map(new AlanIncrementMapFunction())
- .addSink(new CollectSink());
- // execute
- env.execute();
- // verify your results
- assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L)));
- }
- // create a testing sink
- private static class CollectSink implements SinkFunction<Long> {
- // must be static
- public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
- @Override
- public void invoke(Long value, SinkFunction.Context context) throws Exception {
- values.add(value);
- }
- }
- }
关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:
·为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
· 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。
· 如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。
· 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
· 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。
· 如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。