当前位置:   article > 正文

flink java_java flink

java flink

这里写目录标题

5. Flink流处理API

5.1 Environment执行环境

创建一个执行环境,表示当前执行程序的上下文。

有三种创建方式:

  • getExecutionEnvironment():如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • createLocalEnvironment(int): 返回本地执行环境,需要在调用时指定默认的并行度LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
  • createRemoteEnvironment(String,int,String):返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,以及要在集群中运行的Jar包。StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//WordCount.jar");

5.2 Source数据源

env.fromCollection 从集合读取数据

DataStream<SensorReading> sensorDataStream = env.fromCollection(
    Arrays.asList(
        new SensorReading("sensor_1", 1547718199L, 35.8),
        new SensorReading("sensor_6", 1547718201L, 15.4),
        new SensorReading("sensor_7", 1547718202L, 6.7),
        new SensorReading("sensor_10", 1547718205L, 38.1)
    )
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

env.readTextFile 从文件读取数据

env.readTextFile("YOUR_FILE_PATH ");

kafka读取数据

env.addSource( new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));

自定义数据源

需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction

env.addSource( new SourceFunction(){xxx});

DataStream<Tuple2<Integer, Integer>> inputStream= env.addSource(new RandomFibonacciSource());


private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
    private static final long serialVersionUID = 1L;
    private Random rnd = new Random();
    private int counter = 0;

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
        while (isRunning && counter < BOUND) {
            int first = rnd.nextInt(BOUND / 2 - 1) + 1;
            int second = rnd.nextInt(BOUND / 2 - 1) + 1;
            ctx.collect(new Tuple2<>(first, second));
            counter++;
            Thread.sleep(50L);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

5.3 Transform转换算子

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hpHXBCZw-1649949748146)(C:\Users\Simon\AppData\Roaming\Typora\typora-user-images\1648291788955.png)]

——使用lambda表达式时需提供返回类型,datastream.flatMap(xxx).returns(Types.STRING)。

一 单数据流基本转换:

map

对一个DataStream中的每个元素使用自定义函数处理,每个输入对应一个输出,输入输出类型可以不同,我们可以重写MapFunction或RichMapFunction来自定义map(),或使用lambda

