当前位置:   article > 正文

python流处理框架_Faust——python分布式流式处理框架

faust 架构 python

摘要

Faust是用python开发的一个分布式流式处理框架。在一个机器学习应用中,机器学习算法可能被用于数据流实时处理的各个环节,而不是仅仅在推理阶段,算法也不仅仅局限于常见的分类回归算法,而是会根据业务需要执行一个十分差异化的任务, 例如:在我们的时序异常检测应用中, 前处理阶段的变点检测算法。这就要求流处理框架除了具备进行常规的转换聚合操作之外,可以支持更加强大的任意自定义逻辑和更加复杂的自定义状态,能够更好地与原生的python算法代码紧密结合在一起。在主流的flink, spark streaming不能满足我们的个性化需求时, Faust为我们提供了一个选择.

本文将对faust框架的主要功能进行概要描述。

参考连接

https://faust.readthedocs.io/en/latest/

https://github.com/robinhood/faust

基本使用

app

faust库的一个实例,提供了Faust的核心API,通过app可定义kafka topic、流处理器等。

>>> app = faust.App(

... 'myid',

... broker='kafka://kafka.example.com',

... store='rocksdb://',

... )

创建topic

faust以kafka作为数据传输和自组织管理的媒介,可以直接在faust应用中定义kafka主题。

topic = app.topic('name_of_topic')

@app.agent(topic)

async def process(stream):

async for event in stream:

...

创建table

table是Faust中的分布式键值对数据表,可用于保存流处理过程中的中间状态。

transfer_counts = app.Table(

'transfer_counts',

default=int,

key_type=str,

value_type=int,

)

创建agent

agent是数据处理流中的一个基本处理单元,通过从kafka中摄取指定topic中的数据,并进行相应的处理。

import faust

app = faust.App('stream-example')

@app.agent()

async def myagent(stream):

"""Example agent."""

async for value in stream:

print(f'MYAGENT RECEIVED -- {value!r}')

yield value

if __name__ == '__main__':

app.main()

agent——分布式自组织流处理器

import faust

class Add(faust.Record):

a: int

b: int

app = faust.App('agent-example')

topic = app.topic('adding', value_type

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/927149
推荐阅读
相关标签
  

闽ICP备14008679号