当前位置:   article > 正文

Flink双流及多流Join 、IntervalJoin、coGroupJoin的区别与生产使用_双流join

双流join

1.Flink 三种Join的代码测试

1.1 数据源

(1)左流

订单表(orders)
id	  productName	      orderTime
1	    iphone	          2020-04-01 10:00:00.0
2	    mac               2020-04-01 10:02:00.0
3	    huawei	          2020-04-01 10:03:00.0
4	    pad	              2020-04-01 10:05:00.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(2)右流

物流表(shipments)
shipId	 orderId	status	      shiptime
0	      1	        shipped	      2020-04-01 11:00:00.0
1	      2	        delivered	    2020-04-01 17:00:00.0
2     	3	        shipped	      2020-04-01 12:00:00.0
3     	4	        shipped	      2020-04-01 11:30:00.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.2 join

(1)代码

 //延迟0s
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

left.join(right)
  .where(_._1).equalTo(_._2)  //Join字段 left流的第一个字段(id) 等于 right流的第二个字段(orderId)
  .window(TumblingEventTimeWindows.of(Time.hours(window)))  //滑动窗口
  //IN1   (Int,String,Long)  id	 productName	orderTime
  //IN2   (Int, Int,String,Long)  shipId	orderId	status	shiptime
  //OUT   (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
  .apply(new JoinFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
    override def join(first: (Int,String,Long), second: (Int, Int,String,Long)):(Int,String,String,Long,Long) = {
      (first._1, first._2, second._3, first._3, second._4)
    }
  }).print()

 env.execute()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

(2)分析与输出结果

 WM=窗口内最大的时间-允许延迟执行的时间
VM是不断增大的
窗口触发条件:  WM 》=上一个窗口的结束边界
              窗口内最大的时间-允许延迟执行的时间 > = 上一个窗口的结束边界

                                     nowTimeStamp   nowTime     currentMaxT     WM         窗口                窗口转化为hour      WM转化为hour
订单表(orders)
(1,iphone,1585706400000)          -- 1585706400000 -- 10:00:00 -- 10:00:00 -- 10:00:00    10:00:00-12:00:00    [10-12)            10:00
(2,mac,1585706520000)             -- 1585706520000 -- 10:02:00 -- 10:02:00 -- 10:02:00    10:00:00-12:00:00    [10-12)            10:02
(3,huawei,1585706580000)          -- 1585706580000 -- 10:03:00 -- 10:03:00 -- 10:03:00    10:00:00-12:00:00    [10-12)            10:03
(4,pad,1585706700000)             -- 1585706700000 -- 10:05:00 -- 10:05:00 -- 10:05:00    10:00:00-12:00:00    [10-12)            10:05

物流表(shipments)
(0,1,shipped,1585710000000)       -- 1585710000000 -- 11:00:00 -- 11:00:00 -- 11:00:00    10:00:00-12:00:00    [10-12)            11
(1,2,delivered,1585731600000)     -- 1585731600000 -- 17:00:00 -- 17:00:00 -- 17:00:00    16:00:00-18:00:00    [16-18)            17
(2,3,shipped,1585713600000)       -- 1585713600000 -- 12:00:00 -- 17:00:00 -- 17:00:00    12:00:00-14:00:00    [12-14)            17
(3,4,shipped,1585711800000)       -- 1585711800000 -- 11:30:00 -- 17:00:00 -- 17:00:00    10:00:00-12:00:00    [10-12)            17      //窗口的WM为17,大于窗口的结束边界12,Window窗口触发
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

订单表(orders)的四条数据与物流表(shipments)的(0,1,shipped,1585710000000)、(3,4,shipped,1585711800000) 同窗口,

并在物流表中流(3,4,shipped,1585711800000)输入时,窗口的WM为17(hour),大于窗口的结束边界12(hour),Window窗口触发。

输出结果:

Window 4hour
(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
  • 1
  • 2
  • 3

1.3 intervalJoin

支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默认为INNER JOIN。

暂不支持SEMI JOIN和ANTI JOIN。

TIMEBOUND_EXPRESSION为左右两个流时间属性列上的区间条件表达式,支持以下三种条件表达式:

ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
  • 1
  • 2
  • 3

1.3.1 intervalJoin API用法

(1)代码

 //延迟0s
val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

left.print("orderStream=>")
right.print("shipMentStream=>")

left
  .keyBy(0)
  .intervalJoin(right.keyBy(1))
  // between 只支持 event time
  //时间间隔 -> leftStream 默认和 [left+0hour,left+4hour]的时间范围内的rightStream进行Join
  //订单流和 发送物流流延迟4个小时内的数据可以Join上
  .between(Time.hours(0), Time.hours(4))
  //不包含下界
  //.lowerBoundExclusive()
  //不包含上界
  //.upperBoundExclusive()
  .process(new ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]() {
    override def processElement(orders: (Int, String, Long), shipments:(Int,Int,String,Long), context: ProcessJoinFunction[ (Int, String, Long),(Int,Int,String,Long) , (Int,String,String,Long,Long)]#Context, out: Collector[(Int,String,String,Long,Long)]): Unit = {
      //orderId,ProductName, orderStatus ,TimeStamp ,TimeStamp
      out.collect( (orders._1, orders._2, shipments._3,orders._3, shipments._4))
    }
  })
  .print("IntervalJoin=>");

  env.execute("IntervalJoinTest")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

(2)分析与输出结果

时间间隔 -> leftStream 默认和 [left+0hour,left+4hour]的时间范围内的rightStream进行Join

订单流(leftStream)和 发送物流流(rightStream)延迟4个小时内的数据可以Join上

输出结果:

IntervalJoin=>> (1,iphone,shipped,1585706400000,1585710000000)
IntervalJoin=>> (3,huawei,shipped,1585706580000,1585713600000)
IntervalJoin=>> (4,pad,shipped,1585706700000,1585711800000)
  • 1
  • 2
  • 3

1.3.2 intervalJoin SQL用法

val delay = 0
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val left = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
   .map(ele=>Order(ele._1,ele._2,DateUtils.formatTime(ele._3)))
val right = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))
   .map(ele=>ShipMents(ele._1,ele._2,ele._3,DateUtils.formatTime(ele._4)))

val tableEnvironment = StreamTableEnvironment.create(env)

val orderTable:Table=tableEnvironment.fromDataStream(left)
val shipmentsTable:Table=tableEnvironment.fromDataStream(right)

val table: Table = tableEnvironment.sqlQuery(
  s"""
     |SELECT o.id, o.productName, s.status
     |FROM $orderTable AS o
     |JOIN $shipmentsTable AS s on o.id = s.orderId AND
     |     o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
     |""".stripMargin)

tableEnvironment.toAppendStream[(Int,String,String)](table).print("IntervalJoinTest")

env.execute()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

注意 SQL与API,在写法上有点不一样,但是含义上都表示order流能够Join上shipMent流延迟4个小时之内的数据。

o.orderTime BETWEEN s.shipTime - INTERVAL '4' HOUR AND s.shipTime
  • 1

orderStream
   .keyBy(0) 
   .intervalJoin(shipTimeStream.keyBy(1))
   .between(Time.hours(0), Time.hours(4))
  • 1
  • 2
  • 3
  • 4

1.4 coGroup

(1)代码

//延迟0s
val delay = 0
//Window 4hour
val window=4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val leftStream = env.addSource(new OrdersSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks(delay))
val rightStream = env.addSource(new ShipmentsSource).assignTimestampsAndWatermarks(new TimedataAssignerWithPeriodicWatermarks2(delay))

val leftJoinResult: DataStream[(Int,String,String,Long,Long)] = leftStream.
  coGroup(rightStream).where(_._1).equalTo(_._2) //leftJoin,以名字进行关联
  .window(TumblingEventTimeWindows.of(Time.hours(window))) //滚动窗口
  //IN1   (Int,String,Long)  id	 productName	orderTime
  //IN2   (Int, Int,String,Long)  shipId	orderId	status	shiptime
  //OUT   (Int,String,String,Long,Long)) orderId productName status orderTime shiptime
  .apply(new CoGroupFunction[(Int,String,Long),(Int, Int,String,Long),(Int,String,String,Long,Long)] {
    override def coGroup(first: lang.Iterable[(Int,String,Long)], second: lang.Iterable[(Int, Int,String,Long)], out: Collector[(Int,String,String,Long,Long)]): Unit = {
      for (firstEle <- first.asScala) {
        var flag = false
        for (secondEle <- second.asScala) {
          //left join: 可以join到
          out.collect((firstEle._1, firstEle._2, secondEle._3, firstEle._3, secondEle._4))
          flag = true
        }
        //left join: join不到
        if (!flag) {
          out.collect((firstEle._1, firstEle._2, "null", firstEle._3, -1L))
        }
      }
    }
  })

