赞
踩
我们非常高兴地宣布:RisingWave 1.8 版本正式发布!此版本包含了新增功能、现有功能更新以及错误修复。我们还对 Python UDF、Rust UDF 和连接器做了进一步改进,使用户能够更加灵活地使用数据库。此外,解耦 Sink 方面有了重大变更,请继续阅读以了解如何避免兼容性问题。
自 1.8 版开始,RisingWave 改用了一种更轻量级的状态清理方法,这会影响到已启用解耦功能的 Sink。1.6 版本首次实现了这一状态清理方法,但我们此后将不再进行 Sink 解耦兼容性的维护。因此,我们建议已启用解耦 Sink 的用户避免直接更新至 1.8 版本,或者在更新前删除所有已启用解耦功能的 Sink。
要检查是否有已启用解耦功能的 Sink,请先更新至 1.7 版本。1.7 版本支持内部表 rw_sink_decouple
,可检查所有 Sink 的解耦状态。1.6 更新至 1.7 时不会出现兼容性问题。接下来,运行以下查询。
- SELECT * FROM rw_sink_decouple
- WHERE is_decouple AND watermark_vnode_count < 256;
该查询将返回更新时可能会遇到兼容性问题的所有 Sink。如果查询返回空结果,则可以放心更新至 1.8 版本。
上个月的 1.7 版本新增了对其他语言 UDF 的支持,本月的新版本则继续改进了 UDF 的功能。现在,您可以在 Python 和 Rust 中创建嵌入式 UDF,这意味着这些 UDF 可在 RisingWave 内部定义、编译和运行。这样,就无需安装外部 API。不过,您在使用外部库或文件系统时会受到限制。
创建嵌入式 Python UDF 时,请使用 CREATE FUNCTION
命令。下面是一个示例。
- CREATE FUNCTION gcd(a int, b int) RETURNS int LANGUAGE python AS $$
- def gcd(a, b):
- while b != 0:
- a, b = b, a % b
- return a
- $$;
函数定义使用 Python 句法编写。您还可以创建表函数,并让函数返回 struct
类型。
目前嵌入式 UDF 只能使用纯计算逻辑,但如果您需要重复使用复杂计算,它还是很有用的。不过,您可以访问 json
、decimal
、re
、math
和 datetime
库。创建函数后,您可以像调用其他内置函数一样对其进行调用。
某些内置函数是不允许的。有关完整列表,请参阅下方官方文档。
同样,可使用 CREATE FUNCTION
命令创建嵌入式 Rust UDF。
- CREATE FUNCTION gcd(int, int) RETURNS int LANGUAGE rust AS $$
- fn gcd(mut x: i32, mut y: i32) -> i32 {
- while y != 0 {
- (x, y) = (y, x % y);
- }
- return x;
- }
- $$;
函数体使用 Rust 句法定义。像嵌入式 Python UDF 一样,您可以创建表函数并让其返回 struct
类型。虽然不支持其他外部库,但您仍然可以使用标准库 chrono
, rust_decimal
和 serde_json
。定义函数后,就可以像使用其他内置函数一样对其进行使用。
有关句法的详细信息,请参阅下方文档。
更多详细信息,请参阅:
过去要刷新使用 Schema Registry 所定义 Source 的 Schema,需要使用 ALTER SOURCE
命令重新定义其 Schema Registry,过程相当繁琐。此外,也不支持刷新表的 Schema。
此版本新增了 REFRESH SCHEMA
句法,使 RisingWave 中表或 Source 的 Schema 更新变得更加容易。请注意,更新 Schema 时不能更改数据 FORMAT
和 ENCODE
选项。刷新 Source Schema 的句法如下。
ALTER SOURCE s REFRESH SCHEMA;
现在,Source s1
的 Schema 将根据对其所做的更改进行更新。
同样,刷新使用外部连接器所创建表的 Schema 的句法如下。
ALTER TABLE t REFRESH SCHEMA;
如果在刷新 Schema 时删除了某些列,而这些列又被其他下游 Fragment(如物化视图)引用,则该命令无效。
更多详细信息,请参阅:
[ALTER SOURCE](https://docs.risingwave.com/docs/current/sql-alter-source/)
命令[ALTER TABLE
命令](https://docs.risingwave.com/docs/current/sql-alter-table/#refresh-schema)RANGE
在 SQL 查询中使用窗口函数时,可使用 RANGE
子句来指定窗口帧中所包含的相对于当前行的行范围。该子句可根据行的排序对一系列值进行操作。可按如下方式指定 RANGE
子句。
RANGE BETWEEN frame_start AND frame_end
这里,frame_start
和 frame_end
描述了要对哪些行进行计算。
frame_start
可以是 UNBOUNDED PRECEDING
、CURRENT ROW
、特定数量的行或特定的时间间隔。例如,以下子句包括从前一天到当前行的所有行。
RANGE BETWEEN 1 DAY PRECEDING AND CURRENT ROW
frame_end
可以是 UNBOUNDED FOLLOWING
、CURRENT ROW
、特定数量的行或时间间隔。以下子句包括当前行之后的所有行。
RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
更多详细信息,请参阅:
如果您有一个外部 Ruby 应用,现在就可以在该应用中使用任何第三方 PostgreSQL 驱动程序与 RisingWave 进行交互。我们的官方文档介绍了如何使用 ruby-pg
驱动程序。下面几行 Ruby 代码能让您与 RisingWave 数据库建立连接(假定使用默认的数据库凭据)。
- require 'pg'
-
- conn = PG.connect(host: '127.0.0.1', port: 4566, dbname: 'dev', user: 'root')
建立连接后就可以像在 RisingWave 中一样运行 SQL 查询,也可使用 ruby-pg
提供的任何内置功能。
ruby-pg
驱动程序可确保最佳性能,并与 PostgreSQL 的功能和数据库操作兼容。这使其适用于构建具有高性能要求的 Ruby 应用。
更多详细信息,请参阅:
除了对现有的 Source 连接器和 Sink 连接器进行了大量改进外,新版本还新增了两个 Source 连接器,让您可以更灵活地构建流处理管道。
您现在可以使用新增的 Iceberg Source 连接器从 Iceberg Source 中批量读取数据。与 RisingWave 中的所有其他 Source 连接器一样,您可以使用 CREATE SOURCE
命令开始摄取数据。请注意,Iceberg 中的源表必须是 COW 表,不能是已删除的文件。
- CREATE SOURCE iceberg_source (
- id bigint,
- user_name varchar
- ) WITH (
- connector = 'iceberg',
- catalog.type = 'storage',
- warehouse.path = 's3a://hummock001/',
- s3.endpoint = 'http://127.0.0.1:9301',
- s3.access.key = 'admin',
- s3.secret.key = 'admin',
- s3.region = 'us-east-1',
- database.name='db_name',
- table.name='table_name'
- );
支持的目录类型包括 storage
、jdbc
、hive
和 rest
。
此外,创建 Iceberg Source 时可选择是否指定列。在这种情况下,表中的所有列都会自动派生。
过去要从 RisingWave 摄取 MongoDB 的 CDC 数据,需要设置一个包括 Debezium 连接器的管道(用于 MongoDB 跟踪数据库变更并将其记录到 Kafka Topic 中)和一个 Kafka 连接器(用于连接到 RisingWave)。而新增的 MongoDB CDC 连接器简化了这一过程,可让您从 RisingWave 直接连接到 MongoDB。只需执行 CREATE TABLE
命令,即可与 MongoDB 建立直接连接。
- CREATE TABLE mongocdc(
- _id varchar PRIMARY KEY,
- payload jsonb
- ) WITH (
- connector = 'mongodb_cdc',
- mongodb.url = 'mongodb://localhost:27017/?replicaSet=rs0',
- collection.name = 'dbname.*, foo.*'
- );
您可以选择在 [collection.name](http://collection.name)
参数中从多个数据库的集合中摄取数据,也可从特定集合中摄取数据。
更多详细信息,请参阅:
在确保 etcd 的向后兼容的同时, 此次更新将为您带来 PostgreSQL、MySQL 和 SQLite 的技术预览,作为元数据存储的新选项。
PostgreSQL 在处理大量元数据时具有更强的稳健性。例如,当创建超过 1000 个物化视图和 Sink 时,etcd 容易不堪重负且内存不足,而 PostgreSQL 则更为稳定。
在生产环境中,我们建议为 PostgreSQL 部署一个至少配备 2 个 CPU 内核和 4 GB 内存的实例,并进行主动复制以实现高可用性。
更多详细信息,请参阅:
以上只是 RisingWave 1.8 版本新增的部分功能。要查看本次更新的完整列表,包括有关数据格式和系统目录的更多更新,请参阅更为详细的发布说明。
敬请期待下个月的版本及其新功能。查看 RisingWave GitHub repository 即可随时了解最新功能和计划发布的版本。
如果您想了解 RisingWave 的最新动态,请订阅我们的邮件月刊。同时,欢迎关注我们的 Twitter 和 LinkedIn,并加入我们的 Slack 社区,与我们的工程师还有全球各地的流处理爱好者交流!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。