EventFlow 是一个基于 .NET 的轻量级 CQRS + Event Sourcing 框架,提供了完善且合理的默认实现,帮助开发者以较低成本构建事件驱动的领域模型。
核心概念
Aggregates(聚合根)
聚合是领域模型的核心,它负责保证业务规则和状态变更的一致性。 在 EventFlow 中,所有对业务状态的修改都必须通过聚合完成,聚合内部通过产生 领域事件(Event) 来表达“发生了什么”,而不是直接改数据。
Command Bus & Commands(命令总线与命令)
命令是“我要做什么” 的表达,比如“创建订单”“支付订单”。 Command Bus 是所有命令的统一入口,负责把命令路由到对应的命令处理器,再由处理器调用聚合执行业务逻辑。
Event Store(事件存储)
Event Store 用来持久化聚合产生的事件流,而不是直接存状态。 聚合的当前状态,是通过“从头回放事件”得到的。
EventFlow 支持多种事件存储方式:
In-memory / Files(仅测试)
SQL Server / EF Core
SQLite / PostgreSQL
EventStore(专业事件存储)
Subscribers(事件订阅者)
订阅者用于监听已经提交成功的领域事件,并在事件发生后执行额外动作,例如:
发送通知
更新外部系统
触发异步流程
它不会影响聚合本身的一致性。
Read Models(读模型)
读模型是为“查询”而存在的,通常是事件的非规范化结果,结构更适合快速读取。 它和写模型(聚合)完全分离,是 CQRS 的关键组成部分。
支持的存储包括:
Elasticsearch
SQL Server / EF Core
SQLite / PostgreSQL
In-memory(测试)
Snapshots(快照)
如果一个聚合的事件非常多,每次都从头回放会变慢。 Snapshot 会定期保存聚合的“中间状态”,下次加载时只需从快照开始回放后续事件。
EventFlow 的快照是可选的,并支持快照升级。
Sagas(Saga / 进程管理器)
Saga 用来处理跨聚合、跨限界上下文的长事务流程。 它通过监听事件、发送命令,来协调多个聚合之间的协作。
Queries(查询)
Query 是对“我要查什么”的抽象描述,本身不关心怎么查。 具体执行逻辑由 Query Handler 决定,通常直接作用于读模型。
Jobs(作业 / 延时任务)
Jobs 用于执行延迟或定时任务,例如:
几分钟后发送命令
定时补偿操作
EventFlow 内置支持与 Hangfire 等调度器集成。
Event Upgrade(事件升级)
事件一旦写入 Event Store 就永远不会被修改。 当事件结构演进时,EventFlow 通过 Event Upgrader 在加载聚合时,把旧事件升级为新事件。
Event Publishing(事件发布)
有时领域事件不仅要在系统内部使用,还需要被其他系统或服务消费。 EventFlow 支持将事件发布到外部消息系统,如:
RabbitMQ
Metadata(元数据)
元数据是附加在事件上的上下文信息,例如:
用户 IP
操作人
请求来源
EventFlow 提供多种内置元数据提供器,开箱即用。
Value Objects(值对象)
值对象用于表达没有身份、只有值的领域概念,如:
用户名
金额
它们通常是不可变的,并负责自身的合法性校验。
使用示例
[Test]
public async Task Example()
{
// 将 EventFlow 与示例中用到的所有类型进行注册。
// 这里手动添加了事件、命令、命令处理器等,
// 实际上也可以使用更简单的 AddDefaults(Assembly) 一次性注册。
var serviceCollection = new ServiceCollection()
.AddLogging()
.AddEventFlow(o => o
.AddEvents(typeof(ExampleEvent))
.AddCommands(typeof(ExampleCommand))
.AddCommandHandlers(typeof(ExampleCommandHandler))
.UseInMemoryReadStoreFor<ExampleReadModel>());
using (var serviceProvider = serviceCollection.BuildServiceProvider())
{
// 为聚合根创建一个新的唯一标识
var exampleId = ExampleId.New;
// 从容器中解析命令总线,并通过它发布一个命令
var commandBus = serviceProvider.GetRequiredService<ICommandBus>();
await commandBus.PublishAsync(
new ExampleCommand(exampleId, 42), CancellationToken.None);
// 从容器中解析查询处理器,
// 使用内置的“按 ID 查询读模型”的查询,
// 获取表示当前聚合状态的读模型
var queryProcessor = serviceProvider.GetRequiredService<IQueryProcessor>();
var exampleReadModel = await queryProcessor.ProcessAsync(
new ReadModelByIdQuery<ExampleReadModel>(exampleId), CancellationToken.None);
// 验证读模型中的数据是否符合预期
exampleReadModel.MagicNumber.Should().Be(42);
}
}
// 聚合根
publicclassExampleAggregate : AggregateRoot<ExampleAggregate, ExampleId>,
IEmit<ExampleEvent>
{
privateint? _magicNumber;
public ExampleAggregate(ExampleId id) : base(id) { }
// 由命令触发调用的业务方法
public void SetMagicNumber(int magicNumber)
{
if (_magicNumber.HasValue)
throw DomainError.With("Magic number already set");
// 通过产生领域事件来表达状态变更
Emit(new ExampleEvent(magicNumber));
}
// 事件溯源过程中用于应用事件的方法
// EventFlow 提供了多种方式来应用事件(例如使用状态对象),
// Apply 方法是其中最简单、最直接的一种
public void Apply(ExampleEvent aggregateEvent)
{
_magicNumber = aggregateEvent.MagicNumber;
}
}
总结
EventFlow 核心全面接口化,具备良好的可配置性与可扩展性,同时不依赖后台线程或工作进程,行为清晰可控。采用 MIT 许可证,对企业级项目友好,既适合快速上手学习 CQRS/ES,也适用于对架构演 进有要求的实际业务系统。