leftJoinResult.print()
env.execute()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

(2)分析及结果

与Join没有任何实质区别,只不过在输出的时候更加灵活,可以自定义输出。以上写法和SQL中的Left Join的含义类似。

输出结果:

(1,iphone,shipped,1585706400000,1585710000000)
(4,pad,shipped,1585706700000,1585711800000)
(3,huawei,null,1585706580000,-1)
(2,mac,null,1585706520000,-1)
  • 1
  • 2
  • 3
  • 4

2. intervalJoin源码解析

2.1 between方法进入类

     org.apache.flink.streaming.api.datastream. Class KeyedStream(java){
 Class IntervalJoin{
    Method between{
              //IntervalJoin仅支持EventTime	
	if (timeCharacteristic != TimeCharacteristic.EventTime) {
	throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
	}
     }	
 }
}

org.apache.flink.streaming.api.scala.KeyedStream(scala){
  Class IntervalJoin{
    //between方法注释leftElement.timestamp + lowerBound <= rightElement.timestamp<= leftElement.timestamp + upperBound
    Method between{
         new IntervalJoined[IN1, IN2, KEY](streamOne, streamTwo, lowerMillis, upperMillis){
	//下界默认包含,此方法是排除下界
	Method lowerBoundExclusive{ this.lowerBoundInclusive = false  }
	//上界默认包含,此方法是排除上界
	Method upperBoundExclusive{ this.upperBoundInclusive = false }
             //process方法中需传入用户自定义函数
	Method process((processJoinFunction: ProcessJoinFunction[IN1, IN2, OUT])){
                //Scala中的process方法跳转到Java中的process方法中
	  asScalaStream(javaJoined.process(processJoinFunction, outType)){
	      //Java中的process方法重点关注IntervalJoinOperator类、流的connect
	      SingleOutputStreamOperator<OUT> process{
		//【重要方法1】
		//An TwoInputStreamOperator operato to execute time-bounded stream inner joins.
		 operator=new IntervalJoinOperator<>{
		}
		//【重要方法2】
 	             //		
		return left
			//是将两个KeyedStream进行connect操作,得到ConnectedStreams,这样的两个数据流之间就可以实现状态共享,对于intervalJoin来说就是两个流相同key的数据可以相互访问	
			.connect(right)
			//Assigns keys to the elements,return The partitioned ConnectedStreams。
			.keyBy(keySelector1, keySelector2)
			//creating a transformed output stream.
			.transform("Interval Join", outputType, operator);

	      }
	  }	
	}
         }
      }
  }
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

2.2 将上述重要方法1 IntervalJoinOperator单独拿出来解析

    class IntervalJoinOperator{
  //流的状态使用的是MapState,这属于Keyed State类型。状态可以理解为本地缓存。
  //分别用来存储两个流的数据,其中Long对应数据的时间戳,List<BufferEntry>对应相同时间戳的数据(其中BufferEntry有element与 hasBeenJoined两个属性)
  private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
  private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
  
  //初始化MapState
  Method initializeState{
 	 this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( ...... ))
	 this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(......))
  }

  //processElement1对左流进行处理,均调用processElement方法
  
   Method processElement1{
	processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
   }
   //processElement2对右流进行处理,均调用processElement方法
   Method processElement2(StreamRecord<T2> record) throws Exception {
	processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
  }
  
  //方法描述的是,当两个流达到之后,比如左边的流有数据到达之后,就去右边的流去查找对应上下界范围内的数据。这两个方法调用的都是processElement方法。
  Method processElement{
		//获取流的值
	              final THIS ourValue = record.getValue();
		//获取流的时间戳
	              final long ourTimestamp = record.getTimestamp();
		
 		//时间戳的值要有实际意义,一般使用EventTime
		if (ourTimestamp == Long.MIN_VALUE) {
			throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
					"interval stream joins need to have timestamps meaningful timestamps.");
		}
		
		//判断该条记录是否延迟,如果延迟,则直接跳出方法,不作任何处理。
		if (isLate(ourTimestamp)) {
                                //到达的记录的时间戳小于当前水位时,说明该条数据延迟,不对该条数据作任何处理
			return;
		}

		//将记录添加到对应流的MapState中,并给改记录打上未Join的标签flase
		addToBuffer(ourBuffer, ourValue, ourTimestamp);
		
		//遍历另一条流的MapStat: otherBuffer
		for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
			//另一条流记录的时间戳
			final long timestamp  = bucket.getKey();
			
			// 如过当前流的时间戳ourTimestamp与另一条流的时间戳timestamp满足如下关系
			//ourTimestamp + relativeLowerBound <=  timestamp <= ourTimestamp + relativeUpperBound
			//则进行Join操作,否则不作任何操作。
			if (timestamp < ourTimestamp + relativeLowerBound ||
					timestamp > ourTimestamp + relativeUpperBound) {
				continue;
			}
			
			//获取另一条流的值,并执行用户自定义函数的逻辑
			//取双流中时间戳较大者作为用户自定义函数ProcessJoinFunction中重写processElemen方法的输入
			for (BufferEntry<OTHER> entry: bucket.getValue()) {
				if (isLeft) {
					//左流执行Join逻辑
					collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
				} else {
					//右流执行Join逻辑
					collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
				}
			}
		}
		//当前流的清除时间
		long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
		
		//对状态的清理详细看本类onEventTime解析
                            //定时的清理时间,就是当下记录的时间+relativeUpperBound,当watermark大于该时间就需要清理
		//这里可以理解为加了relativeUpperBound延长了当下记录流从状态中删除的时间。
		if (isLeft) {
			//左流执行,注册定时清理时间
			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
		} else {
			//右流执行,注册定时清理时间
			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
		}

  }
 
 //判断记录时间戳是否延迟
 Method boolean isLate(long timestamp) {
              //获得当前eventTime的Watermark.水位是单调递增函数
	long currentWatermark = internalTimerService.currentWatermark();
	//如果记录中的时间戳小于currentWatermark则返回true,即当到达的记录的时间戳小于水位线时,说明该数据延时,不去处理,不去关联另一条流的数据。
	return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
 }
 //将记录添加到对应流的MapState中
 Method  void addToBuffer{			
	List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
	if (elemsInBucket == null) {
	     elemsInBucket = new ArrayList<>();
	}
              //给改条记录默认打上一个未Join的标签false: new BufferEntry<>(value, false)
	elemsInBucket.add(new BufferEntry<>(value, false));
	buffer.put(timestamp, elemsInBucket);
 }
 
 //collet方法,取双流中时间戳较大者作为用户自定义函数ProcessJoinFunction中重写processElemen方法的输入
 Method collect {
		final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

		collector.setAbsoluteTimestamp(resultTimestamp);
		context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

		userFunction.processElement(left, right, context, collector);
 }
 
 //清除watermark大于该记录EventTime记录
 Method onEventTime(InternalTimer<K, String> timer){

		//注册当前流的清除时间(而不是数据的时间戳)
 		long timerTimestamp = timer.getTimestamp();

		String namespace = timer.getNamespace();

		logger.trace("onEventTime @ {}", timerTimestamp);

		switch (namespace) {
			//假设: 假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s。lowerBound为1s,upperBound为5s,
			//含义: 左流可以Join上右流时间范围在 [左流+1,左流+5]的数据,即 左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s 
			             右流可以Join上左流时间范围在 [右流-5,右流-1]的数据,即 右边时间戳-5s<=左边流时间戳<=右边时间戳-1s

			//【重点】当前流的清除时间
			//long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;

			//对左流状态清除, 此时cleanupTime = 时间戳+5s,即15秒的时候可以清除左流中时间戳在10s的数据
			case CLEANUP_NAMESPACE_LEFT: {				
				//根据左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s ;lowerBound为1s,upperBound为5s。
				//如果是左流,调用processElement1方法, relativeUpperBound为5即 relativeUpperBound>0, 此时的 timerTimestamp=10+5=15s。
				//此时清除左流的timestamp=timerTimestamp=15s.
				//当时间达到15s时,可以清除左边流时间戳为10s数据,即看右边流在15s时,需要查找的左边流时间范围10s<=左边流时间戳<=14s,所以watermark>15s时可清除左边流时间戳为10s数据。
				long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
				logger.trace("Removing from left buffer @ {}", timestamp);
				leftBuffer.remove(timestamp);
				break;
			}

			//对右流状态清除,此时cleanupTime = 时间戳,即10秒的时候可以清除右流中时间戳在10s的数据
			case CLEANUP_NAMESPACE_RIGHT: {
				//右边时间戳-5s<=左边流时间戳<=右边时间戳-1s;此时relativeLowerBound为-5,relativeUpperBound为-1。		
				//如果为右边流数据到达,调用processElement2方法 ,relativeLowerBound为-5即relativeLowerBound<0,此时的 timerTimestamp=10s。
				//此时清除右流的timestamp=timerTimestamp + lowerBound =10s-5=5s,实际上清除的是右流为5s的数据???
				//当时间到达10s,可以清除右边流时间戳为10s的数据,即看左边流在10s时,需要查找右边流时间范围11s<=右边时间戳<=16s,所以所以watermark>10s时可清除右边流时间戳为10s数据。
				long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
				logger.trace("Removing from right buffer @ {}", timestamp);
				rightBuffer.remove(timestamp);
				break;
			}
			default:
				throw new RuntimeException("Invalid namespace " + namespace);
		}

 }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161

