当前位置:   article > 正文

RisingWave 用户定义函数 (一) :概览

RisingWave 用户定义函数 (一) :概览

|作者:王润基 RisingWave Labs 内核开发工程师

用户定义函数(User Defined Function,以下简称 UDF)是数据系统中的常见功能。它允许用户使用各种语言定义自己的函数,作为内置函数的补充,以实现各种定制化执行逻辑。通过 UDF,我们可以将多个已有函数组合起来形成新函数,简化查询逻辑;也可以使用 Python 等语言编写代码,借用其它语言的生态,填补 SQL 语言和内置函数表达能力的不足;除了纯计算以外,我们还可以调用外部系统 API,将外部服务集成到统一的数据处理管线中。可以说,UDF 的存在极大提升了数据处理系统的灵活性和扩展性。

之前的文章中,我们介绍了 RisingWave 内部函数的开发框架,它主要面向内部开发者。接下来,我们将通过一系列文章介绍面向用户的 UDF 功能的设计与实现过程。它在技术上与前者一脉相承,但对易用性提出了更高的要求。作为该系列第一篇文章,我们先来概览一下 UDF 的种类、用户接口和使用场景。

UDF 的种类

UDF 可以从三个维度进行分类。

第一个维度是函数的输入输出。在 SQL 中,我们有以下几种常见的函数:

  • 标量函数(Scalar Function):输入一行,输出一行。例如 abs(-1) -> 1
  • 表值函数(Table Function):输入一行,输出多行。例如 generate_series(1,3) -> {1,2,3}
  • 聚合函数(Aggregate Function):输入多行,输出一行。例如 sum({1,2,3}) -> 6 。
  • 窗口函数(Window Function):输入多行,输出多行。例如 lag({1,2,3}) -> {NULL,1,2}

这里每一种函数都可以支持用户自定义。在 UDF 的语境下,我们通常将它们简称为 UDF、UDTF、UDAF 和 UDWF。RisingWave 目前支持 UDF 和 UDTF,它们可以覆盖绝大多数实际需求。我们也正在开发 UDAF 以支持更多场景。

第二个维度是编写函数的语言。目前 RisingWave 已经支持的语言包括 SQL、Python、Java、JavaScript 和 Rust。它们面向的用户和场景也略有不同:

  • SQL 是 RisingWave 的原生语言。SQL UDF 并没有提供超出 SQL 本身的表达能力,它只是将已有的函数进行组合,实现代码复用和简化查询的效果。
  • Python 是数据科学和人工智能领域的常用语言。它编写简单、生态丰富、用户众多,因此是我们最早支持的 UDF 语言。但它作为解释执行的脚本语言,运行速度很慢,不适合实现重计算任务。
  • Java 是大数据系统的主流语言。包括 Hadoop、Spark、Flink、Kafka 等在内的 Apache 生态都使用 Java 编写。因此 Java UDF 主要面向从以上系统迁移而来的用户。Java 语言性能较高,但其编写和部署方式较为繁琐。
  • JavaScript 是 Web 前后端的主流语言。近年来其生态迅猛发展,有赶超 Python 的趋势。JavaScript UDF 主要面向前后端开发者。
  • Rust 是高性能系统编程语言,也是 RisingWave 本身的开发语言。Rust UDF 适合编写对性能要求高的重计算任务。它会被编译为 WebAssembly 在 RisingWave 内置的容器中运行。

总的来说,我们的目标是让用户能够用任何喜欢的语言和适合的语言来编写函数,不让语言成为表达逻辑的障碍。

第三个维度是函数的执行方式。这决定了函数的性能和能力。目前我们有以下三种执行方式:

  • 内联执行:特指 SQL UDF。它会在前端直接展开 inline 到表达式中。因此和手动调用多个函数相比,性能几乎没有区别。
  • 嵌入式执行:函数运行在 RisingWave 计算节点内嵌的语言 runtime 中。由于直接在进程内调用,它几乎没有额外通信开销。但出于安全考虑,我们目前不开放 runtime 的外部访问,也就是说这些函数只能进行纯计算,不能访问网络。目前 Python、JavaScript 和 Rust 支持嵌入式执行。
  • 外部函数(External Function):函数运行在一个独立的进程中,以 RPC 的方式向 RisingWave 提供服务。目前外部函数支持 Python 和 Java 语言。这种方式提供了最大的灵活性,用户可以在这个进程中做任何事情。而它与 RisingWave 本身天然隔离,使得 RisingWave 无须担心用户定义函数与其争抢资源或干扰运行。但这种方式最大的问题在于性能,因为 RPC 会引入很大的延迟,导致整个数据流的阻塞(尤其是当在云上和 RisingWave 部署在不同的数据中心时)。此外,它对用户的部署和运维也带来不少麻烦。