DataStream<Integer> mapStram = dataStream.map(new MapFunction<String, Integer>() {
    public Integer map(String value) throws Exception {
    	return value.length();
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5

flatMap

与map()类似,但每个输入可以对应多个输出

DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String,
String>() {
    public void flatMap(String value, Collector<String> out) throws Exception {
        String[] fields = value.split(",");
        for( String field: fields )
        out.collect(field);
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

filter

对每个元素进行过滤,返回False则该元素将被过滤,返回True则元素被保留。

可以使用lambda表达式或继承FilterFunction或RichFilterFunction

DataStream<Interger> filterStream = dataStream.filter(new FilterFunction<String>(){
    public boolean filter(String value) throws Exception {
    	return value == 1;
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5

二 基于key的分组转换

keyBy()

DataStream -> KeyedStream:keyBy()按照Key分组,逻辑地将一个流拆分成不相交的分区(子算子任务),每个分区包含具有相同key的元素,在内部以hash的形式实现的。

1、KeyBy会重新分区;

2、不同的key有可能分到一起,因为是通过hash原理实现的;

3、一旦按照Key分组后,我们便可以对每组数据进行时间窗口的处理以及状态的创建和更新。

指定key的三种方式

1 使用数字位置来指定Key:DataStream<Tuple2<Integer, Double>> keyedStream = dataStream.keyBy(0).sum(1);
2 使用字段名来指定key(适用pojo类型):DataStream<Word> fieldNameStream = wordStream.keyBy("word").sum("count");
3 实现KeySelector接口(推荐):重写getKey方法,IN是输入类型,KEY是输出类型,表示用于分组的key。前两种方式本质上都是实现KeySelector接口

// IN为数据流元素,KEY为所选择的Key 
@FunctionalInterface 
public interface KeySelector<IN, KEY> extends Function, Serializable { 
    // 选择一个字段作为Key 
    KEY getKey(IN value) throws Exception; 
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

聚合(Aggregation)算子

对于一个KeyedStream,一次只能使用一个聚合函数,无法链式使用多个。

聚合函数对流入的数据进行实时统计,并不断输出到下游。

这些算子可以针对KeyedStream的每一个支流做聚合。

使用聚合函数时,我们需要一个参数来指定按照哪个字段进行聚合。跟keyBy()相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以实现一个KeySelector

  • sum():对该字段进行加和,并将结果保存在该字段中,它无法确定其他字段的数值。也就是只对指定字段求和,不修改同个数据中的其他字段
  • max():对该字段求最大值,并将结果保存在该字段中。不影响同个数据的其他字段。
  • maxBy():返回指定字段最大值所在的整个数据,max()只将数据的指定字段改为最大值,mayby直接返回数据流中最大的整个元素。例如一条数据有a b c三个字段,max(a)会将当前数据的a字段更新为最大值,bc不变,然后返回;而maxBy(a),会将当前数据替换为a字段最大的那条数据,也就是bc也可能变化
  • min()
  • minBy()

reduce归并

reduce()在KeyedStream上生效,它接受两个输入,生成一个输出,即合并当前的元素和上次聚合的结果,生成一个同类型的新元素。(是否两个输入和一个输出都是相同数据类型?)

实现方式:实现ReduceFunction接口或使用lambda表达式。如:

keyedStream.reduce((curSensor,newSensor)->new SensorReading(curSensor.getId(),newSensor.getTimestamp(), Math.max(curSensor.getTemperature(), newSensor.getTemperature())));

其中curSensor表示上一条数据reduce的结果,newSensor当前数据

三 多数据流转换

Connect

连接两个数据流,两个输入流的数据类型可以不一致,返回一个ConnectedStreams。操作ConnectedStreams时,一般是map操作,我们需要重写CoMapFunction或CoFlatMapFunction,对两个输入流用两个分开的方法进行操作。stream1.connect(stream2)

  • Connect :
    • DataStream,DataStream → ConnectedStreams:连接两个数据流,它们的数据类型可以不一致,被 Connect 之后,两个输入流只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。stream1.connect(stream2)
  • CoMapFunction、CoFlatMapFunction:
    • ConnectedStreams → DataStream:相当于对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理,我们需要重写CoMapFunction或CoFlatMapFunction,对两个输入流用两个分开的方法进行操作。
// 2. 将高温流转换成二元组类型
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
    @Override
    public Tuple2<String, Double> map(SensorReading value) throws Exception {
        return new Tuple2<>(value.getId(), value.getTemperature());
    }
});
// 2. 合流 connect
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);

DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
    @Override
    public Object map1(Tuple2<String, Double> value) throws Exception {
        return new Tuple3<>(value.f0, value.f1, "high temp warning");
    }

    @Override
    public Object map2(SensorReading value) throws Exception {
        return new Tuple2<>(value.getId(), "normal");
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Union合流

DataStream -> DataStream:可以合并多个同类型的数据流,返回相同类型的新的DataStream。stream1.union(stream2,stream3)

和Connect的区别?
  1. Connect 只能合并两个流,流的数据类型可以不同;
  2. Union可以合并多条流,流的数据类型必须相同;

四 数据重分布

  1. datastream.shuffle()基于正态分布,将数据随机分布到下游各算子子任务上。
  2. datastream.rebalance()使用Round-Ribon方法轮询式地均匀分布到下游的所有的子任务上。
  3. datastream.rescale()与rebalance()很像,但它的传输开销更小,因为不是轮询给下游每个子任务,而是就近发送给下游子任务(下游均分为多组,每个上游算子对一组下游进行轮询分发)
  4. datastream.broadcast()数据可被复制并广播发送给下游的所有子任务上。
  5. datastream.global()会将所有数据发送给下游算子的第一个子任务上
  6. datastream.partitionCustom(Partitioner,xxx)用来自定义数据重分布逻辑。有两个参数:第一个参数是自定义的Partitioner(我们需要重写里面的partition(K key,int numPartitions)方法,参数1是用于分区的key,参数2是并行度,返回值是将元素分配到下游第几个任务);第二个参数表示对数据流哪个字段使用partiton()方法,为字段索引、字段名称、xx。
public class TransformTest6_Partition {
  public static void main(String[] args) throws Exception{

    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 设置并行度 = 4
    env.setParallelism(4);

    // 从文件读取数据
    DataStream<String> inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");

    // 转换成SensorReading类型
    DataStream<SensorReading> dataStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    });

    // SingleOutputStreamOperator多并行度默认就rebalance,轮询方式分配
    dataStream.print("input");

    // 1. shuffle (并非批处理中的获取一批后才打乱,这里每次获取到直接打乱且分区)
    DataStream<String> shuffleStream = inputStream.shuffle();
    shuffleStream.print("shuffle");

    // 2. keyBy (Hash,然后取模)
    dataStream.keyBy(SensorReading::getId).print("keyBy");

    // 3. global (直接发送给第一个分区,少数特殊情况才用)
    dataStream.global().print("global");

    env.execute();
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

五 其他

Split 和 Select分流

注:新版Flink已经不存在Split和Select这两个API了(至少Flink1.12.1没有!)

  • Split :
    • DataStream -> SplitStream:根据某些特征把DataStream拆分成多个SplitStream;
    • SplitStream虽然看起来像是两个Stream,但是其实它是一个特殊的Stream;
  • Select:
    • SplitStream -> DataStream:从一个SplitStream中获取一个或者多个DataStream;
// 1. 分流,按照温度值30度为界分为两条流
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
    @Override
    public Iterable<String> select(SensorReading value) {
        return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
    }
});

DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

5.4 支持的数据类型

一 flink支持的数据类型

1 基本类型:

Java和Scala的所有基础数据类型,诸如int(包括Integer)、Double、Long、String,以及Date、BigDecimal和BigInteger

2 数组:

基础类型或其他对象类型组成的数组,如String[]。

3 复合类型:JAVA POJO(还有sclala class,这里忽略)

​ 类必须是public。
​ 必须有一个public的无参构造函数
​ 所有非静态(non-static)、非瞬态(non-transient)字段必须是public的
​ 非public字段必须有标准的getter和setter方法
​ 所有子字段必须是Flink支持的数据类型

4 元组Tuple0~Tuple25
5 辅助类型:

Java的ArrayList、HashMap和Enum(Scala的Either和Option)

二 TypeInformation

在Flink中,以上如此多的类型统一使用TypeInformation类表示。比如,POJO在Flink内部使用PojoTypeInfo来表示

TypeInformation的一个重要的功能就是创建TypeSerializer序列化器,为该类型的数据做序列化。

一般情况下,Flink会自动探测传入的数据类型,生成对应的TypeInformation,调用对应的序列化器。

当Flink的自动类型推断不起作用时,程序员就需要关注TypeInformation了。

三 注册类

如果传递给Flink算子的数据类型是父类,实际执行过程中使用的是子类,子类中有一些父类没有的数据结构和特性,将子类注册可以提高性能。在执行环境上调用env.registerType(clazz)来注册类。通常来说,POJO和元组等Flink内置类型的性能更好一些。

四 使用序列化框架定义类

Avro、Kryo、Thrift和Protobuf

5.5 UDF 用户自定义函数

User Defined Function即用户自定义函数。

Flink暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。

也就是说,用户可以自定义函数内容,以实现流操作。

1 对于map()、flatMap()、reduce()等函数,我们可以实现MapFunction、FlatMapFunction、ReduceFunction等接口。
2 当不需要处理非常复杂的业务逻辑时,可以使用Lambda表达式
3 Rich函数类

自定义实现类

可以自定义参数传进去。示例:

DataStream<String> tweets = env.readTextFile("INPUT_FILE "); 
DataStream<String> flinkTweets = tweets.filter(new KeyWordFilter("flink")); 

public static class KeyWordFilter implements FilterFunction<String> { 
  private String keyWord; 

  KeyWordFilter(String keyWord) { 
    this.keyWord = keyWord; 
  } 

  @Override public boolean filter(String value) throws Exception { 
    return value.contains(this.keyWord); 
  } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

支持lambda匿名函数

DataStream<String> tweets = env.readTextFile("INPUT_FILE"); 
DataStream<String> flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );
  • 1
  • 2

!富函数(Rich Functions)

“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。

它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function有一个生命周期的概念。典型的生命周期方法有:

  • open()方法:Flink在算子调用前会执行这个方法,可以用来进行一些初始化工作。
  • close()方法:算子生命周期中的最后一个调用的方法,做一些清理、释放工作。
  • getRuntimeContext()方法:获取运行时上下文。每个并行的算子子任务都有一个运行时上下文,上下文记录了这个算子执行过程中的一些信息,包括算子当前的并行度、算子子任务序号、广播数据、累加器、监控数据。最重要的是,我们可以从上下文里获取状态数据。
// 实现自定义富函数类(RichMapFunction是一个抽象类)
public static class MyMapper extends RichMapFunction<SensorReading, Tuple2<String, Integer>> {
    @Override
    public Tuple2<String, Integer> map(SensorReading value) throws Exception {
        //            RichFunction可以获取State状态
        //            getRuntimeContext().getState();
        return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 初始化工作,一般是定义状态,或者建立数据库连接
        System.out.println("open");
    }

    @Override
    public void close() throws Exception {
        // 一般是关闭连接和清空状态的收尾操作
        System.out.println("close");
    }
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5.6 累加器

累加器是大数据框架帮我们实现的一种机制,允许我们在多节点上进行累加统计

常见的有IntCounter、LongCounter、DoubleCounter等

例如在RichMapFunction中使用:

  • 1 先定义累加器IntCounter c=new IntCounter(0);
  • 2 在open方法中向上下文环境注册该累加器:getRuntimeContext().addAccumulator(“countername”,c);
  • 3 在执行过程中调用累加器:this.c.add(1);
  • 4 通过执行结果(或其他方式)获取累加器的值:JobExecutionResult res=env.execute(xxxx);res.getAccumulatorResult(“countname”);

5.7 sink

flink的所有对外的输出操作都要利用 Sink 完成:stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink,用户也可以自定义实现 sink。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lmHNEULJ-1649949748153)(C:\Users\Simon\AppData\Roaming\Typora\typora-user-images\1648356010639.png)]

自定义sink

实现RichSinkFunction接口

dataStream.addSink(new MyJdbcSink());

// 实现自定义的SinkFunction
public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
    // 声明连接和预编译语句
    Connection connection = null;
    PreparedStatement insertStmt = null;
    PreparedStatement updateStmt = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&useSSL=false", "root", "example");
        insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
        updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
    }

    // 每来一条数据,调用连接,执行sql
    @Override
    public void invoke(SensorReading value, Context context) throws Exception {
        // 直接执行更新语句,如果没有更新那么就插入
        updateStmt.setDouble(1, value.getTemperature());
        updateStmt.setString(2, value.getId());
        updateStmt.execute();
        if (updateStmt.getUpdateCount() == 0) {
            insertStmt.setString(1, value.getId());
            insertStmt.setDouble(2, value.getTemperature());
            insertStmt.execute();
        }
    }

    @Override
    public void close() throws Exception {
        insertStmt.close();
        updateStmt.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

kafka:FlinkKafkaProducer

dataStream.addSink( new FlinkKafkaProducer<String>("localhost:9092", "sinktest", new SimpleStringSchema()));

redis:RedisSink

// 定义jedis连接配置(我这里连接的是docker的redis)
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
    .setHost("localhost")
    .setPort(6379)
    .setPassword("123456")
    .setDatabase(0)
    .build();

dataStream.addSink(new RedisSink<>(config, new MyRedisMapper()));

// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading> {
    // 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
    }

    @Override
    public String getKeyFromData(SensorReading data) {
        return data.getId();
    }

    @Override
    public String getValueFromData(SensorReading data) {
        return data.getTemperature().toString();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

ES:ElasticsearchSink

dataStream.addSink( new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());


// 实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
    @Override
    public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
        // 定义写入的数据source
        HashMap<String, String> dataSource = new HashMap<>();
        dataSource.put("id", element.getId());
        dataSource.put("temp", element.getTemperature().toString());
        dataSource.put("ts", element.getTimestamp().toString());

        // 创建请求,作为向es发起的写入命令(ES7统一type就是_doc,不再允许指定type)
        IndexRequest indexRequest = Requests.indexRequest()
            .index("sensor")
            .source(dataSource);

        // 用index发送请求
        indexer.add(indexRequest);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

7. 时间语义与 Wartermark

三种时间语义

  • **Event Time:事件创建时间;**数据流中元素自带的时间——要求每个元素自带时间,并进行watermark设置
  • Ingestion Time:数据到达Flink Source的时间
  • Processing Time:数据进入当前算子时的操作系统时间

flink默认使用ProcessingTime

设置指定时间语义

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • 1

7.3 Watermark

Flink对于迟到数据有三层保障,先来后到的保障顺序是:

  • WaterMark => 约等于放宽窗口标准
  • allowedLateness => 允许迟到(ProcessingTime超时,但是EventTime没超时)
  • sideOutputLateData => 超过迟到时间,另外捕获,之后可以自己批处理合并先前的数据

基本概念

前言:流处理从事件产生,到流经source,再到operator,有可能由于网络、分布式等原因,导致Flink接收到的事件的先后顺序与事件的Event Time顺序不同,此时若使用了Event Time语义,则不能明确数据是否全部到位,但又不能无限期的等下去,于是有了Watermark。

  1. Watermark可以理解成一个延迟触发机制,设置Watermark的延时时长t,每次对比当前数据的时间事件和已收到数据的最大事件时间,若eventTime<maxEventTime - t则认为小于该水印的所有数据都已经到达
  2. Watermark = maxEventTime-延迟时间t
  3. 如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。
  4. Watermark时间戳必须单调递增,一个时间戳为T的Watermark会假设后续到达事件的时间戳都大于T。假如Flink算子接收到一个违背上述规则的事件,该事件将被认定为迟到数据

分布式环境下Watermark的传播

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4SPtWYOS-1649949748155)(C:\Users\Simon\AppData\Roaming\Typora\typora-user-images\1649778902375.png)]

Flink的算子一般分布在多个并行的算子子任务,每个并行算子子任务会维护针对该子任务的Event Time时钟,这个时钟记录了这个算子子任务的Watermark处理进度。每个分区维护一个水印,当上游有水印进入该分区时,若新水印大于旧水印,就以新换旧,然后对比更新了水印后的各个分区的水印,若最小的那个大于当前算子的水印,就更新当前算子水印,并把该水印发到下游算子——Flink某算子子任务根据上游流入的各Watermark来更新Partition Watermark列表。选取Partition Watermark列表中最小的时间戳作为该算子子任务的Event Time,并将Event Time发送给下游算子。

可能的问题

假如某个上游分区的Watermark一直不更新,Partition Watermark列表其他地方都在正常更新,唯独个别分区的Watermark停滞,这会导致算子的Event Time不更新,相应的时间窗口计算也不会被触发,大量的数据积压在算子内部得不到处理
在union()等多数据流处理时,一旦发现某个数据流不再生成新的Watermark,我们要在SourceFunction中的SourceContext里调用markAsTemporarilyIdle()来设置该数据流为空闲状态,避免空转。

watermark的引入

1 在source阶段设置:

在自定义SourceFunction或RichSourceFunction的SourceContext里重写void collectWithTimestamp(Telement,long timestamp)和void emitWatermark(Watermark mark)两个方法,前者给数据流中的每个元素赋值一个timestamp作为Event Time,后者生成Watermark。

ctx.collectWithTimestamp(obj, obj.eventTime);  
if (obj.hasWatermarkTime()) { 
    ctx.emitWatermark(new Watermark(obj.watermarkTime)); 
}
  • 1
  • 2
  • 3
  • 4
2 在source之后设置:

使用流的assignTimestampsAndWatermarks()方法,传入WatermarkStrategy.forGenerator(...).withTimestampAssigner(...)

forGenerator()方法用来生成Watermark,withTimestampAssigner()方法用来为数据流的每个元素设置时间戳。

Watermark生成策略

forGenerator()设置Watermark生成策略:

  1. 周期性地(Periodic)生成Watermark:如果想周期性地生产水印,可以在env设置周期,单位毫秒:env.getConfig.setAutoWatermarkInterval(5000L)
  2. 逐个式地(Punctuated)生成Watermark:假如数据流元素有一些特殊标记,标记某些元素是否要生成Watermark,可直接在接口中的onEvent()方法判断,需要就调用output.emitWatermark生成,此时onPeriodicEmit()方法里不需要做任何事情——假如每个元素都带有Watermark标记,Flink是允许为每个元素都生成一个Watermark的,但这种策略非常激进,大量的Watermark会增大下游计算的延迟,拖累整个Flink作业的性能。

两者都需要实现WatermarkGenerator接口

// Flink源码 
// 生成Watermark的接口 
@Public 
public interface WatermarkGenerator<T> { 
 
    // 数据流中的每个元素流入后都会调用onEvent()方法 
    // Punctunated方式下,一般根据数据流中的元素是否有特殊标记来判断是否需要生成Watermark 
    // Periodic 方式下,一般用于记录各元素的Event Time时间戳 
    void onEvent(T event, long eventTimestamp, WatermarkOutput output); 
 
    // 每隔固定周期调用onPeriodicEmit()方法 
    // 一般主要用于Periodic方式 
    // 固定周期用 ExecutionConfig#setAutoWatermarkInterval() 方法设置 
    void onPeriodicEmit(WatermarkOutput output); 
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

示例:

dataStream.assignTimestampsAndWatermarks( 
    WatermarkStrategy 
        .forGenerator((context -> new MyPeriodicGenerator())) 
        .withTimestampAssigner((event, recordTimestamp) -> event.f1)); 

public static class MyPeriodicGenerator implements WatermarkGenerator<Tuple2<String, 
Long>> { 
 
    private final long maxOutOfOrderness = 60 * 1000; // 1分钟 
    private long currentMaxTimestamp;                 // 已抽取的Timestamp最大值 
 
    @Override 
    public void onEvent(Tuple2<String, Long> event, long eventTimestamp,  
WatermarkOutput output) { 
        // 更新currentMaxTimestamp为当前遇到的最大值 
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp); 
} 
 
    @Override 
    public void onPeriodicEmit(WatermarkOutput output) { 
        // Watermark比currentMaxTimestamp最大值慢1分钟 
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness)); 
    } 
} 
currentMaxTimestamp存元素的时间戳最大值。当需要发射Watermark时,以时间戳最大值减1分钟作为Watermark发送出去。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
内置的Watermark生成策略