2.3 状态清理机制详解

2.3.1 状态清理时间cleanupTime

		long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
  • 1

2.3.2 执行状态清理操作 Buffer.remove(timestamp)

1.假设: 假设目前流到达的数据的时间戳为10s,between传进去的时间分别为1s,5s。lowerBound为1s,upperBound为5s,

2.含义:
左流可以Join上右流时间范围在 [左流+1,左流+5]的数据,即 左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s
右流可以Join上左流时间范围在 [右流-5,右流-1]的数据,即 右边时间戳-5s<=左边流时间戳<=右边时间戳-1s

3.当左流时间戳为10s的数据进入
(1)左边流时间戳+1s<=右边时间戳<=左边流时间戳+5s ;lowerBound为1s,upperBound为5s
(2)如果是左流,调用processElement1方法, relativeUpperBound为5即 relativeUpperBound>0, 此时的 timerTimestamp=10+5=15s。

long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
  • 1

(3)此时 relativelowerBound为1即lowerBound>0; 清除左流的timestamp=timerTimestamp=15s.

long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
				logger.trace("Removing from right buffer @ {}", timestamp);
				rightBuffer.remove(timestamp);
  • 1
  • 2
  • 3

(4)结论:
当时间达到15s时,可以清除左边流时间戳为10s数据,即看右边流在15s时,需要查找的左边流时间范围10s<=左边流时间戳<=14s,所以watermark>15s时可清除左边流时间戳为10s数据。

