当前位置:   article > 正文

一、一文搞懂什么是flink?flink版本怎么选?flink与pylink、alink之间的关系?

一、一文搞懂什么是flink?flink版本怎么选?flink与pylink、alink之间的关系?

关键字

flink pyflink alink(pyalink)
Source Sink Batch Stream
DataFrame DataStream
Oprator算子 -
Connector
benv - BatchExecutionEnvironment btenv - BatchTableEnvironment
senv - StreamExecutionEnvironment stenv - StreamTableEnvironment

UDF - User Define Function


理念

2015年Google Dataflow 论文:a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing
unbounded/bounded 边界 批处理是流处理的特例
在 Flink 设计之初流处理应用和批处理应用尽管底层都是流处理,但在编程 API 上是分开的。

在这里插入图片描述
在这里插入图片描述


版本演进

在这里插入图片描述


使用场景

在这里插入图片描述

Pyflink系统架构

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
注:
1、pyflink 1.12中 Table API 基本对齐了Java的所有功能
2、DataStream API 到 1.13 将补齐缺少的大步幅功能
3、如果项目重点偏向AI,建议使用python作为开发语言,选择flink1.13作为支持引擎
在这里插入图片描述

Pyflink Table API

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Pyflink DataStream API

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        state_desc = ValueStateDescriptor('cnt', Types.LONG())
        # 定义value state
        self.cnt_state = runtime_context.get_state(state_desc)

    def map(self, value):
        cnt = self.cnt_state.value()
        if cnt is None:
            cnt = 0

        new_cnt = cnt + 1
        self.cnt_state.update(new_cnt)
        return value[0], new_cnt


def state_access_demo():
    # 1. 创建 StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()

    # 2. 创建数据源
    seq_num_source = NumberSequenceSource(1, 100)
    ds = env.from_source(
        source=seq_num_source,
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name='seq_num_source',
        type_info=Types.LONG())

    # 3. 定义执行逻辑
    ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
           .key_by(lambda a: a[0]) \
           .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))

    # 4. 将打印结果数据
    ds.print()

    # 5. 执行作业
    env.execute()


if __name__ == '__main__':
    state_access_demo()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

在这里插入图片描述
在这里插入图片描述

Alink功能架构

在这里插入图片描述

Alink系统架构

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/619723
推荐阅读
相关标签
  

闽ICP备14008679号