当前位置:   article > 正文

快速轻巧的CQRS和事件源解决方案_c# cqrs

c# cqrs

目录

介绍

何苦呢?

优先级

1.可读性

2.性能

3.可调试性

4.最小的依赖

5. 单独的命令和事件

6.多租户

7. Sagas/流程经理

8. 调度

9. 聚合到期

10. Async/Await是邪恶的

整洁架构

Timeline项目

样例项目

总览

入门

用法

方案A:如何创建和更新联系人

数据流

方案B:如何制作聚合快照

方案C:如何使聚合脱机

方案D:如何创建具有唯一登录名的新用户

方案E:如何调度命令

方案F:如何使用一个命令更新多个聚合

方案G:如何实现自定义事件处理程序

方案H:如何使用自定义处理程序覆盖命令

展示

应用程序

写端(Write Side)

读端(Read Side)

领域

持久性

CQRS + ES主干

命令

事件

聚合

快照

指标


本文的目的是提供使用C#编程语言和.NET FrameworkCQRS + ES模式的快速、轻量级实现。此实现功能比较全面,包括对命令和事件的SQL Server序列化、调度的命令、快照、sagas(即流程管理器)以及用于多租户自定义的即插即用替代的支持。我将描述代码的结构,并说明如何与示例应用程序一起工作。

介绍

如果您正在阅读本文,那么您可能已经对命令查询责任隔离(CQRS)和事件源(ES)有所了解,所以我不会解释它是什么,为什么要使用它,或者为什么你想要避免它。有很多相关的文章可用参考,如下:

本文的目的是提供使用C#编程语言和.NET Framework的快速,轻量级的实现。

此实现功能比较全面,包括支持SQL Server命令和事件的持久性、调度的命令、快照、sagas(即流程管理器)以及用于多租户自定义的即插即用替代。

我将描述代码的结构,并说明如何使用遵循整洁架构(Clean Architectur)模式的示例应用程序。

何苦呢?

几乎可以肯定这是您的第一个问题。

已经有很多CQRS + ES框架和解决方案,这些框架和解决方案已经得到充分开发和充分验证。如果您正在研究选择方案并评估构建vs购买决策,那么您可以选择出色的商业产品和开源替代品。

例如:

为什么要实现另一种解决方案?为什么我要从头开始并发展自己的?

我研究和使用CQRSES模式已经有几年了。我既使用了商业解决方案,也使用了开源解决方案,并使用它们来构建和/或改进实际的生产系统。如果您的背景和经历与我的相似,那么您可能已经知道该问题的答案,但是您可能不想相信这是真的(因为我很长时间没有这样做):

如果您认真考虑采用CQRS + ES架构,那么除了自行构建之外,别无选择。

就像克里斯·基尔(Chris Kiehl)在他的文章中说的那样:事件很难 ”

...您可能会从头开始构建核心组件。就技术堆栈而言,该领域的框架往往是重量级,规范性强,缺乏灵活性的框架。如果您想启动并运行某事,则应该自己动手做(这是建议的方法)。

如果是这样,那么对我来说写这篇文章有什么帮助?

简单:这是另一个带有源代码的示例,因此您可以看到我如何解决CQRS + ES解决方案中出现的一些问题。如果您是从CQRS + ES项目开始的,那么您应该研究发现的所有示例。

我不希望(或建议)采用此源代码并将其合并到您开发的任何应用程序中。

相反,我的意图仅是提供另一个示例,您可以从中得出一些想法——也许(如果我做得不错的话)为您自己的项目提供一些小启发。

优先级

从驱动实现的优先级列表开始是很重要的,因为要做出的许多设计决策都需要进行重大的权衡。

CQRS + ES的纯粹主义者会反对我的某些决定,并坚决谴责其他决定。我可以忍受这一点。

我已经设计和开发软件很长时间了(比我准备在这里承认的时间还长)。我流血、流汗、流泪和染白头发不止一点——所以我敏锐地意识到,面对取舍,选择不当所带来的成本。

以下优先级有助于告知和指导这些决定。它们以重要性的高低顺序列出,但是所有都是必需的,以,给自己倒杯酒,安顿下来,因为这里的序言将是很长的...

1.可读性

该代码必须可读

代码越可读,它就越有用、可维护和可扩展。

在我使用的某些实现中(以及我自己开发的某些实现中),除了原始作者之外,几乎所有其他人都无法理解底层CQRS + ES主干的代码。我们不能在这里允许。一小组开发人员必须能够并且相对容易地共享并使用代码,并且充分了解其工作方式以及为什么以这种方式编写代码。

尤其重要的是,用于注册命令处理程序和事件处理程序的代码必须简单明了。

许多CQRS + ES框架使用反射和依赖注入的组合来自动注册用户,以处理命令和事件。尽管这通常非常聪明——并通常会减少项目中的代码行总数——但它隐藏了命令(或事件)与其订阅者之间的关系,从而将这些关系变成了不透明的,神奇的黑框。许多控制反转(IoC)容器使此操作变得容易,因此可以理解,但我相信这是一个错误。

需要明确的是:在项目中使用IoC容器不是错误。依赖注入是一种出色的最佳实践,也是一种完善的重要技术。但是,依赖项注入模式本身并不是发布-订阅模式,并且将两者混为一谈会导致很多痛苦和灾难。在IoC容器库中使用高度专业化的功能来自动化该库的预期用途之外的功能,然后将软件体系结构中最关键的组件紧密结合在一起是一个错误(我自己做过)。当您的应用程序中的某些行为异常时,这将使故障排除和调试异常困难且耗时。

