赞
踩
flink pyflink alink(pyalink)
Source Sink Batch Stream
DataFrame DataStream
Oprator算子 -
Connector
benv - BatchExecutionEnvironment btenv - BatchTableEnvironment
senv - StreamExecutionEnvironment stenv - StreamTableEnvironment
UDF - User Define Function
2015年Google Dataflow 论文:a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing
unbounded/bounded 边界 批处理是流处理的特例
在 Flink 设计之初流处理应用和批处理应用尽管底层都是流处理,但在编程 API 上是分开的。
注:
1、pyflink 1.12中 Table API 基本对齐了Java的所有功能
2、DataStream API 到 1.13 将补齐缺少的大步幅功能
3、如果项目重点偏向AI,建议使用python作为开发语言,选择flink1.13作为支持引擎
from pyflink.common import WatermarkStrategy, Row from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import NumberSequenceSource from pyflink.datastream.functions import RuntimeContext, MapFunction from pyflink.datastream.state import ValueStateDescriptor class MyMapFunction(MapFunction): def open(self, runtime_context: RuntimeContext): state_desc = ValueStateDescriptor('cnt', Types.LONG()) # 定义value state self.cnt_state = runtime_context.get_state(state_desc) def map(self, value): cnt = self.cnt_state.value() if cnt is None: cnt = 0 new_cnt = cnt + 1 self.cnt_state.update(new_cnt) return value[0], new_cnt def state_access_demo(): # 1. 创建 StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # 2. 创建数据源 seq_num_source = NumberSequenceSource(1, 100) ds = env.from_source( source=seq_num_source, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name='seq_num_source', type_info=Types.LONG()) # 3. 定义执行逻辑 ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \ .key_by(lambda a: a[0]) \ .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()])) # 4. 将打印结果数据 ds.print() # 5. 执行作业 env.execute() if __name__ == '__main__': state_access_demo()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。