赞
踩
随着城市扩展和城市化率提高,如何高效管理停车已成为市政和企业共同面对的挑战。在空间有限,但需求不断增加的情况下,最大化利用停车场是减少拥堵、降低排放并提升驾驶员体验的关键。
RisingWave 可以助力您解决这个挑战。作为一个流式数据库,RisingWave 可以彻底改变您停车场的利用率。首先,RisingWave 能够实时处理和分析从各种传感器、摄像头和其他物联网设备中获取的实时数据;然后,通过数据转换功能,RisingWave 可以识别模式、预测需求并优化停车位分配,确保每个可用车位都得到有效利用。
在本文中,我们将介绍使用 RisingWave 实时监控停车位利用情况的两个场景,并探讨如何优化 RisingWave 的性能。
在正式深入讲解两个场景及其优化前,让我们先进行一些数据准备,了解所用数据的 Schema 和产生过程。
我们假设信号数据存储在 Kafka 主题中,停车场数据则本地存储在 RisingWave 中。以下是示例数据的 Schema。
CREATE TABLE signals ( start_at timestamp, -- 何时起停车位被占用 end_at timestamp, -- 何时起停车位空出(值可以为空) last_updated_at timestamp, -- 何时生成并发送本信号 space_id int, -- 停车位 ID level int -- 停车位所在楼层 ) with ( connector = 'kafka', topic = 'signals' ); CREATE TABLE parking_lot ( space_id int, -- 停车位 ID level int, -- 停车位所在楼层 primary key (space_id) );
然后,我们需要先了解一下数据记录生成过程:假设在时间 T0 时,位于第 2 层的第 45 个停车位上方的传感器检测到有车辆停在其下方,这将生成一条记录: (T0, NULL, T0, 45, 2)
。
为应对可能的网络故障或临时服务中断,传感器需要不时重复传输记录到中央控制单元,以确保数据完整。例如,它可能在随机的时间间隔 T1 之后重复发送记录,此时,会生成又一条记录: (T0, NULL, T1, 45, 2)
。
当车辆在时间 T2 离开停车位时,传感器将再生成一个新记录:(T0, T2, T2, 45, 2)
,表示该车位现在空置。同样,为防止数据丢失,传感器会在短暂延迟后再次重新传输此信息。
现在,基于以上前期准备,我们将正式深入讲解使用 RisingWave 实时监控停车位利用情况的两个场景,并探讨如何优化。**
为了创建上文展示的“可用停车位看板“,RisingWave 需要确定当前空置的停车位数量,这需要检查最新停车位数据的 end_at
字段是否为空,以下是详细步骤。
首先,我们提取每个停车位的所有数据,保留其中具有最新 last_updated_at
时间戳的数据:
create materialized view latest_signal_for_each_parking_space as
with ranked as (
select *, row_number() over (partition by space_id order by last_updated_at desc) as rn
from signals
)
select * except (rn)
from ranked
where rn = 1;
以上代码根据 space_id
字段对数据进行分区,并在每个分区内按 last_updated_at
排序,从而保留最新信号。关于其中用到的 row_number()
与 rank = 1
,您可以参考我们的文档介绍[1]获取更多信息。
之后,我们根据最新信号,确定停车位是否被占用,并汇总停车场内每层被占用的车位总数。
create materialized view occupied_on_each_level as
select
level,
count(case when end_at is NULL then 1
else 0
end) as occupied
from
latest_signal_for_each_parking_space
group by
level;
以上代码中,如果最新信号显示 end_at
字段为 NULL,则该停车位被占用;如果 end_at
字段不为 NULL,则停车位为空。occupied
代表了每层被占用车位的总数。
然后,我们从每层的总停车位数中减去被占用车位数,以确定可用停车位数:
create materialized view available_on_each_level as with total_on_each_level as ( select level, count(*) as total from parking_lot group by level ) select t.level as level, total - occupied as available from total_on_each_level t inner join occupied_on_each_level o on t.level = o.level;
为了计算每个停车位每小时的使用率,我们需要考虑每个信号的处同一状态时的重叠时间间隔。
例如,如果一个停车位从 7:00 被占用到 7:10,然后从 7:40 被占用到 8:00,那么从 7:00 到 8:00 的 60 分钟中,有 30 分钟被占用。这代表 50% 的使用率。
同样,如果一个停车位从 7:50 开始被占用,并一直被占用到 8:30(假设当前时间是8:30),那么从 7:00 到 8:00 的一个小时内,我们仅考虑从 7:50 到 8:00 的时段,即 60 分钟中的 10 分钟被使用。这代表 16.7% 的使用率。
对于后续的小时,例如从 8:00 到 9:00,我们只考虑从这个小时开始(8:00)、到当前时间(8:30)的部分,也就是说,我们认为此时的小时利用率是 100%。
要将此逻辑用 SQL 体现,我们需要计算每个停车位在每小时内被占用的百分比,而不考虑时间的进展。以下是具体实现代码:
-- 首先,仍然先对信号去重,只选取最新数据 create materialized view deduplicated_signals as with ranked as ( -- 数据按 space_id、level_id 和 start_at 分区, -- 只保留每组中的最新信号。 select *, row_number() over (partition by space_id, level_id, start_at order by last_updated_at desc) as rn from signals ) select * except (rn) from ranked where rn = 1; -- 然后我们将它们分为两组: -- 1. 停车位被占用 -- 2. 停车位没被占用 create materialized view occupying as -- 我们使用当前时间作为占用结束时间 select space_id, level_id, start_at, timestamp '2024-05-13 14:54:02.91714' as end_at from deduplicated_signals where end_at is NULL; create materialized view occupied as select space_id, level_id, start_at, end_at from deduplicated_signals where end_at is not NULL; -- 接下来,我们将这两类事件合并在一起 create materialized view both as select * from occupying union all select * from occupied; -- 我们获取信号重叠的所有 1 小时窗口 create materialized view split_window as select *, generate_series( date_trunc('HOUR', start_at - interval '0 HOUR', 'Europe/Stockholm')::timestamp, date_trunc('HOUR', end_at - interval '0 HOUR', 'Europe/Stockholm')::timestamp, interval '1 HOUR' )::timestamptz as window_start from both; -- 上面的 SQL 缺少每个窗口的结束时间。我们将其添加。 create materialized view occupy_window as select *, window_start + '1 HOUR' as window_end from split_window; -- 如上例所示, -- 对于 1:30~2:00 和 4:00~4:10 等时间段, -- 它们属于 1:00~2:00 和 4:00~5:00 窗口。 -- 但是,我们计算使用率时,只计入 1:30~2:00 和 4:00~4:10 部分。 create materialized view occupy_start_end as select *, greatest(window_start, start_at) as occupy_start, least(window_end, end_at) as occupy_end from occupy_window; -- 最后,我们可以计算使用率 create materialized view unavailability as select space_id, level, window_start, window_end, 'HOURLY' as window_type, extract(epoch from (window_end - window_start)) as total_seconds, sum(extract(epoch from (occupy_end - occupy_start))) as occupy_seconds from occupy_start_end group by 1, 2, 3, 4; -- 如果您喜欢百分比表示,可以使用以下代码 create materialized view percentage as select occupy_seconds::double / total_seconds as percentage from unavailability;
以上代码中,对于当前时间,我们用的是固定的一个值,如果想把它变成真正动态的当前时间,我们可以使用所有信号last_updated_at
的最大值来表示所有正在进行的占用的结束时间。这样就能动态确定占用的结束时间。
在 SQL 中,您可以通过使用子查询来实现这一点,我们通过示例讲解如何实现,首先,创建一个新的物化视图:
-- 这是一个只有一行和两列的物化视图。
-- 1 作为常量是一个虚拟变量,用于稍后连接记录。
create materialized view current_max_time
as
select max(last_updated_at) as now, 1 as constant from signals;
然后我们修改 occupying
的定义,如下:
create materialized view occupying as
with tmp as (
select space_id, level_id, start_at, 1 as constant
from
deduplicated_signals where end_at is NULL
)
select space_id, level_id, start_at, now as end_at from
-- 这里我们滥用了常量列来进行交叉积。
-- 这是因为 RisingWave 默认禁止嵌套循环连接,因为它的效率低下。
tmp inner join current_max_time on tmp.constant = current_max_time.constant;
其余的物化视图保持不变即可。
当 last_updated_at
列频繁更新时,依赖 max(last_updated_at)
可能会导致问题。这是因为 max(last_updated_at)
的每次更改都需要刷新所有后续物化视图,导致计算负担显著增加。
为了解决这个问题并最小化不必要的刷新,我们可以使用窗口函数,如下所示:
create materialized view current_max_time
as
select max(window_end) as now, 1 as constant
from
TUMBLE(signals, last_updated_at, INTERVAL '15 MINUTES')
有关时间窗口的更多用法,请参阅我们的官方文档[2]。
现在,我们可以预期重新计算大约每 15 分钟触发一次。
先前的优化对于处理新进数据是有效的。然而,对于历史数据可能还不够。新数据的 last_updated_at
值是实时推进的,而历史数据则以更快的速度推进。这可能导致频繁的重新计算,尤其是在处理大量历史数据时。
为了解决这个问题,我们可以对 current_max_time
物化视图进行额外修改,如下所示:
-- 假设当前时间戳为 2024-05-20 12:05:35.607001+00
create materialized view current_max_time
as
select max('2024-05-20 12:05:35.607001+00', max(window_end)) as now, 1 as constant
from
TUMBLE(signals, last_updated_at, INTERVAL '15 MINUTES');
注意,now
不会改变,直到 last_updated_at
赶上它。
在本文中,我们演示了如何应用 RisingWave 实时监控停车位利用情况。如果能够获取其他相应数据,还可以使用 RisingWave 实现更先进的功能。欢迎大家分享更多使用 RisingWave 进行实时监控和分析的解决方案。
RisingWave 是一款开源的分布式流处理数据库,旨在帮助用户降低实时应用的开发成本。RisingWave 采用存算分离架构,提供 Postgres-style 使用体验,具备比 Flink 高出 10 倍的性能以及更低的成本。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。