在 RisingWave 的开发过程中,我们首先实现的是 Python 语言的外部函数。这也是目前用户使用最多的一种 UDF。为了解决它带来的性能和易用性的问题,我们又开发了 SQL UDF 和嵌入式执行的 UDF。在之后的文章中,我们将深入它们的设计与实现,并探讨相关问题与解决方案。

RisingWave UDF 用户接口

为了让大家对以上五花八门的 UDF 有一个具体的概念,我们来看一下用户在 RisingWave 中如何定义 UDF。这里我们举几个经典的使用场景,更多详细用法可以查看 RisingWave 官方文档

示例 1:使用 SQL UDF 简化查询

例如我们想将学生的百分制分数转换为成绩等级。这一逻辑可以使用 SQL 内置的 case when 语句实现,但是它表达起来比较繁琐,写在查询中可读性不佳。于是我们可以创建一个 SQL UDF 表达这一过程。在 RisingWave 中,使用 create function 语句即可定义一个函数:

  1. create function grade(score int) returns varchar language sql as $$
  2. select case score >= 100 then 'A+'
  3. when score >= 90 then 'A'
  4. when score >= 80 then 'B'
  5. when score >= 70 then 'C'
  6. when score >= 60 then 'D'
  7. else 'F'
  8. end;
  9. $$;

然后,我们就可以像内置函数一样在任何地方调用这个 UDF 了:

  1. select score, grade(score) from generate_series(50, 100, 10) t(score);
  2. score | grade
  3. -------+-------
  4. 50 | F
  5. 60 | D
  6. 70 | C
  7. 80 | B
  8. 90 | A
  9. 100 | A+
  10. (6 rows)

示例 2:使用 Rust UDF 进行数学计算

如果我们想实现一些内置函数不支持的数学运算,例如求最大公约数(GCD),可以使用 Rust UDF。和上面的 SQL UDF 类似,我们可以将 Rust 代码嵌入 create function 语句中。

  1. create function gcd(int, int) returns int language rust as $$
  2. fn gcd(mut a: i32, mut b: i32) -> i32 {
  3. while b != 0 {
  4. (a, b) = (b, a % b);
  5. }
  6. a
  7. }
  8. $$;

这段代码会在 RisingWave 前端被编译到 WebAssembly,然后在计算节点上以 JIT 方式运行,以达到接近原生的性能。其它一些需要高性能定制计算逻辑的场景,例如量化因子提取,也适合使用 Rust UDF 实现。

示例 3:使用 Rust 第三方库解析 protobuf 数据

解析 Protobuf 也可以使用 Rust UDF。只是此时我们需要使用一些第三方库(例如 prost),先从 .proto 文件生成 Rust 代码,然后再对数据进行提取。整个过程需要创建一个完整的 cargo 项目,仅仅在语句中嵌入代码是无法完成这一需求的。

  1. // lib.rs
  2. use arrow_udf::{function, types::StructType};
  3. use prost::{DecodeError, Message};
  4. // 导入从 .proto 生成的 Rust 代码
  5. pub mod proto {
  6. include!(concat!(env!("OUT_DIR"), "/proto.rs"));
  7. }
  8. // 定义返回结构体
  9. #[derive(StructType)]
  10. struct DataKey {
  11. stream: String,
  12. pan: String,
  13. }
  14. // 定义解析函数
  15. #[function("decode_proto(bytea) -> struct DataKey")]
  16. fn decode_proto(data: &[u8]) -> Result<DataKey, DecodeError> {
  17. let data_key = proto::DataKey::decode(data)?;
  18. Ok(DataKey {
  19. stream: data_key.stream,
  20. pan: data_key.pan,
  21. })
  22. }

对于这样的场景,我们允许用户自己创建 Rust 项目并编译成 WebAssembly 模块。然后通过 CREATE FUNCTION 语句将 WASM 模块以 BASE64 编码的方式直接导入 RisingWave 中。

  1. \\set wasm_binary `base64 -i target/release/decode.wasm`
  2. create function decode_proto(bytea) returns struct<stream varchar, pan varchar>
  3. language wasm using base64 :'wasm_binary';

具体操作方法可以参考 RisingWave 文档

示例 4:使用 JavaScript UDTF 处理 JSON 数据

