0

0

RabbitMQ与PHP

ringa_lee

ringa_lee

发布时间:2018-05-18 11:40:20

|

12231人浏览过

|

来源于php中文网

原创

你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用、通讯的问题而苦恼、挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题。消息服务擅长于解决多系统、异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(rpc)。本文将要介绍的rabbitmq就是当前最主流的消息中间件之一。

RabbitMQ简介

AMQP ,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。网站在: http://www.rabbitmq.com/ 上面有各种语言教程和实例代码

AMPQ协议为了能够满足各种消息队列需求,在概念上比较复杂,了解了这些概念,是使用好RabbitMQ的基础。

立即学习PHP免费学习笔记(深入)”;

vhosts : 虚拟主机

虚拟主机( virtual host ):一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢? RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止 A 组访问 B 组的交换机 / 队列 / 绑定,必须为 A 和 B 分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机 “/” 。

一个RabbitMQ的Server上可以有多个vhosts,用户与权限设置就是依附于vhosts。对一般PHP应用,不需要用户权限设定,直接使用默认就存在的”/”就可以了,用户可以使用默认就存在的”guest”。一个简单的配置示例:

$conn_args = array(   
 'host' => '127.0.0.1',    
 'port' => '5672',    
 'login' => 'guest',   
  'password' => 'guest',    
  'vhost'=>'/');

connection 与 channel : 连接与信道

connection是指物理的连接,一个client与一个server之间有一个连接;一个连接上可以建立多个channel,可以理解为逻辑上的连接。一般应用的情况下,有一个channel就够用了,不需要创建更多的channel。示例代码:

//创建连接和
channel$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {    
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

Exchange 与 routingkey : 交换机 与 路由键

为了将不同类型的message进行区分,设置了Exchange交换机与Route路由两个概念。比如,将A类型的message发送到名为‘C1’的交换机,将类型为B的发送到’C2′的交换机。当客户端连接C1处理队列消息时,取到的就只是A类型message。进一步的,如果A类型message也非常多,需要进一步细化区分,比如某个客户端只处理A类型message中针对K用户的message,routingkey就是来做这个用途的。

$e_name = 'e_linvo'; //交换机名
$k_route = array(0=> 'key_1', 1=> 'key_2'); //路由key
//创建交换机
$ex = new AMQPExchange($channel);$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";for($i=0; 
$ipublish($message . date('H:i:s'), $k_route[i%2])."\n";
}

由以上代码可以看到,发送消息时,只要有“交换机”就够了。至于交换机后面有没有对应的处理队列,发送方是不用管的。routingkey可以是空的字符串。在示例中,我使用了两个key交替发送消息,是为了下面更便于理解routingkey的作用。

对于交换机,有两个重要的概念:

交换机( Exchange ):可以理解成具有路由表的路由程序。每个消息都有一个路由键( routing key ),就是一个简单的字符串。交换机中有一系列的绑定( binding ),即路由规则( routes )。交换机可以有多个。多个队列可以和同一个交换机绑定,同时多个交换机也可以和同一个队列绑定。(多对多的关系)

A,类型。有三种类型:

1. Fanout Exchange (不处理路由键):一个发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。 Fanout 交换机发消息是最快的。

2. Direct Exchange (处理路由键):如果一个队列绑定到该交换机上,并且当前要求路由键为 X ,只有路由键是 X 的消息才会被这个队列转发。

3. Topic Exchange (将路由键和某模式进行匹配,可以理解成模糊处理):路由键的词由 “.” 隔开,符号 “#” 表示匹配 0 个或多个词,符号 “*” 表示匹配不多不少一个词。

类型总结:Fanout类型最简单,这种模型忽略routingkey;Direct类型是使用最多的,使用确定的routingkey。这种模型下,接收消息时绑定’key_1′则只接收key_1的消息;最后一种是Topic,这种模式与Direct类似,但是支持通配符进行匹配,比如: ‘key_*’,就会接受key_1和key_2。Topic貌似美好,但是有可能导致不严谨,所以还是推荐使用Direct。

