当前位置:   article > 正文

Flink学习总结

Flink学习总结
Flink概述
  1. Apache Flink是一个计算框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:
  2. DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。
  3. DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。
  4. Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。
Flink例子学习
  1. 场景:分析商品item的点击率,并关联输出商品的名称
  2. 准备item数据UserBehavior2.csv, itemInfo.csv数据

UserBehavior2.csv

    543462,1715,1464116,pv,1587635915932
    662867,1162383,570735,pv,1587635915932
    561558,3611281,965809,pv,1587635915932
    894923,3076029,1879194,pv,1587635915932
    834377,1256540,1451783,pv,1587635915932
    315321,942195,4339722,pv,1587635915932
    625915,1162383,570735,pv,1587635915932
    578814,534083,4203730,pv,1587635915932
    873335,1256540,1451783,pv,1587635915932
    429984,4625350,2355072,pv,1587635915932
    866796,534083,4203730,pv,1587635915932
    937166,534083,4203730,pv,1587635915932
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

itemInfo.csv

1715,aeeeeeeeee
1162383,beeeeeeee
3611281,9eee
3076029,18eee
1256540,14eee
942195,43ddd
534083,42bbb
4625350,23bbb
534083,42aaa


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 我们采用Flink sql来演示例子,数据通过csv组件导入,并通过sql解析过滤,聚合并计算,最后输出。
批量例子代码
package myflink;


import lombok.Data;
import myflink.entity.ItemInfo;
import myflink.entity.UserBehavior;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import java.net.URL;


public class TableJobJoinExample {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        env.
//        env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
//        env


        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
        Table topScore = getOrder(env, tableEnv);
        //将topScore注册为一个表 ,
        tableEnv.registerTable("topScore", topScore);

        // 商品信息表
        Table itemInfo = getItemInfo(env, tableEnv);
        tableEnv.registerTable("itemInfo", itemInfo);


        //查询item点击数,及itemId的名字
        String groupSql="select a.itemId,count(*) as sum_total_score,b.desc from topScore a join itemInfo b on a.itemId=b.itemId  group by a.itemId,categoryId,b.desc order by itemId desc";
        // 按照滚动时间窗口来处理
       String tumbleWinGroupSql="select a.itemId,count(*) as sum_total_score from topScore a join itemInfo b on a.itemId=b.itemId  group by TUMBLE(a.createTime,INTERVAL '2' MINUTE), a.itemId order by itemId desc";

        Table groupedByCountry = tableEnv.sqlQuery(groupSql);

        //转换回dataset
        DataSet<Result> result = tableEnv.toDataSet(groupedByCountry, Result.class);

        //将dataset map成tuple输出
        result.map(new MapFunction<Result, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(Result result) throws Exception {
                System.out.println("=============================" + result);
                Long country = result.itemId;
                Long sum_total_score = result.sum_total_score;
                return Tuple2.of(country, sum_total_score);
            }
        }).print();

    }

    /**
     * item点击
     *
     * @param env
     * @param tableEnv
     * @return
     */
    private static Table getOrder(ExecutionEnvironment env, BatchTableEnvironment tableEnv) {
        // UserBehavior.csv 的本地文件路径, 在 resources 目录下
        URL fileUrl = HotItemsSql.class.getClassLoader().getResource("UserBehavior2.csv");

        //source,这里读取CSV文件,并转换为对应的Class
        DataSet<UserBehavior> csvInput = env
                .readCsvFile(fileUrl.getPath())

                .ignoreFirstLine().pojoType(UserBehavior.class, "userId", "itemId", "categoryId", "behavior", "createTime");

        //将DataSet转换为Table
        return tableEnv.fromDataSet(csvInput);
    }


    /**
     * itemInfo
     *
     * @param env
     * @param tableEnv
     * @return
     */
    private static Table getItemInfo(ExecutionEnvironment env, BatchTableEnvironment tableEnv) {
        // UserBehavior.csv 的本地文件路径, 在 resources 目录下
        URL fileUrl = HotItemsSql.class.getClassLoader().getResource("ItemInfo.csv");

        //source,这里读取CSV文件,并转换为对应的Class
        DataSet<ItemInfo> csvInput = env
                .readCsvFile(fileUrl.getPath())
                .ignoreFirstLine().pojoType(ItemInfo.class, "itemId", "desc");

        //将DataSet转换为Table
        return tableEnv.fromDataSet(csvInput);
    }


    /**
     * 统计结果对应的类
     */
    @Data
    public static class Result {
        //            public String country;
        public Long sum_total_score;
        //            public String userId;
        public Long itemId;
        public String desc;

        public Result() {
        }
    }
}



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  1. 基础环境变量构建
  2. 通过csvInput读取对应的csv文件转换成对应的DataSet。
  3. 执行sql,按照item分组,统计点击数。
流式例子代码

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import myflink.entity.ItemInfo;
import myflink.entity.UserBehavior;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

