赞
踩
|作者:王润基 RisingWave Labs 内核开发工程师
用户定义函数(User Defined Function,以下简称 UDF)是数据系统中的常见功能。它允许用户使用各种语言定义自己的函数,作为内置函数的补充,以实现各种定制化执行逻辑。通过 UDF,我们可以将多个已有函数组合起来形成新函数,简化查询逻辑;也可以使用 Python 等语言编写代码,借用其它语言的生态,填补 SQL 语言和内置函数表达能力的不足;除了纯计算以外,我们还可以调用外部系统 API,将外部服务集成到统一的数据处理管线中。可以说,UDF 的存在极大提升了数据处理系统的灵活性和扩展性。
在之前的文章中,我们介绍了 RisingWave 内部函数的开发框架,它主要面向内部开发者。接下来,我们将通过一系列文章介绍面向用户的 UDF 功能的设计与实现过程。它在技术上与前者一脉相承,但对易用性提出了更高的要求。作为该系列第一篇文章,我们先来概览一下 UDF 的种类、用户接口和使用场景。
UDF 可以从三个维度进行分类。
第一个维度是函数的输入输出。在 SQL 中,我们有以下几种常见的函数:
abs(-1) -> 1
。generate_series(1,3) -> {1,2,3}
。sum({1,2,3}) -> 6
。lag({1,2,3}) -> {NULL,1,2}
这里每一种函数都可以支持用户自定义。在 UDF 的语境下,我们通常将它们简称为 UDF、UDTF、UDAF 和 UDWF。RisingWave 目前支持 UDF 和 UDTF,它们可以覆盖绝大多数实际需求。我们也正在开发 UDAF 以支持更多场景。
第二个维度是编写函数的语言。目前 RisingWave 已经支持的语言包括 SQL、Python、Java、JavaScript 和 Rust。它们面向的用户和场景也略有不同:
总的来说,我们的目标是让用户能够用任何喜欢的语言和适合的语言来编写函数,不让语言成为表达逻辑的障碍。
第三个维度是函数的执行方式。这决定了函数的性能和能力。目前我们有以下三种执行方式:
在 RisingWave 的开发过程中,我们首先实现的是 Python 语言的外部函数。这也是目前用户使用最多的一种 UDF。为了解决它带来的性能和易用性的问题,我们又开发了 SQL UDF 和嵌入式执行的 UDF。在之后的文章中,我们将深入它们的设计与实现,并探讨相关问题与解决方案。
为了让大家对以上五花八门的 UDF 有一个具体的概念,我们来看一下用户在 RisingWave 中如何定义 UDF。这里我们举几个经典的使用场景,更多详细用法可以查看 RisingWave 官方文档。
例如我们想将学生的百分制分数转换为成绩等级。这一逻辑可以使用 SQL 内置的 case when
语句实现,但是它表达起来比较繁琐,写在查询中可读性不佳。于是我们可以创建一个 SQL UDF 表达这一过程。在 RisingWave 中,使用 create function
语句即可定义一个函数:
- create function grade(score int) returns varchar language sql as $$
- select case score >= 100 then 'A+'
- when score >= 90 then 'A'
- when score >= 80 then 'B'
- when score >= 70 then 'C'
- when score >= 60 then 'D'
- else 'F'
- end;
- $$;
然后,我们就可以像内置函数一样在任何地方调用这个 UDF 了:
- select score, grade(score) from generate_series(50, 100, 10) t(score);
- score | grade
- -------+-------
- 50 | F
- 60 | D
- 70 | C
- 80 | B
- 90 | A
- 100 | A+
- (6 rows)
如果我们想实现一些内置函数不支持的数学运算,例如求最大公约数(GCD),可以使用 Rust UDF。和上面的 SQL UDF 类似,我们可以将 Rust 代码嵌入 create function
语句中。
- create function gcd(int, int) returns int language rust as $$
- fn gcd(mut a: i32, mut b: i32) -> i32 {
- while b != 0 {
- (a, b) = (b, a % b);
- }
- a
- }
- $$;
这段代码会在 RisingWave 前端被编译到 WebAssembly,然后在计算节点上以 JIT 方式运行,以达到接近原生的性能。其它一些需要高性能定制计算逻辑的场景,例如量化因子提取,也适合使用 Rust UDF 实现。
解析 Protobuf 也可以使用 Rust UDF。只是此时我们需要使用一些第三方库(例如 prost),先从 .proto
文件生成 Rust 代码,然后再对数据进行提取。整个过程需要创建一个完整的 cargo 项目,仅仅在语句中嵌入代码是无法完成这一需求的。
- // lib.rs
- use arrow_udf::{function, types::StructType};
- use prost::{DecodeError, Message};
-
- // 导入从 .proto 生成的 Rust 代码
- pub mod proto {
- include!(concat!(env!("OUT_DIR"), "/proto.rs"));
- }
-
- // 定义返回结构体
- #[derive(StructType)]
- struct DataKey {
- stream: String,
- pan: String,
- }
-
- // 定义解析函数
- #[function("decode_proto(bytea) -> struct DataKey")]
- fn decode_proto(data: &[u8]) -> Result<DataKey, DecodeError> {
- let data_key = proto::DataKey::decode(data)?;
- Ok(DataKey {
- stream: data_key.stream,
- pan: data_key.pan,
- })
- }