因此,作为此可读性目标的一部分,必须在代码中显式定义命令处理程序和事件处理程序的注册,而不是通过约定或自动化来隐式定义。

2.性能

代码必须是快速的

处理命令和事件是在CQRS + ES架构上开发的任何系统的核心,因此吞吐量优化是关键的性能指标。

就并发用户和系统发出命令并观察已发布事件的影响而言,实现必须处理最大可能的数量。

在我以前的一些实现中,很多痛苦和苦难是由于将命令发送到大型聚合(例如,具有大量事件流的长期聚合)时发生的并发冲突而引起的。根本原因通常是性能不佳的代码。因此,算法优化至关重要。

快照是满足此要求所不可或缺的,因此必须是解决方案所不可或缺的。该实现必须具有对每个聚合根上的自动快照的内置支持。

内存中缓存是运行时优化的另一个重要部分,因此,它也必须是解决方案不可或缺的一部分。

3.可调试性

使用标准调试器(如Visual Studio IDE调试器)来跟踪代码并跟踪其执行必须是很容易的。

我已经看到许多CQRS + ES实现依赖于复杂的算法来动态注册、查找和调用用于处理命令和事件的方法。

同样,这些算法中的许多算法都非常聪明:它们具有强大的功能和灵活性,并且可以显着减少解决方案中的代码行数。

例如,我在过去的一些项目中使用过DynamicInvoker类。这是一段巧妙的代码——少于150行——而且效果很好。(我没有写它,所以当我这么说的时候我并不自夸。)但是,如果代码中有些杂乱无章的东西,您已经编写了调用此类的方法的代码,并且如果需要使用调试器,然后你需要特别熟练地进行思维体操,以了解所发生的事情。我不是,所以如果使用任何动态调用,那么在使用调试器时,理解代码和跟踪其执行的线程必须非常容易。

4.最小的依赖

外部依赖性必须保持在绝对的最低限度。

过多的依赖性导致代码比您在系统的任何关键组件中可能需要的速度更慢,更重且更脆弱。最小化依赖关系有助于确保您的代码更快、更轻巧、更健壮。

最重要的是,最小化依赖性有助于确保解决方案不会与任何外部程序集、服务或组件紧密耦合,除非该依赖性至关重要。

如果软件的基本体系结构依赖于某些外部第三方组件,则必须做好准备,有可能在某天对其进行更改可能会影响您的项目。有时这是可以接受的风险,而其他时候则不是。

在该特定实现方式中,对该风险的容忍度非常低。

因此,您会注意到核心的Timeline程序集(实现CQRS + ES主干)仅具有一个外部依赖项:即System.NET Framework中的名称空间。

旁白一下,因为这是一篇有趣的文章,说明了我的观点:在撰写本文时,2018年,NPM JavaScript软件包单数在一周内有280万以上的安装。所有这些开发人员都没有编写基本的代码来让函数在数字为奇数时返回true的情况,而是选择将is-odd程序包与他们的300多个依赖项链合并到他们的解决方案中!

5. 单独的命令和事件

许多CQRS + ES框架都以共同的基类派生的方式实现一个Command类和一个Event类。

这样做的理由很明显:将命令和事件都视为通用消息的子类型是很自然的。两者都是使用某种形式的服务总线”“发送的,那么为什么不在共享基类中实现通用功能,而编写一个双重用途的类来路由消息——而不是编写大量重复代码呢?

这是我过去采用的方法,对此有很好的论据。

但是,我现在认为这可能是一个错误。引用罗伯特·马丁(Robert C. Martin)的话

软件开发人员经常陷入陷阱——陷阱取决于他们对重复的恐惧。在软件中,复制通常是一件坏事。但是有不同种类的重复。确实存在重复,其中对一个实例的每次更改都必须对该实例的每个副本进行相同的更改。然后有虚假或偶然的重复。如果两个明显重复的代码部分沿着不同的路径发展——如果它们以不同的速率变化并且由于不同的原因——那么它们就不是真正的重复...当您将用例彼此垂直分离时,就会遇到这个问题,您的诱惑是将用例耦合在一起,因为它们具有相似的用户界面,相似的算法或相似的数据库模式。小心。抵制诱惑,不要犯条件反射式消除重复的罪。确保重复是真实的。

命令和事件彼此之间有足够的不同,以保证它们可以沿着不同的路径发展并适应系统需求。

我还没有经历过通过消除A)发送/处理命令和B)发布/处理事件的重复代码来提高代码质量、性能或可读性的任何情况。

因此,命令和事件不得具有任何共享基类,并且用于发送/发布命令/事件的机制一定不能是共享队列。

6.多租户

多租户必须是解决方案不可或缺的组成部分,而不是事后必须附加的功能或设施。

这些天来,我专门构建和维护企业多租户系统。这意味着我只有一个应用程序的单个实例,该实例为具有多个并发用户的多个并发租户提供服务。

在此实现中将多租户作为优先级有几个原因:

  • 每个集合必须分配给一个租户。这使得数据的所有权清晰且定义明确。
  • 当需要扩大规模时,分片必须易于实现。分片是将聚合分布到多个写侧节点,而租户是划分聚合的最自然边界。
  • 特定于租户的自定义必须易于实现。每个应用程序对于每个命令和每个事件都有核心的默认行为,但是在为许多不同的组织和/或利益相关者服务的大型复杂应用程序中,不同的租户肯定具有各种特定的需求。有时差异很小。有时它们很重要。此处的解决方案必须允许开发人员使用针对特定租户定制的功能来覆盖命令和/或事件的默认处理。覆盖必须是明确的,因此易于识别、启用或禁用。

