当前位置:   article > 正文

基于Flink实时数仓——维表关联代码实现(4.2优化:异步查询)_flink 异步维表

flink 异步维表

Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。
例如:

  • 在电商场景中,需要一个商品的 skuid去关联商品的一些属性,例如商品所属行业、 商品的生产厂家、生产厂家的一些情况;
  • 在物流场景中,知道包裹 id,需要去关联包裹的行业属性、发货信息、收货信息。

默认情况下,在 Flink 的 MapFunction 中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO 阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加 MapFunction 的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。

Flink 在 1.2 中引入了 Async I/O,在异步模式下,将 IO 操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
Async I/O 解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
在这里插入图片描述
异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。这种方式特别针对涉及网络 IO 的操作,减少因为请求等待带来的消耗。

封装维度异步查询的函数类 DimAsyncFunction该类继承异步方法类 RichAsyncFunction,实现自定义维度查询接口
其中 RichAsyncFunction<IN,OUT>是 Flink 提供的异步方法类,此处因为是查询操作输入类和返回类一致,所以是<T,T>。 RichAsyncFunction 这个类要实现两个方法:

  1. open 用于初始化异步连接池。
  2. asyncInvoke 方法是核心方法,里面的操作必须是异步的,如果你查询的数据库有异步 api
    也可以用线程的异步方法,如果没有异步方法,就要自己利用线程池等方式实现异步查 询。

如何使用这个 DimAsyncFunction
核心的类是 AsyncDataStream,这个类有两个方法一个是有序等待(orderedWait),一个是无序等待(unorderedWait)。

无序等待(unorderedWait)

后来的数据,如果异步查询速度快可以超过先来的数据,这样性能会更好一些,但是会
有乱序出现。

有序等待(orderedWait)

严格保留先来后到的顺序,所以后来的数据即使先完成也要等前面的数据。所以性能会
差一些。

  1. 必须重写装配结果 join 方法(将维度数据注入宽表)和获取查询 rowkey 的 getKey 方法。
  2. 方法的最后两个参数 10, TimeUnit.SECONDS ,标识次异步查询最多执行 10 秒, 否则会报超时异常。

代码实现:

  //4.1 关联用户维度
        SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait(
                orderWideWithNoDimDS,
                new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
   
                    @Override
                    public String getKey(OrderWide orderWide) {
   
                        return orderWide.getUser_id().toString();
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException {
   
                        orderWide.setUser_gender(dimInfo.getString("GENDER"));

                        String birthday = dimInfo.getString("BIRTHDAY");
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

                        long currentTs = System</
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/962928
推荐阅读
相关标签
  

闽ICP备14008679号