当前位置:   article > 正文

实践|用流式数据库 RisingWave 最大化停车场利用率

实践|用流式数据库 RisingWave 最大化停车场利用率

随着城市扩展和城市化率提高,如何高效管理停车已成为市政和企业共同面对的挑战。在空间有限,但需求不断增加的情况下,最大化利用停车场是减少拥堵、降低排放并提升驾驶员体验的关键。

RisingWave 可以助力您解决这个挑战。作为一个流式数据库,RisingWave 可以彻底改变您停车场的利用率。首先,RisingWave 能够实时处理和分析从各种传感器、摄像头和其他物联网设备中获取的实时数据;然后,通过数据转换功能,RisingWave 可以识别模式、预测需求并优化停车位分配,确保每个可用车位都得到有效利用。

RisingWave 构建的可用停车位看板

在本文中,我们将介绍使用 RisingWave 实时监控停车位利用情况的两个场景,并探讨如何优化 RisingWave 的性能。

1. 数据准备

在正式深入讲解两个场景及其优化前,让我们先进行一些数据准备,了解所用数据的 Schema 和产生过程。

1.1 Schema

我们假设信号数据存储在 Kafka 主题中,停车场数据则本地存储在 RisingWave 中。以下是示例数据的 Schema。

CREATE TABLE signals (
 start_at         timestamp, -- 何时起停车位被占用
 end_at           timestamp, -- 何时起停车位空出(值可以为空)
 last_updated_at  timestamp, -- 何时生成并发送本信号
 space_id         int, -- 停车位 ID
 level            int -- 停车位所在楼层
) with (
 connector = 'kafka',
 topic = 'signals'
);