1 与上述自定义的示例一样,指定延迟时间t,以 最大事件时间-t 的值作为数据的watermark

// 第2个字段是时间戳 
env.addSource(new MySource()) 
    .assignTimestampsAndWatermarks( 
        WatermarkStrategy 
            .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) 
            .withTimestampAssigner((event, timestamp) -> event.f1) 
); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2 直接以时间戳最大值作为水印,不添加延迟:

env.addSource(new MySource()) 
    .assignTimestampsAndWatermarks( 
        WatermarkStrategy 
            .<Tuple2<String, Long>>forMonotonousTimestamps() 
            .withTimestampAssigner((event, timestamp) -> event.f1) 
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
flink1.10版本:

最常见的引用方式如下:

dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
    @Override
    public long extractTimestamp(SensorReading element) {
        return element.getTimestamp() * 1000L;
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

BoundedOutOfOrdernessTimestampExtractor是AssignerWithPeriodicWatermarks的实现类,还有一个接口AssignerWithPunctuatedWatermarks,这两个接口都可以自定义如何从事件数据中抽取时间戳。

AssignerWithPeriodicWatermarks

  • 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中
  • 默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置
  • 升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性 watermark 的

AssignerWithPunctuatedWatermarks

  • 没有时间周期规律,可打断的生成 watermark(即可实现每次获取数据都更新watermark)

一般认为Watermark的设置代码,在里Source步骤越近的地方越合适。

即使如此,依然可能会有些事件数据在 Watermark 之后到达,这时可以用Late Elements 处理。

6. Flink 中的 Window

window是一种切割无限数据为有限块进行处理的手段,将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。

无限数据集是指一种不断增长的本质上无限的数据集。

Flink 默认的时间窗口根据 Processing Time 进行窗口的划分。

窗口程序骨架

// Keyed Window 
stream 
       .keyBy(<KeySelector>)           <-   按照一个Key进行分组 
       .window(<WindowAssigner>)       <-  将数据流中的元素分配到相应的窗口中 
      [.trigger(<Trigger>)]            <-  指定触发器Trigger(可选) 
      [.evictor(<Evictor>)]            <-  指定清除器Evictor(可选) 
       .reduce/aggregate/process()     <-  窗口处理函数Window Function 

// Non-Keyed Window 
stream 
       .windowAll(WindowAssigner)      <-   不分组,将数据流中的所有元素分配到相应的窗口中 
      [.trigger(<Trigger>)]            <-  指定触发器Trigger(可选) 
      [.evictor(<Evictor>)]            <-  指定清除器Evictor(可选) 
       .reduce/aggregate/process()     <-  窗口处理函数Window Function 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

6.1 Window类型与特点

  • 时间窗口(Time Window):按时间生成窗口

    • 滚动时间窗口(Tumbling Windows):按固定窗口长度对数据做切割,可以看做是滑动窗口的一种特殊情况(即窗口大小和滑动间隔相等)

      • 特点:时间对齐,窗口长度固定,没有重叠。

        适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。

    • 滑动时间窗口(Sliding Windows):每隔一个滑动间隔时间,对过去固定窗口长度的数据做切割

      • 特点:时间对齐,窗口长度固定, 可以有重叠。
      • 适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是
        否要报警)。
    • 会话窗口(Session Windows):由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口,配置 session 间隔以定义非活跃周期的长度,当收不到数据的时间达到session间隔,则关闭当前session,后续数据会分配到新session窗口

      • 特点:时间无对齐。
  • 计数窗口(Count Window):按数据数量生成窗口

    • 滚动计数窗口
    • 滑动计数窗口

6.2 Window API

  • 窗口分配器——window()方法
  • Flink提供了更加简单的.timeWindow().countWindow()方法,用于定义时间窗口和计数窗口。
  • 除了.windowAll(),其他窗口定义方法必须在keyBy之后才能使用
  • Flink 默认的时间窗口根据 Processing Time 进行窗口的划分

6.2.1 时间窗口.window()和.timeWindow( xxx )

基于时间的窗口都有一个开始时间和结束时间,表示一个左闭右开的时间段。

timeWindow()是window()的简写,使用timeWindow()时,如果我们在执行环境设置了TimeCharacteristic.EventTime,那么Flink会自动调用TumblingEventTimeWindows;如果我们设置TimeCharacteristic.ProcessingTime,那么Flink会自动调用TumblingProcessingTimeWindows。

滚动窗口:

DataStream<T> input = ... 
//window
input 
    .keyBy(<KeySelector>) 
    .window(TumblingEventTimeWindows.of(Time.seconds(5))) //基于事件时间
//    .window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) //基于事件时间并设置offset(参数2)
//    .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //基于处理时间
    .<window function>(...) 
    
//timeWindow
input.timeWindow( Time.seconds(15) ) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

滑动窗口:(参数分别是窗口大小、滑动步长、延迟时间)

DataStream<T> input = ... 
//window
input 
    .keyBy(<KeySelector>) 
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) //基于事件时间
//    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))//基于处理时间
//    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) //带延迟
    .<window function>(...) 
    
