RabbitMQ详解
1.常见消息中间件大PK
1.1 几种协议
1.1.1 JMS
1.1.1.1 JMS 介绍
JMS 全称 Java Message Service
,类似于 JDBC,不同于 JDBC,JMS 是 JavaEE 的消息服务接口
JMS 主要有两个版本:
- 1.1
- 2.0
两者相比,后者主要是简化了收发消息的代码。
考虑到消息中间件是一个非常常用的工具,所以 JavaEE 为此制定了专门的规范 JMS。
不过和 JDBC 一样,JMS 作为规范,他只是一套接口,并不包含具体的实现,如果要使用 JMS,那么一般还需要对应的实现,这就像使用 JDBC 需要对应的驱动一样。
1.1.1.2 JMS 模型
JMS 消息服务支持两种消息模型:
- 点对点或队列模型
- 发布/订阅模型
在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到对应的队列。这是一种点对点的消息模型,这种模式被概括为:
- 只有一个消费者将获得消息。
- 生产者不需要在消费者消费该消息期间处于运行状态,消费者也同样不需要在消息发送时处于运行状态,即消息的生产者和消费者是完全解耦的。
- 每一个成功处理的消息都由消息消费者签收。
发布者/订阅者模型支持向一个特定的消息主题发布消息,消费者则可以定义自己感兴趣的主题,这是一种点对面的消息模型,这种模式可以被概括为:
- 多个消费者可以消费消息。
- 在发布者和订阅者之间存在时间依赖性,发布者需要创建一个订阅(subscription),以便客户能够订阅;订阅者必须保持在线状态以接收消息;当然,如果订阅者创建了持久的订阅,那么在订阅者未连接时,消息生产者发布的消息将会在订阅者重新连接时重新发布。
1.1.1.3 JMS 实现
开源的支持 JMS 的消息中间件有:
- Kafka
- Apache ActiveMQ
- JBoss 社区的 HornetQ
- Joram
- Coridan 的 MantaRay
- OpenJMS
一些商用的支持 JMS 的消息中间件有:
- WebLogic Server JMS
- EMS
- GigaSpaces
- iBus
- IONA JMS
- IQManager(2005 年 8 月被Sun Microsystems并购)
- JMS+
- Nirvana
- SonicMQ
- WebSphere MQ
其实对于日常开发接触较多的,可能就是 Kafka 和 ActiveMQ。
1.1.2 AMQP
1.1.2.1 AMQP 简介
Message Queue 的需求由来已久,80 年代最早在金融交易中,高盛等公司采用 Teknekron 公司的产品,当时的 Message Queue 软件叫做:the information bus(TIB)。TIB 被电信和通讯公司采用,路透社收购了 Teknekron 公司。之后,IBM 开发了 MQSeries,微软开发了 Microsoft Message Queue(MSMQ)。这些商业 MQ 供应商的问题是厂商锁定,价格高昂。2001 年,Java Message Service 试图解决锁定和交互性的问题,但对应用来说反而更加麻烦了。
于是 2004 年,摩根大通和 iMatrix 开始着手 **Advanced Message Queuing Protocol (AMQP)**开放标准的开发。2006 年,AMQP 规范发布。2007 年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。
目前 RabbitMQ 的最新版本为 3.5.7,基于 AMQP 0-9-1。
在 AMQP 协议中,消息收发涉及到如下一些概念:
- Broker: 接收和分发消息的应用,日常所用的 RabbitMQ 就是一个 Message Broker。
- Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 中创建
exchange/queue
等,这个松哥之前写过专门的文章,传送门:RabbitMQ 中的 VirtualHost 该如何理解。 - Connection: publisher/consumer 和 broker 之间的 TCP 连接,断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非出现网络故障或 broker 服务出现问题。
- Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 Thread 创建单独的 Channel 进行通讯,AMQP method 包含了 Channel id 帮助客户端和 Message Broker 识别 Channel,所以 Channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销
- Exchange: Message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (点对点), topic(发布订阅) 以及 fanout (广播)。
- Queue: 消息最终被送到这里等待 Consumer 取走,一个 Message 可以被同时拷贝到多个 queue 中。
- Binding: Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 Exchange 中的查询表中,作为 Message 的分发依据。
1.1.2.2 AMQP 实现
来看看实现了 AMQP 协议的一些具体的消息中间件产品都有哪些。
- Apache Qpid
- Apache ActiveMQ
- RabbitMQ
ActiveMQ 不仅支持 JMS,也支持 AMQP
另外还有大家熟知的阿里出品的 RocketMQ,这个是自定义了一套协议,社区也提供了 JMS,但是不太成熟
1.1.3 MQTT
做物联网开发的小伙伴应该会经常接触这个协议,MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,目前看来算是物联网开发中比较重要的协议之一了,该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和 Actuator(比如通过 Twitter 让房屋联网)的通信协议,它的优点是格式简洁、占用带宽小、支持移动端通信、支持 PUSH、适用于嵌入式系统。
1.1.4 XMPP
XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是一个基于 XML 的协议,多用于即时消息(IM)以及在线现场探测,适用于服务器之间的准即时操作。核心是基于 XML 流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。 它的优点是通用公开、兼容性强、可扩展、安全性高,缺点是 XML 编码格式占用带宽大。
1.1.5 JMS Vs AMQP
对于 Java 工程师而言,大家日常接触较多的应该是 JMS 和 AMQP 协议,既然 JMS 和 AMQP 都是协议,那么两者有什么区别呢?来看下面一张图:
项目 | AMQP | JMS |
---|---|---|
定义 | 线级协议 | Java API |
跨平台 | 是 | 否 |
跨语言 | 是 | 否 |
消息收发模型 | 4种消息收发模型:
| 2种消息收发模型
|
消息类型 | 二进制数据类型 | 5种消息类型
|
消息流 | Producer.将消息发送到 Exchange, Exchange将消息路由到 Queue, Consumer从 Queue中消费消息。 | Producer.将消息发送到 Queue或者 Topic, Consumer从 Queue或Topc中消费消息。 |
1.2 重要产品
1.2.1 ActiveMQ
ActiveMQ 是 Apache 下的一个子项目,使用完全支持 JMS1.1 和 J2EE1.4 规范的 JMS Provider 实现,少量代码就可以高效地实现高级应用场景,并且支持可插拔的传输协议,如:in-VM
, TCP
, SSL
, NIO
, UDP
, multicast
, JGroups and JXTA transports
。
ActiveMQ 支持常用的多种语言客户端如 C++、Java、.Net,、Python、 Php、 Ruby 等。
现在的 ActiveMQ 分为两个版本:
- ActiveMQ Classic
- ActiveMQ Artemis
这里的 ActiveMQ Classic 就是原来的 ActiveMQ,而 ActiveMQ Artemis 是在 RedHat 捐赠的 HornetQ 服务器代码的基础上开发的,两者代码完全不同,后者支持 JMS2.0,使用基于 Netty 的异步 IO,大大提升了性能,更为神奇的是,后者不仅支持 JMS 协议,还支持 AMQP 协议、STOMP 以及 MQTT,可以说后者的玩法相当丰富。
因此大家在使用时,建议直接选择 ActiveMQ Artemis。
1.2.2 RabbitMQ
RabbitMQ 算是 AMQP 体系下最为重要的产品了,它基于 Erlang 语言开发实现,估计很多人被 RabbitMQ 的安装折磨过,松哥建议安装 RabbitMQ 直接用 Docker,省心省力(公号后台回复 docker 有教程)。
RabbitMQ 支持 AMQP、XMPP、SMTP、STOMP 等多种协议,功能强大,适用于企业级开发。
来看一张 RabbitMQ 的结构图:
1.2.3 RocketMQ
RocketMQ 是阿里开源的一款分布式消息中间件,原名 Metaq,从 3.0 版本开始改名为 RocketMQ,是阿里参照 Kafka 设计思想使用 Java 语言实现的一套 MQ。RocketMQ 将阿里内部多款 MQ 产品(Notify、Metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下 MQ 的架构,目前主要用于订单交易系统。
RocketMQ 具有以下特点:
- 保证严格的消息顺序。
- 提供针对消息的过滤功能。
- 提供丰富的消息拉取模式。
- 高效的订阅者水平扩展能力。
- 实时的消息订阅机制。
- 亿级消息堆积能力
对于 Java 工程师而言,这也是一种经常会用到的 MQ。
1.2.4 Kafka
Kafka 是 Apache 下的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作(网页浏览,搜索和其他用户的行动)流数据。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
Kafka 具有以下特性:
- 快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化。
- 高吞吐:在一台普通的服务器上既可以达到 10W/s 的吞吐速率。
- 高堆积:支持 topic 下消费者较长时间离线,消息堆积量大。
- 完全的分布式系统:Broker、Producer、Consumer 都原生自动支持分布式,通过 Zookeeper 可以自动实现更加复杂的负载均衡。
- 支持 Hadoop 数据并行加载。
大数据开发中大家可能会经常接触 Kafka,Java 开发中也会接触,但是相对来说可能接触的少一些。
1.2.5 ZeroMQ
ZeroMQ 号称最快的消息队列系统,它专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZeroMQ 不是单独的服务,而是一个嵌入式库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的 API,应用程序通过加载库文件,调用 API 函数来实现高性能网络通信。
ZeroMQ 的特性:
- 无锁的队列模型:对于跨线程间的交互(用户端和 session)之间的数据交换通道 pipe,采用无锁的队列算法 CAS,在 pipe 的两端注册有异步事件,在读或者写消息到 pipe 时,会自动触发读写事件。
- 批量处理的算法:对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。
- 多核下的线程绑定,无须 CPU 切换:区别于传统的多线程并发模式,信号量或者临界区,ZeroMQ 充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的 CPU 切换开销。
1.2.6 其他
另外还有如 Redis 也能做消息队列,Redis 做普通消息队列和延迟消息队列
1.3. 比较
最后,再来通过一张图来比较下各个消息中间件。
2. RabbitMQ管理界面
2.0 安装
2.0.1 docker安装
拉取镜像
docker pull rabbitmq:3-management
初始化容器
docker run -d -p 5672:5672 -p 15672:15672 --restart=always --name rabbitmq01 rabbitmq:3-management
通信端口 : 5672 管理界面端口 :15672 默认账户密码都是 guest
进入容器
docker exec -it rabbitmq01 /bin/bash
进入容器后的操作:
重置rabbitmq
rabbitmqctl reset
停止rabbitmq
rabbitmqctl stop_app
启动rabbitmq
rabbitmqctl start_app
2.1 概览
- Overview:这里可以概览 RabbitMQ 的整体情况,如果是集群,也可以查看集群中各个节点的情况。包括 RabbitMQ 的端口映射信息等,都可以在这个选项卡中查看。
- Connections:这个选项卡中是连接上 RabbitMQ 的生产者和消费者的情况。
- Channels:这里展示的是“通道”信息,关于“通道”和“连接”的关系
- Exchange:这里展示所有的交换机信息。
- Queue:这里展示所有的队列信息。
- Admin:这里展示所有的用户信息。
2.2 Overview
Totals:
Totals 里面有 准备消费的消息数、待确认的消息数、消息总数以及消息的各种处理速率(发送速率、确认速率、写入硬盘速率等等)。
Nodes:
Nodes 其实就是支撑 RabbitMQ 运行的一些机器,相当于集群的节点。点击每个节点,可以查看节点的详细信息。
Churn statistics:
这个不好翻译,里边展示的是 Connection、Channel 以及 Queue 的创建/关闭速率。
Ports and contexts:
这个里边展示了端口的映射信息以及 Web 的上下文信息。
- 5672 是 RabbitMQ 通信端口。
- 15672 是 Web 管理页面端口。
- 25672 是集群通信端口。
Export definitions && Import definitions:
最后面这两个可以导入导出当前实例的一些配置信息
2.3 Connections
这里主要展示的是当前连接上 RabbitMQ 的信息,无论是消息生产者还是消息消费者,只要连接上来了这里都会显示出来。
注意协议中的 AMQP 0-9-1 指的是 AMQP 协议的版本号。
其他属性含义如下:
- User name:当前连接使用的用户名。
- State:当前连接的状态,running 表示运行中;idle 表示空闲。
- SSL/TLS:表示是否使用 ssl 进行连接。
- Channels:当前连接创建的通道总数。
- From client:每秒发出的数据包。
- To client:每秒收到的数据包。
点击连接名称可以查看每一个连接的详情。
在详情中可以查看每一个连接的通道数以及其他详细信息,也可以强制关闭一个连接。
2.4 Channels
这个地方展示的是通道的信息:
--- 图片
那么什么是通道呢?
一个连接(IP)可以有多个通道,如上图,一共是两个连接,但是一共有 12 个通道。
一个连接可以有多个通道,这个多个通道通过多线程实现,一般情况下,在通道中创建队列、交换机等。
生产者的通道一般会立马关闭;消费者是一直监听的,通道几乎是会一直存在。
上面各项参数含义分别如下:
- Channel:通道名称。
- User name:该通道登录使用的用户名。
- Model:通道确认模式,C 表示 confirm;T 表示事务。
- State:通道当前的状态,running 表示运行中;idle 表示空闲。
- Unconfirmed:待确认的消息总数。
- Prefetch:Prefetch 表示每个消费者最大的能承受的未确认消息数目,简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了。总的来说,消费者负责不断处理消息,不断 ack,然后只要 unAcked 数少于 prefetch * consumer 数目,RabbitMQ 就不断将消息投递过去。
- Unacker:待 ack 的消息总数。
- publish:消息生产者发送消息的速率。
- confirm:消息生产者确认消息的速率。
- unroutable (drop):表示未被接收,且已经删除了的消息。
- deliver/get:消息消费者获取消息的速率。
- ack:消息消费者 ack 消息的速率。
2.5 Exchange
这个地方展示交换机信息:
-- 图片
这里会展示交换机的各种信息。
Type 表示交换机的类型。
Features 有两个取值 D 和 I。
D 表示交换机持久化,将交换机的属性在服务器内部保存,当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。
I 表示这个交换机不可以被消息生产者用来推送消息,仅用来进行交换机和交换机之间的绑定。
Message rate in 表示消息进入的速率。Message rate out 表示消息出去的速率。
点击下方的 Add a new exchange 可以创建一个新的交换机。
2.6 Queue
这个选项卡就是用来展示消息队列的:
-- 图片
各项含义如下:
- Name:表示消息队列名称。
- Type:表示消息队列的类型,除了上图的 classic,另外还有一种消息类型是 Quorum。两个区别如下图:
- Features:表示消息队列的特性,D 表示消息队列持久化。
- State:表示当前队列的状态,running 表示运行中;idle 表示空闲。
- Ready:表示待消费的消息总数。
- Unacked:表示待应答的消息总数。
- Total:表示消息总数 Ready+Unacked。
- incoming:表示消息进入的速率。
- deliver/get:表示获取消息的速率。
- ack:表示消息应答的速率。
点击下方的 Add a new queue 可以添加一个新的消息队列。
点击每一个消息队列的名称,可以进入到消息队列中。进入到消息队列后,可以完成对消息队列的进一步操作,例如:
- 将消息队列和某一个交换机进行绑定。
- 发送消息。
- 获取一条消息。
- 移动一条消息(需要插件的支持)。
- 删除消息队列。
- 清空消息队列中的消息。
如下图:
-- 图片
2.7 Admin
这里是做一些用户管理操作,如下图:
-- 图片
各项属性含义如下:
- Name:表示用户名称。
- Tags:表示角色标签,只能选取一个。
- Can access virtual hosts:表示允许进入的虚拟主机。
- Has password:表示这个用户是否设置了密码。
常见的两个操作时管理用户和虚拟主机。
点击下方的 Add a user 可以添加一个新的用户,添加用户的时候需要给用户设置 Tags,其实就是用户角色,如下:
- none:不能访问 management plugin
- management:用户可以通过 AMQP 做的任何事 列出自己可以通过 AMQP 登入的 virtual hosts 查看自己的 virtual hosts 中的 queues, exchanges 和 bindings 查看和关闭自己的 channels 和 connections 查看有关自己的 virtual hosts 的“全局”的统计信息,包含其他用户在这些 virtual hosts 中的活动
- policymaker:management 可以做的任何事 查看、创建和删除自己的 virtual hosts 所属的 policies 和 parameters
- monitoring:management 可以做的任何事 列出所有 virtual hosts,包括他们不能登录的 virtual hosts 查看其他用户的 connections 和 channels 查看节点级别的数据如 clustering 和 memory 使用情况 查看真正的关于所有 virtual hosts 的全局的统计信息
- administrator:policymaker 和 monitoring 可以做的任何事 创建和删除 virtual hosts 查看、创建和删除 users 查看创建和删除 permissions 关闭其他用户的 connections
- impersonator(模拟者) 模拟者,无法登录管理控制台。
另外,这里也可以进行虚拟主机 virtual host 的操作,后面小节会和大家介绍虚拟主机。
3. RabbitMQ 七种消息收发方式
3.1 RabbitMQ 架构简介
这张图中涉及到如下一些概念:
- 生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。
- 交换机(Exchange):和生产者建立连接并接收生产者的消息。
- 消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。
- 队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。
- 路由(Routes):交换机转发消息到队列的规则。
3.2 七种消息分发策略
大家知道,RabbitMQ 是 AMQP 阵营里的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
在 application.properties 中配置 RabbitMQ 的基本连接信息,如下:
spring.rabbitmq.host=101.43.23.183
# 5672 为通信端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
RabbitMQ 官网介绍了如下七种消息分发的形式:
3.3 (1)基本消息收发 Hello World
这个其实是默认的交换机,需要提供一个生产者一个队列以及一个消费者。消息传播图如下:
消费者:
队列配置:
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "lc_queue";
@Bean
public Queue myQueue() {
//参数解释:
//1、队列的名称
//2、消息是否持久化
//3、该队列是否具有排他性,如果有排他性则只有被其创建的Connection处理
//4、是否自动删除,若改队列没有被监听,则删除
return new Queue(QUEUE_NAME, true, false, false);
}
}
队列监听配置
@Component
public class MsgReceiver {
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(String msg) {
System.out.println("消息消费:"+msg);
}
}
生产者:
向队列发送消息
@SpringBootTest
class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("lc_queue","hello world!");
}
}
这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key
相同的 Queue 上,例如消息队列名为 “lc_queue”,则 routingkey 为 “lc_queue” 的消息会被该消息队列接收。
3.4 (2)WorkQueues
一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
3.4.1 并发能力
先来看并发能力的配置,如下:
消费者配置:
队列配置
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "lc_queue";
@Bean
public Queue myQueue() {
//参数解释:
//1、队列的名称
//2、消息是否持久化,重启该连接,消息依然存在。
//3、该队列是否具有排他性,如果有排他性则只有被其创建的Connection处理
//4、是否自动删除,若该队列没有被监听,则删除
return new Queue(QUEUE_NAME, true, false, false);
}
}
- 关于排他性,如果设置为 true,则该消息队列只有创建它的 Connection 才能访问,其他的 Connection 都不能访问该消息队列,如果试图在不同的连接中重新声明或者访问排他性队列,那么系统会报一个资源被锁定的错误。另一方面,对于排他性队列而言,当连接断掉的时候,该消息队列也会自动删除(无论该队列是否被声明为持久性队列都会被删除)。
队列监听配置
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(String msg) {
System.out.println("handleMsg消息消费:"+msg);
}
/**
* queues:监听指定队列名称
* concurrency: 指定消费消息的线程数,即开启20个子线程消费消息
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME, concurrency = "20")
public void handleMsg01(String msg) {
System.out.println("handleMsg01消息消费:" + msg+"===>"+Thread.currentThread().getName());
}
生产者配置:
向队列发送消息10条消息
@SpringBootTest
class Publisher01ApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void Test(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("lc_queue", "hello world!"+i);
}
}
}
消息处理结果:
当监听队列没有配置
concurrency
时,两个监听线程同时争抢消费消息,几率几乎相同。当监听队列配置
concurrency="20"
,即开启20个线程争抢消息,此时该监听的队列处理的消息几率大大提高- 该连接下总共有
21
个线程(通道
)去消费
- 该连接下总共有
3.4.2 手动确认
修改配置文件
# 手动确认ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
修改监听消息队列
package com.example.consumer_01.receiver;
import com.example.consumer_01.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MsgReceiver {
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(Message msg, Channel channel) throws IOException {
System.out.println("handleMsg消息消费:"+msg.getPayload());
//确认消息,告诉RabbitMQ,这条消息已确认消费。
channel.basicAck((Long) msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG), false);
}
/**
* queues:监听指定队列名称
* concurrency: 指定消费消息的线程数,即开启20个子线程消费消息
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME, concurrency = "20")
public void handleMsg01(Message msg, Channel channel) throws IOException {
//拒接消费该消息。
// requeue参数:true 表示拒绝的消息是重新进入队列,而不是丢弃
channel.basicReject((Long) msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG), true);
}
}
消费结果:
所有的消息均被handleMsg监听队列消费
handleMsg消息消费:hello world!0
handleMsg消息消费:hello world!2
handleMsg消息消费:hello world!8
handleMsg消息消费:hello world!6
handleMsg消息消费:hello world!5
handleMsg消息消费:hello world!4
handleMsg消息消费:hello world!9
handleMsg消息消费:hello world!3
handleMsg消息消费:hello world!7
handleMsg消息消费:hello world!1
3.5 (3)Publish/Subscribe
再来看发布订阅模式,这种情况是这样:
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:
这种情况下,有四种交换机可供选择,分别是:
- Direct
- Fanout
- Topic
- Header
3.5.1 Direct
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key
相同的 Queue 上,一般交换机-路由-队列
是一对一的。
如果只创建队列,而不创建交换机和绑定关系,则默认使用的是直连交换机,并且绑定的路由键为队列名称
消费者
队列-交换机-路由 配置
@Configuration
public class RabbitConfig {
//定义队列名称
public static final String DIRECT_QUEUE_NAME1 = "direct_name_1";
public static final String DIRECT_QUEUE_NAME2 = "direct_name_2";
//定义交换机名称
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";
//定义路由
public static final String ROUTEING_NAME1 = "direct_routing1";
public static final String ROUTEING_NAME2 = "direct_routing2";
//创建队列
@Bean
public Queue queue1() {
return new Queue(DIRECT_QUEUE_NAME1, true, false, false);
}
//创建队列
@Bean
public Queue queue2() {
return new Queue(DIRECT_QUEUE_NAME2, true, false, false);
}
//创建直连交换机
@Bean
public DirectExchange directExchange() {
//name 交换机名称
//durable 是否持久化,即重启服务,交换机是否存在
//autoDelete 是否自动删除。没有队列绑定时,是否自动删除
return new DirectExchange(DIRECT_EXCHANGE_NAME,true,false);
}
//创建 队列-交换机-路由 绑定关系
@Bean
public Binding directBinding1() {
return BindingBuilder
//绑定队列
.bind(queue1())
//绑定交换机
.to(directExchange())
//绑定路由
.with(ROUTEING_NAME1);
}
@Bean
public Binding directBinding2() {
return BindingBuilder
//绑定队列
.bind(queue2())
//绑定交换机
.to(directExchange())
//绑定路由
.with(ROUTEING_NAME2);
}
}
监听队列配置
@Component
public class MsgReceiver {
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_NAME1)
public void handleMsg1(String msg){
System.out.println("handleMsg1消息消费:"+msg);
}
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_NAME2)
public void handleMsg2(String msg){
System.out.println("handleMsg2消息消费:"+msg);
}
}
生产者配置
@SpringBootTest
class Publisher01ApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void Test1(){
rabbitTemplate.convertAndSend("direct_exchange","direct_routing1","hello world!!!");
rabbitTemplate.convertAndSend("direct_exchange","direct_routing2","hello world!!!");
}
}
结果
handleMsg1消息消费:hello world! handleMsg2消息消费:hello world!
3.5.2 Fanout
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中routingkey 将不起任何作用
消费者
队列-交换机 绑定关系配置
@Configuration
public class FanoutConfig {
//定义队列名称
public static final String QUEUE_NAME1 = "fanout_queue_1";
public static final String QUEUE_NAME2 = "fanout_queue_2";
//定义交换机名称
public static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";
@Bean
public Queue fanoutQueue1() {
return new Queue(QUEUE_NAME1, true, false, false);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(QUEUE_NAME2, true, false, false);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE_NAME, true, false);
}
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
监听队列配置
@Component
public class FanoutListener {
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = FanoutConfig.QUEUE_NAME1)
public void handleMsg1(String msg){
System.out.println("handleMsg1消息消费:"+msg);
}
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = FanoutConfig.QUEUE_NAME2)
public void handleMsg2(String msg){
System.out.println("handleMsg2消息消费:"+msg);
}
}
生产者
@SpringBootTest
class Publisher01ApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void Test2(){
rabbitTemplate.convertAndSend("fanout_exchange", null, "hello world!");
}
}
结果
handleMsg1消息消费:hello world! handleMsg2消息消费:hello world!
3.5.3 Topic
TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。
消费者
队列-交换机-路由规则 绑定配置
@Configuration
public class TopicConfig {
//定义队列名称
public static final String XIAOMI_QUEUE_NAME = "xiaomi_queue_name";
public static final String HUAWEI_QUEUE_NAME = "huawei_queue_name";
public static final String PHONE_QUEUE_NAME = "phone_queue_name";
//定义交换机名称
public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
@Bean
public Queue xiaomiQueue() {
return new Queue(XIAOMI_QUEUE_NAME, true, false, false);
}
@Bean
public Queue huaweiQueue() {
return new Queue(HUAWEI_QUEUE_NAME, true, false, false);
}
@Bean
public Queue phoneQueue() {
return new Queue(PHONE_QUEUE_NAME, true, false, false);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME,true,false);
}
@Bean
public Binding xiomiBinding() {
return BindingBuilder.bind(xiaomiQueue())
.to(topicExchange())
//表示消息的 routingkey 凡是以 “xiaomi” 或者 “xiaomi.” 开头的,都将被路由到名称为 “xiaomi_queue_name” 的 Queue 上
.with("xiaomi.#");
}
@Bean
public Binding huaweiBinding() {
return BindingBuilder.bind(huaweiQueue())
.to(topicExchange())
//表示消息的 routingkey 凡是以 “huawei” 或者 “huawei.” 开头的,都将被路由到名称为 “huawei_queue_name” 的 Queue 上
.with("huawei.#");
}
@Bean
public Binding phoneBinding() {
return BindingBuilder.bind(phoneQueue())
.to(topicExchange())
//表示消息的 routingkey 中凡是包含 “.phone” 或者 “phone.” 或者 “phone” 的,都将被路由到名称为 “phone_queue_name” 的 Queue 上
.with("#.phone.#");
}
}
监听队列配置
@Component
public class TopicListener {
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = TopicConfig.XIAOMI_QUEUE_NAME)
public void xiaomiHandleMsg(String msg){
System.out.println("xiaomi消息消费:"+msg);
}
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = TopicConfig.HUAWEI_QUEUE_NAME)
public void huaweiHandleMsg(String msg){
System.out.println("huawei消息消费:"+msg);
}
/**
* queues:监听指定队列名称
*/
@RabbitListener(queues = TopicConfig.PHONE_QUEUE_NAME)
public void phoneHandleMsg(String msg){
System.out.println("phone消息消费:"+msg);
}
}
生产者
@SpringBootTest
class Publisher01ApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void Test3(){
//xiaomi消息消费:小米
rabbitTemplate.convertAndSend("topic_exchange","xiaomi","小米");
//phone消息消费:手机
rabbitTemplate.convertAndSend("topic_exchange","phone","手机");
//phone消息消费:华为—手机
//huawei消息消费:华为—手机
rabbitTemplate.convertAndSend("topic_exchange","huawei.phone","华为—手机");
}
}
3.5.4 Header
HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关
消费者
队列-交换机-绑定关系配置
package com.example.consumer_02.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HeaderConfig {
//定义队列名称
public static final String AGE_HEADER_QUEUE_NAME = "age_header_queue_name";
public static final String CITY_HEADER_QUEUE_NAME = "city_header_queue_name";
//定义交换机名称
public static final String HEADER_EXCHANGE_NAME = "header_exchange";
@Bean
public Queue ageQueue() {
return new Queue(AGE_HEADER_QUEUE_NAME, true, false, false);
}
@Bean
public Queue cityQueue() {
return new Queue(CITY_HEADER_QUEUE_NAME, true, false, false);
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADER_EXCHANGE_NAME, true, false);
}
@Bean
public Binding ageBinding() {
return BindingBuilder.bind(ageQueue())
.to(headersExchange())
//如果头部包含键为 age 的则匹配
.where("age").exists();
}
@Bean
public Binding cityBinding() {
return BindingBuilder.bind(cityQueue())
.to(headersExchange())
//如果头部包含键为 city,值为 wuhan 的则匹配
.where("city").matches("wuhan");
}
}
监听队列配置
@Component
public class HeaderListener {
@RabbitListener(queues = HeaderConfig.AGE_HEADER_QUEUE_NAME)
public void ageHandleMsg(byte[] msg){
System.out.println("ageHandleMsg消息消费:"+new String(msg, 0,msg.length));
}
@RabbitListener(queues = HeaderConfig.CITY_HEADER_QUEUE_NAME)
public void cityHandleMsg(byte[] msg){
System.out.println("cityHandleMsg消息消费:"+new String(msg, 0,msg.length));
}
}
生产者配置
package com.example.publisher_01;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class Publisher01ApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void Test4(){
Message ageMsg = MessageBuilder.withBody("age hello world!!!".getBytes()).setHeader("age", "3").build();
//ageHandleMsg消息消费:age hello world!!!
rabbitTemplate.convertAndSend("header_exchange", null, ageMsg);
Message cityMsg = MessageBuilder.withBody("city hello world!!!".getBytes()).setHeader("city", "wuhan").build();
//cityHandleMsg消息消费:city hello world!!!
rabbitTemplate.convertAndSend("header_exchange", null, cityMsg);
}
}
3.6 (4)Routing
这种情况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。
如下图:
这个就是按照 routing key 去路由消息.参考 3.5.1
3.7 (5)Topics
这种情况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 *
和 #
关键字,对指定 RoutingKey
内容,编写时注意格式 xxx.xxx.xxx
去编写。
如下图:
参考3.5.3
3.8 (6)RPC
参考 4.1
3.9 (7)Publisher Confirm
参考后续目录
4.RabbitMQ 实现 RPC
4.1 架构
这张图把问题说的很明白了:
- 首先 Client 发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字。
- Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中。
- Client 从回调队列中读取消息,就可以知道消息的执行情况是什么样子了。
这种情况其实非常适合处理异步调用。
4.2 案例
核心依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
4.2.1 客户端
基础配置
spring.rabbitmq.host=101.43.23.183
# 5672 为通信端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#开启发送消息确认机制,将来将消息到达交换机之后会有一个回调
spring.rabbitmq.publisher-confirm-type=correlated
#消息到达队列的回调(消息如果没有成功到达队列,会触发回调方法)
spring.rabbitmq.publisher-returns=true
server.port=8080
队列-交换机-绑定配置
@Configuration
public class RabbitConfig {
//定义发送消息的队列
public static final String RPC_SEND_QUEUE = "rpc_send_queue";
//定义接收消息的队列
public static final String RPC_REPLY_QUEUE = "rpc_reply_queue";
//定义交换机
public static final String RPC_EXCHANGE = "rpc_exchange";
@Bean
public Queue sendQueue() {
return new Queue(RPC_SEND_QUEUE, true, false, false);
}
@Bean
public Queue reply() {
return new Queue(RPC_REPLY_QUEUE, true, false, false);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(RPC_EXCHANGE, true, false);
}
@Bean
public Binding sendBinding() {
return BindingBuilder.bind(sendQueue())
.to(topicExchange())
//路由直接为队列名
.with(RPC_SEND_QUEUE);
}
@Bean
public Binding replyBinding() {
return BindingBuilder.bind(reply())
.to(topicExchange())
//路由直接为队列名
.with(RPC_REPLY_QUEUE);
}
/**
* 重写 RabbitTemplate中的属性对象
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template=new RabbitTemplate(connectionFactory);
//设置回复消息的队列
template.setReplyAddress(RPC_REPLY_QUEUE);
//设置回复的超时时间 6s
template.setReplyTimeout(6000);
return template;
}
/**
* 给返回队列设置监听器
* @param connectionFactory
* @return
*/
@Bean
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RPC_REPLY_QUEUE);
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}
}
发送消息客户端配置
@RestController
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public Object sendMsg(@PathVariable(value = "msg") String msg) {
//要发送的消息对象
Message sendMsg = MessageBuilder.withBody(msg.getBytes()).build();
//收到的消息对象 注意:使用sendAndReceive方法,才会给消息生成id
Message replyMsg = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE,RabbitConfig.RPC_SEND_QUEUE, sendMsg);
if (replyMsg != null) {
//发送的消息id
String correlationId = sendMsg.getMessageProperties().getCorrelationId();
//接收的消息id
String replyId = (String) replyMsg.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
if (correlationId.equals(replyId)) {
return "收到服务端的消息:" + new String(replyMsg.getBody());
}
}
return "暂未收到服务端消息";
}
}
4.2.2 服务端配置
基础配置
spring.rabbitmq.host=101.43.23.183
# 5672 为通信端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#开启发送消息确认机制,将来将消息到达交换机之后会有一个回调
spring.rabbitmq.publisher-confirm-type=correlated
#消息到达队列的回调(消息如果没有成功到达队列,会触发回调方法)
spring.rabbitmq.publisher-returns=true
server.port=8081
队列名称
@Configuration
public class RabbitConfig {
//定义发送消息的队列
public static final String RPC_SEND_QUEUE = "rpc_send_queue";
//定义接收消息的队列
public static final String RPC_REPLY_QUEUE = "rpc_reply_queue";
}
服务端监听配置
@Component
public class ServerListener {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 监听客户端向服务端发生消息的队列
*/
@RabbitListener(queues = RabbitConfig.RPC_SEND_QUEUE)
public void process(Message message) {
//客户端发送的消息
byte[] body = message.getBody();
//构建服务端消息
Message build = MessageBuilder.withBody(("服务端已收到消息,客户端发送的消息为:" + new String(body)).getBytes()).build();
//构建服务端消息id对象
CorrelationData correlationData = new CorrelationData(message.getMessageProperties().getCorrelationId());
//必须使用sendAndReceive,不然客户端收不到消息id
rabbitTemplate.sendAndReceive(RabbitConfig.RPC_REPLY_QUEUE, build, correlationData);
}
}
4.2.3 请求测试
http://localhost:8080/send/HelloWorld!
5. RabbitMQ消息有效期
5.1 默认情况
首先来看看默认情况。
默认情况下,消息是不会过期的,也就是平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。
5.2 TTL 过期时间
TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果希望消息能够有一个存活时间,那么可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了 TTL 并且还没有被消息,此时消息就会变成死信
TTL 的设置有两种不同的方式:
- 在声明队列的时候,可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。
- 在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。
那如果两个都设置了呢?
以时间短的为准。
当设置了消息有效期后,消息过期了就会被从队列中删除了(进入到死信队列),但是两种方式对应的删除时机有一些差异:
给队列设置过期时间,当消息队列设置过期时间的时候,那么消息过期了就会被删除,因为消息进入 RabbitMQ 后是存在一个消息队列中,队列的头部是最早要过期的消息,所以 RabbitMQ 只需要一个定时任务,从头部开始扫描是否有过期消息,有的话就直接删除。
给消息设置过期时间,当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会去删除,因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除。
下面创建SpringBoot工程
基础配置
spring.rabbitmq.host=101.43.23.183 # 5672 为通信端口 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
5.2.1 给单条消息设置过期时间
队列配置
这里使用默认直连交换机,并且默认以队列名为路由
@Configuration
public class RabbitConfig {
//定义队列名称
public static final String TTL_QUEUE_NAME = "ttl_queue_name";
@Bean
public Queue queueName() {
return new Queue(TTL_QUEUE_NAME, true, false, false);
}
}
这里不配置消费者,只向队列发送消息
@RestController
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public String sendMsg(@PathVariable String msg) {
Message newMsg = MessageBuilder.withBody(msg.getBytes())
//设置消息过期时间为10s
.setExpiration("10000")
.build();
rabbitTemplate.convertAndSend(RabbitConfig.TTL_QUEUE_NAME, newMsg);
return "发送成功";
}
}
在创建 Message 对象的时候可以设置消息的过期时间,这里设置消息的过期时间为 10 秒。
5.2.2 给队列设置过期时间
队列配置
@Configuration
public class RabbitConfig {
//定义队列名称
public static final String TTL_QUEUE_NAME = "ttl_queue_name";
@Bean
public Queue queueName() {
Map<String, Object> arguments=new HashMap<>();
//设置队列过期时间8s
arguments.put("x-message-ttl", 8000);
return new Queue(TTL_QUEUE_NAME, true, false, false, arguments);
}
}
直接向队列发送消息
@RestController
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendQueue/{msg}")
public String sendQueueMsg(@PathVariable String msg) {
rabbitTemplate.convertAndSend(RabbitConfig.TTL_QUEUE_NAME, msg);
return "发送成功";
}
}
可以看到,消息队列的 Features 属性为 D 和 TTL,D 表示消息队列中消息持久化,TTL 则表示消息会过期。
8s 之后刷新页面,发现消息数量已经恢复为 0。
这就是给消息队列设置消息过期时间,一旦设置了,所有进入到该队列的消息都有一个过期时间了。
5.2.3 特殊情况
还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)。
5.3 死信队列
5.3.1 死信交换机
死信交换机,Dead-Letter-Exchange 即 DLX。
死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:
- 消息被拒绝(Basic.Reject/Basic.Nack) ,并且设置requeue 参数为false
- 消息过期
- 队列达到最大长度
当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。
DLX 本质上也是一个普普通通的交换机,可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
5.3.2 死信队列
这个好理解,绑定了死信交换机的队列就是死信队列。
5.3.3 案例
基础配置
spring.rabbitmq.host=101.43.23.183
# 5672 为通信端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
定义死信队列和正常队列
package com.example.dlx_demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class RabbitConfig {
//定义队列名称
public static final String DLX_QUEUE_NAME = "dlx_queue_name";
public static final String NORMAL_DEMO = "normal_demo";
//定义交换机名称
public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
/**
* 正常的队列 使用默认的直连交换机
* @return
*/
@Bean
public Queue normalQueue() {
HashMap<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
//设置死信的路由键,这里为队列名
arguments.put("x-dead-letter-routing-key", DLX_QUEUE_NAME);
return new Queue(NORMAL_DEMO, true, false, false,arguments);
}
/**
* 死信队列
* @return
*/
@Bean
public Queue dlxQueue() {
return new Queue(DLX_QUEUE_NAME, true, false, false);
}
/**
* 死信交换机
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
}
/**
* 死信队列和交换机的绑定
* @return
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(DLX_QUEUE_NAME);
}
}
监听死信队列
@Component
@Log4j2
public class DlxListener {
@RabbitListener(queues = RabbitConfig.DLX_QUEUE_NAME)
public void listenerDLX(Message message) {
byte[] body = message.getBody();
log.info("死信队列收到消息>>>>:" + new String(body));
}
}
生产者发送消息到正常队列,正常队列中的消息过期后进入死信队列
@Log4j2
@RestController
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public String sendMsg(@PathVariable String msg) {
Message newMsg = MessageBuilder.withBody(msg.getBytes())
//设置过期时间为3s
.setExpiration("5000")
.build();
log.info("发送时间>>>>>>>>>>>>");
rabbitTemplate.convertAndSend(RabbitConfig.NORMAL_DEMO, newMsg);
return "发送成功";
}
}
请求: http://localhost:8080/send/louchen1111
日志信息
2022-01-27 14:45:05.969 INFO 4632 --- [nio-8080-exec-4] c.e.dlx_demo.controller.HelloController : 发送时间>>>>>>>>>>>> 2022-01-27 14:45:10.986 INFO 4632 --- [ntContainer#0-1] c.example.dlx_demo.listener.DlxListener : 死信队列收到消息>>>>:louchen1111
6. RabbitMQ 实现延迟队列
定时任务各种各样,常见的定时任务例如日志备份,可能在每天凌晨 3 点去备份,这种固定时间的定时任务一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,开发中有的时候也会遇到类似的需求,例如:
- 在电商项目中,当下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
- 我买了一个智能砂锅,可以用来煮粥,上班前把素材都放到锅里,然后设置几点几分开始煮粥,这样下班后就可以喝到香喷喷的粥了,那么这个煮粥的指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了再执行。
- 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
- 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
- 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
很多场景下都需要延迟队列。
本文以 RabbitMQ 为例来和大家聊一聊延迟队列的玩法。
整体上来说,在 RabbitMQ 上实现定时任务有两种方式:
- 利用 RabbitMQ 自带的消息过期和私信队列机制,实现定时任务。
- 使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。
6.1 用插件
6.1.1 安装插件
注意插件版本和rabbitmq版本需要一致
下载
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
将下载的拷贝到容器中的plugins目录下
docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq01:/plugins
进入到容器
docker exec -it rabbitmq01 /bin/bash
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看插件列表
rabbitmq-plugins list
Listing plugins with pattern ".*" ... Configured: E = explicitly enabled; e = implicitly enabled | Status: * = running on rabbit@dd068a88e7b8 |/ [ ] rabbitmq_amqp1_0 3.9.13 [ ] rabbitmq_auth_backend_cache 3.9.13 [ ] rabbitmq_auth_backend_http 3.9.13 [ ] rabbitmq_auth_backend_ldap 3.9.13 [ ] rabbitmq_auth_backend_oauth2 3.9.13 [ ] rabbitmq_auth_mechanism_ssl 3.9.13 [ ] rabbitmq_consistent_hash_exchange 3.9.13 [E*] rabbitmq_delayed_message_exchange 3.9.0 [ ] rabbitmq_event_exchange 3.9.13 [ ] rabbitmq_federation 3.9.13 [ ] rabbitmq_federation_management 3.9.13 [ ] rabbitmq_jms_topic_exchange 3.9.13 [E*] rabbitmq_management 3.9.13 [e*] rabbitmq_management_agent 3.9.13 [ ] rabbitmq_mqtt 3.9.13 [ ] rabbitmq_peer_discovery_aws 3.9.13 [ ] rabbitmq_peer_discovery_common 3.9.13 [ ] rabbitmq_peer_discovery_consul 3.9.13 [ ] rabbitmq_peer_discovery_etcd 3.9.13 [ ] rabbitmq_peer_discovery_k8s 3.9.13 [E*] rabbitmq_prometheus 3.9.13 [ ] rabbitmq_random_exchange 3.9.13 [ ] rabbitmq_recent_history_exchange 3.9.13 [ ] rabbitmq_sharding 3.9.13 [ ] rabbitmq_shovel 3.9.13 [ ] rabbitmq_shovel_management 3.9.13 [ ] rabbitmq_stomp 3.9.13 [ ] rabbitmq_stream 3.9.13 [ ] rabbitmq_stream_management 3.9.13 [ ] rabbitmq_top 3.9.13 [ ] rabbitmq_tracing 3.9.13 [ ] rabbitmq_trust_store 3.9.13 [e*] rabbitmq_web_dispatch 3.9.13 [ ] rabbitmq_web_mqtt 3.9.13 [ ] rabbitmq_web_mqtt_examples 3.9.13 [ ] rabbitmq_web_stomp 3.9.13 [ ] rabbitmq_web_stomp_examples 3.9.13
6.1.2 案例
基础配置
spring.rabbitmq.host=101.43.23.183
# 5672 为通信端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
延迟队列配置
@Configuration
public class RabbitConfig {
//定义队列名
public static final String DELAY_PLUGIN_QUEUE_NAME = "delay_plugin_queue_name";
//定义交换机名
public static final String DELAY_PLUGIN_EXCHANGE = "delay_plugin_exchange";
@Bean
public Queue delayPluginQueue() {
return new Queue(DELAY_PLUGIN_QUEUE_NAME, true, false, false);
}
/**
* 自定义交换机
* @return
*/
@Bean
public CustomExchange delayPluginExchange() {
Map<String, Object> arguments=new HashMap<>();
//定义交换机类型为直连交换机
arguments.put("x-delayed-type", "direct");
//type定义为延迟消息交换机
return new CustomExchange(DELAY_PLUGIN_EXCHANGE, "x-delayed-message", true, false, arguments);
}
@Bean
public Binding delayPluginBinding() {
return BindingBuilder.bind(delayPluginQueue())
.to(delayPluginExchange())
.with(DELAY_PLUGIN_QUEUE_NAME)
//没有参数
.noargs();
}
}
监听延迟队列
@Component
public class DelayPluginListener {
private static final Logger logger= LoggerFactory.getLogger(DelayPluginListener.class);
@RabbitListener(queues = RabbitConfig.DELAY_PLUGIN_QUEUE_NAME)
public void listenerDelayPlugin(Message message) {
logger.info(new String(message.getBody()));
}
}
生产者发送消息到延迟队列并设置消息延迟发送时间
@Controller
public class HelloController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public void send(@PathVariable(value = "msg") String msg) {
//设置消息
Message message = MessageBuilder.withBody((msg + new Date()).getBytes())
//设置延迟时间为5s
.setHeader("x-delay", 5000).build();
rabbitTemplate.send(RabbitConfig.DELAY_PLUGIN_EXCHANGE, RabbitConfig.DELAY_PLUGIN_QUEUE_NAME, message);
}
}
http://localhost:8080/send/发送的消息
日志
022-01-27 15:49:19.067 INFO 28476 --- [ntContainer#0-1] c.e.m.listener.DelayPluginListener : 发送的消息Thu Jan 27 15:49:14 GMT+08:00 2022
6.2 DLX 实现延迟队列
6.2.1 延迟队列实现思路
延迟队列实现的思路也很简单,就是 DLX(死信交换机)+TTL(消息超时时间)。
可以把死信队列就当成延迟队列。
具体来说是这样:
假如一条消息需要延迟 30 分钟执行,就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key
,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。
参考上述死信队列例子:
@Configuration public class RabbitConfig { //定义队列名称 public static final String DLX_QUEUE_NAME = "dlx_queue_name"; public static final String NORMAL_DEMO = "normal_demo"; //定义交换机名称 public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name"; /** * 正常的队列 使用默认的直连交换机 * @return */ @Bean public Queue normalQueue() { HashMap<String, Object> arguments = new HashMap<>(); //设置队列过期时间为30分钟 即进入死信队列 arguments.put("x-message-ttl", 30000); //设置死信交换机 arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); //设置死信的路由键,这里为队列名 arguments.put("x-dead-letter-routing-key", DLX_QUEUE_NAME); return new Queue(NORMAL_DEMO, true, false, false,arguments); } /** * 死信队列 * @return */ @Bean public Queue dlxQueue() { return new Queue(DLX_QUEUE_NAME, true, false, false); } /** * 死信交换机 * @return */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(DLX_EXCHANGE_NAME, true, false); } /** * 死信队列和交换机的绑定 * @return */ @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(DLX_QUEUE_NAME); } }
7. RabbitMQ 发送可靠性
微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了。
7.1 RabbitMQ 消息发送机制
RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。
大致的流程就是这样,所以要确保消息发送的可靠性,主要从两方面去确认:
消息成功到达 Exchange(
主要确保消息成功到达交换机
)消息成功到达 Queue
如果能确认这两步,那么就可以认为消息发送成功了。
如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。
经过上面的分析,可以确认,要确保消息成功发送,只需要做好三件事就可以了:
确认消息到达 Exchange。
确认消息到达 Queue。
开启定时任务,定时投递那些发送失败的消息。
7.2 确保消息成功到达
如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:
开启事务机制
发送方确认机制
⚠️这是两种不同的方案,不可以同时开启,只能选择其中之一.
7.2.1 开启事务
基础配置
spring.rabbitmq.host=101.43.23.183
# 5672 为通信端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
队列和事务配置
@Configuration
public class RabbitConfig {
//定义队列名
public static final String TX_QUEUE_NAME = "tx_queue_name";
//定义交换机名
public static final String TX_EXCHANGE_NAME = "tx_exchange";
@Bean
public Queue txQueue() {
return new Queue(TX_QUEUE_NAME, true, false, false);
}
@Bean
public DirectExchange txExchange() {
return new DirectExchange(TX_EXCHANGE_NAME, true, false);
}
@Bean
public Binding txBinding() {
return BindingBuilder.bind(txQueue())
.to(txExchange())
.with(TX_QUEUE_NAME);
}
/**
* 事务配置
* @param connectionFactory
* @return
*/
@Bean
public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
//开启事务模式
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
}
模拟请求服务发送数据
@Service
public class HelloService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void send(String msg) {
rabbitTemplate.convertAndSend(RabbitConfig.TX_EXCHANGE_NAME, RabbitConfig.TX_QUEUE_NAME, msg);
//抛出异常,消息回滚,不会将消息发送到消息队列
int i = 1 / 0;
}
}
@RestController
public class HelloController {
@Autowired
private HelloService helloService;
@GetMapping("/send/{msg}")
public void send(@PathVariable("msg")String msg) {
helloService.send(msg);
}
}
这里注意两点:
- 发送消息的方法上添加
@Transactional
注解标记事务。 - 调用
setChannelTransacted
方法设置为true
开启事务模式。
结果
- 抛出异常,消息未进入消息队列
当开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
- 客户端发出请求,将信道设置为事务模式。
- 服务端给出回复,同意将信道设置为事务模式。
- 客户端发送消息。
- 客户端提交事务。
- 服务端给出响应,确认事务提交。
上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
7.2.2 发送方确认机制
相比于事务,这种模式下的消息吞吐量会得到极大的提升。
7.2.2.1 单条消息处理
application.properties 配置
spring.rabbitmq.host=101.43.23.183
# 5672 为通信端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#1、开启发送消息确认机制,将来将消息到达交换机之后会有一个回调
spring.rabbitmq.publisher-confirm-type=correlated
#2、消息到达队列的回调(消息如果没有成功到达队列,会触发回调方法)
spring.rabbitmq.publisher-returns=true
1是配置消息到达交换器的确认回调,2则是配置消息到达队列的回调。
1属性的配置有三个取值:
- none:表示禁用发布确认模式,默认即此。
- correlated:表示成功发布消息到交换器后会触发的回调方法。
- simple:类似 correlated,并且支持
waitForConfirms()
和waitForConfirmsOrDie()
方法的调用。
队列交换机配置
@Log4j2
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
//定义队列名
public static final String CONFIRM_QUEUE = "confirm_queue";
//定义交换机名
public static final String CONFIRM_EXCHANGE = "confirm_exchange";
@Bean
public Queue confirmQueue() {
return new Queue(CONFIRM_QUEUE, true, false, false);
}
@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE, true, false);
}
@Bean
public Binding confirmBinding() {
return BindingBuilder.bind(confirmQueue())
.to(confirmExchange())
.with(CONFIRM_QUEUE);
}
/**
* 重写RabbitTemplate,并设设置确认回调和返回回调
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer,ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
//这个方法内部中会设置setMandatory为true
configurer.configure(rabbitTemplate,connectionFactory);
//true,消息通过交换器无法匹配到队列会返回给生产者
//false,匹配不到会直接被丢弃
// rabbitTemplate.setMandatory(true);//默认为false
return rabbitTemplate;
}
//或者重新注入,以上任选其一即可
// @Autowired
// private RabbitTemplate rabbitTemplate;
// //本类RabbitConfig初始化完成后调用此init方法
// @PostConstruct
// public void init() {
// rabbitTemplate.setConfirmCallback(this);
// rabbitTemplate.setReturnCallback(this);
// }
/**
* 消息是否到达交换机到回调
* @param correlationData 消息id相关
* @param ack 是否到达交换机
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息成功发送到交换机,消息id为:{}", correlationData.getId());
} else {
log.info("消息发送到交换机失败,消息id为:{},失败原因为:{}", correlationData.getId(),cause);
}
}
/**
* 消息未到达队列的回调
* @param message 消息对象
* @param replyCode 回复状态码
* @param replyText 回复文本
* @param exchange 交换机
* @param routingKey 路由
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息未成功到达队列,消息id为:{},replyCode:{},replyText:{},exchange:{},routingKey:{}"+message.getMessageProperties().getMessageId(),replyCode,replyText,exchange,routingKey);
}
}
生产者配置
package com.example.confirm.controller;
import com.example.confirm.config.RabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public void send(@PathVariable("msg")String msg) {
//设置消息id对象
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
//正常发送消息并设置消息属性
rabbitTemplate.convertAndSend(RabbitConfig.CONFIRM_EXCHANGE, RabbitConfig.CONFIRM_QUEUE, msg, correlationData);
//1、模拟向交换机发送消息失败
// rabbitTemplate.convertAndSend("error_exchange", RabbitConfig.CONFIRM_QUEUE, msg, correlationData);
//2、模拟路由到队列失败
//rabbitTemplate.convertAndSend(RabbitConfig.CONFIRM_EXCHANGE, "error_routingKey", msg, correlationData);
}
}
测试不同条件的生产者发送消息的结果:
http://localhost:8080/send/xiaoxi
正常消费
- 消息正常到队列,等待消费
模拟向交换机发送消息失败
2022-02-11 16:52:52.254 ERROR 14820 --- [.43.23.183:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'error_exchange' in vhost '/', class-id=60, method-id=40) 2022-02-11 16:52:52.322 INFO 14820 --- [nectionFactory2] com.example.confirm.config.RabbitConfig : 消息发送到交换机失败,消息id为:2f4f1301-387d-4572-a984-a27b11a5a294,失败原因为:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'error_exchange' in vhost '/', class-id=60, method-id=40)
模拟交换机路由到队列失败
2022-02-11 16:54:19.811 INFO 18904 --- [nectionFactory1] com.example.confirm.config.RabbitConfig : 消息未成功到达队列,消息id为:312,replyCode:NO_ROUTE,replyText:confirm_exchange,exchange:error_routingKey,routingKey:312null 2022-02-11 16:54:19.812 INFO 18904 --- [nectionFactory2] com.example.confirm.config.RabbitConfig : 消息成功发送到交换机,消息id为:148f037b-a487-4fff-b989-d7ce7d5ba1d0
7.2.2.2 消息批量处理
如果是消息批量处理,那么发送成功的回调监听是一样的,这里不再赘述。
7.3 失败重试
失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。
7.3.1 自带重试机制
前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring
中的retry
机制来完成的,具体配置如下:
#开启重试机制
spring.rabbitmq.template.retry.enabled=true
#重试起始间隔时间
spring.rabbitmq.template.retry.initial-interval=1000ms
#最大重试次数
spring.rabbitmq.template.retry.max-attempts=10
#最大重试间隔时间
spring.rabbitmq.template.retry.max-interval=10000ms
#间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
spring.rabbitmq.template.retry.multiplier=2
配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。
这里正常发送消息,这里只是模拟RabbitMQ服务挂断,或者网络抖动导致无法连接MQ
2022-02-11 17:22:27.793 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:22:30.923 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:22:35.035 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:22:41.146 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:22:51.229 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:23:03.346 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:23:15.461 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:23:27.534 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:23:39.653 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:23:51.746 INFO 29652 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [101.43.23.183:5672]
2022-02-11 17:23:53.825 ERROR 29652 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect] with root cause
7.3.2 业务重试(待处理)
业务重试主要是针对消息没有到达交换器的情况。
整体思路是这样:
首先创建一张表,用来记录发送到中间件上的消息,像下面这样:
- 每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个额外说下:
- status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
- tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。
- count:表示消息重试次数。
- 在消息发送的时候,就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
- 在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。
- 另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。
当然这种思路有两个弊端:
- 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候并不需要 MQ 有很高的 Qos(Quality of Service 服务质量),所以这个应用时要看具体情况。
- 按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,在消息消费时,解决好幂等性问题就行了。
当然,大家也要注意,消息是否要确保 100% 发送成功,也要看具体情况。
8. RabbitMQ 消费可靠性
8.1 两种消费思路
RabbitMQ 的消息消费,整体上来说有两种不同的思路:
- 推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。
- 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。
推(push):
这种方式大家比较常见,就是通过
@RabbitListener
注解去标记消费者,如下:@Component public class MyListener { @RabbitListener(queues = RabbitConfig.CONFIRM_QUEUE) public void handler(String msg) { System.out.println("msg is :" + msg); } }
拉(pull):(不能成功消费unacked的消息)
向队列发送两条消息,每次执行手动拉取消息只会消费一条消息
@SpringBootTest class ConfirmApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { Object receiveMsg = rabbitTemplate.receiveAndConvert(RabbitConfig.CONFIRM_QUEUE); System.out.println("手动拉取的消息:"+receiveMsg); } }
调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。
总结:
如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。
8.2 确保消费成功两种思路
为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。
- 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
- 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。
如上图所示,在 RabbitMQ 的 web 管理页面:
- Ready 表示待消费的消息数量。(Ready状态的消息会一直进行投递消费)
- Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。(unacked的消息不能通过手动拉取的方式进行消费,必须通过ack的方式进行消息的确认。)
当将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
- 待消费的消息
- 已经投递给消费者,但是还没有被消费者确认的消息
换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说还需要在处理消息时,解决幂等性问题。
8.3 消息拒绝
当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。来看下拒绝的方式:
@Component
public class MyListener {
@RabbitListener(queues = RabbitConfig.CONFIRM_QUEUE)
public void handler(Channel channel, Message message) {
//获取消息标签
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag"+deliveryTag);
try {
//1、传入消息标签
//2、true:消息重新入队 false:消息直接丢弃(若设置死信队列则进入死信队列)
channel.basicReject(deliveryTag,true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
若拒绝改消息,消息重新入队后,消息变为
Ready
状态,并且一直没有其它队列消费该消息,消息会再次进入该监听器消费,那么会进入死循环。
消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
- 获取消息编号 deliveryTag。
- 调用 basicReject 方法拒绝消息。
调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。
需要注意的是,basicReject 方法一次只能拒绝一条消息。
8.4 消息确认
8.4.1 自动确认
先来看看自动确认,在 Spring Boot 中,默认情况下,消息消费就是自动确认的。
@Component
public class MyListener {
@RabbitListener(queues = RabbitConfig.CONFIRM_QUEUE)
public void handler(String msg) {
System.out.println("msg is :" + msg);
int i=1/0;
}
}
若消费消息时出现异常,消息默认会重新回到队列,消息变为Ready状态,若异常一直存在,则消息再次进入该监听器,那么会一直出现死循环异常
通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。
8.4.2 手动确认
要开启手动确认,需要首先关闭自动确认,关闭方式如下:
#消息确认模式
#auto 自动确认(默认)
#manual 手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
package com.example.confirm.listener;
import com.example.confirm.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MyListener {
@RabbitListener(queues = RabbitConfig.CONFIRM_QUEUE)
public void handler(Channel channel, Message message) {
//获取消息标签
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = new String(message.getBody());
System.out.println("msg:" + msg);
// int i=1/0;
//消费完成后,手动ack
//1、消息标签参数
//2、是否批量处理。ture:表示之前所有的消息都确认消费成功 false:表示仅仅确认当前消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
//手动Nack,告诉mq这条消息消费失败
//1、消息标签参数
//2、是否批量处理。ture:表示之前所有的消息都确认消费成功 false:表示仅仅确认当前消息
//3、是否重新进入队列, true:重新进入队列(消息变为Ready状态) false:丢弃消息(若配置死信队列,则会进入死信队列)
channel.basicNack(deliveryTag, false, true);
} catch (Exception exception) {
exception.printStackTrace();
}
e.printStackTrace();
}
}
}
若确认消息之前发生异常,则手动Nack时把消息重新入队,消息变成Ready,那么会再次进入消费,若还出现异常,则再次通过Nack重新入队,导致死循环。
若消息未进行确认,之后消息未重新入队或者丢弃此消息,那么此消息会变为Unacked状态,未确认的消息不能进行正常消费,必须通过重启应用后消息重新消费的时候进行手动确认(确认之前不能发生异常,否则也是确认失败的)。
将消费者要做的事情放到一个 try..catch
代码块中。
如果消息正常消费成功,则执行 basicAck
完成确认。
如果消息消费失败,则执行 basicNack
方法,告诉 RabbitMQ 消息消费失败。
这里涉及到两个方法:
- basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
- basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。
8.4.2.2 拉模式手动确认
拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,得用原生的办法,如下:
package com.example.confirm;
import com.example.confirm.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
@SpringBootTest
class ConfirmApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void Test1(){
long deliveryTag=0;
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
try {
GetResponse getResponse = channel.basicGet(RabbitConfig.CONFIRM_QUEUE, false);
deliveryTag = getResponse.getEnvelope().getDeliveryTag();
String msg = new String(getResponse.getBody());
System.out.println("msg is :" + msg);
//消息确认之前抛出异常
int i=1/0;
//消息确认
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
try {
//抛出异常,消息重新入队
channel.basicNack(deliveryTag,false,true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
e.printStackTrace();
}
}
}
手动拉取确认模式下只能拉取Ready的消息,如果拉取Unacked的消息,则会出现空指针。
8.5 幂等性问题(待处理)
消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次。种种原因导致在消费消息时,一定要处理好幂等性问题。
采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
- id-0(正在执行业务)
- id-1(执行业务成功)
如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。
9. 理解 VirtualHost
第一次安装好一个 RabbitMQ 之后,可能都会通过 Web 页面去管理这个 RabbitMQ,默认情况下,第一次使用的默认用户是 guest。
登录成功后,在 admin 选项卡可以查看所有用户:
9.1 多租户
RabbitMQ 中有一个概念叫做多租户,怎么理解呢?
安装一个 RabbitMQ 服务器,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是所说的虚拟主机(virtual host),一般简称为 vhost。
本质上,每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突。
并不需要特别的去看待 vhost,他就跟普通的物理 RabbitMQ 一样,不同的 vhost 能够提供逻辑上的分离,确保不同的应用消息队列能够安全独立运行。
要我来说,该怎么看待 vhost 和 RabbitMQ 的关系呢?RabbitMQ 相当于一个 Excel 文件,而 vhost 则是 Excel 文件中的一个个 sheet,所有的操作都是在某一个 sheet 上进行操作。
本质上来说,vhost 算是 AMQP 协议中的概念
创建一个用户,并为用户设置一个vhost
- 创建用户并添加权限
设置vhost
点击用户名,进去后设置
Set permission
设置完成
vhost对应的用户信息
9.2 命令行创建vhost
这里使用docker,进入容器
docker exec -it rabbitmq01 bash
创建一个名为
/myvh
的vhostroot@dd068a88e7b8:/# rabbitmqctl add_vhost myvh Adding vhost "myvh" ...
查看已有的vhost
root@dd068a88e7b8:/# rabbitmqctl list_vhosts Listing vhosts ... name / myvh
当然这个命令也可以添加两个选项 name 和 tracing,name 表示 vhost 的名称,tracing 则表示是否使用了 tracing 功能(tracing 可以帮助追踪 RabbitMQ 中消息的流入流出情况)
root@dd068a88e7b8:/# rabbitmqctl list_vhosts name tracing Listing vhosts ... name tracing / false myvh false
删除一个 vhost
root@dd068a88e7b8:/# rabbitmqctl delete_vhost myvh Deleting vhost "myvh" ...
当删除一个 vhost 的时候,与这个 vhost 相关的消息队列、交换机以及绑定关系等,统统都会被删除。
给一个用户设置 vhost
注意:设置前改vhost必须存在
给
lisi
用户设置myvh
主机,并设置可配置,可读,可写的权限
root@dd068a88e7b8:/# rabbitmqctl set_permissions -p myvh lisi ".*" ".*" ".*" Setting permissions for user "lisi" in vhost "myvh" ...
最后面三个
".*"
含义分别如下:- 用户在所有资源上都拥有可配置权限(创建/删除消息队列、创建/删除交换机等)。
- 用户在所有资源上都拥有写权限(发消息)。
- 用户在所有资源上都拥有读权限(消息消费,清空队列等)。
移除用户访问某个vhost的权限
移除
myvh
主机下的lisi
用户权限root@dd068a88e7b8:/# rabbitmqctl clear_permissions -p myvh lisi Clearing permissions for user "lisi" in vhost "myvh" ...
9.3 命令行用户相关操作
添加一个用户
添加用户名为
lc
,密码为123
的用户root@dd068a88e7b8:/# rabbitmqctl add_user lc 123 Adding user "lc" ... Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
修改用户密码
修改
lc
用户密码为123456
root@dd068a88e7b8:/# rabbitmqctl change_password lc 123456 Changing password for user "lc" ...
验证用户密码
root@dd068a88e7b8:/# rabbitmqctl authenticate_user lc 123456 Authenticating user "lc" ... Success
查看所有用户列表
root@dd068a88e7b8:/# rabbitmqctl list_users Listing users ... user tags lisi [administrator] lc [] guest [administrator] zhangsan [administrator]
设置用户权限
设置
lc
用户权限为adminstrator
root@dd068a88e7b8:/# rabbitmqctl set_user_tags lc adminstrator Setting tags for user "lc" to [adminstrator] ...
删除用户
删除
lc
用户root@dd068a88e7b8:/# rabbitmqctl delete_user lc Deleting user "lc" ...
10. 集群搭建
10.1 两种模式
- 普通集群
- 镜像集群
10.1.1 普通集群
普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。
此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。
这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。
大致的流程图如下图:
10.1.2 镜像集群
它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。
大致流程图如下图:
10.1.3 节点类型
RabbitMQ 中的节点类型有两种:
- RAM node:内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操作速度更快。
- Disk node:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息
RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(增删改查),直到节点恢复。为了确保集群信息的可靠性,或者在不确定使用磁盘节点还是内存节点的时候,建议直接用磁盘节点。
10.2 搭建普通集群
10.2.1 预备知识
大致的结构了解了,接下来我们就把集群给搭建起来。先从普通集群开始,我们就使用 docker 来搭建。
搭建之前,有两个预备知识需要大家了解:
- 搭建集群时,节点中的 Erlang Cookie 值要一致,默认情况下,文件在 /var/lib/rabbitmq/.erlang.cookie,我们在用 docker 创建 RabbitMQ 容器时,可以为之设置相应的 Cookie 值。
- RabbitMQ 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。可以通过编辑 /etc/hosts 来手工添加主机名和 IP 对应关系。如果主机名 ping 不通,RabbitMQ 服务启动会失败(如果我们是在不同的服务器上搭建 RabbitMQ 集群,大家需要注意这一点,接下来将通过 Docker 的容器连接 link 来实现容器之间的访问,略有不同)。
10.2.2 开始搭建
docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="lc_rabbitmq_cookie" rabbitmq:3-management
docker run -d --hostname rabbit02 --name mq02 --link mq01:mylink01 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE="lc_rabbitmq_cookie" rabbitmq:3-management
docker run -d --hostname rabbit03 --name mq03 --link mq01:mylink02 --link mq02:mylink03 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE="lc_rabbitmq_cookie" rabbitmq:3-management
三个节点现在就启动好了
在 mq02 中,使用 --link 来连接 mq01,并且连接名为 mylink01
在 mq03 中,使用 --link 来连接 mq01,并且连接名为 mylink02。连接mq02,连接名为 mylink03
进入mq02容器,加入集群:
docker exec -it mq02 bash
将mq02容器加入到mq01集群中
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbit01 rabbitmqctl start_app
进入mq03容器,加入集群:
docker exec -it mq03 bash
将mq03容器加入到mq01集群中
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbit01 rabbitmqctl start_app
查看集群状态信息
在任一容器中,查看集群状态信息
rabbitmqctl cluster_status
root@rabbit01:/# rabbitmqctl cluster_status RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead. Cluster status of node rabbit@rabbit01 ... Basics Cluster name: rabbit@rabbit01 Disk Nodes rabbit@rabbit01 rabbit@rabbit02 rabbit@rabbit03 Running Nodes rabbit@rabbit01 rabbit@rabbit02 rabbit@rabbit03 Versions rabbit@rabbit01: RabbitMQ 3.9.13 on Erlang 24.2 rabbit@rabbit02: on Erlang rabbit@rabbit03: on Erlang Maintenance status Node: rabbit@rabbit01, status: not under maintenance Node: rabbit@rabbit02, status: unknown Node: rabbit@rabbit03, status: unknown Alarms (none) Network Partitions (none) Listeners Node: rabbit@rabbit01, interface: [::], port: 15672, protocol: http, purpose: HTTP API Node: rabbit@rabbit01, interface: [::], port: 15692, protocol: http/prometheus, purpose: Prometheus exporter API over HTTP Node: rabbit@rabbit01, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication Node: rabbit@rabbit01, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0 Feature flags Flag: drop_unroutable_metric, state: enabled Flag: empty_basic_get_metric, state: enabled Flag: implicit_default_bindings, state: enabled Flag: maintenance_mode_status, state: enabled Flag: quorum_queue, state: enabled Flag: stream_queue, state: enabled Flag: user_limits, state: enabled Flag: virtual_host_metadata, state: enabled
10.2.3 代码测试
applicaiton.properties
spring.rabbitmq.addresses=101.43.23.183:5671,101.43.23.183:5672,101.43.23.183:5673
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- 队列交换机相关配置
@Configuration
public class RabbitConfig {
//定义队列名称
public static final String CLUSTER_QUEUE = "cluster_queue";
//定义交换机
public static final String CLUSTER_EXCHANGE="cluster_exchange";
@Bean
public Queue queue() {
return new Queue(CLUSTER_QUEUE, true, false, false);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(CLUSTER_EXCHANGE, true, false);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue())
.to(exchange())
.with(CLUSTER_QUEUE);
}
}
- 向队列发送消息
@SpringBootTest
class DemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(RabbitConfig.CLUSTER_QUEUE, "hello world!");
}
}
这条消息发送成功之后,在 RabbitMQ 的 Web 管理端,我们会看到三个 RabbitMQ 实例上都会显示有一条消息,但是实际上消息本身只存在于一个 RabbitMQ 实例。
从上图可以知道,消息存在节点
rabbit02
中
- 若停掉mq01节点,再次发送消息,发现节点mq02和mq03节点均能看到发送的消息,即消息同步成功
- 若停掉mq02节点,那么节点
mq01
和节点mq02
均无法使用,消息发送失败
- 重启mq02节点,集群恢复正常,可以发送消息
配置消费者
@Configuration public class RabbitConsumer { @RabbitListener(queues = RabbitConfig.CLUSTER_QUEUE) public void comsumer(String msg) { System.out.println("msg is:" + msg); } }
因为队列中有两条消息,当消息消费者启动成功后,这个方法中只收到2条消息。
10.3 搭建镜像集群
所谓的镜像集群模式并不需要额外搭建,只需要我们将队列配置为镜像队列即可。
这个配置可以通过网页配置,也可以通过命令行配置,我们分别来看。
10.3.1 网页配置镜像队列
先来看看网页上如何配置镜像队列。
在任意节点上,点击 Admin 选项卡,然后点击右边的 Policies,再点击 Add/update a policy
,如下图:
各参数含义如下:
Name: policy 的名称。
Pattern: queue 的匹配模式(正则表达式)。
Definition:镜像定义,主要有三个参数:ha-mode, ha-params, ha-sync-mode。
- ha-mode:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。
- 7ha-params:ha-mode 模式需要用到的参数。
- ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual。
priority 为可选参数,表示 policy 的优先级。
配置完成后,点击下面的 add/update policy
按钮,完成策略的添加,如下:
首先确认三个 RabbitMQ 都启动了,然后用上面的 provider 向消息队列发送一条消息,发现三个节点均有消息
可以发现此时节点存储位置为rabbit01
- 若此时停掉mq01,那么节点的存储会从其它节点选举一个新的存储节点(但剩下的节点至少要有一个),不影响其他节点的正常使用。
因为队列中有1条消息,当消息消费者启动成功后,这个方法中只收到1条消息。