赞
踩
摘要
Faust是用python开发的一个分布式流式处理框架。在一个机器学习应用中,机器学习算法可能被用于数据流实时处理的各个环节,而不是仅仅在推理阶段,算法也不仅仅局限于常见的分类回归算法,而是会根据业务需要执行一个十分差异化的任务, 例如:在我们的时序异常检测应用中, 前处理阶段的变点检测算法。这就要求流处理框架除了具备进行常规的转换聚合操作之外,可以支持更加强大的任意自定义逻辑和更加复杂的自定义状态,能够更好地与原生的python算法代码紧密结合在一起。在主流的flink, spark streaming不能满足我们的个性化需求时, Faust为我们提供了一个选择.
本文将对faust框架的主要功能进行概要描述。
参考连接
https://faust.readthedocs.io/en/latest/
https://github.com/robinhood/faust
基本使用
app
faust库的一个实例,提供了Faust的核心API,通过app可定义kafka topic、流处理器等。
>>> app = faust.App(
... 'myid',
... broker='kafka://kafka.example.com',
... store='rocksdb://',
... )
创建topic
faust以kafka作为数据传输和自组织管理的媒介,可以直接在faust应用中定义kafka主题。
topic = app.topic('name_of_topic')
@app.agent(topic)
async def process(stream):
async for event in stream:
...
创建table
table是Faust中的分布式键值对数据表,可用于保存流处理过程中的中间状态。
transfer_counts = app.Table(
'transfer_counts',
default=int,
key_type=str,
value_type=int,
)
创建agent
agent是数据处理流中的一个基本处理单元,通过从kafka中摄取指定topic中的数据,并进行相应的处理。
import faust
app = faust.App('stream-example')
@app.agent()
async def myagent(stream):
"""Example agent."""
async for value in stream:
print(f'MYAGENT RECEIVED -- {value!r}')
yield value
if __name__ == '__main__':
app.main()
agent——分布式自组织流处理器
import faust
class Add(faust.Record):
a: int
b: int
app = faust.App('agent-example')
topic = app.topic('adding', value_type
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。