win8扁平风格返回顶部和在线客服网站侧边栏代码
win8扁平风格返回顶部和在线客服网站侧边栏代码

win8扁平风格返回顶部和在线客服网站侧边栏代码,经常用于企业网站或者商城网站,实现与客户的在线沟通,php中文网推荐下载!

下载

B,持久化。指定了持久化的交换机,在重新启动时才能重建,否则需要客户端重新声明生成才行。

需要特别明确的概念:交换机的持久化,并不等于消息的持久化。只有在持久化队列中的消息,才能持久化;如果没有队列,消息是没有地方存储的;消息本身在投递时也有一个持久化标志的,PHP中默认投递到持久化交换机就是持久的消息,不用特别指定。

4,queue: 队列

讲了这么多,才讲到队列呀。事实上,队列仅是针对接收方(consumer)的,由接收方根据需求创建的。只有队列创建了,交换机才会将新接受到的消息送到队列中,交换机是不会在队列创建之前的消息放进来的。换句话说,在建立队列之前,发出的所有消息都被丢弃了。下面这个图比RabbitMQ官方的图更清楚——Queue是属于ReceiveMessage的一部分。

接下来看一下创建队列及接收消息的示例:

$e_name = 'e_linvo'; //交换机名
$q_name = 'q_linvo'; //队列名
$k_route = ''; //路由key 
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect())
 {    
 die("Cannot connect to the broker!\n");
 }
 $channel = new AMQPChannel($conn);   //创建交换机
 $ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";  
 //创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);$q->setFlags(AMQP_DURABLE); //持久化  
