0

0

如何用 Redis Streams 构建 .NET 事件存储?

煙雲

煙雲

发布时间:2025-10-10 16:12:01

|

782人浏览过

|

来源于php中文网

原创

使用 Redis Streams 可构建高性能 .NET 事件存储,支持按聚合根分隔离事件流、版本控制与消费者组分发。1. 以 events:{id} 为 Key 存储事件流,每条消息含 type、data、timestamp 和 version 字段。2. 利用 StackExchange.Redis 写入和读取事件,通过 StreamAddAsync 和 StreamRangeAsync 操作数据。3. 创建消费者组实现事件广播,StreamReadGroupAsync 拉取消息并用 StreamAcknowledgeAsync 确认处理,确保至少一次交付。4. 使用独立版本键或 Lua 脚本实现乐观锁,防止并发写冲突。该方案具备持久化、高吞吐与水平扩展能力,适用于事件溯源架构。

如何用 redis streams 构建 .net 事件存储?

用 Redis Streams 构建 .NET 事件存储是一种轻量、高性能的实现方式,特别适合需要持久化事件流并支持消费者组处理的场景。Redis Streams 天然支持消息持久化、多消费者组、消息确认机制,非常适合做事件溯源(Event Sourcing)的基础存储。

1. 设计事件存储的基本结构

事件存储的核心是将领域事件按聚合根(Aggregate Root)分类,追加写入事件流。在 Redis 中,可以用 Stream Key 表示一个聚合实例的事件流,例如:

events:order-12345

每个事件作为一条消息写入该 Stream,包含事件类型、时间戳和序列化后的数据。

每条消息字段建议包括:

  • type: 事件类型(如 OrderCreated)
  • data: JSON 序列化的事件内容
  • timestamp: 发生时间
  • version: 聚合版本号,用于乐观并发控制

2. 在 .NET 中使用 StackExchange.Redis 操作 Streams

安装 NuGet 包:

Install-Package StackExchange.Redis

连接 Redis 并写入事件:

var redis = ConnectionMultiplexer.Connect("localhost");
var db = redis.GetDatabase();

var streamKey = "events:order-12345";
var eventId = await db.StreamAddAsync(streamKey, new NameValueEntry[]
{
    new ("type", "OrderCreated"),
    new ("data", JsonSerializer.Serialize(new { OrderId = "12345", Amount = 100 })),
    new ("timestamp", DateTime.UtcNow.ToString("o")),
    new ("version", "1")
});

读取某个聚合的所有事件:

Question AI
Question AI

一款基于大模型的免费的AI问答助手、总结器、AI搜索引擎

下载
var entries = await db.StreamRangeAsync(streamKey, null, null);
var events = entries.Select(entry => new {
    Type = entry["type"],
    Data = entry["data"],
    Version = int.Parse(entry["version"])
}).ToList();

3. 支持消费者组处理事件(用于事件分发)

如果需要将事件通知给多个服务,可以创建消费者组:

// 创建消费者组(首次执行)
await db.StreamCreateConsumerGroupAsync(streamKey, "payments-service", "$");

// 拉取未处理的消息
var messages = await db.StreamReadGroupAsync(streamKey, "payments-service", "consumer-1", 10, ">");
foreach (var message in messages)
{
    // 处理业务逻辑
    Console.WriteLine($"Received: {message["type"]}");

    // 确认消息处理完成
    await db.StreamAcknowledgeAsync(streamKey, "payments-service", message.Id);
}

这种模式支持水平扩展,多个消费者可并行处理不同消息,同时保证每条消息只被组内一个消费者处理。

4. 添加版本控制与并发检查

写入事件前应检查当前版本,防止并发冲突:

可通过 StreamLength 或存储一个单独的版本键(如 version:order-12345)来管理版本号。

写入时先获取当前长度或版本:

var currentVersion = await db.StringGetAsync($"version:{aggregateId}");
if (expectedVersion != (long)currentVersion)
    throw new ConcurrencyException();

// 写入事件
await db.StreamAddAsync(streamKey, entries);

// 更新版本
await db.StringSetAsync($"version:{aggregateId}", expectedVersion + 1);

也可通过 Lua 脚本原子化操作:校验版本 + 写入事件 + 自增版本。

基本上就这些。Redis Streams 提供了足够基础的能力,配合 .NET 的序列化和异步处理,可以快速搭建一个高效、可靠的事件存储。关键是设计好 Key 结构、版本控制和消费者组的使用策略。

相关专题

更多
json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

411

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

533

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

309

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

74

2025.09.10

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

970

2023.11.02

内存数据库有哪些
内存数据库有哪些

内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

631

2023.11.14

mongodb和redis哪个读取速度快
mongodb和redis哪个读取速度快

redis 的读取速度比 mongodb 更快。原因包括:1. redis 使用简单的键值存储,而 mongodb 存储 json 格式的数据,需要解析和反序列化。2. redis 使用哈希表快速查找数据,而 mongodb 使用 b-tree 索引。因此,redis 在需要高性能读取操作的应用程序中是一个更好的选择。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

475

2024.04.02

redis怎么做缓存服务器
redis怎么做缓存服务器

redis 作为缓存服务器的答案:redis 是一款开源、高性能、分布式的键值存储,可作为缓存服务器使用。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

398

2024.04.07

C++ 单元测试与代码质量保障
C++ 单元测试与代码质量保障

本专题系统讲解 C++ 在单元测试与代码质量保障方面的实战方法,包括测试驱动开发理念、Google Test/Google Mock 的使用、测试用例设计、边界条件验证、持续集成中的自动化测试流程,以及常见代码质量问题的发现与修复。通过工程化示例,帮助开发者建立 可测试、可维护、高质量的 C++ 项目体系。

8

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.3万人学习

Redis+MySQL数据库面试教程
Redis+MySQL数据库面试教程

共72课时 | 6.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号