假设我们想获得 RisingWave 项目的贡献者名单。我们访问 Github REST API 获取到一个 JSON 列表,希望从中提取出想要的信息。于是我们用 JavaScript 定义一个 UDTF,输入 API 返回的 JSON,然后逐行输出开发者昵称和贡献数。

  1. create function contributors(response jsonb)
  2. returns table (name varchar, contributions int)
  3. language javascript as $$
  4. for (let user of response) {
  5. yield { name: user.login, contributions: user.contributions };
  6. }
  7. $$;
  8. select * from contributors('<response>') limit 5;
  9. name | contributions
  10. -----------------+---------------
  11. BugenZhao | 616
  12. skyzh | 482
  13. TennyZhuang | 465
  14. xxchan | 435
  15. kwannoel | 410

示例 5:使用 Python 外部函数调用 LLM 服务

如果你想在流处理中调用时下最流行的大语言模型服务,可以使用 Python 定义外部函数。下面的脚本创建了一个 UDF Server,其中定义了一个函数,内部调用 OpenAI API 生成文本:

  1. from risingwave.udf import udf, UdfServer
  2. from openai import OpenAI
  3. client = OpenAI()
  4. @udf(input_types=["VARCHAR"], result_type="VARCHAR", io_threads=8)
  5. def ask_ai(content):
  6. response = client.chat.completions.create(
  7. model="gpt-3.5-turbo",
  8. messages=[
  9. {"role": "system", "content": "You are a helpful assistant."},
  10. {"role": "user", "content": content},
  11. ],
  12. )
  13. return response.choices[0].message.content
  14. if __name__ == "__main__":
  15. server = UdfServer(location="localhost:8815")
  16. server.add_function(ask_ai)
  17. server.serve()

当 UDF 服务启动后,我们可以在 RisingWave 中创建函数,与 Python 服务器建立连接。

  1. create function ask_ai(varchar) returns varchar
  2. as ask_ai using link '<http://localhost:8815>';

总结

用户可以通过 CREATE FUNCTION 语句在 RisingWave 中创建自定义函数。函数需要指定参数和返回值类型,然后嵌入实现代码,或者指定外部函数的地址。用户可以根据实际使用场景,选择适合的语言实现 UDF。

使用场景适合的 UDF
组合函数,简化查询SQL
高性能计算Rust
数据格式转换Rust / JS / Python
调用外部服务Python 外部函数
从 Flink 等系统迁移Java 外部函数

实现方式

RisingWave UDF 使用 Apache Arrow 作为数据接口格式。这是因为 UDF 涉及跨语言、跨进程的数据交换,而 Arrow 已经成为这一领域的事实标准。使用 Arrow 一方面可以直接复用其现有生态,例如我们基于 Arrow Flight RPC、pyarrow 和 Arrow Java API 实现了 Python 和 Java 的外部函数框架;另一方面,我们实现的 UDF 开发框架也可以被其它基于 Arrow 的项目所使用。

最近,我们将所有 UDF 实现代码从 RisingWave 中抽离出来,作为一个独立项目 arrow-udf 发布到社区。很快,隔壁 Databend 就基于这一框架为他们的系统实现了 Python、JavaScript 和 WASM 的 UDF 支持。还有一位热心的社区开发者主动为 JavaScript UDF 添加了 Deno 后端支持。 目前,这一 UDF 生态已经初现雏形,相信借助社区的力量未来能够获得更好的发展。如果你恰好也在用 Rust 开发数据系统,不妨尝试集成一下 arrow-udf,无论是 UDF 还是内置函数都可以用哦。

RisingWave 各种语言 UDF 的模块结构

说完了接口,后面的实现工作其实就比较 trivial 了。对于嵌入式 UDF,包括 Rust (WebAssembly)、JavaScript 和 Python,我们分别嵌入了 wasmtime、quickjs / deno 和 CPython 作为语言运行时。而对于外部函数,我们使用 Arrow Flight RPC 将数据发送给远端 UDF 进程,UDF 进程则使用我们提供的 Python SDK 和 Java SDK 进行开发。无论何种形式,都会有一段胶水代码负责将 Arrow 中数据转换为语言的原生类型,最终调用到用户定义的函数中。本质上这是一个 FFI 的工作。

我们会在接下来的两篇文章中展开介绍 Python 外部函数 和 基于 WebAssembly 的 Rust UDF 的具体实现。敬请期待!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/791767
推荐阅读
相关标签
  

闽ICP备14008679号