import java.io.File;
import java.net.URL;

/**
 *
 *
 *
 */
@Slf4j
public class HotItemsSql {

    public static void main(String[] args) {

        try {

            // 创建 execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 告诉系统按照 EventTime 处理
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 为了打印到控制台的结果不乱序,我们配置全局的并发为1,改变并发对结果正确性没有影响
            env.setParallelism(1);
//        env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
//        env.setStateBackend(new FsStateBackend("file://D:\\img\\data"));
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            Table topScore = getOrder(env, tableEnv);
            //将topScore注册为一个表 ,
            tableEnv.registerTable("topScore", topScore);
            // 商品信息表
            Table itemInfo = getItemInfo(env, tableEnv);
            tableEnv.registerTable("itemInfo", itemInfo);
            //查询item点击数,及itemId的名字
            String groupSql = "select a.itemId,count(*) as sum_total_score,b.desc from topScore a join itemInfo b on a.itemId=b.itemId  group by a.itemId,categoryId,b.desc order by itemId desc";

//            groupSql = "select a.itemId,b.desc from topScore a join itemInfo b on a.itemId=b.itemId  group by a.itemId,categoryId,b.desc ";

            // 按照滚动时间窗口来处理
            String tumbleWinGroupSql = "select a.itemId,count(*) as sum_total_score from topScore a join itemInfo b on a.itemId=b.itemId  group by TUMBLE(a.createTime,INTERVAL '2' MINUTE), a.itemId order by itemId desc";

            Table groupedByCountry = tableEnv.sqlQuery(groupSql);


            DataStream<Tuple2<Boolean, Result>> result = tableEnv.toRetractStream(groupedByCountry, Result.class);


//        //将dataset map成tuple输出
//        result.map(new MapFunction<Tuple2<Boolean, Result>, String>() {
//
//            @Override
//            public String map(Tuple2<Boolean, Result> booleanResultTuple2) throws Exception {
//                System.out.println("=============================" + result);
//                return null;
//            }
//        }).print();
//
//

            result.process(new TopNHotItems2())
                    .print();
            env.execute("Hot Items Job");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static class TopNHotItems2 extends ProcessFunction<Tuple2<Boolean, Result>, String> {


        @Override
        public void processElement(Tuple2<Boolean, Result> value, Context context, Collector<String> collector) throws Exception {
            System.out.println("0:" + value.f0 + "1:" + value.f1);
        }
    }

    /**
     * item点击
     *
     * @param env
     * @param tableEnv
     * @return
     */
    private static Table getOrder(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) throws Exception {
        // UserBehavior.csv 的本地文件路径, 在 resources 目录下
        URL fileUrl = HotItemsSql.class.getClassLoader().getResource("UserBehavior2.csv");
        Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
        //source,这里读取CSV文件,并转换为对应的Class
        // 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
        PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);
        // 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
        String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "createTime"};
        // 创建 PojoCsvInputFormat
        PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<UserBehavior>(filePath, pojoType, fieldOrder);


        DataStream<UserBehavior> dataStream = env
                // 创建数据源,得到 UserBehavior 类型的 DataStream
                .createInput(csvInput, pojoType)
                // 抽取出时间和生成 watermark
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                    @Override
                    public long extractAscendingTimestamp(UserBehavior userBehavior) {
                        // 原始数据单位秒,将其转成毫秒
                        return userBehavior.createTime;
                    }
                });

        //将DataSet转换为Table
        return tableEnv.fromDataStream(dataStream);
    }


    private static Table getItemInfo(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) throws Exception {
        // UserBehavior.csv 的本地文件路径, 在 resources 目录下
        URL fileUrl = HotItemsSql.class.getClassLoader().getResource("ItemInfo.csv");
        Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
        //source,这里读取CSV文件,并转换为对应的Class
        // 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
        PojoTypeInfo<ItemInfo> pojoType = (PojoTypeInfo<ItemInfo>) TypeExtractor.createTypeInfo(ItemInfo.class);
        // 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
        String[] fieldOrder = new String[]{"itemId", "desc"};
        // 创建 PojoCsvInputFormat
        PojoCsvInputFormat<ItemInfo> csvInput = new PojoCsvInputFormat<ItemInfo>(filePath, pojoType, fieldOrder);


        DataStream<ItemInfo> dataStream = env
                // 创建数据源,得到 UserBehavior 类型的 DataStream
                .createInput(csvInput, pojoType);
        // 抽取出时间和生成 watermark


        //将DataSet转换为Table
        return tableEnv.fromDataStream(dataStream);
    }


    /**
     * 统计结果对应的类
     */
    @Data
    public static class Result {
        //            public String country;
//        public String sum_total_score;
        //            public String userId;
        public Long itemId;
        public String desc;

        public Result() {
        }

        public Result(Long itemId, String desc) {
            this.itemId = itemId;
            this.desc = desc;
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/413138?site
推荐阅读
相关标签
  

闽ICP备14008679号