//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; //阻塞模式接收消息
echo "Message:\n";$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  $conn->disconnect();
/** * 消费回调函数 * 处理消息 */
function processMessage($envelope, $queue) 
{    
var_dump($envelope->getRoutingKey);    
$msg = $envelope->getBody();    
echo $msg."\n"; //处理消息}

从上述示例中可以看到,交换机既可以由消息发送端创建,也可以由消息消费者创建。

创建一个队列(line:20)后,需要将队列绑定到交换机上(line:25)队列才能工作,routingkey也是在这里指定的。有的资料上写成bindingkey,其实一回事儿,弄两个名词反倒容易混淆。

消息的处理,是有两种方式:

A,一次性。用 $q->get([...]),不管取到取不到消息都会立即返回,一般情况下使用轮询处理消息队列就要用这种方式;

B,阻塞。用 $q->consum( callback, [...] ) 程序会进入持续侦听状态,每收到一个消息就会调用callback指定的函数一次,直到某个callback函数返回FALSE才结束。

关于callback,这里多说几句: PHP的call_back是支持使用数组的,比如: $c = new MyClass(); $c->counter = 100; $q->consume( array($c,’myfunc’) ) 这样就可以调用自己写的处理类。MyClass中myfunc的参数定义,与上例中processMessage一样就行。

在上述示例中,使用的$routingkey = ”, 意味着接收全部的消息。我们可以将其改为 $routingkey = ‘key_1′,可以看到结果中仅有设置routingkey为key_1的内容了。

注意: routingkey = ‘key_1′ 与 routingkey = ‘key_2′ 是两个不同的队列。假设: client1 与 client2 都连接到 key_1 的队列上,一个消息被client1处理之后,就不会被client2处理。而 routingkey = ” 是另类,client_all绑定到 ” 上,将消息全都处理后,client1和client2上也就没有消息了。

在程序设计上,需要规划好exchange的名称,以及如何使用key区分开不同类型的标记,在消息产生的地方插入发送消息代码。后端处理,可以针对每一个key启动一个或多个client,以提高消息处理的实时性。如何使用PHP进行多线程的消息处理,将在下一节中讲述。

安装erlang依赖的基本环境

#操作系统:CentOS release 6.2yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel java-devel  unixODBC-devel;

Erlang安装方式一:源码编译

访问 官网下载页

wget http://www.erlang.org/download/otp_src_R16B03.tar.gz;tar -zxvf otp_src_R16B03.tar.gz;cd otp_src_R16B03;./configure --prefix=/usr/local/erlang --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac;#不用java编译,故去掉java避免错误make && make install;

配置erlang环境

#vi /etc/profile在文件最后加入:PATH=$PATH:/usr/local/erlang/binexport PATH#source /etc/profile

Erlang安装方式二:YUM安装

安装erlang的YUM源

访问 官网YUM安装教程

#自动安装erlang的YUM源
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpmrpm -Uvh erlang-solutions-1.0-1.noarch.rpm
#或手动安装YUM源
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.ascAdd the following lines to some file in /etc/yum.repos.d/:[erlang-solutions]name=Centos $releasever - $basearch - Erlang Solutionsbaseurl=http://packages.erlang-solutions.com/rpm/centos/$releasever/$basearchgpgcheck=1gpgkey=http://packages.erlang-solutions.com/rpm/erlang_solutions.ascenabled=1

yum erlang

yum -y install erlang;

安装成功检测

安装完后输入“erl”以下提示即为安装成功:

[root@localhost ~]# erlErlang/OTP 18 [erts-7.2] [source-e6dd627] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]Eshell V7.2  (abort with ^G)

 

相关文章

PHP速学教程(入门到精通)
PHP速学教程(入门到精通)

PHP怎么学习?PHP怎么入门?PHP在哪学?PHP怎么学才快?不用担心,这里为大家提供了PHP速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
erlang语言是什么
erlang语言是什么

erlang是一种并发、容错、分布式和动态类型的编程语言。它专门用于构建并发系统,并提供了一个轻量级进程模型来实现并发性。想了解更多erlang的相关内容,可以阅读本专题下面的文章。

408

2024.06.19

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

206

2024.02.23

Java 消息队列与异步架构实战
Java 消息队列与异步架构实战

本专题系统讲解 Java 在消息队列与异步系统架构中的核心应用,涵盖消息队列基本原理、Kafka 与 RabbitMQ 的使用场景对比、生产者与消费者模型、消息可靠性与顺序性保障、重复消费与幂等处理,以及在高并发系统中的异步解耦设计。通过实战案例,帮助学习者掌握 使用 Java 构建高吞吐、高可靠异步消息系统的完整思路。

46

2026.01.28

什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

402

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

249

2023.10.07

什么是中间件
什么是中间件

中间件是一种软件组件,充当不兼容组件之间的桥梁,提供额外服务,例如集成异构系统、提供常用服务、提高应用程序性能,以及简化应用程序开发。想了解更多中间件的相关内容,可以阅读本专题下面的文章。

181

2024.05.11

Golang 中间件开发与微服务架构
Golang 中间件开发与微服务架构

本专题系统讲解 Golang 在微服务架构中的中间件开发,包括日志处理、限流与熔断、认证与授权、服务监控、API 网关设计等常见中间件功能的实现。通过实战项目,帮助开发者理解如何使用 Go 编写高效、可扩展的中间件组件,并在微服务环境中进行灵活部署与管理。

224

2025.12.18

ajax教程
ajax教程

php中文网为大家带来ajax教程合集,Ajax是一种用于创建快速动态网页的技术。通过在后台与服务器进行少量数据交换,Ajax可以使网页实现异步更新。这意味着可以在不重新加载整个网页的情况下,对网页的某部分进行更新。php中文网还为大家带来ajax的相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

166

2023.06.14

Golang 测试体系与代码质量保障:工程级可靠性建设
Golang 测试体系与代码质量保障:工程级可靠性建设

Go语言测试体系与代码质量保障聚焦于构建工程级可靠性系统。本专题深入解析Go的测试工具链(如go test)、单元测试、集成测试及端到端测试实践,结合代码覆盖率分析、静态代码扫描(如go vet)和动态分析工具,建立全链路质量监控机制。通过自动化测试框架、持续集成(CI)流水线配置及代码审查规范,实现测试用例管理、缺陷追踪与质量门禁控制,确保代码健壮性与可维护性,为高可靠性工程系统提供质量保障。

24

2026.02.28

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.4万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号