//timeWindow
input.timeWindow( Time.seconds(15), Time.seconds(5) ) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

时间间隔可以通过Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中的一个来指定。

6.2.2 计数窗口.countWindow( xxx )

滚动窗口:.countWindow( long)

滑动窗口:.countWindow( long, long ) ,参数一个是window_size,一个是sliding_size。

CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。

6.2.3 会话窗口(session window)

.window(EventTimeSessionWindows.withGap(Time.minutes(10)))

该模式下,两个窗口之间有一个间隙,称为Session Gap当一个窗口在大于Session Gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的Size是可变的,每个窗口的开始和结束时间并不是确定的。
我们可以设置定长的Session Gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session Gap的值。

DataStream<T> input = ... 

// event-time session windows with static gap 
input 
    .keyBy(<KeySelector>) 
    .window(EventTimeSessionWindows.withGap(Time.minutes(10))) 
    .<window function>(...) 

// event-time session windows with dynamic gap 
    .window(EventTimeSessionWindows.withDynamicGap((element) -> { 

// processing-time session windows with static gap 
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) 

// processing-time session windows with dynamic gap 
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

6.3 window function相关API

function例子:http://www.javashuo.com/article/p-ftiqugmc-eu.html

window function 定义了要对窗口中收集的数据做的计算操作,在window开窗操作后调用

主要可以分为两类:

6.3.1 增量计算

指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中。典型的增量聚合函数有ReduceFunction, AggregateFunction。

  • .reduce(ReduceFunction); 合并当前的元素 和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果——ReduceFunction的输入、输出都是相同类型,且没有累加器
  • .aggregate(AggregateFunction):比reduce通用,AggregateFunction需要传入三个数据类型,分别代表流、累加器、返回结果的类型,三个类型可以不相同
  • .fold(FoldFunction);——已经被弃用,建议使用aggregate函数
ReduceFunction源码
public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T var1, T var2) throws Exception;
}
  • 1
  • 2
  • 3