4.当右流时间戳为10s的数据进入
(1)右边时间戳-5s<=左边流时间戳<=右边时间戳-1s;此时relativeLowerBound为-5,relativeUpperBound为-1。
(2)如果为右边流数据到达,调用processElement2方法 ,relativeLowerBound为-5即relativeLowerBound<0,此时的 timerTimestamp=10s。

	long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
  • 1

(3)此时 relativelowerBound为-5即lowerBound<0;清除右流的timestamp=timerTimestamp + lowerBound =10s-5=5s,实际上清除的是右流为5s的数据???

long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
				logger.trace("Removing from right buffer @ {}", timestamp);
				rightBuffer.remove(timestamp);
  • 1
  • 2
  • 3

(4)结论
当时间到达10s,可以清除右边流时间戳为10s的数据,即看左边流在10s时,需要查找右边流时间范围11s<=右边时间戳<=16s,所以所以watermark>10s时可清除右边流时间戳为10s数据。

2.4 看完源码后需要知道的

2.4.1 MapState存储状态

状态使用的是MapState,这属于Keyed State类型。状态可以理解为本地缓存,分别用来存储两个流的数据。其数据结构为 MapState<Long, List<BufferEntry>>,其中Long对应数据的时间戳,List对应相同时间戳的数据(其中BufferEntry有element与 hasBeenJoined两个属性)

2.4.2 状态清理时间

左流状态清理时间=ourTimestamp(数据流中的Event) + relativeUpperBound(时间范围上界)

右流状态清理时间=ourTimestamp(数据流中的Event)

3.三种Join的区别及使用场景

