0

0

Python如何实现消息队列消费_幂等设计

舞姬之光

舞姬之光

发布时间:2026-03-18 11:54:30

|

515人浏览过

|

来源于php中文网

原创

消费端必须考虑幂等性,因为消息队列通常只保证“至少一次”投递,网络抖动、重启等会导致重复消费,若无幂等控制易引发重复扣款、超卖等问题;常见方案包括消息ID+数据库去重、Redis SETNX、业务状态机校验、唯一约束+乐观锁。

python如何实现消息队列消费_幂等设计

为什么消费端必须考虑幂等性

消息队列(如 RabbitMQ、Kafka、RocketMQ)本身不保证“恰好一次”投递,多数场景下是“至少一次”。网络抖动、消费者重启、手动重试、Broker 重发等都可能导致同一条消息被多次投递给消费者。若业务逻辑未做幂等控制,就可能引发重复扣款、重复下单、库存超卖等严重问题。

常见幂等实现方案与适用场景

核心思路:在消费前判断这条消息是否已被处理过。关键在于「唯一标识」和「状态存储」:

  • 消息ID + 数据库去重表:用消息自带的 message_id(或业务生成的全局唯一 ID,如订单号)作为主键,插入前先 INSERT IGNOREON CONFLICT DO NOTHING。适合 MySQL/PostgreSQL 等支持原子插入的数据库。
  • Redis SETNX / SET with NX+EX:以 "msg:{msg_id}" 为 key,设置带过期时间的锁。成功写入即代表首次消费。注意过期时间要略大于业务最大执行耗时,避免误删。
  • 业务状态机校验:不依赖外部存储,而是检查业务实体当前状态是否允许该操作。例如:“只有订单状态为‘待支付’时才允许执行支付”,重复消息到来时因状态已变而直接忽略。
  • 数据库唯一约束 + 乐观锁:在关键业务表中增加唯一字段(如 order_no),配合版本号或时间戳字段做更新校验。失败即说明已处理或冲突,无需重试。

Python 消费端幂等代码示例(以 Redis + Kafka 为例)

以下是一个轻量、可复用的装饰器模式实现:

Jamboss
Jamboss

Jamboss是一款简单的AI音乐生成App,可以一键生成歌曲。

下载
import redis
import json
from functools import wraps
<p>r = redis.Redis(host='localhost', port=6379, db=0)</p><p><span>立即学习</span>“<a href="https://pan.quark.cn/s/00968c3c2c15" style="text-decoration: underline !important; color: blue; font-weight: bolder;" rel="nofollow" target="_blank">Python免费学习笔记(深入)</a>”;</p><p>def idempotent(key_func, expire=300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):</p><h1>生成幂等 key,例如基于消息体中的 order_id</h1><pre class='brush:python;toolbar:false;'>        msg = kwargs.get('message') or args[0] if args else None
        if not msg:
            raise ValueError("message not found in args/kwargs")

        key = key_func(msg)
        if not key:
            raise ValueError("idempotent key is empty")

        # 尝试加锁(SET if not exists + expire)
        ok = r.set(key, "1", nx=True, ex=expire)
        if not ok:
            print(f"[SKIP] message already processed: {key}")
            return None

        try:
            result = func(*args, **kwargs)
            return result
        except Exception as e:
            # 可选:记录异常但不重试,避免死循环
            print(f"[ERROR] processing {key}: {e}")
            raise
    return wrapper
return decorator

使用示例

@idempotent(key_func=lambda msg: f"consume:{json.loads(msg.value()).get('order_id')}", expire=600) def process_order_message(message): data = json.loads(message.value()) order_id = data["order_id"]

执行真实业务逻辑:创建订单、扣减库存等

print(f"Processing order {order_id}")

注意事项与避坑点

  • 幂等 key 必须全局唯一且稳定:不能依赖时间戳、随机数或临时变量;推荐使用业务主键(订单号、流水号)或消息自带的 message_id(需确认 Broker 支持且生产端未丢失)。
  • 存储介质要可靠且低延迟:Redis 是常用选择,但需注意集群模式下 key 分布与事务限制;数据库更持久,但性能开销大,适合对一致性要求极高的场景。
  • 不要在幂等校验后又抛异常导致重复触发:一旦通过幂等检查,后续业务异常应明确区分——是可重试错误(如 DB 连接超时)还是终态错误(如余额不足)。后者应记录并告警,而非反复重试。
  • 消费位点提交时机很重要:建议在业务逻辑执行成功、且幂等记录落库/写 Redis 后再提交 offset,否则可能造成“处理了但位点没提交”,下次重启重复消费。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
mysql修改数据表名
mysql修改数据表名

MySQL修改数据表:1、首先查看数据库中所有的表,代码为:‘SHOW TABLES;’;2、修改表名,代码为:‘ALTER TABLE 旧表名 RENAME [TO] 新表名;’。php中文网还提供MySQL的相关下载、相关课程等内容,供大家免费下载使用。

690

2023.06.20

MySQL创建存储过程
MySQL创建存储过程

存储程序可以分为存储过程和函数,MySQL中创建存储过程和函数使用的语句分别为CREATE PROCEDURE和CREATE FUNCTION。使用CALL语句调用存储过程智能用输出变量返回值。函数可以从语句外调用(通过引用函数名),也能返回标量值。存储过程也可以调用其他存储过程。php中文网还提供MySQL创建存储过程的相关下载、相关课程等内容,供大家免费下载使用。

554

2023.06.21

mongodb和mysql的区别
mongodb和mysql的区别

mongodb和mysql的区别:1、数据模型;2、查询语言;3、扩展性和性能;4、可靠性。本专题为大家提供mongodb和mysql的区别的相关的文章、下载、课程内容,供大家免费下载体验。

287

2023.07.18

mysql密码忘了怎么查看
mysql密码忘了怎么查看

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql密码忘了怎么办呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

520

2023.07.19

mysql创建数据库
mysql创建数据库

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql怎么创建数据库呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

267

2023.07.25

mysql默认事务隔离级别
mysql默认事务隔离级别

MySQL是一种广泛使用的关系型数据库管理系统,它支持事务处理。事务是一组数据库操作,它们作为一个逻辑单元被一起执行。为了保证事务的一致性和隔离性,MySQL提供了不同的事务隔离级别。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

392

2023.08.08

sqlserver和mysql区别
sqlserver和mysql区别

SQL Server和MySQL是两种广泛使用的关系型数据库管理系统。它们具有相似的功能和用途,但在某些方面存在一些显著的区别。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

543

2023.08.11

mysql忘记密码
mysql忘记密码

MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。那么忘记mysql密码我们该怎么解决呢?php中文网给大家带来了相关的教程以及其他关于mysql的文章,欢迎大家前来学习阅读。

668

2023.08.14

Python WebSocket实时通信与异步服务开发实践
Python WebSocket实时通信与异步服务开发实践

本专题聚焦 Python 在实时通信场景中的开发实践,系统讲解 WebSocket 协议原理、长连接管理、消息推送机制以及异步服务架构设计。内容包括客户端与服务端通信实现、连接稳定性优化、消息队列集成及高并发处理策略。通过完整案例,帮助开发者构建高效稳定的实时通信系统,适用于聊天应用、实时数据推送等场景。

3

2026.03.18

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 5.1万人学习

SciPy 教程
SciPy 教程

共10课时 | 2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号