AgregateFunction接口说明
public interface AggregateFunction<IN, ACC, OUT>  
    extends Function, Serializable { // ACC数据即中间状态数据
    
   // 以下函数一般在初始化时调用 
   ACC createAccumulator(); 
 
   // 当一个新元素流入时,将新元素与ACC数据合并,返回ACC数据 
   ACC add(IN value, ACC accumulator); 
 
   // 将两个ACC数据合并,一般在窗口融合时调用,比如,会话窗口模式下,窗口长短是不断变化的,多个窗口有可能合并为一个窗口,多个窗口内的ACC会合并为一个
   ACC merge(ACC a, ACC b); 
 
   // 将中间数据转换成结果数据,在窗口结束时调用
   OUT getResult(ACC accumulator); 
 
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
AggregateFunction示例
dataStream.keyBy("id")
      //                .timeWindow(Time.seconds(15)) // 已经不建议使用@Deprecated
      .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
      .aggregate(new myAggregateFunction());

AggregateFunction myAggregateFunction=new AggregateFunction<UserActionLog, Tuple2<Long,Long>, Double>() {

    // 一、初始值
    // 定义累加器初始值
    @Override
    public Tuple2<Long, Long> createAccumulator() {
        return new Tuple2<>(0L,0L);
    }

    // 二、累加
    // 定义累加器如何基于输入数据进行累加
    @Override
    public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
        accumulator.f0 += 1;
        accumulator.f1 += value.getProductPrice();
        return accumulator;
    }

    // 三、合并
    // 定义累加器如何和State中的累加器进行合并
    @Override
    public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
        acc1.f0+=acc2.f0;
        acc1.f1+=acc2.f1;
        return acc1;
    }

    // 四、输出
    // 定义如何输出数据
    @Override
    public Double getResult(Tuple2<Long, Long> accumulator) {
        return accumulator.f1 / (accumulator.f0 * 1.0);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

6.3.2 全量计算

指的是窗口先缓存所有元素,等触发条件后才对窗口内的全量元素执行计算。WindowFunction和ProcessWindowFunction

  • .apply(WindowFunction)
  • .process(ProcessWindowFunction)
ProcessWindowFunction接口说明
/** 
 * 函数接收4个泛型 
 * IN:输入类型 
 * OUT:输出类型 
 * KEY:keyBy()中按照Key分组,Key类型 
 * W:窗口类型 
 */ 
 
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends  
AbstractRichFunction { 
 
  /** 
   * 对一个窗口内的元素进行处理,窗口内的元素缓存在Iterable<IN>,进行处理后输出到 
Collector<OUT>中 
   * 我们可以输出一到多个结果 
   */ 
    public abstract void process(KEY key, Context context, Iterable<IN> elements,  
Collector<OUT> out) throws Exception; 
 
  /**  
    * 当窗口执行完毕被清理时,删除各类状态数据 
      */ 
    public void clear(Context context) throws Exception {} 
 
  /** 
   * 一个窗口的Context,包含窗口的一些元数据、状态数据等。 
   */ 
    public abstract class Context implements java.io.Serializable { 
 
    // 返回当前正在处理的窗口 
        public abstract W window(); 
 
    // 返回当前Process Time 
        public abstract long currentProcessingTime(); 
 
    // 返回当前Event Time对应的Watermark 
        public abstract long currentWatermark(); 
 
    // 返回某个Key下的某个窗口的状态 
        public abstract KeyedStateStore windowState(); 
    // 返回某个Key下的全局状态 
        public abstract KeyedStateStore globalState(); 
 
    // 将迟到数据发送到其他位置 
        public abstract <X> void output(OutputTag<X> outputTag, X value); 
    } 
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
ProcessWindowFunction的Context

可以获取窗口的更多信息。

Context中有两种状态:

  1. 针对Key的全局状态,它是跨多个窗口的,多个窗口都可以访问,通过Context.globalState()获取;
  2. 该Key下单窗口的状态,通过Context.windowState()获取。单窗口的状态只保存该窗口的数据,主要是针对process()函数多次被调用的场景,比如处理迟到数据或自定义Trigger等场景。当使用单窗口状态时,要在clear()方法中清理状态。

6.3.3 Trigger与Evictor

Trigger触发器

触发器(Trigger)本质上是一种Timer,我们需要注册合适的时间,当到达这个时间时,flink会启动窗口处理函数来处理窗口中的数据,以及将窗口内的数据清除。

使用方式
stream.keyBy(s -> s.symbol) 
    .timeWindow(Time.seconds(60)) 
    .trigger(new MyTrigger()) 
    .aggregate(new AverageAggregate()); 
  • 1
  • 2
  • 3
  • 4
TriggerResult

Trigger会返回一个TriggerResult类型的结果。

TriggerResult是一个枚举类型,有下面几种情况。

  • CONTINUE:什么都不做。
  • FIRE:启动计算并将结果发送给下游算子,不清除窗口数据。
  • PURGE:清除窗口数据但不执行计算。
  • FIRE_AND_PURGE:启动计算,发送结果然后清除窗口数据。

每个窗口都有一个默认的Trigger。比如基于Event Time的窗口会有一个EventTimeTrigger,每当窗口的Watermark到达窗口的结束时间,Trigger会发送FIRE。

自定义Trigger

当这些已有的Trigger无法满足我们的需求时,我们需要自定义Trigger(自定义Trigger,如果使用了状态,一定要使用clear()方法将状态数据清除)

/** 
    * T为元素类型 
    * W为窗口类型 
  */ 
public abstract class Trigger<T, W extends Window> implements Serializable { 
 
    // 窗口增加一个元素时调用,返回一个TriggerResult,可以在这里进行逻辑判断,在合适的时候调用ctx的registerxxxTimer注册timer
    public abstract TriggerResult onElement(T element, long timestamp, W window,  
TriggerContext ctx) throws Exception; 
 
    // 当一个基于Processing Time的Timer触发了FIRE时被调用 
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext  ctx) throws Exception; 
 
    // 当一个基于Event Time的Timer触发了FIRE时被调用 
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) 
throws Exception; 
 
    // 当窗口数据被清除时,调用clear()方法来清除所有的Trigger状态数据 
    public abstract void clear(W window, TriggerContext ctx) throws Exception 
 
    /** 
     * TriggerContext,保存了时间、状态、监控以及定时器等信息
     */ 
    public interface TriggerContext { 
 
        // 返回当前Processing Time 
        long getCurrentProcessingTime(); 
 
        // 返回MetricGroup对象  
        MetricGroup getMetricGroup(); 
 
        // 返回当前Watermark时间 
        long getCurrentWatermark(); 
 
        // 将某个time值注册为一个Timer,当操作系统时间到达time值这个时间点时, 
onProcessingTime方法会被调用 
        void registerProcessingTimeTimer(long time); 
 
        // 将某个time值注册为一个Timer,当Watermark到达time值这个时间点时,onEventTime 
方法会被调用 
        void registerEventTimeTimer(long time); 
 
        void deleteProcessingTimeTimer(long time); 
 
        void deleteEventTimeTimer(long time); 
 
        // 获取状态 
        <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor); 
    } 
 
    ... 
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
Evictor清除器

