当前位置:   article > 正文

Flink流式框架过程问题

Flink流式框架过程问题

注:博主使用的版本就是:<flink.version>1.16.1</flink.version>

前提环境:

因公司业务需要,使用Flink对数据进行流式处理,具体处理流程就是,从kafka接到数据,然后连续请求十多个接口(算法)对数据进行打标;

主程序:
在这里插入图片描述
具体的异步IO代码(随便找一个展示):

package com.wenge.datagroup.storage.process;

import com.alibaba.fastjson.JSONObject;
import com.wenge.datagroup.storage.bean.ParamConfig;
import com.wenge.datagroup.storage.common.ArgsConstants;
import com.wenge.datagroup.storage.process.base.BaseETL;
import com.wenge.datagroup.storage.service.YaYiService.YaYiPolarityService;
import com.wenge.datagroup.storage.utils.ConfigUtil;
import com.wenge.datagroup.storage.utils.Funnel;
import com.wenge.datagroup.storage.utils.YaYiUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@Slf4j
public class AnalyerAsyncIOProcessPolarity {

    public static DataStream<JSONObject> process(DataStream<JSONObject> dataStream) {

        log.error("----------------------------开始异步IO处理----------------");
        String topic = Funnel.contains(ArgsConstants.TOPIC) ? Funnel.getString(ArgsConstants.TOPIC) : "";
        String configFile = Funnel.contains(ArgsConstants.CONFIG) ? Funnel.getString(ArgsConstants.CONFIG) : "config.properties";
        int asyncNum = Funnel.contains(ArgsConstants.ASYNC_NUM) ? Funnel.getInt(ArgsConstants.ASYNC_NUM) : ConfigUtil.getInteger(ArgsConstants.ASYNC_NUM);
        int mapParallelism = Funnel.contains(ArgsConstants.MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.MAP_PARALLELISM);
        int filterParallelism = Funnel.contains(ArgsConstants.FILTER_PARALLELISM) ? Funnel.getInt(ArgsConstants.FILTER_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.FILTER_PARALLELISM);
        int TranslateParallelism = (Funnel.contains(ArgsConstants.Translate_MAP_PARALLELISM) ? Funnel.getInt(ArgsConstants.Translate_MAP_PARALLELISM) : ConfigUtil.getInteger(ArgsConstants.Translate_MAP_PARALLELISM));

        // 异步IO
        RichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {
            private transient ExecutorService executorService;
            private ParamConfig paramConfig;
            private YaYiUtil yaYiUtil;

            @Override
            public void open(Configuration parameters) {
                // 重新加载配置文件
                log.error("重新加载配置文件");
                ConfigUtil.setConfigFile(configFile);
                ConfigUtil.setTopic(topic);
                ConfigUtil.init();
                this.executorService = Executors.newFixedThreadPool(asyncNum);
                paramConfig = new ParamConfig(ConfigUtil.getString("YaYiappKey"), ConfigUtil.getString("YaYiappSecret"));
                yaYiUtil = new YaYiUtil(paramConfig);
            }

            @Override
            public void close() throws Exception {
                // 关闭线程池
                if (executorService != null) {
                    executorService.shutdown();
                }
                log.error("----------------------------情感分析-线程池关闭----------------------");
            }

            @Override
            public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {
                JSONObject data = input;
                String uuid = data.getString("uuid");
                log.error("-----------------------数据超时----------------------:{}", uuid);
                //对超时数据进行处理
                resultFuture.complete(Collections.singleton(data));
            }

            @Override
            public void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {

                CompletableFuture.supplyAsync(new Supplier<JSONObject>() {

                    @Override
                    public JSONObject get() {
                        String uuid = json.getString("uuid");
                        long start =System.currentTimeMillis();
                        try {
                            //TODO: 根据业务逻辑进行处理
                            String title = json.getString("title");
                            String content = json.getString("content");
                            String translate_title = json.getString("translate_title");
                            String translate_content = json.getString("translate_content");
                            String languageRecognition = json.getJSONObject("analysis").getString("language");
                            String dataSourceType = json.getJSONObject("platform").getString("data_source_type");

                            
                            if (StringUtils.isNotBlank(translate_content)) {
                                String polarity = new String();
                                Integer polaritySum = 0;
								//具体算法调用
                                YaYiPolarityService yaYiPolarityService = new YaYiPolarityService();
                                polarity = yaYiPolarityService.yaYiPolarity(translate_content);
                                if (StringUtils.isNotBlank(polarity)) {
                                    polaritySum = StringUtils.equals(polarity, "A") ? 0 : StringUtils.equals(polarity, "B") ? 1 : 2;
                                    JSONObject analysis = json.getJSONObject("analysis");
                                    if (Objects.nonNull(analysis)) {
                                        analysis.put("polarity", polaritySum);
                                        json.put("analysis", analysis);
                                    } else {
                                        JSONObject analysisJson = new JSONObject();
                                        analysisJson.put("polarity", polaritySum);
                                        json.put("analysis", analysisJson);
                                    }
                                }
                                log.error("uuid:{},分析后数据:{}", uuid, polarity);
                                long end =System.currentTimeMillis();
                                log.error("uuid:{},分析,耗时:{} ms", uuid,(end-start));
                            }
                            return json;
                        } catch (Exception e) {
                            log.error("--------分析异常:{},数据:{}",uuid, e);
                            return json;
                        }
                    }
                }, executorService).thenAccept((JSONObject dbResult) -> {
                    resultFuture.complete(Collections.singleton(dbResult));
                });
            }
        };
        DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(
                dataStream,
                richAsyncFunction,
                50000,
                TimeUnit.MILLISECONDS,
                asyncNum).name("qinggan").setParallelism(TranslateParallelism);

        return downloadStream;
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140

问题

1:异步io访问接口直接引发关闭

在这里插入图片描述
在这里插入图片描述

解决方案:

方法中传递参数不要使用 Set

具体的原因我没深究,只是经过验证在异步IO中使用如下就会导致线程关闭:
在这里插入图片描述
改成如下就行:
在这里插入图片描述

2:数据格式问题直接引发关闭

在这里插入图片描述
解决方案:
在这里插入图片描述
在这里插入图片描述

整体来说,flink中如果数据格式传输导致错误,就会引发线程关闭,
所以 DataStream 改为DataStream 一定要通过map和filter 筛选

3:flink 消费kafka不提交offset

虽然在new FlinkKafkaConsumer<> 中设置了自动提交间隔,如下:

 properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//自动提交的时间间隔
 properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "20000");//自动提交的时间间隔
  • 1
  • 2

但是在实际应用过程中发现,到了设置的20000ms既20s,依然不提交offset,以为flink读取kafka失败。
实际上是因为我们再代码中开启了Checkpoint,就会覆盖kafka的配置,所以是经过规定的Checkpoint时间后才会提交offset
在这里插入图片描述

4:flink本地运行,发现默认并行度为6

当我们在本地中可以通过一下开启本地webui模式

// 使用本地模式并开启WebUI
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8083);
streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
  • 1
  • 2
  • 3
  • 4

当时我们本地打开web UI发现,默认的并行度是6
在这里插入图片描述
这样有时候影响我们异步调用外部接口的qps设置,

解决方案:
在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数(巨坑!)
所以我们需要自己手动指定每个算子的并行度,不要使用默认的,可以通过.setParallelism(2)来指定某一个算子的并行度

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/937852
推荐阅读
相关标签
  

闽ICP备14008679号