当前位置:   article > 正文

Flink本地idea运行环境配置webui_flink idea webui

flink idea webui

Flink本地idea运行环境配置webui

1.添加依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>1.13.6</version>
            <scope>provided</scope>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
2. 代码如下
public class FlinkWithLocalWebui {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT, "8081"); // 设置WebUI端口为8081
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); // 创建带有WebUI的本地流执行环境
        env.setParallelism(1); // 设置并行度为1
        DataStream<Map<String, String>> stream = env.addSource(new SourceFunction<Map<String, String>>() {
                    @Override
                    public void run(SourceContext<Map<String, String>> ctx) throws Exception {
                        while (true) {
                            HashMap<String, String> hashMap = new HashMap<>();
                            hashMap.put("ID", new Random().nextInt(3) + 1 + ""); // 随机生成ID
                            hashMap.put("AMT", "1"); // 设置AMT为1
                            System.out.println("生产数据:" + hashMap); // 打印生产的数据
                            ctx.collect(hashMap); // 发射数据
                            Thread.sleep(1000); // 每隔1秒发送一次数据
                        }
                    }

                    @Override
                    public void cancel() {
                    }
                })
                // 按照ID字段进行分区
                .keyBy(new KeySelector<Map<String, String>, String>() {
                    @Override
                    public String getKey(Map<String, String> value) throws Exception {
                        return value.get("ID");
                    }
                })
                // 对AMT字段进行累加
                .reduce(new ReduceFunction<Map<String, String>>() {
                    @Override
                    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) throws Exception {
                        HashMap<String, String> hashMap = new HashMap<>();
                        hashMap.put("ID", value1.get("ID"));
                        hashMap.put("AMT", Integer.valueOf(value1.get("AMT")) + Integer.valueOf(value2.get("AMT")) + "");
                        return hashMap;
                    }
                });
        // 输出数据流
        stream.print();
        // 执行作业并指定作业名称
        env.execute("job-" + FlinkWithLocalWebui.class.getSimpleName());
    }
}
//这段代码是一个基于Apache Flink的实时数据处理程序。
//程序创建了一个带有WebUI的本地流执行环境,设置了并行度为1。
//通过自定义的SourceFunction生成随机数据流,数据包含ID和AMT字段,每秒发送一次数据。
//然后对数据流按照ID字段进行分区,并对AMT字段进行累加操作。
//最后,将处理后的数据流打印输出,并执行作业。
//整体流程是一个简单的实时数据处理流水线,用于生成、处理和输出数据流。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
3. 执行结果:
生产数据:{AMT=1, ID=3}
{AMT=1, ID=3}
生产数据:{AMT=1, ID=1}
{AMT=1, ID=1}
生产数据:{AMT=1, ID=2}
{AMT=1, ID=2}
...
生产数据:{AMT=1, ID=2}
{AMT=9, ID=2}
生产数据:{AMT=1, ID=3}
{AMT=13, ID=3}
生产数据:{AMT=1, ID=3}
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
4. 访问Webui,截图如下:

地址:http://localhost:8081/#/overview

http://localhost:8081/#/overview

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/941634
推荐阅读
相关标签
  

闽ICP备14008679号