赞
踩
在 Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。
例如:
默认情况下,在 Flink 的 MapFunction 中,单个并行只能用同步方式去交互: 将请求发送到外部存储,IO 阻塞,等待请求返回,然后继续发送下一个请求。这种同步交互的方式往往在网络等待上就耗费了大量时间。为了提高处理效率,可以增加 MapFunction 的并行度,但增加并行度就意味着更多的资源,并不是一种非常好的解决方式。
Flink 在 1.2 中引入了 Async I/O,在异步模式下,将 IO 操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
Async I/O 解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。这种方式特别针对涉及网络 IO 的操作,减少因为请求等待带来的消耗。
封装维度异步查询的函数类 DimAsyncFunction该类继承异步方法类 RichAsyncFunction,实现自定义维度查询接口
其中 RichAsyncFunction<IN,OUT>是 Flink 提供的异步方法类,此处因为是查询操作输入类和返回类一致,所以是<T,T>。 RichAsyncFunction 这个类要实现两个方法:
如何使用这个 DimAsyncFunction
核心的类是 AsyncDataStream,这个类有两个方法一个是有序等待(orderedWait),一个是无序等待(unorderedWait)。
无序等待(unorderedWait)
后来的数据,如果异步查询速度快可以超过先来的数据,这样性能会更好一些,但是会
有乱序出现。
有序等待(orderedWait)
严格保留先来后到的顺序,所以后来的数据即使先完成也要等前面的数据。所以性能会
差一些。
代码实现:
//4.1 关联用户维度 SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait( orderWideWithNoDimDS, new DimAsyncFunction<OrderWide>("DIM_USER_INFO") { @Override public String getKey(OrderWide orderWide) { return orderWide.getUser_id().toString(); } @Override public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException { orderWide.setUser_gender(dimInfo.getString("GENDER")); String birthday = dimInfo.getString("BIRTHDAY"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); long currentTs = System</
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。