0

0

Python怎样操作消息队列?pika连接RabbitMQ

蓮花仙者

蓮花仙者

发布时间:2025-08-04 14:37:01

|

271人浏览过

|

来源于php中文网

原创

使用python通过pika操作rabbitmq的核心步骤为:1. 建立连接(blockingconnection);2. 创建通道(channel);3. 声明持久化队列(queue_declare,durable=true);4. 发布消息时设置消息持久化(delivery_mode=2);5. 消费者手动确认消息(auto_ack=false,basic_ack)。选择rabbitmq因其基于amqp协议,具备高可靠性、丰富的交换机类型和成熟生态,适合需要复杂路由与消息不丢失的场景。pika的同步模式(blockingconnection)适用于简单脚本,逻辑直观但阻塞线程;异步模式(如selectconnection)适用于高并发服务,通过事件循环提升吞吐量,但编程复杂度更高。消息持久化需同时设置队列和消息的durable与delivery_mode=2,确保服务重启后消息可恢复;确认机制通过关闭auto_ack并手动调用basic_ack实现,保证消息被成功处理前不会丢失,支持“至少一次”投递,要求消费者具备幂等性。完整实现包括生产者发送5条消息并休眠,消费者接收后模拟处理耗时并发送确认,确保消息可靠传递与处理。

Python怎样操作消息队列?pika连接RabbitMQ

Python操作消息队列,Pika连接RabbitMQ,这组合在很多后端系统里简直是标配。它提供了一种可靠的异步通信机制,让不同服务间解耦,处理高并发任务变得游刃有余。通过Pika库,Python应用可以轻松地发布消息到队列,也能消费队列中的消息,实现服务间的有效协作。

解决方案

要用Python通过Pika操作RabbitMQ,核心步骤围绕着连接(Connection)、通道(Channel)、声明队列/交换机、发布消息和消费消息。最直接的方式是使用Pika的

BlockingConnection
,它简单易用,适合快速开发和非高并发场景。

生产者(发布消息)示例:

立即学习Python免费学习笔记(深入)”;

import pika
import time

# 连接RabbitMQ服务器
# 这里假设RabbitMQ运行在本地,没有用户名密码
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列,如果队列不存在则创建。durable=True表示队列持久化
# 即使RabbitMQ重启,队列也不会丢失
channel.queue_declare(queue='my_queue', durable=True)

message_count = 0
while message_count < 5:
    message = f"Hello World! Message number {message_count}"
    # 发布消息到默认交换机,路由键为队列名
    # delivery_mode=2表示消息持久化,即使RabbitMQ重启,消息也不会丢失
    channel.basic_publish(
        exchange='',
        routing_key='my_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 使消息持久化
        )
    )
    print(f" [x] Sent '{message}'")
    message_count += 1
    time.sleep(1) # 模拟发送间隔

connection.close()

消费者(消费消息)示例:

import pika
import time

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明相同的队列,确保消费者知道要从哪个队列取消息
channel.queue_declare(queue='my_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    """
    消息处理回调函数
    ch: channel对象
    method: 包含消息的 delivery tag 等信息
    properties: 消息属性
    body: 消息体
    """
    print(f" [x] Received '{body.decode()}'")
    time.sleep(body.count(b'.')) # 模拟处理消息的耗时
    # 消息处理完成后,发送确认回执
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(" [x] Done")

# 设置QoS (Quality of Service),每次只分发一条消息给消费者
# 这样可以防止一个消费者处理速度慢,导致所有消息堆积在它那里
channel.basic_qos(prefetch_count=1)

# 开始消费消息,no_ack=False表示需要手动发送确认回执
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)

# 启动消费循环,会一直阻塞直到连接关闭
channel.start_consuming()

为什么选择RabbitMQ作为消息队列?

我个人觉得,RabbitMQ就像是消息队列界的“老黄牛”,它稳定、可靠、功能全面,是很多企业级应用的首选。你可能听说过Kafka在高吞吐量大数据场景的优势,但对于需要复杂路由、高可靠性、并且消息处理量并非天文数字的业务,RabbitMQ的优势就凸显出来了。

它基于AMQP(Advanced Message Queuing Protocol)协议,这个协议本身就为消息的可靠传输、事务性、路由等提供了强大的保障。这意味着,当你的系统对消息丢失零容忍时,RabbitMQ能给你足够的信心。它的交换机(Exchange)类型非常丰富,比如直连(Direct)、扇出(Fanout)、主题(Topic)、头部(Headers),可以满足各种复杂的路由需求。想象一下,你有一个订单系统,新订单消息可能需要同时通知库存、物流和客户服务部门,通过RabbitMQ的Topic交换机,一条消息就能精准地分发给所有相关方,这可比你手动维护多个HTTP请求或者数据库触发器要优雅和高效得多。

而且,RabbitMQ的社区非常活跃,文档也相当完善,遇到问题很容易找到解决方案。对于Python开发者来说,Pika库的支持也很好,虽然Pika的API有时候看起来有点“原生”,需要对AMQP概念有一定理解,但这正是它强大和灵活的体现。它的成熟度,让它在很多关键业务场景下,成为一个让人放心的选择。

Pika库的异步与同步模式有何不同?

Pika库提供了两种主要的工作模式:同步模式(

BlockingConnection
)和异步模式(如
SelectConnection
TornadoConnection
等)。这两种模式的选择,很大程度上取决于你的应用场景和对性能、并发处理的需求。

想象一下,你是个餐厅服务员。

同步模式 (

BlockingConnection
) 就像你一次只服务一位客人。你接到一个点餐请求,就一直等到菜做好、客人吃完、结账,你才去接下一个客人的请求。这种模式简单直接,逻辑清晰,不容易出错。对于那些一次只处理少量消息、或者在脚本中一次性发送一批消息然后退出的场景,
BlockingConnection
是完美的。它会阻塞当前线程,直到操作完成。比如,一个简单的日志收集脚本,把日志发到RabbitMQ,用
BlockingConnection
就足够了,写起来也很顺手。

异步模式 (

SelectConnection
,
TornadoConnection
等)
则像你同时服务多位客人。你接到点餐请求后,不是傻等,而是把点餐单交给厨房,然后立刻去接其他客人的请求,或者去处理其他事务(比如倒水、收拾桌子)。当厨房通知你菜好了,你再回来处理之前的点餐。这种模式复杂一些,因为你需要管理多个并发的事件,但它的效率非常高,不会因为一个耗时操作而阻塞整个应用。对于Web服务(如Django、Flask应用)、长连接服务、或者需要处理大量并发消息的消费者来说,异步模式是必不可少的。它能让你的应用在等待I/O(比如网络传输)的时候,去处理其他事情,大大提升了吞吐量和响应速度。当然,这也意味着你需要更深入地理解Python的异步编程模型,比如回调函数、事件循环等。虽然学习曲线可能陡峭一点,但一旦掌握,它能让你的Python应用在处理消息队列时如虎添翼。

消息的持久化与确认机制在Pika中如何实现?

在生产环境中,消息的持久化和确认机制是确保消息不丢失的关键。这两点在Pika中都有明确的实现方式,它们共同构筑了RabbitMQ“至少一次”消息投递的可靠性保障。

消息持久化:

InstantMind
InstantMind

AI思维导图生成器,支持30+文件格式一键转换,包括PDF、Word、视频等。

下载

消息持久化分为两个层面:队列持久化消息持久化

  1. 队列持久化: 当你声明队列时,将

    durable
    参数设为
    True

    channel.queue_declare(queue='my_queue', durable=True)

    这样做是为了防止RabbitMQ服务器重启后,你创建的队列消失。如果队列是非持久化的,服务器一重启,队列就不在了,即使里面有持久化的消息,也无处可寻了。

  2. 消息持久化: 当你发布消息时,通过

    pika.BasicProperties
    设置
    delivery_mode=2

    channel.basic_publish(
        exchange='',
        routing_key='my_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 2表示消息持久化
        )
    )

    这告诉RabbitMQ,这条消息需要写入磁盘。这样,即使在消息到达消费者并被确认之前,RabbitMQ服务器突然崩溃,重启后这条消息也能从磁盘中恢复,并重新投递。

需要注意的是,即使消息和队列都持久化了,也不能保证100%不丢消息。比如,在消息到达RabbitMQ并写入磁盘的极短时间内,如果服务器崩溃,消息可能还是会丢失。对于极端高可靠性的场景,你可能还需要结合发布者确认(Publisher Confirms)机制。

消息确认机制(Acknowledgements):

这是消费者端确保消息被成功处理的关键。当消费者从队列中获取一条消息后,它需要向RabbitMQ发送一个“确认回执”(Acknowledgement)。

  1. 关闭自动确认:

    basic_consume
    时,将
    auto_ack
    参数设为
    False

    channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)

    默认情况下,

    auto_ack
    True
    ,这意味着RabbitMQ一旦把消息发给消费者,就认为消息已经被成功处理并从队列中删除。这显然是不安全的。

  2. 手动发送确认回执: 在你的消息回调函数中,当消息被成功处理后,调用

    channel.basic_ack()

    def callback(ch, method, properties, body):
        # ... 处理消息的逻辑 ...
        ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息

    delivery_tag
    是RabbitMQ分配给每条消息的唯一标识。通过发送确认回执,RabbitMQ就知道这条消息已经被消费者成功处理,可以安全地从队列中删除了。

如果消费者在处理消息过程中崩溃,或者没有发送确认回执,RabbitMQ会认为这条消息没有被成功处理,并在消费者重新连接或有其他消费者可用时,将这条消息重新投递给其他消费者。这保证了消息的“至少一次”投递:消息可能被投递多次,但绝不会丢失。当然,这也意味着你的消费者需要具备幂等性,即多次处理同一条消息不会产生副作用。

你也可以使用

basic_nack
(否定确认)或
basic_reject
来拒绝消息。
basic_nack
更灵活,可以指定是否将消息重新放回队列(
requeue=True/False
),而
basic_reject
通常用于彻底拒绝一条消息,且只能拒绝一条。在实际应用中,根据业务逻辑选择合适的确认方式,是构建健壮消息系统的关键。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

Python Flask框架
Python Flask框架

本专题专注于 Python 轻量级 Web 框架 Flask 的学习与实战,内容涵盖路由与视图、模板渲染、表单处理、数据库集成、用户认证以及RESTful API 开发。通过博客系统、任务管理工具与微服务接口等项目实战,帮助学员掌握 Flask 在快速构建小型到中型 Web 应用中的核心技能。

86

2025.08.25

Python Flask Web框架与API开发
Python Flask Web框架与API开发

本专题系统介绍 Python Flask Web框架的基础与进阶应用,包括Flask路由、请求与响应、模板渲染、表单处理、安全性加固、数据库集成(SQLAlchemy)、以及使用Flask构建 RESTful API 服务。通过多个实战项目,帮助学习者掌握使用 Flask 开发高效、可扩展的 Web 应用与 API。

72

2025.12.15

kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

167

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

150

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

502

2023.08.10

Golang channel原理
Golang channel原理

本专题整合了Golang channel通信相关介绍,阅读专题下面的文章了解更多详细内容。

248

2025.11.14

Python 自然语言处理(NLP)基础与实战
Python 自然语言处理(NLP)基础与实战

本专题系统讲解 Python 在自然语言处理(NLP)领域的基础方法与实战应用,涵盖文本预处理(分词、去停用词)、词性标注、命名实体识别、关键词提取、情感分析,以及常用 NLP 库(NLTK、spaCy)的核心用法。通过真实文本案例,帮助学习者掌握 使用 Python 进行文本分析与语言数据处理的完整流程,适用于内容分析、舆情监测与智能文本应用场景。

10

2026.01.27

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.3万人学习

Django 教程
Django 教程

共28课时 | 3.6万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号