Join方式特点生产使用
Join类似SQL中的Inner Join,对同窗口同key的数据Join的上对双流/多流Join基本上不用Join,因为这种方式基本上会丢数据;对Processing Time和Event Time都支持。
IntervalJoin和上面的Join类似,不同的是左流和右流中每条记录只关联另外一条流上同一时间段内的数据,这个数据保留在State中(Flink自身维护)对于双流Join这种方式经常使用,不过值得注意的是这种方式仍然会存在丢数据的情况,比如超过时间范围而数据是同窗口同Key的情况下,这条数据还是会丢失;仅支持Event Time的计算。
CoGroupJoin类似于SQL中的Left Join,和Join的区别在于输出数据流的方式可以自定义对于双流Join,在写API方式中经常使用,会结合sideOutputLateData(侧流)将延迟数据拿出来,对延迟数据进一步处理;对Processing Time和Event Time都支持。

参考资料

Flink DataStream Join && IntervalJoin && coGroup的区别
https://blog.csdn.net/qq_33689414/article/details/93875881

(阿里云实时flink版本)IntervalJoin语句
https://help.aliyun.com/document_detail/195298.html


(原理)Apache Flink 漫谈系列 - Time Interval JOIN
https://enjoyment.cool/2019/03/22/Apache%20Flink%20%E6%BC%AB%E8%B0%88%E7%B3%BB%E5%88%97%20-%20Time%20Interval%20JOIN/#more


(状态清理机制)Flink1.11 intervalJoin watermark生成,状态清理机制源码理解&Demo分析
https://blog.csdn.net/qq_34864753/article/details/111183556

(源码分析)Flink Interval Join 使用和原理分析
 https://blog.csdn.net/tzs_1041218129/article/details/109475489?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_baidulandingword-1&spm=1001.2101.3001.4242
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

4.多个流Join

    上面说到了两个流的JOIN,但是实际场景中可能涉及到四个流,甚至六个流的JOIN,该如何实现呢?

4.1 场景1:多个流更新不频繁,需要实时join成一张表(多个维表JOIN成一张维表)

在这里插入图片描述
    实现步骤:
    1.用Canal实时同步MySQL binlog到Kafka,形成相应的流。
    2.将表A流与原MySQL中其他表(表B、C、D)异步JOIN。对表E进行相应的增删改。
    注意此处前提条件:
    (1)MySQL的四张表更新不频繁,因为如果更新频繁,使用MySQL进行异步Join可能QPS要求达不到。
    (2)表A去Join表B、C、D是根据情况选择,只需要Join对标E有增删改的表。

4.2 两个流Join(事实表与维表JOIN)

在这里插入图片描述

    实现步骤:
    1.用Canal实时同步MySQL binlog到Kafka,事实表A流。
    2.用Canal实时同步MySQ维表B到Phoenix。
    3.使用Kafka中A流异步Join Phoenix中的维表B,将结果写入到Phoenix中结果表C。
    注意:
    (1)此处将维表B实时同步到Phoenix中,是维表B的QPS比较高(这里的维表是一个广泛概念),如果QPS比较低,可以直接使用MySQL中的维表B。

4.3 两个事实表Join(不使用TimeWindowJoin)

    两个实时表Join如果使用TimeWindowJoin就是将数据状态保存在Flink的Operate State中。首先,这里使用第三方存储Phoenix。其次IntervalJoin的缺点是其中一个流如果有延迟,而且延迟超过State的过期时间,就会存在数据丢失的情况。此处使用CoGroupJoin+侧流输出解决此问题。

在这里插入图片描述

    实现步骤:
    1.用Canal实时同步MySQL binlog到Kafka,事实表A流和B流。
    2.使用A流 coGroup B流
    3.A流late,sideputTag+API/DB(使用API从数据库中异步JoinB表数据)。同理B流late,sideputTag+API/DB(使用API从数据库中异步JoinA表数据)。
    4.UNION。 将所有流UNION起来并写入到Phoenix表C
    注意:
    此处与IntervalJoin的不同是,没有使用Flink的状态,而是将延迟的数据直接通过SideOutPutTag拿出来,并异步Join MySQL中的数据。

5.拓展

5.1 Canal的Phoenix插件

由作者开发,并提交到开源社区。详细见

4.5 Canal的替代Flink CDC

实时流批一体是实时数据处理趋势,使用Flink CDC也势在必行。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/734628
推荐阅读
相关标签
  

闽ICP备14008679号