赞
踩
在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们让外部组件RabbitMQ充当了无界流的数据源,使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中,我们使用了Flink自带的数据生成器,生成了有限数据,从而让Flink以批处理形式运行了该任务。
本文我们将自定义一个无界流生成器,以方便后续测试。
我们新建一个名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator实现RichSourceFunction接口
public abstract class RichSourceFunction<OUT> extends AbstractRichFunction
implements SourceFunction<OUT> {
private static final long serialVersionUID = 1L;
}
主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取,cancel方法用于终止任务。
package org.example.generator; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; public class UnBoundedStreamGenerator extends RichSourceFunction<Long> { private volatile boolean isRunning = true; @Override public void run(SourceContext<Long> ctx) throws Exception { long count = 0L; while (isRunning) { Thread.sleep(1000); // Simulate delay ctx.collect(count++); // Emit data } } @Override public void cancel() { isRunning = false; System.out.println("UnBoundedStreamGenerator canceled"); } }
在run方法中,我们每隔一秒产生一条数据,且这个数字自增。
我们使用addSource方法,将该无界流生成器添加成数据源。然后将其输出到日志。
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.example; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.example.generator.UnBoundedStreamGenerator; /** * Skeleton for a Flink DataStream Job. * * <p>For a tutorial how to write a Flink application, check the * tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>. * * <p>To package your application into a JAR file for execution, run * 'mvn clean package' on the command line. * * <p>If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). */ public class DataStreamJob { public static void main(String[] args) throws Exception { // Sets up the execution environment, which is the main entry point // to building Flink applications. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source") .setParallelism(1) .print(); // For demonstration, print the stream to stdout // Execute program, beginning computation. env.execute("Flink Java API Skeleton"); } }
使用下面命令查看日志输出
tail -f log/*
然后我们在后台点击Cancel Job
可以看到输出
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。