清除器(Evictor)是在WindowAssigner和Trigger的基础上的一个可选选项,用来清除一些数据。

可以在窗口处理函数执行前或执行后调用Evictor。

主要针对全量计算,对于增量计算的ReduceFunction和AggregateFunction没必要使用Evictor

内置evictor有:
  • CountEvictor可以保留一定数量的元素,多余的元素按照从前到后的顺序先后清除。
  • TimeEvictor可以保留一个时间段内的元素,早于这个时间段的元素会被清除。
自定义evictor

如下,窗口的所有元素被放在了Iterable<TimestampedValue>中,我们要实现自己的清除逻辑。

/** 
    * T为元素类型 
    * W为窗口类型 
  */ 
public interface Evictor<T, W extends Window> extends Serializable { 
 
    // 在窗口处理函数前调用 
    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window,  
EvictorContext evictorContext); 
 
    // 在窗口处理函数后调用 
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window,  
EvictorContext evictorContext); 
 
    // EvictorContext 
    interface EvictorContext { 
        long getCurrentProcessingTime(); 
        MetricGroup getMetricGroup(); 
        long getCurrentWatermark(); 
    } 
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

6.4 两个数据流的window join、coGroup与apply,intervalJoin与process

两种Join:窗口连接(Window Join)和时间间隔连接(IntervalJoin)。