7. Sagas/流程经理

实现流程管理器所需的步骤数量必须相对较少,并且流程管理器的代码必须相对易于编写。

流程管理器(有时称为saga)是一个独立的组件,它以交叉聚合、最终一致的方式对域事件做出反应。流程管理器有时纯粹是反应性的,有时代表工作流。

从技术角度来看,流程管理器是一种状态机,受传入事件的驱动,这些事件可能是从多个聚合发布的。每个状态都可能有副作用(例如,发送命令,与外部Web服务通信,发送电子邮件)。

我曾使用过一些CQRS + ES框架,这些框架根本不支持流程管理器,而其他框架则支持该概念,但不以易于理解或配置的方式提供支持。

例如,在我自己过去的一种实现中,事件被附加到数据库日志之后,事件存储将立即发布事件。它不是由聚集或命令处理程序发布的。这甚至使实现最基本的工作流程也变得异常困难:我无法从事件处理程序中向聚合发送同步命令,因为事件存储的Save方法在同步锁(以维护线程安全)内执行,并且新事件不创建死锁就无法发布。

无论工作流程的状态机多么简单或复杂,要协调该流程中的事件,都需要具有副作用的代码,例如向其他聚合发送命令,向外部Web服务发送请求或发送电子邮件。因此,此处的解决方案必须具有本地的内置支持才能实现此目的。

8. 调度

命令调度必须是解决方案不可或缺的一部分。

使用计时器发送命令必须很容易,因此该命令会在计时器经过后执行。这使开发人员可以指示执行任何命令的特定日期和时间。

这对于必须依赖时间触发的命令很有用。

对于在正常执行流程之外的后台进程中必须脱机执行的命令,它也很有用。这种类型的完全异步操作非常适合需要较长时间才能完成的命令。

例如,假设您有一条命令要求在某些外部第三方Web服务上调用方法,并且该服务通常需要超过80万毫秒才能响应。必须安排此类命令在非高峰时间执行和/或在执行的主线程之外执行。

9. 聚合到期

该解决方案必须具有本机内置的聚合到期和清除支持。

我需要一个CQRS + ES解决方案,该解决方案可以轻松地将聚合事件流从联机结构化日志复制到脱机存储,并从事件存储中清除它。

事件源极简主义者将立即对此进行红色标记,并说绝不可更改或删除聚合事件流。他们会说事件(因此是聚集)从定义上是不可变的。

但是,在某些情况下,这是不可协商的业务需求。

  • 第一:当客户不续订对多租户应用程序的订阅时,托管该应用程序的服务提供商通常负有合同义务,要求从其系统中删除该客户的数据。
  • 第二:当项目团队进行频繁的集成测试以确认系统功能正常运行时,从定义上看,输入和输出这些测试的数据是临时的。用于测试聚合的事件流的永久存储是浪费磁盘空间,没有当前或将来的业务价值;我们需要一种删除它的机制。

因此,可以说,这里的解决方案必须提供一种简便的方法来将聚合移出操作系统并移入冷存储

10. Async/Await是邪恶的

我当然在开玩笑。

但事实并非如此。

C#中的async/await模式产生非常高性能的代码。毫无疑问。在某些情况下,我已经看到它将性能提高了一个数量级或更多。

async/await模式可以在此解决方案的将来迭代中应用,但是——尽管此列表中具有第二优先级——在此解决方案中它是不允许的,因为它会导致破坏第一优先级。

在将async/await引入方法后,您将被迫转换其调用方,以便它们使用asyncawait(或被迫开始将干净的代码包装在脏线程块中),然后被迫转换这些调用方的调用方,因此他们在整个代码库中使用async/await,依此类推。该async/await关键字蔓延像传染性僵尸病毒。由此产生的异步代码混乱几乎可以肯定会更快,但同时更难阅读,甚至更难调试。

可读性是这里的重中之重,因此,我一直避免async/await直到它是提高性能的唯一剩余选择(而且提高性能是不可商议的业务要求)。

整洁架构

