当前位置:   article > 正文

Flink Join操作_flinksql开窗后进行left join关联

flinksql开窗后进行left join关联

目录

DataStream API(函数编程)

window Join

join

coGroup

interval Join

Table API(flink sql)

Reguler Join (常规join)

inner join

left join / right join

full join

interval join

lookup join

Window Join

INNER/LEFT/RIGHT/FULL OUTER 


DataStream API(函数编程)

window Join

join

对处于同一窗口的数据进行join

时间类型:processTime、eventTime

问题:1、不在同一窗口的数据无法join,

           2、只能inner join

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...
  6. DataStream<Integer> greenStream = ...
  7. orangeStream.join(greenStream)
  8. .where(<KeySelector>) // 左侧key值
  9. .equalTo(<KeySelector>) // 右侧key值
  10. .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)) // 开窗方式 tumbing/sliding/session
  11. .apply (new JoinFunction<Integer, Integer, String> (){
  12. @Override
  13. public String join(Integer first, Integer second) {
  14. return first + "," + second;
  15. }
  16. });

coGroup

coGroup是join的底层方法,通过coGroup可以实现inner/left/right/full 四种join

时间类型:processTime、eventTime

问题:不在同一窗口的数据无法join

  1. latedata.coGroup(stream)
  2. .where(a->a.getStr("a"))
  3. .equalTo(a->a.getStr("a"))
  4. .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  5. .apply(new CoGroupFunction<JSONObject, JSONObject, Object>() {
  6. @Override
  7. public void coGroup(Iterable<JSONObject> iterable, Iterable<JSONObject> iterable1, Collector<Object> collector) throws Exception {
  8. }
  9. })

interval Join

 为了解决window join的问题:处于不同窗口的数据无法join

时间类型:eventTime

interval join :根据左流的数据的时间点,左右各等待一段右流时间,在此范围内进行join

问题:只能是以左流为时间线,因此只支持inner join

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
  3. import org.apache.flink.streaming.api.windowing.time.Time;
  4. ...
  5. DataStream<Integer> orangeStream = ...
  6. DataStream<Integer> greenStream = ...
  7. orangeStream
  8. .keyBy(<KeySelector>)
  9. .intervalJoin(greenStream.keyBy(<KeySelector>))
  10. .between(Time.milliseconds(-2), Time.milliseconds(1))
  11. .process (new ProcessJoinFunction<Integer, Integer, String(){
  12. @Override
  13. public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
  14. out.collect(first + "," + second);
  15. }
  16. });

Table API(flink sql)

Reguler Join (常规join)

默认没有时间范围,全局都可以join

可以设置数据过期时间

tableEnv.getConfig().setIdleStateRetention(xx)

设置过期时间后以后,四种join 数据过期方式各有不同

inner join

inner join 左流右流,创建后进入过期倒计时

  1. SELECT *
  2. FROM Orders
  3. INNER JOIN Product
  4. ON Orders.product_id = Product.id

left join / right join

left: 左流创建后进入过期倒计时,但是成功join一次后,就会重置过期时间

right: 右流创建后进入过期倒计时,但是成功join一次后,就会重置过期时间

  1. SELECT *
  2. FROM Orders
  3. LEFT JOIN Product
  4. ON Orders.product_id = Product.id
  5. SELECT *
  6. FROM Orders
  7. RIGHT JOIN Product
  8. ON Orders.product_id = Product.id

full join

左、右流创建后进入过期倒计时,但是成功join一次后,就会重置过期时间

  1. SELECT *
  2. FROM Orders
  3. FULL OUTER JOIN Product
  4. ON Orders.product_id = Product.id

interval join

作为DataStreamApi升级版的interval join,sql版本的支持处理时间语义和事件事件语义

  1. SELECT *
  2. FROM Orders o, Shipments s
  3. WHERE o.id = s.order_id
  4. AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

lookup join

效果等同于cdc,但是每次过来一条数据都会去数据库进行一次查询关联、效率差

但是可以设置缓存机制,如果用过一次后会缓存指定的时间,但是在缓存期间内就不会实时同步mysql的数据了。此时就和regular join 一样了

因此lookup join 试用场景为字典数据需要变化,但是变化的时间不需要实时变化,有点延迟也可以。

应用场景不多

关键语句

FOR SYSTEM_TIME AS OF o.proc_time

lookup.cache.max-rows

optional(none)Integer

The max number of rows of lookup cache, over this value, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details

最多缓存多少条

lookup.cache.ttl

optional(none)Duration

The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Lookup cache is disabled by default. See the following Lookup Cache section for more details.

缓存数据ttl

1 DAY 

1 HOUR

  1. CREATE TEMPORARY TABLE Orders (
  2. id INT,
  3. order_id INT,
  4. total INT,
  5. proc_time as procetime()
  6. ) WITH (
  7. 'connector' = 'kafka',
  8. ...
  9. );
  10. -- Customers is backed by the JDBC connector and can be used for lookup joins
  11. CREATE TEMPORARY TABLE Customers (
  12. id INT,
  13. name STRING,
  14. country STRING,
  15. zip STRING
  16. ) WITH (
  17. 'connector' = 'jdbc',
  18. 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  19. 'table-name' = 'customers',
  20. 'lookup.cache.max-rows' = '10',
  21. 'lookup.cache.ttl' = '1 hour'
  22. );
  23. -- enrich each order with customer information
  24. SELECT o.order_id, o.total, c.country, c.zip
  25. FROM Orders AS o
  26. JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
  27. ON o.customer_id = c.id;

Window Join

窗口join,必须对表进行TVF开窗才能使用

table(tumple(table tablegreen,descriptor(rt),interval '5' minutes))

时间类型:processTime、eventTime

INNER/LEFT/RIGHT/FULL OUTER 

  1. SELECT ...
  2. FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
  3. ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/784805
推荐阅读
相关标签
  

闽ICP备14008679号