赞
踩
关于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/
传智教育是第一个实现 A 股 IPO 的教育企业,公司致力于培养高精尖数字化人才,主要培养人工智能、大数据、智能制造、软件、互联网、区块链等数字化专业人才及数据分析、网络营销、新媒体等数字化应用人才。
为了将更优质的教育资源惠及更多学子,传智教育在全国开设了 19 所分校,培养了 30+ 万 IT 从业者;出版书籍 111 本,覆盖全国 200+ 万高校生;发布视频教程 12+ 万节,年均下载及播放量 4000+ 万次;开展免费直播公开课 1500+ 场,年均听课人数近百万。
博学谷于 2016 年 7 月正式成立。依托传智教育 15 年 IT 教育沉淀,以就业课为核心,采用个性化、随到随学的自适应学习模式,为学员提供零基础入门、技能提升及职业生涯规划为一体的 IT 在线学习服务。专注整合优势 IT 教学资源,打造更适合在线学习的优质教学产品和服务。
2020 年,疫情为我们的生活和工作带来了巨大变化,因疫情防控需要,很多线下课程无法正常开展,更多用户选择通过线上学习来提升知识储备、拓展专业能力。博学谷提供线上教学服务,成为了更多用户的最佳选择。随着用户咨询和学习行为的大幅上涨,博学谷在线系统的压力有所增加,对原有系统提出了新的挑战:
•原有系统只支持离线同步,响应较慢。•需要对原有系统收集的旧数据进行同步,对新数据进行离线收集和实时收集,并且基于所有数据进行链路式的数据清洗和聚合分析。•目前采用阿里云 DTS(Data Transmission Service,数据传输服务)同步方式对业务表进行同步,成本较大且无法在同步过程中对数据进行清洗、转换等操作。
面对规模的增长和模式的调整,博学谷需要一套更加灵活、高效的系统,以处理规模化增长的业务数据,确保业务系统的正常运行,支撑业务模式的调整,同时将数据更多地用于决策分析。
我们希望借助消息中间件来解决遇到的这些挑战。我们团队成员有使用 RabbitMQ 和 Kafka 的经验:RabbitMQ 比较适合轻量化场景,Apache Kafka 适用于日志量大的场景。我们需要一套应用场景和源码阅读都更全面的解决方案。在调研的过程中,我们知道市场上还有另一个深受欢迎的消息系统 Apache Pulsar。对于运维团队来说,学习这三种消息中间件的都有一定的学习成本问题,而且基础设施一旦落地再改变并不容易,所以在传智教育的中间件选型上我们进行了充分调研。调研角度主要包括:
•支持消息流式处理,确保消息处理顺序•支持“仅一次”语义消息处理•支持消息永久持久化,存储规模易于扩展•云原生部署友好,运维成本较低•源代码质量好,社区活跃度高
我们发现 Pulsar 是一款云原生的消息和事件流平台,内置的很多特性正好满足我们的需求。例如:Pulsar 采用计算与存储分离的架构设计,在 Apache BookKeeper 上存储数据,在 broker 上进行 Pub/Sub 相关的计算,具备 IO 隔离的特性。和传统的消息平台(如 Kafka)相比,Pulsar 的架构有明显的优势:
•Broker 和 bookie 相互独立,可以独立扩展和容错,提升系统的可用性。•分区存储不受单个节点存储容量的限制,数据分布更均匀。•BookKeeper 存储安全可靠,保证消息不丢失,同时支持批量刷盘以获得更高吞吐量。•读取的峰值不会影响写入性能,读取和写入使用不同的物理存储,数据的可持久化变得更加方便和廉价。
2020 年 4~9 月,我们对 Pulsar 进行了功能测试,包括消息的顺序消费、数据一致性和丢失率等。测试结果证明,Pulsar 能够有序消费消息、数据保持一致,并且不丢失。在不考虑有序性的应用场景下,Pulsar 可以直接当消息队列使用,多种订阅模式和订阅级别不作用于Topic,这样多个Consumer就可以同时有序或无序消费该Topic。
在运维方面,我们可以利用 K8S (Helm) 部署 Pulsar 、Pulsar IO、Pulsar Functions;利用 pulsar-admin 简化运维团队部署和管理的复杂度。
在一家商业公司,采用任何一项新技术(包括开源技术)都有一定的风险,即使这项技术具有显著的优势。经过了深思熟虑和充分调研,我们最终决定引入 Apache Pulsar。
作为一个在线教育平台,我们需要和外部交换大量的数据。我们使用第三方消息系统容联七陌做在线客服数据收集,采用诸葛 IO 系统采集用户行为数据进行分析。因此我们需要一套系统将外部数据汇总起来,二次处理后,持久化到数据仓库中,并最终得到一套符合业务分析的数据。
我们基于 Apache Pulsar 搭建了博学谷数据处理系统,通过多个命名空间隔离各应用的数据和配置,通过 Pulsar IO、Pulsar Functions 实现数据采集、处理等。根据业务需要,部分命名空间被配置为消息永不过期、永久保留。得益于 Pulsar 消息系统中计算与存储分离的设计,系统得以灵活扩展存储容量。目前生产环境部署的 Pulsar 为基于官方 v2.6.1 的修改版本,所有涉及问题修复代码已通过 GitHub 与社区分享,将在未来的发布版本中修复。
通过搭建 Source 集群对数据进行多维度采集,使用 Pulsar Functions 对采集的数据进行实时清洗等操作,在整个链路过程中 Pulsar Topic 采用持久化存储,使用 Pulsar SQL[1]方便对每个阶段的数据进行数据回溯。Sink 集群对清洗后的数据进行持久化操作。
上面这个链路中,我们使用了 Pulsar 的 Delay Topic 来标识会话的完成状态,Dead Letter Topic 记录 Sink 端消费失败的消息。
在开发过程中,我们发现 Pulsar Functions 在实时流(有序)场景中,Receive Fail 响应后不会中断进程。随后我们联系 Pulsar 社区,提交了 issue 和 PR ,得到了 StreamNative 团队的快速响应和支持。这个问题目前标记在 Pulsar 2.8.0 修复,我们内部则基于 Pulsar 2.6.1 进行了补丁修复。
博学谷系统使用第三方在线客服系统实现网页端和移动端的在线咨询功能。在此前,受到第三方服务接口的限制,在线咨询会话数据的使用受到限制。随着业务的增长和模式的调整,团队希望能够将这部分数据与客户管理系统(CMS)相结合,更好地挖掘客户需求、提高咨询和反馈效率。
该第三方系统使用 HTTP API 向接入方提供数据查询接口,并对接口访问进行了限流处理,影响了 CMS 系统对会话数据的使用。
经分析和讨论,我们设计并开发了基于 Pulsar IO 的 HTTP Polling Source 组件和 Common JDBC Sink 组件,将会话数据高效地抓取至内部 MySQL 数据库中持久存储,同时支持在数据采集过程中对数据进行清洗和转换,大大提高了会话数据利用效率和使用场景。
HTTP Polling Source 是基于 HTTP 轮询机制的数据采集消息源,循环执行基于配置模板的 HTTP 请求,在每次请求后更新同步状态(Offset State)至 State Storage 并将请求结果写入下游 Pulsar Topic。
Common JDBC Sink 使用 JDBC 接口持久化结构化对象数据,支持多种 JDBC 驱动的通用结构化文档存储处理,不仅覆盖H2、MySQL、MariaDB、PostgreSQL 数据库的所有数据类型,还支持 INSERT、UPDATE、UPSERT、DELETE 和 Schema Migration 操作。
博学谷系统使用第三方系统实现客户端用户行为分析功能,该商业系统的用户行为分析功能有限,且不便于将分析维度与业务系统中的概念相结合,博学谷系统需要让用户行为数据产生更大的价值,以便为客户提供更好的服务。
该商业系统提供基于 Apache Kafka 早期版本(v0.8)的数据订阅服务,Pulsar 内置的 Kafka Source 并不支持这一 Kafka 版本。通过方案评估,我们将现有的支持 Kafka v0.8 版本的订阅程序包装为 Pulsar IO Source 接口,即 Legacy Kafka Source。该接口支持 Kafka v0.8 版本的日志消息源,用于将从 Kafka 中订阅到的数据高效地保存至 Pulsar Topic 中,以支持下游灵活处理数据,支撑如异常行为研判、学习效果评估等功能。
随着业务系统的演进,采集业务变更日志逐渐成为研发团队的负担。目前,研发团队通过额外的数据库表来记录业务数据的变更历史,如订单变更记录、流程流转记录等。开发人员需要熟悉数据库表的设计,并在表结构发生变更时谨慎地调整日志记录功能;为保证关键数据完整性,还需将数据变更和日志写入在同一个事务中,对系统性能产生了一定的影响。
通过基于 MySQL Replication 协议的 MySQL Binlog Connector,可以将业务系统数据库中的数据变更事件实时同步到 Pulsar Topic 中,利用 Pulsar 的流式消息处理机制,确保在下游按顺序处理消息一次。通过这种方式自动生成数据变更日志,支持 DDL 变更自动迁移,支持下游使用多种日志存储机制(MySQL、ElasticSearch 等)持久化业务日志,减少对业务系统代码的侵入,降低对业务系统性能的影响。
MySQL Binlog Connector 有两个组件:MySQL Binlog Source 和 MySQL Binlog Sink。MySQL Binlog Source 用于采集原始 Binlog Event 数据,以事务为单位向下游发送消息,使用 Binlog Filename/Position 或 GTID Set 作为同步数据的偏移量保存至 State Storage 中。MySQL Binlog Sink 通过在下游数据库中回放(以事务为单位) Binlog Event 消息,处理这些数据,将 DML 或 DDL 变更同步至下游数据库实例中。
开发数据处理系统时,数据安全始终是研发团队的工作重点,如何在确保敏感信息不被非法访问的前提下更好地挖掘数据的价值成为亟需处理的问题。目前我们团队使用阿里云 DTS 或内部 ETL 工具将业务数据同步至分析型数据库(OLAP)中,实现数据分析类需求,但此类方案并不能很好地在同步过程中将敏感信息进行脱敏处理。
基于数据变更日志采集模块的工作积累,设计并实现了基于 MySQL Binlog Source 的数据实时脱敏同步方案。该方案利用已保存至 Pulsar Topic 中的 Binlog Event 信息,基于 Pulsar Functions 开发脱敏处理 function,根据规则引擎匹配脱敏处理方法,再通过 Common JDBC Sink 将脱敏后的数据持久化到分析型数据库中,提高了数据同步方案的扩展性和灵活性。
我们使用 Pulsar 解决了原有收集系统采集效率低、延迟率高的问题,并且针对多数据源兼容了不同的收集方式;同时在同步生产业务库方面,使用 Pulsar 替换掉了原有成本较大的 DTS 方案,链路式地进行数据脱敏,保证数据安全的同时也方便了数据分析团队能够更好更高效地利用数据。
基于传智教育信息建设的总体规划,集合博学谷的实际需求,未来我们会继续挖掘数据处理系统的价值,更好地利用 Apache Pulsar 这一优秀的消息系统,支撑系统运行和业务发展。
•通过数据变更日志采集方案简化业务日志功能的开发•使用数据实时脱敏同步方案替换阿里云 DTS•实现用户异常行为研判、学习效果评估、操作历史回放•建设跨部门的数据交换系统
感谢 Apache Pulsar 社区和 StreamNative 团队的支持,博学谷数据处理系统的建设和未来发展离不开开源社区的优秀贡献。博学谷研发团队将继续推进 Apache Pulsar 系统在公司业务系统建设中的应用,并鼓励团队成员更多地参与到开源社区活动中,与大家共同成长。
在调研、使用 Pulsar 过程中,我们充分利用了 Pulsar Functions、Pulsar IO 等诸多 Pulsar 的原生特性,也根据需求进行了部分优化。作为下一代云原生分布式消息流平台,Pulsar 的社区十分活跃,日益壮大。未来,我们计划基于 Pulsar 构建多维度的数据流规则引擎,使用 Pulsar 搭建集团电商平台基础中间件服务,增加 Pulsar 在传智教育的应用场景。
孙长宇 传智教育博学谷研发主管
刘梓霖 博学谷基础架构研发工程师
•Apache Pulsar 在能源互联网领域的落地实践•Apache Pulsar 在腾讯 Angel PowerFL 联邦学习平台上的实践•Apache Pulsar 在 BIGO 的性能调优实战(下)
[1]
Pulsar SQL: https://pulsar.apache.org/docs/en/sql-overview/[2]
传智教育官网: http://www.itcast.cn/[3]
Pulsar官方文档: https://pulsar.apache.org/docs/en/standalone/[4]
Debezium 官网: https://debezium.io/[5]
Trino官网: https://trino.io/[6]
Binlog Connector: https://github.com/shyiko/mysql-binlog-connector-java[7]
容联七陌: https://www.7moor.com/[8]
诸葛IO: https://zhugeio.com/[9]
DTS: https://help.aliyun.com/product/26590.html
点击“阅读原文”,获取 Apache Pulsar 硬核干货资料!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。