马修· 伦兹(Matthew Renze在整洁架构主题方面开设了出色的Pluralsight。该解决方案的源代码包含五个程序集,并且遵循他所倡导的简洁架构模式。

Timeline项目

Timeline程序集实现了CQRS + ES主干。该程序集没有上游依赖性,因此它并不特定于任何应用程序。它可以从示例应用程序中断开,并集成到一个新的解决方案中,以开发完全不同的应用程序。

其他四个程序集(Sample*)使用Timeline程序集在控制台应用程序中实现这些层,以演示我在CQRS + ES软件系统中执行常见任务的方法。

项目依赖关系图如下所示:

样例项目

注意,Timeline程序集没有引用任何Sample程序集。

还要注意以领域为中心的方法:领域层不依赖于PresentationApplicationPersistence层。

示例领域的实体关系图如图2所示:

在此基本数据模型中:

  • 一个Person0..N个银行Account
  • 一个Transfer从一个账户取钱,然后存入另一个账户;
  • 一个User可能是没有个人数据的管理员,或者是拥有多个租户拥有的个人数据的某人

请记住:每个PersonAccountTransfer都是聚合根,因此这些实体中的每个都有一个Tenant属性。

总览

3展示了此解决方案中CQRS + ES的总体方法:

请注意,Write Side(命​​令)和Read Side(查询)已被很好地描述。

您还可以看到,事件源非常类似于Write Side的插件。尽管此解决方案中未进行演示,但您可以看到不带事件源的CQRS解决方案的外观,有时(取决于CQRS)是更好的模式,具体取决于项目的要求。

以下是该体系结构的关键特征:

  • 命令队列将命令(调度所需)保存在结构化日志中。
  • 命令订阅者在命令队列上侦听命令。
  • 命令订户负责创建聚合并在执行命令时在聚合上调用方法。
  • 命令订户将聚合(作为事件流)保存在结构化日志中。
  • 命令订户在事件队列上发布事件。
  • 已发布的事件由事件订阅者和流程管理器处理。
  • 流程管理器可以响应事件在命令队列上发送命令。
  • 事件订阅者在查询存储中创建和更新投影。
  • 查询搜索是用于读取投影的轻量级数据访问层。

入门

在编译和执行源代码之前:

  1. 执行脚本Create Database.sql以创建本地SQL Server数据库。
  2. 更新Web.config的连接字符串。
  3. OfflineStoragePath更新Web.configappSetting值。

用法

我将从顶部开始并演示如何使用它,然后从应用程序堆栈一直向下浏览到CQRS + ES主干的基本细节,而不是从底部开始描述Timeline程序集的工作方式。

如果我吸引了你这么长时间,那么我应该感谢你陪我到现在……

方案A:如何创建和更新联系人

这是最简单的用法。

在这里,我们创建一个新的联系人,然后进行名称更改,模拟Alice结婚的用例:

  1. public static void Run(ICommandQueue commander)
  2. {
  3. var alice = Guid.NewGuid();
  4. commander.Send(new RegisterPerson(alice, "Alice", "O'Wonderland"));
  5. commander.Send(new RenamePerson(alice, "Alice", "Cooper"));
  6. }

这样运行后,读端投影看起来很好,正如预期的那样:

数据流

下图说明了系统在这种情况下执行的步骤:

方案B:如何制作聚合快照

快照由“Timeline”程序集自动执行。默认情况下,每个聚合都启用了它们,因此您无需执行任何操作即可运行此功能。

在下一个测试运行中,Timeline程序集被配置为每10个事件后拍摄一次快照。我们注册一个新的联系人,然后将其重命名20次。这将在事件编号20上生成快照,这是倒数第二个重命名操作。

  1. public static void Run(ICommandQueue commander)
  2. {
  3. var henry = Guid.NewGuid();
  4. commander.Send(new RegisterPerson(henry, "King", "Henry I"));
  5. for (int i = 1; i <= 20; i++)
  6. commander.Send(new RenamePerson(henry, "King", "Henry " + (i+1).ToRoman()));
  7. }

不出所料,我们在版本20上有一个快照,并在事件编号后显示了当前状态21

方案C:如何使聚合脱机

术语装箱取消装箱用于使聚合脱机并使其重新联机。

当您发送命令将汇总框装箱时Timeline程序集:

  1. 创建快照;
  2. 将快照和整个聚合事件流复制到存储在文件系统目录中的JSON文件中;然后
  3. SQL Server结构化日志表中删除快照和聚合。

当然,这使它成为极具破坏性的操作,除非是强制性的业务/法律要求,否则永远不要使用它。

在下一个测试运行中,我们注册一个新的联系人,将其重命名7次,然后将聚合框起来。

  1. public static void Run(ICommandQueue commander)
  2. {
  3. var hatter = Guid.NewGuid();
  4. commander.Send(new RegisterPerson(hatter, "Mad", "Hatter One"));
  5. for (int i = 2; i <= 8; i++)
  6. commander.Send(new RenamePerson(hatter, "Mad", "Hatter " + i.ToWords().Titleize()));
  7. commander.Send(new BoxPerson(hatter));
  8. }

如您所见,聚合不再存在于事件存储中,并且最终快照的脱机副本(以及整个事件流)已在文件系统上进行了制作。

方案D:如何创建具有唯一登录名的新用户

在开发人员中,我最经常在线上尝试理解CQRS + ES的问题是:

如何强制执行参照完整性以确保新用户具有唯一的登录名?

在我对CQRS + ES模式进行研究的初期,我自己(不止一次)问过同样的问题。

来自经验丰富的从业人员的许多答案看起来像这样:

您的问题表明您不了解CQRS + ES

这是真的(我现在意识到),但是完全没有帮助——尤其是对于那些努力学习的人。

一些答案稍好一些,以摘要形式提供了高级建议,但使用了CQRS + ES术语,但这也不总是很有帮助。我最喜欢的建议之一是(来自Edument的好伙伴):

创建一个反应式的saga,以标记和停用仍然使用重复的用户名创建的帐户,无论是由于极端巧合还是恶意或由于客户端故障引起的。

我第一次读到我对它的含义只有一个模糊的认识,根本不知道如何开始执行这样的建议。

下一个测试运行以真实的工作代码为例,展示了一种使用唯一名称创建新用户的方法(但不是唯一方法)。

在这种情况下,诀窍是要意识到您实际上确实需要一个saga(或我更喜欢称呼它流程管理器)。创建新的用户帐户不是一步一步的操作。这是一个过程,因此需要协调。流程图(或状态机,如果您愿意的话)在您的应用程序中可能非常复杂,但是即使在所有可能的情况中最简单的情况下,它也会看起来像这样:

下图显示了依赖于流程管理器来实现此功能的代码:

  1. public void Run()
  2. {
  3. var login = "jack@example.com";
  4. var password = "Let_Me_In!";
  5. if (RegisterUser(Guid.NewGuid(), login, password)) // succeeds.
  6. System.Console.WriteLine($"User registration for {login} succeeded");
  7. if (!RegisterUser(Guid.NewGuid(), login, password)) // fails; duplicate login.
  8. System.Console.WriteLine($"User registration for {login} failed");
  9. }
  10. private bool RegisterUser(Guid id, string login, string password)
  11. {
  12. bool isComplete(Guid user) { return _querySearch.IsUserRegistrationCompleted(user); }
  13. const int waitTime = 200; // ms
  14. const int maximumRetries = 15; // 15 retries (~3 seconds)
  15. _commander.Send(new StartUserRegistration(id, login, password));
  16. for (var retry = 0; retry < maximumRetries && !isComplete(id); retry++)
  17. Thread.Sleep(waitTime);
  18. if (isComplete(id))
  19. {
  20. var summary = _querySearch.SelectUserSummary(id);
  21. return summary?.UserRegistrationStatus == "Succeeded";
  22. }
  23. else
  24. {
  25. var error = $"Registration for {login} has not completed after
  26. {waitTime * maximumRetries} ms";
  27. throw new IncompleteUserRegistrationException(error);
  28. }
  29. }

请注意,上面示例中的调用方未假定命令StartUserRegistration的同步处理。而是轮询注册的状态,等待注册完成。

知道Timeline程序集中的代码是同步的,我们可以重构方法RegisterUser,使其更加简单:

  1. private bool RegisterUserNoWait(Guid id, string login, string password)
  2. {
  3. bool isComplete(Guid user) { return _querySearch.IsUserRegistrationCompleted(user); }
  4. _commander.Send(new StartUserRegistration(id, login, password));
  5. Debug.Assert(isComplete(id));
  6. return _querySearch.SelectUserSummary(id).UserRegistrationStatus == "Succeeded";
  7. }

流程管理器本身的代码比您可能想象的要简单:

  1. public class UserRegistrationProcessManager
  2. {
  3. private readonly ICommandQueue _commander;
  4. private readonly IQuerySearch _querySearch;
  5. public UserRegistrationProcessManager
  6. (ICommandQueue commander, IEventQueue publisher, IQuerySearch querySearch)
  7. {
  8. _commander = commander;
  9. _querySearch = querySearch;
  10. publisher.Subscribe<UserRegistrationStarted>(Handle);
  11. publisher.Subscribe<UserRegistrationSucceeded>(Handle);
  12. publisher.Subscribe<UserRegistrationFailed>(Handle);
  13. }
  14. public void Handle(UserRegistrationStarted e)
  15. {
  16. // Registration succeeds only if no other user has the same login name.
  17. var status = _querySearch
  18. .UserExists(u => u.LoginName == e.Name
  19. && u.UserIdentifier != e.AggregateIdentifier)
  20. ? "Failed" : "Succeeded";
  21. _commander.Send(new CompleteUserRegistration(e.AggregateIdentifier, status));
  22. }
  23. public void Handle(UserRegistrationSucceeded e) { }
  24. public void Handle(UserRegistrationFailed e) { }
  25. }

那里有一个基本的反应式saga,它标记了使用重复用户名创建的不活动帐户。有很多的欣喜。

如预期的那样,第一次注册成功,而第二次失败:

方案E:如何调度命令

调度命令在将来的日期/时间运行很容易:

  1. public static void Run(ICommandQueue commander)
  2. {
  3. var alice = Guid.NewGuid();
  4. var tomorrow = DateTimeOffset.UtcNow.AddDays(1);
  5. commander.Schedule(new RegisterPerson(alice, "Alice", "O'Wonderland"), tomorrow);
  6. // After the above timer elapses, any call to Ping() executes the scheduled command.
  7. // commander.Ping();
  8. }

注意,这不会在事件日志中创建任何聚合,并且命令日志现在包含计划的条目:

方案F:如何使用一个命令更新多个聚合

这是试图了解如何实现CQRS + ES模式的开发人员提出的另一个常见问题。当我自己学习时,这是我(很多次)问过的另一个问题。

从业者经常回答:

你不能。

这不是很有启发性的。

有些人会提供更多指导,内容如下:

聚合和命令处理程序的分解将使这种想法无法在代码中表达。

最初几次阅读该语句似乎很神秘,最后发现它对于验证实现很有帮助,但是从一开始它并不是超级有用。

最有用的是一个带有实际工作代码的示例,该示例首先实现了引发问题的功能类型:

  • 假设我有两个银行帐户,每个银行帐户都是一个总根,我想将资金从一个帐户转移到另一个帐户。如何使用CQRS + ES做到这一点?

下一次测试运行显示了可以完成此操作的一种方法(而非唯一方法)。

在这种情况下,诀窍是要意识到您需要另一个聚合根——即汇款本身不是一个帐户——并且需要一个流程管理器来协调工作流。

下图说明了最简单的流程图。(会计系统显然需要比这更复杂的东西。)

一旦完成所有步骤,依靠流程管理器来实现上述工作流程的代码就很容易了:

  1. public void Run()
  2. {
  3. // Start one account with $100.
  4. var bill = Guid.NewGuid();
  5. CreatePerson(bill, "Bill", "Esquire");
  6. var blue = Guid.NewGuid();
  7. StartAccount(bill, blue, "Bill's Blue Account", 100);
  8. // Start another account with $100.
  9. var ted = Guid.NewGuid();
  10. CreatePerson(ted, "Ted", "Logan");
  11. var red = Guid.NewGuid();
  12. StartAccount(ted, red, "Ted's Red Account", 100);
  13. // Create a money transfer for Bill giving money to Ted.
  14. var tx = Guid.NewGuid();
  15. _commander.Send(new StartTransfer(tx, blue, red, 69));
  16. }
  17. private void StartAccount(Guid person, Guid account, string code, decimal deposit)
  18. {
  19. _commander.Send(new OpenAccount(account, person, code));
  20. _commander.Send(new DepositMoney(account, deposit));
  21. }
  22. private void CreatePerson(Guid person, string first, string last)
  23. {
  24. _commander.Send(new RegisterPerson(person, first, last));
  25. }

执行该测试后,Bill的蓝色帐户余额为31美元,Ted的红色帐户余额为169美元,这与预期的一样:

汇款流程管理器的代码也不太困难:

  1. public class TransferProcessManager
  2. {
  3. private readonly ICommandQueue _commander;
  4. private readonly IEventRepository _repository;
  5. public TransferProcessManager
  6. (ICommandQueue commander, IEventQueue publisher, IEventRepository repository)
  7. {
  8. _commander = commander;
  9. _repository = repository;
  10. publisher.Subscribe<TransferStarted>(Handle);
  11. publisher.Subscribe<MoneyDeposited>(Handle);
  12. publisher.Subscribe<MoneyWithdrawn>(Handle);
  13. }
  14. public void Handle(TransferStarted e)
  15. {
  16. var withdrawal = new WithdrawMoney(e.FromAccount, e.Amount, e.AggregateIdentifier);
  17. _commander.Send(withdrawal);
  18. }
  19. public void Handle(MoneyWithdrawn e)
  20. {
  21. if (e.Transaction == Guid.Empty)
  22. return;
  23. var status = new UpdateTransfer(e.Transaction, "Debit Succeeded");
  24. _commander.Send(status);
  25. var transfer = (Transfer) _repository.Get<TransferAggregate>(e.Transaction).State;
  26. var deposit = new DepositMoney(transfer.ToAccount, e.Amount, e.Transaction);
  27. _commander.Send(deposit);
  28. }
  29. public void Handle(MoneyDeposited e)
  30. {
  31. if (e.Transaction == Guid.Empty)
  32. return;
  33. var status = new UpdateTransfer(e.Transaction, "Credit Succeeded");
  34. _commander.Send(status);
  35. var complete = new CompleteTransfer(e.Transaction);
  36. _commander.Send(complete);
  37. }
  38. }

方案G:如何实现自定义事件处理程序

在下一个示例中,我将演示如何定义一个自定义事件处理程序,该事件处理程序旨在供多租户系统中的一个租户和仅一个租户使用。

在这种情况下,Umbrella Corporation是我们的租户之一,并且该组织希望我们系统中所有现有的核心功能。但是,该公司还需要其他自定义功能:

  • 当从任何一个Umbrella帐户开始进行资金转账或向其进行资金转账时,如果美元金额超过10,000美元,则必须将电子邮件通知直接发送给公司所有者。

为了满足此要求,我们为租户实现了流程管理器。依赖于此流程管理器的调用代码与之前的场景没有什么不同。

  1. public void Run()
  2. {
  3. // Start one account with $50,000.
  4. var ada = Guid.NewGuid();
  5. CreatePerson(ada, "Ada", "Wong");
  6. var a = Guid.NewGuid();
  7. StartAccount(ada, a, "Ada's Account", 50000);
  8. // Start another account with $25,000.
  9. var albert = Guid.NewGuid();
  10. CreatePerson(albert, "Albert", "Wesker");
  11. var b = Guid.NewGuid();
  12. StartAccount(albert, b, "Albert's Account", 100);
  13. // Create a money transfer for Ada giving money to Albert.
  14. var tx = Guid.NewGuid();
  15. _commander.Send(new StartTransfer(tx, a, b, 18000));
  16. }
  17. private void StartAccount(Guid person, Guid account, string code, decimal deposit)
  18. {
  19. _commander.Send(new OpenAccount(account, person, code));
  20. _commander.Send(new DepositMoney(account, deposit));
  21. }
  22. private void CreatePerson(Guid person, string first, string last)
  23. {
  24. _commander.Send(new RegisterPerson(person, first, last));
  25. }

这是Visual Studio调试器的快照,查看流程管理器的代码,在发送电子邮件通知的行上有一个断点。请注意,弹出窗口中的消息正文是我们期望的:

方案H:如何使用自定义处理程序覆盖命令

最后一个示例是前一个示例的变体。Umbrella Corporation希望完全禁用核心应用程序功能,并将其替换为完全自定义的行为。新的业务需求如下所示:

  • 不允许在我们的系统中更改联系人的姓名。曾经

为了满足此要求,我们对流程管理器进行了一些简单的更改。我们向构造函数添加一行代码,指定覆盖,然后添加替换函数:

  1. public class UmbrellaProcessManager
  2. {
  3. private IQuerySearch _querySearch;
  4. public UmbrellaProcessManager
  5. (ICommandQueue commander, IEventQueue publisher, IQuerySearch querySearch)
  6. {
  7. _querySearch = querySearch;
  8. publisher.Subscribe<TransferStarted>(Handle);
  9. commander.Override<RenamePerson>(Handle, Tenants.Umbrella.Identifier);
  10. }
  11. public void Handle(TransferStarted e) { }
  12. public void Handle(RenamePerson c)
  13. {
  14. // Do nothing. Umbrella does not permit renaming people.
  15. // Throw an exception to make the consequences even more severe
  16. // for any attempt to rename a person...
  17. // throw new DisallowRenamePersonException();
  18. }
  19. }

这是一个基本的测试运行,以证明此功能可以按预期进行:

  1. public static class Test08
  2. {
  3. public static void Run(ICommandQueue commander)
  4. {
  5. ProgramSettings.CurrentTenant = Tenants.Umbrella;
  6. var alice = Guid.NewGuid();
  7. commander.Send(new RegisterPerson(alice, "Alice", "Abernathy"));
  8. commander.Send(new RenamePerson(alice, "Alice", "Parks"));
  9. }
  10. }

请注意,日志中只有一个事件,并且此人的姓名没有变化:

展示

示例应用程序中的表示层是一个控制台应用程序,仅用于编写和运行测试用例场景。

这里没有什么值得特别注意的。您会注意到,我没有使用第三方组件进行依赖注入。相反,我写了一个非常基本的内存服务定位器。

这样做仅是为了使示例应用程序尽可能小且尽可能集中。在您自己的表示层中,将使用您喜欢的任何IoC容器,以最适合您的方式实吸纳依赖项注入。

应用程序

应用程序层分为两个不同的部分:用于命令的写端和用于查询的读端。这种划分有助于确保我们不会意外地混合使用写端和读端功能。

请注意,这里没有引用外部第三方程序集:

写端(Write Side)

命令是普通的旧C#对象(PO​​CO)类,因此它们可以轻松地用作数据传输对象(DTO)以便于序列化:

  1. public class RenamePerson : Command
  2. {
  3. public string FirstName { get; set; }
  4. public string LastName { get; set; }
  5. public RenamePerson(Guid id, string firstName, string lastName)
  6. {
  7. AggregateIdentifier = id;
  8. FirstName = firstName;
  9. LastName = lastName;
  10. }
  11. }

注意:与数据传输对象相比,我更喜欢数据包一词,并且我知道许多读者会反对,因此请选择适合您和您的团队的术语。

命令处理程序方法的注册在命令订户类的构造函数中是显式的,并且事件在保存到事件存储后才发布:

  1. public class PersonCommandSubscriber
  2. {
  3. private readonly IEventRepository _repository;
  4. private readonly IEventQueue _publisher;
  5. public PersonCommandSubscriber
  6. (ICommandQueue commander, IEventQueue publisher, IEventRepository repository)
  7. {
  8. _repository = repository;
  9. _publisher = publisher;
  10. commander.Subscribe<RegisterPerson>(Handle);
  11. commander.Subscribe<RenamePerson>(Handle);
  12. }
  13. private void Commit(PersonAggregate aggregate)
  14. {
  15. var changes = _repository.Save(aggregate);
  16. foreach (var change in changes)
  17. _publisher.Publish(change);
  18. }
  19. public void Handle(RegisterPerson c)
  20. {
  21. var aggregate = new PersonAggregate { AggregateIdentifier = c.AggregateIdentifier };
  22. aggregate.RegisterPerson(c.FirstName, c.LastName, DateTimeOffset.UtcNow);
  23. Commit(aggregate);
  24. }
  25. public void Handle(RenamePerson c)
  26. {
  27. var aggregate = _repository.Get<PersonAggregate>(c.AggregateIdentifier);
  28. aggregate.RenamePerson(c.FirstName, c.LastName);
  29. Commit(aggregate);
  30. }
  31. }

读端(Read Side)

查询也是POCO类,使其轻量且易于序列化。

  1. public class PersonSummary
  2. {
  3. public Guid TenantIdentifier { get; set; }
  4. public Guid PersonIdentifier { get; set; }
  5. public string PersonName { get; set; }
  6. public DateTimeOffset PersonRegistered { get; set; }
  7. public int OpenAccountCount { get; set; }
  8. public decimal TotalAccountBalance { get; set; }
  9. }

事件处理程序方法的注册在事件订阅者类的构造函数中也很明显

  1. public class PersonEventSubscriber
  2. {
  3. private readonly IQueryStore _store;
  4. public PersonEventSubscriber(IEventQueue queue, IQueryStore store)
  5. {
  6. _store = store;
  7. queue.Subscribe<PersonRegistered>(Handle);
  8. queue.Subscribe<PersonRenamed>(Handle);
  9. }
  10. public void Handle(PersonRegistered c)
  11. {
  12. _store.InsertPerson(c.IdentityTenant, c.AggregateIdentifier,
  13. c.FirstName + " " + c.LastName, c.Registered);
  14. }
  15. public void Handle(PersonRenamed c)
  16. {
  17. _store.UpdatePersonName(c.AggregateIdentifier, c.FirstName + " " + c.LastName);
  18. }
  19. }

领域

领域仅包含聚合和事件。再次,您将看到这里的参考列表尽可能裸机:

每个聚合根类都包含一个函数,用于接受其更改状态请求:

  1. public class PersonAggregate : AggregateRoot
  2. {
  3. public override AggregateState CreateState() => new Person();
  4. public void RegisterPerson(string firstName, string lastName, DateTimeOffset registered)
  5. {
  6. // 1. Validate command
  7. // Omitted for the sake of brevity.
  8. // 2. Validate domain.
  9. // Omitted for the sake of brevity.
  10. // 3. Apply change to aggregate state.
  11. var e = new PersonRegistered(firstName, lastName, registered);
  12. Apply(e);
  13. }
  14. public void RenamePerson(string firstName, string lastName)
  15. {
  16. var e = new PersonRenamed(firstName, lastName);
  17. Apply(e);
  18. }
  19. }

注意,聚合状态是在与聚合根分开的类中实现的。

这使得序列化和快照更易于管理,并有助于整体可读性,因为它在命令相关功能和事件相关功能之间进行了更强的划分:

  1. public class Person : AggregateState
  2. {
  3. public string FirstName { get; set; }
  4. public string LastName { get; set; }
  5. public DateTimeOffset Registered { get; set; }
  6. public void When(PersonRegistered @event)
  7. {
  8. FirstName = @event.FirstName;
  9. LastName = @event.LastName;
  10. Registered = @event.Registered;
  11. }
  12. public void When(PersonRenamed @event)
  13. {
  14. FirstName = @event.FirstName;
  15. LastName = @event.LastName;
  16. }
  17. }

事件(如命令和查询)是轻量级的POCO类:

  1. public class PersonRenamed : Event
  2. {
  3. public string FirstName { get; set; }
  4. public string LastName { get; set; }
  5. public PersonRenamed(string first, string last) { FirstName = first; LastName = last; }
  6. }

持久性

在持久层,我们开始看到对外部第三方组件的更多依赖关系。例如,在这里我们依靠:

该项目中的源代码实现了标准的常规数据访问层,并且在这一层中应该没有什么是新的,特别是创新的,或任何有经验的开发人员都感到惊讶的,因此不需要进行特殊讨论。

CQRS + ES主干

女士们,先生们,终于(漫长)终于来到了您一直在等待的夜晚:Timeline程序集实际上实现了CQRS + ES模式,这使得上述所有事情成为可能。

有趣的是...现在我们已经到了基本要点,剩下的谜团应该很少了。

您会注意到的第一件事是Timeline程序集依赖于外部第三方组件(显然,.NET Framework本身除外)。

命令

这里只有几件事要注意。

如您所料,Command基类包含用于聚合标识符和版本号的属性。它还包含用于租户和发送命令的用户身份的属性。

  1. /// <summary>
  2. /// Defines the base class for all commands.
  3. /// </summary>
  4. /// <remarks>
  5. /// A command is a request to change the domain. It is always are named with a verb in
  6. /// the imperative mood, such as Confirm Order. Unlike an event, a command is not a
  7. /// statement of fact; it is only a request, and thus may be refused. Commands are
  8. /// immutable because their expected usage is to be sent directly to the domain model for
  9. /// processing. They do not need to change during their projected lifetime.
  10. /// </remarks>
  11. public class Command : ICommand
  12. {
  13. public Guid AggregateIdentifier { get; set; }
  14. public int? ExpectedVersion { get; set; }
  15. public Guid IdentityTenant { get; set; }
  16. public Guid IdentityUser { get; set; }
  17. public Guid CommandIdentifier { get; set; }
  18. public Command() { CommandIdentifier = Guid.NewGuid(); }
  19. }

CommandQueue实现了ICommandQueue接口,该接口定义了一组用于注册订阅者和覆盖以及发送和调度命令的方法。您可以将其视为命令的服务总线

事件

Event基类包含用于集合标识符和版本号的属性,以及用于为其发起/发布事件的租户和用户的标识的属性。这样可以确保每个事件日志条目都与特定的租户和用户相关联。

您可以将其EventQueue视为事件的服务总线。

聚合

AggregateState类只有一点点黑魔法。Apply方法使用反射来确定将事件应用于聚合状态时要调用的方法。我不是特别喜欢这种方法,但是我找不到任何避免方法。幸运的是,这些代码易于阅读和理解:

  1. /// <summary>
  2. /// Represents the state (data) of an aggregate. A derived class should be a POCO
  3. /// (DTO/Packet) that includes a When method for each event type that changes its
  4. /// property values. Ideally, the property values for an instance of this class
  5. /// should be modified only through its When methods.
  6. /// </summary>
  7. public abstract class AggregateState
  8. {
  9. public void Apply(IEvent @event)
  10. {
  11. var when = GetType().GetMethod("When", new[] { @event.GetType() });
  12. if (when == null)
  13. throw new MethodNotFoundException(GetType(), "When", @event.GetType());
  14. when.Invoke(this, new object[] { @event });
  15. }
  16. }

快照

实现快照的源代码比我最初启动该项目时想象的更加整洁和简单。逻辑有些复杂,但是Snapshots命名空间中只有240行代码,因此在此不再赘述。

指标

我将以一些基本指标结束本文。(稍后再介绍。)

这是NDepend根据Timeline程序集生成的分析报告:

如您所见,源代码并不完美,但确实获得了“A”级评级,技术债务估计仅为1.3%。在我撰写本文时,该项目也非常紧凑,只有439行代码。

注意NDepend 从程序集.pdb符号文件中每个方法的序列点数中计算代码行(LOCVisual StudioLOC的计数不同;在Timeline项目上,它报告了1,916行源代码,以及277行可执行代码。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/179716
推荐阅读
相关标签
  

闽ICP备14008679号