1 Window Join

主要在Flink的窗口上进行操作,它将两个数据流中落在相同窗口的元素按照某个Key进行Inner Join,当窗口的时间结束,Flink会调用JoinFunction来对窗口内的数据对进行处理。

输入数据流input1中的某个元素与输入数据流input2中的所有元素逐个配对,当数据流中某个窗口内没数据时,Inner Join的结果为空。

使用方式:
input1.join(input2) 
    .where(<KeySelector>)      <- input1 使用哪个字段作为Key 
    .equalTo(<KeySelector>)    <- input2 使用哪个字段作为Key 
    .window(<WindowAssigner>)  <- 指定WindowAssigner 
    [.trigger(<Trigger>)]      <- 指定Trigger(可选) 
    [.evictor(<Evictor>)]      <- 指定Evictor(可选) 
    .apply(<JoinFunction>)     <- 指定JoinFunction 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
JoinFunction与示例

JoinFunction需事先join方法,该方法对两两配对的数据进行操作,并返回join的处理结果。

除了JoinFunction,Flink还提供了FlatJoinFunction,其功能是输出零到多个结果。

public static class MyJoinFunction  
    implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> { 
 
    @Override 
    public String join(Tuple2<String, Integer> input1, Tuple2<String, Integer> input2) 
{ 
        return "input 1 :" + input1.f1 + ", input 2 :" + input2.f1; 
    } 
} 
 
DataStream<Tuple2<String, Integer>> input1 = ... 
DataStream<Tuple2<String, Integer>> input2 = ... 
 
DataStream<String> joinResult = input1.join(input2) 
    .where(i1 -> i1.f0) 
    .equalTo(i2 -> i2.f0) 
    .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) 
    .apply(new MyJoinFunction()); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2 Window coGroup

与join使用方式一致,在.apply()中作为传参,不同的是coGroup对应的是CoGroupFunction

可以获得两个数据流中的所有元素,元素以Iterable的形式供开发者自行处理

使用方式:

与window join类似,不过join换成coGroup,apply的参数改成CoGroupFunction

input1.coGroup(input2) 
    .where(<KeySelector>)      <- input1 使用哪个字段作为Key 
    .equalTo(<KeySelector>)    <- input2 使用哪个字段作为Key 
    .window(<WindowAssigner>)  <- 指定WindowAssigner 
    [.trigger(<Trigger>)]      <- 指定Trigger(可选) 
    [.evictor(<Evictor>)]      <- 指定Evictor(可选) 
    .apply(<CoGroupFunction>)     <- CoGroupFunction
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
CoGroupFunction与示例

如果第一个数据流中的某些Key是空的,那么CoGroupFunction被触发时,该Key上的Iterable为空,开发者自行决定如何处理空数据。

public static class MyCoGroupFunction  
    implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,  
String> { 
    @Override 
    public void coGroup(Iterable<Tuple2<String, Integer>> input1,  
Iterable<Tuple2<String, Integer>> input2, Collector<String> out) { 
        input1.forEach(element -> System.out.println("input1 :" + element.f1)); 
        input2.forEach(element -> System.out.println("input2 :" + element.f1));
    } 
} 

input1.coGroup(input2).where(<KeySelector>).equalTo(<KeySelecotr>)。 
DataStream<Tuple2<String, Integer>> input1 = ... 
DataStream<Tuple2<String, Integer>> input2 = ... 
 
DataStream<String> coGroupResult = input1.coGroup(input2) 
      .where(i1 -> i1.f0) 
      .equalTo(i2 -> i2.f0) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) 
      .apply(new MyCoGroupFunction()); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

