直接用数据库事务发消息会出问题,因为SaveChanges()后调用消息发送若失败,业务已提交但消息丢失,破坏一致性;Transactional Outbox通过将消息写入同事务的outbox表,再由独立幂等投递器轮询发送来解决。

为什么直接用数据库事务发消息会出问题
在 C# 应用中,你可能写过类似这样的代码:SaveChanges() 之后立刻调用 bus.Publish() 或 producer.SendAsync()。表面看是“先存库再发消息”,但一旦消息发送失败(网络抖动、Broker 不可用、序列化异常),业务已提交,消息却丢了——违反了“要么都成功,要么都不发生”的一致性要求。
Transactional Outbox 的核心思路是:把要发的消息也当作业务数据,写进同一个数据库事务里。消息不是“发出去”,而是“记下来”,后续由一个独立的、幂等的投递器(Outbox Processor)去轮询并转发。
如何在 Entity Framework Core 中建 outbox 表并自动写入
你需要一张 OutboxMessages 表,字段至少包含:Id(GUID)、TypeName(事件全名)、Content(JSON 字符串)、ProcessedAt(NULL 表示未处理)、CreatedAt。关键在于:它必须和你的业务实体共享同一个 DbContext 实例,并在同一个 SaveChanges() 中被插入。
推荐做法是封装一个 OutboxService,在业务逻辑中调用 AddOutboxMessage,内部只是 new 一个 OutboxMessage 并 context.OutboxMessages.Add()。EF Core 会把它当成普通实体参与事务。
public class OutboxMessage
{
public Guid Id { get; set; }
public string TypeName { get; set; } = null!;
public string Content { get; set; } = null!;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? ProcessedAt { get; set; }
}- 确保
OutboxMessagesDbSet 在OnModelCreating中配置了HasIndex(x => x.ProcessedAt).IsDescending(),方便后续查询未处理项 - 不要手动调用
SaveChanges()多次;所有操作(业务实体 + outbox 记录)必须在一次SaveChanges()中完成 - 如果使用 EF Core 7+,可考虑用
SaveChangesAsync(cancellationToken)配合超时控制,避免事务卡死
怎么安全地轮询并投递 outbox 消息
投递器不能和业务应用跑在同一个进程里(否则进程崩溃会导致消息丢失),建议作为独立后台服务(如 .NET Worker Service),或用 Quartz.NET / Hangfire 定时触发。每次只取少量(例如 100 条)ProcessedAt IS NULL 的记录,按 CreatedAt 排序,逐条尝试发送到消息队列(如 RabbitMQ、Kafka)。
重点在于“发送成功后才更新 ProcessedAt”——这步更新也必须走数据库事务,且必须是**同一个数据库连接**(不能新开 DbContext)。否则会出现消息已发、但 DB 更新失败,导致重复投递。
- 使用
SELECT ... FOR UPDATE(PostgreSQL)或UPDLOCK, ROWLOCK(SQL Server)锁定待处理行,防止多个投递器实例并发处理同一条消息 - 投递失败时,应记录日志并跳过该条(不更新
ProcessedAt),下次轮询重试;不要 throw 异常中断整个批次 - Kafka 场景下,可利用事务性 Producer(
InitTransactions+SendOffsetsToTransaction)将 offset 提交与ProcessedAt更新绑定,但实现复杂,多数场景用 DB 事务更稳
常见坑:序列化、重试、幂等性怎么处理
Outbox 表里的 Content 是 JSON,必须保证序列化前后完全一致。别用 System.Text.Json 默认设置——它会忽略 null 字段、按字母序排序属性。务必显式配置 JsonSerializerOptions,并全局复用同一实例:
var options = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};投递失败后的重试天然带来重复风险。解决方案不在 outbox 层,而在消费者端:每条消息带唯一 MessageId(通常就是 outbox 表的 Id),消费者需维护已处理 ID 的去重表(或 Redis Set),收到重复 ID 直接丢弃。
- 不要在 outbox 投递层做“最多一次”或“最少一次”的语义包装——那是传输层的事;outbox 只负责“至少一次”持久化
- 如果业务要求强顺序(如账户余额变更必须严格 FIFO),需在 outbox 查询时加
ORDER BY CreatedAt,并在消息队列端确保单分区/单队列消费 - 清理已投递记录?可以,但必须在确认下游 100% 消费成功(如 Kafka commit offset 后)再删,否则删早了就真丢了
最易被忽略的是:投递器的数据库连接字符串是否启用了连接池?是否设置了合理的 Max Pool Size?高吞吐下连接耗尽会导致投递停滞,而业务库仍在持续写入 outbox,最终填满磁盘。