CREATE TABLE parking_lot (
  space_id         int, -- 停车位 ID
  level            int, -- 停车位所在楼层
  primary key (space_id)
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

1.2 数据产生过程

然后,我们需要先了解一下数据记录生成过程:假设在时间 T0 时,位于第 2 层的第 45 个停车位上方的传感器检测到有车辆停在其下方,这将生成一条记录: (T0, NULL, T0, 45, 2)

为应对可能的网络故障或临时服务中断,传感器需要不时重复传输记录到中央控制单元,以确保数据完整。例如,它可能在随机的时间间隔 T1 之后重复发送记录,此时,会生成又一条记录: (T0, NULL, T1, 45, 2)

当车辆在时间 T2 离开停车位时,传感器将再生成一个新记录:(T0, T2, T2, 45, 2),表示该车位现在空置。同样,为防止数据丢失,传感器会在短暂延迟后再次重新传输此信息。

现在,基于以上前期准备,我们将正式深入讲解使用 RisingWave 实时监控停车位利用情况的两个场景,并探讨如何优化。**

2. 「场景1」实时监测可用停车位

为了创建上文展示的“可用停车位看板“,RisingWave 需要确定当前空置的停车位数量,这需要检查最新停车位数据的 end_at 字段是否为空,以下是详细步骤。

首先,我们提取每个停车位的所有数据,保留其中具有最新 last_updated_at 时间戳的数据:

 create materialized view latest_signal_for_each_parking_space as
 with ranked as (
   select *, row_number() over (partition by space_id order by last_updated_at desc) as rn
   from signals
 )
 select * except (rn)
 from ranked
 where rn = 1;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

以上代码根据 space_id 字段对数据进行分区,并在每个分区内按 last_updated_at 排序,从而保留最新信号。关于其中用到的 row_number() 与 rank = 1,您可以参考我们的文档介绍[1]获取更多信息。

之后,我们根据最新信号,确定停车位是否被占用,并汇总停车场内每层被占用的车位总数。

create materialized view occupied_on_each_level as
select
 level,
 count(case when end_at is NULL then 1
           else 0
      end) as occupied
from
 latest_signal_for_each_parking_space
group by
 level;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

以上代码中,如果最新信号显示 end_at 字段为 NULL,则该停车位被占用;如果 end_at 字段不为 NULL,则停车位为空。occupied 代表了每层被占用车位的总数。

然后,我们从每层的总停车位数中减去被占用车位数,以确定可用停车位数:

create materialized view available_on_each_level as
with
 total_on_each_level as (
  select
   level,
   count(*) as total
  from parking_lot
  group by level
 )
select
 t.level as level,
 total - occupied as available
from
 total_on_each_level t
inner join
 occupied_on_each_level o
on t.level = o.level;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3. 「场景2」每小时停车位的使用率

为了计算每个停车位每小时的使用率,我们需要考虑每个信号的处同一状态时的重叠时间间隔。

例如,如果一个停车位从 7:00 被占用到 7:10,然后从 7:40 被占用到 8:00,那么从 7:00 到 8:00 的 60 分钟中,有 30 分钟被占用。这代表 50% 的使用率。

同样,如果一个停车位从 7:50 开始被占用,并一直被占用到 8:30(假设当前时间是8:30),那么从 7:00 到 8:00 的一个小时内,我们仅考虑从 7:50 到 8:00 的时段,即 60 分钟中的 10 分钟被使用。这代表 16.7% 的使用率。

对于后续的小时,例如从 8:00 到 9:00,我们只考虑从这个小时开始(8:00)、到当前时间(8:30)的部分,也就是说,我们认为此时的小时利用率是 100%。

要将此逻辑用 SQL 体现,我们需要计算每个停车位在每小时内被占用的百分比,而不考虑时间的进展。以下是具体实现代码:

-- 首先,仍然先对信号去重,只选取最新数据

create materialized view deduplicated_signals as
with ranked as (
  -- 数据按 space_id、level_id 和 start_at 分区,
  -- 只保留每组中的最新信号。
   select *, row_number() over (partition by space_id, level_id, start_at order by last_updated_at desc) as rn
   from signals
 )
 select * except (rn)
 from ranked
 where rn = 1;

-- 然后我们将它们分为两组:
-- 1. 停车位被占用
-- 2. 停车位没被占用

create materialized view occupying as
-- 我们使用当前时间作为占用结束时间
select space_id, level_id, start_at, timestamp '2024-05-13 14:54:02.91714' as end_at from deduplicated_signals where end_at is NULL;

create materialized view occupied as
select space_id, level_id, start_at, end_at from deduplicated_signals where end_at is not NULL;

-- 接下来,我们将这两类事件合并在一起
create materialized view both as
select * from occupying
union all
select * from occupied;

-- 我们获取信号重叠的所有 1 小时窗口

create materialized view split_window
as
select
  *,
  generate_series(
    date_trunc('HOUR', start_at - interval '0 HOUR', 'Europe/Stockholm')::timestamp,
    date_trunc('HOUR', end_at - interval '0 HOUR', 'Europe/Stockholm')::timestamp,
    interval '1 HOUR'
   )::timestamptz as window_start
from both;

-- 上面的 SQL 缺少每个窗口的结束时间。我们将其添加。

create materialized view occupy_window
as
select *,

 window_start + '1 HOUR' as window_end
from split_window;

-- 如上例所示,
-- 对于 1:30~2:00 和 4:00~4:10 等时间段,
-- 它们属于 1:00~2:00 和 4:00~5:00 窗口。
-- 但是,我们计算使用率时,只计入 1:30~2:00 和 4:00~4:10 部分。

create materialized view occupy_start_end
as
select
  *, greatest(window_start, start_at) as occupy_start, least(window_end, end_at) as occupy_end
from occupy_window;

-- 最后,我们可以计算使用率

create materialized view unavailability
as
select
 space_id,
 level,
  window_start,
  window_end,
  'HOURLY' as window_type,
  extract(epoch from (window_end - window_start)) as total_seconds,
  sum(extract(epoch from (occupy_end - occupy_start))) as occupy_seconds
from
  occupy_start_end
group by 1, 2, 3, 4;

-- 如果您喜欢百分比表示,可以使用以下代码

create materialized view percentage
as
select
 occupy_seconds::double / total_seconds as percentage
from
 unavailability;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

以上代码中,对于当前时间,我们用的是固定的一个值,如果想把它变成真正动态的当前时间,我们可以使用所有信号last_updated_at 的最大值来表示所有正在进行的占用的结束时间。这样就能动态确定占用的结束时间。

在 SQL 中,您可以通过使用子查询来实现这一点,我们通过示例讲解如何实现,首先,创建一个新的物化视图:

-- 这是一个只有一行和两列的物化视图。
-- 1 作为常量是一个虚拟变量,用于稍后连接记录。

create materialized view current_max_time
as
select max(last_updated_at) as now, 1 as constant from signals;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

然后我们修改 occupying 的定义,如下:

create materialized view occupying as
with tmp as (
  select space_id, level_id, start_at, 1 as constant
  from
  deduplicated_signals where end_at is NULL
)
select space_id, level_id, start_at, now as end_at from
-- 这里我们滥用了常量列来进行交叉积。
-- 这是因为 RisingWave 默认禁止嵌套循环连接,因为它的效率低下。
tmp inner join current_max_time on tmp.constant = current_max_time.constant;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

其余的物化视图保持不变即可。

4. 「优化1」减少重新计算频率

当 last_updated_at 列频繁更新时,依赖 max(last_updated_at) 可能会导致问题。这是因为 max(last_updated_at) 的每次更改都需要刷新所有后续物化视图,导致计算负担显著增加。

为了解决这个问题并最小化不必要的刷新,我们可以使用窗口函数,如下所示:

create materialized view current_max_time
as
select max(window_end) as now, 1 as constant
from
TUMBLE(signals, last_updated_at, INTERVAL '15 MINUTES')
  • 1
  • 2
  • 3
  • 4
  • 5

有关时间窗口的更多用法,请参阅我们的官方文档[2]。

现在,我们可以预期重新计算大约每 15 分钟触发一次。

5. 「优化2」抑制回填期间的重新计算

先前的优化对于处理新进数据是有效的。然而,对于历史数据可能还不够。新数据的 last_updated_at 值是实时推进的,而历史数据则以更快的速度推进。这可能导致频繁的重新计算,尤其是在处理大量历史数据时。

为了解决这个问题,我们可以对  current_max_time 物化视图进行额外修改,如下所示:

-- 假设当前时间戳为 2024-05-20 12:05:35.607001+00

create materialized view current_max_time
as
select max('2024-05-20 12:05:35.607001+00', max(window_end)) as now, 1 as constant
from
TUMBLE(signals, last_updated_at, INTERVAL '15 MINUTES');
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

注意,now 不会改变,直到 last_updated_at 赶上它。

6. 总结

在本文中,我们演示了如何应用 RisingWave 实时监控停车位利用情况。如果能够获取其他相应数据,还可以使用 RisingWave 实现更先进的功能。欢迎大家分享更多使用 RisingWave 进行实时监控和分析的解决方案。

7. 关于 RisingWave

RisingWave 是一款开源的分布式流处理数据库,旨在帮助用户降低实时应用的开发成本。RisingWave 采用存算分离架构,提供 Postgres-style 使用体验,具备比 Flink 高出 10 倍的性能以及更低的成本。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/791773
推荐阅读
相关标签