3 Interval Join

主要概念:

  1. Interval Join不依赖Flink的WindowAssigner,也就是不根据窗口进行配对,而是根据时间间隔(Interval),对同个间隔内的数据进行配对处理
  2. 时间间隔由时间下界(lowerBound)和上界(upperBound)界定,上、下界可以是正数也可以是负数默认的时间间隔是包含上、下界的,我们可以使用**.lowerBoundExclusive().upperBoundExclusive()**来取消上、下界。
  3. 作用于KeyedStream,而join、coGroup是作用于DataStream的

其他:

  1. Interval Join内部是用缓存来存储所有数据的,因此需要注意缓存数据不能过多,以免对内存造成太大压力。
  2. 目前Flink(1.11)的Interval Join只支持Event Time语义。

如果我们对input1和input2进行Interval Join,input1中的某个元素为input1.element1,时间戳为input1.element1.ts,那么Interval就是[input1.element1.ts+lowerBound,input1.element1.ts+upperBound],input2中落在这个时间段内的元素将会和input1.element1组成一个数据对。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BtqPqfvr-1649949748158)(C:\Users\Simon\AppData\Roaming\Typora\typora-user-images\1649942778369.png)]

使用方式:

对两个KeyedStream进行intervalJoin操作,用between指定上下界,用process指定ProcessJoinFunction

input1.keyBy(i -> i.f0) 
    .intervalJoin(input2.keyBy(i -> i.f0)) 
    .between(Time.milliseconds(-5), Time.milliseconds(10)) 
    .process(new MyProcessFunction());
  • 1
  • 2
  • 3
  • 4
ProcessJoinFunction与示例
public static class MyProcessFunction extends ProcessJoinFunction<Tuple3<String, 
Long, Integer>, Tuple3<String, Long, Integer>, String> { 
    @Override 
    public void processElement(Tuple3<String, Long, Integer> input1, 
                               Tuple3<String, Long, Integer> input2, 
                               Context context, 
                               Collector<String> out) { 
          out.collect("input 1: " + input1.toString() + ", input 2: " +  
input2.toString()); 
    } 
} 
 
// 使用Event Time 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
// 数据流有3个字段:(Key, 时间戳, 数值) 
DataStream<Tuple3<String, Long, Integer>> input1 = ... 
DataStream<Tuple3<String, Long, Integer>> input2 = ... 
 
DataStream<String> intervalJoinResult = input1.keyBy(i -> i.f0) 
      .intervalJoin(input2.keyBy(i -> i.f0)) 
      .between(Time.milliseconds(-5), Time.milliseconds(10)) 
      .process(new MyProcessFunction());

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

6.5 处理迟到数据

对于Event Time,我们使用Watermark来判断数据是否迟到。一个迟到数据是指数据到达窗口算子时,该数据本该被分配到某个窗口,但由于延迟,窗口已经触发计算。

三种处理方式:

1 直接将迟到数据丢弃。(默认)

allowedLateness(

.allowedLateness() —— 允许处理迟到的数据

2 将迟到数据发送到另一个数据流。

.sideOutputLateData(OutputTag)

使用ProcessFunction系列函数的侧输出功能存储迟到数据:

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; 
 
DataStream<T> input = ... 
 
SingleOutputStreamOperator<T> result = input 
    .keyBy(<key selector>) 
    .window(<window assigner>) 
    .allowedLateness(<time>) 
    .sideOutputLateData(lateOutputTag) 
    .<windowed transformation>(<window function>); 
 
DataStream<T> lateStream = result.getSideOutput(lateOutputTag); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
3 重新执行一次计算,将迟到数据考虑进来,更新计算结果。(针对Event Time)

allowedLateness()设置窗口结束后的等待时间,若某个迟到元素的Event Time大于窗口结束时间但是小于“窗口结束时间+lateness”,迟到数据就会被加入ProcessWindowFunction的缓存中,窗口的Trigger会触发一次FIRE,重新计算结果。
使用这个功能时需要注意,原来窗口中的状态数据在窗口已经触发的情况下仍然会被保留
会话窗口依赖Session Gap来切分窗口,使用allowedLateness()可能会导致两个窗口合并成一个窗口。

/** 
  * ProcessWindowFunction 接收的泛型参数分别为:输入、输出、Key、窗口 
  */ 
public static class AllowedLatenessFunction extends  
ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple4<String, String, Integer, 
String>, String, TimeWindow> { 
    @Override 
    public void process(String key, 
                        Context context, 
                        Iterable<Tuple3<String, Long, Integer>> elements, 
                        Collector<Tuple4<String, String, Integer, String>> out) throws 
Exception { 
        ValueState<Boolean> isUpdated = context.windowState().getState( 
          new ValueStateDescriptor<Boolean>("isUpdated", Types.BOOLEAN)); 
 
        int count = 0; 
        for (Object i : elements) { 
              count++; 
        } 
 
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
 
        if (null == isUpdated.value() || isUpdated.value() == false) { 
            // 第一次使用process()函数时,isUpdated默认初始化为false,因此窗口处理函数第一次
//被调用时会进入这里 
            out.collect(Tuple4.of(key, 
format.format(Calendar.getInstance().getTime()), count, "first")); 
              isUpdated.update(true); 
        } else { 
            // 之后isUpdated被设置为true,窗口处理函数因迟到数据被调用时会进入这里 
            out.collect(Tuple4.of(key, 
format.format(Calendar.getInstance().getTime()), count, "updated")); 
        } 
    }
} 
 
// 使用Event Time 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
 
// 数据流有3个字段:key、时间戳、数值 
DataStream<Tuple3<String, Long, Integer>> input = ... 
 
DataStream<Tuple4<String, String, Integer, String>> allowedLatenessStream = input 
          .keyBy(item -> item.f0) 
      .timeWindow(Time.seconds(5)) 
      .allowedLateness(Time.seconds(5)) 
      .process(new AllowedLatenessFunction()); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/972370
推荐阅读
相关标签
  

闽ICP备14008679号