对于这样的场景,我们允许用户自己创建 Rust 项目并编译成 WebAssembly 模块。然后通过 CREATE FUNCTION
语句将 WASM 模块以 BASE64 编码的方式直接导入 RisingWave 中。
- \\set wasm_binary `base64 -i target/release/decode.wasm`
- create function decode_proto(bytea) returns struct<stream varchar, pan varchar>
- language wasm using base64 :'wasm_binary';
具体操作方法可以参考 RisingWave 文档。
假设我们想获得 RisingWave 项目的贡献者名单。我们访问 Github REST API 获取到一个 JSON 列表,希望从中提取出想要的信息。于是我们用 JavaScript 定义一个 UDTF,输入 API 返回的 JSON,然后逐行输出开发者昵称和贡献数。
- create function contributors(response jsonb)
- returns table (name varchar, contributions int)
- language javascript as $$
- for (let user of response) {
- yield { name: user.login, contributions: user.contributions };
- }
- $$;
- select * from contributors('<response>') limit 5;
- name | contributions
- -----------------+---------------
- BugenZhao | 616
- skyzh | 482
- TennyZhuang | 465
- xxchan | 435
- kwannoel | 410
如果你想在流处理中调用时下最流行的大语言模型服务,可以使用 Python 定义外部函数。下面的脚本创建了一个 UDF Server,其中定义了一个函数,内部调用 OpenAI API 生成文本:
- from risingwave.udf import udf, UdfServer
- from openai import OpenAI
-
- client = OpenAI()
-
- @udf(input_types=["VARCHAR"], result_type="VARCHAR", io_threads=8)
- def ask_ai(content):
- response = client.chat.completions.create(
- model="gpt-3.5-turbo",
- messages=[
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": content},
- ],
- )
- return response.choices[0].message.content
-
- if __name__ == "__main__":
- server = UdfServer(location="localhost:8815")
- server.add_function(ask_ai)
- server.serve()

当 UDF 服务启动后,我们可以在 RisingWave 中创建函数,与 Python 服务器建立连接。
- create function ask_ai(varchar) returns varchar
- as ask_ai using link '<http://localhost:8815>';
用户可以通过 CREATE FUNCTION
语句在 RisingWave 中创建自定义函数。函数需要指定参数和返回值类型,然后嵌入实现代码,或者指定外部函数的地址。用户可以根据实际使用场景,选择适合的语言实现 UDF。
使用场景 | 适合的 UDF |
---|---|
组合函数,简化查询 | SQL |
高性能计算 | Rust |
数据格式转换 | Rust / JS / Python |
调用外部服务 | Python 外部函数 |
从 Flink 等系统迁移 | Java 外部函数 |
RisingWave UDF 使用 Apache Arrow 作为数据接口格式。这是因为 UDF 涉及跨语言、跨进程的数据交换,而 Arrow 已经成为这一领域的事实标准。使用 Arrow 一方面可以直接复用其现有生态,例如我们基于 Arrow Flight RPC、pyarrow 和 Arrow Java API 实现了 Python 和 Java 的外部函数框架;另一方面,我们实现的 UDF 开发框架也可以被其它基于 Arrow 的项目所使用。
最近,我们将所有 UDF 实现代码从 RisingWave 中抽离出来,作为一个独立项目 arrow-udf 发布到社区。很快,隔壁 Databend 就基于这一框架为他们的系统实现了 Python、JavaScript 和 WASM 的 UDF 支持。还有一位热心的社区开发者主动为 JavaScript UDF 添加了 Deno 后端支持。 目前,这一 UDF 生态已经初现雏形,相信借助社区的力量未来能够获得更好的发展。如果你恰好也在用 Rust 开发数据系统,不妨尝试集成一下 arrow-udf,无论是 UDF 还是内置函数都可以用哦。
RisingWave 各种语言 UDF 的模块结构
说完了接口,后面的实现工作其实就比较 trivial 了。对于嵌入式 UDF,包括 Rust (WebAssembly)、JavaScript 和 Python,我们分别嵌入了 wasmtime、quickjs / deno 和 CPython 作为语言运行时。而对于外部函数,我们使用 Arrow Flight RPC 将数据发送给远端 UDF 进程,UDF 进程则使用我们提供的 Python SDK 和 Java SDK 进行开发。无论何种形式,都会有一段胶水代码负责将 Arrow 中数据转换为语言的原生类型,最终调用到用户定义的函数中。本质上这是一个 FFI 的工作。
我们会在接下来的两篇文章中展开介绍 Python 外部函数 和 基于 WebAssembly 的 Rust UDF 的具体实现。敬请期待!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。