SpringBoot整合RabbitMQ
Springboot整合RabbitMQ
一、安装RabbitMQ
这里我们使用docker
通信端口 : 5672 管理界面端口 :15672 默认账户密码都是 guest
1、拉取镜像
docker pull rabbitmq:3-management //下载带管理界面的消息队列
2、初始化容器
docker run -d -p 5672:5672 -p 15672:15672 --restart=always --name myrabbitmq 镜像id
3、修改rabbitmq密码
docker exec -it fd4f81cdd3e7 bash //进入容器
rabbitmqctl list_users //查看所有用户
rabbitmqctl change_password guest 'Newpassword' //修改指定用户密码
4、常见命令
新建用户:rabbitmqctl add_user username passwd
删除用户:rabbitmqctl delete_user username
改密码: rabbimqctl change_password {username} {newpassword}
设置用户角色:rabbitmqctl set_user_tags {username} {tag ...}
rabbitmqctl set_permissions -p / username ".*" ".*" ".*" //添加权限
二、springboot整合RabbitMQ
1、什么是RabbitMQ?
RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的高级消息中间件,是应用层协议的一个开放标准。基于Erlang语言,可跨平台。它主要的技术特点是可用性,安全性,集群,多协议支持,可视化的客户端,活跃的社区。
AMQP为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
2、基本配置
pom.xml配置
spring-boot-starter-amqp
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.lc</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
properties配置
#配置主机地址
spring.rabbitmq.host=47.96.141.44
#通信端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
①直连交换机(Direct)
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。 然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
队列交换机配置:
/**
* Direct策略 直接指定到发送的队列名称。
*/
@Configuration
public class RabbitDirectConfig {
/**
* 定义directExchange的名称
*/
public final static String DIRECTNAME = "javaMr-direct";
/**
* 注意导的包 org.springframework.amqp.core.Queue;
* 定义队列
* @return
*/
@Bean
Queue queue() {
return new Queue("hello.lc");
}
/**
* 定义Direct交换机
* @return
*/
@Bean
DirectExchange directExchange() {
//定义的DirectExchange名称; durable(持久性)重启后是否依然有效; autodelete长期未使用是否自动删除
return new DirectExchange(DIRECTNAME,true,false);
}
/**
* 绑定指定的队列到交换机上并指定策略
* @return
*/
@Bean
Binding binding() {
// 绑定队列到交换机上 并指定路由匹配规则
return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
}
}
消费者:
/**
* direct消费者
*/
@Component
public class DirectReceiver {
/**
* 监听指定队列(hello.lc)
* @param msg
*/
@RabbitListener(queues = "hello.lc")
public void handler(String msg) {
System.out.println(">>>>>>>:"+msg);
}
}
生产者:
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* direct模式 发送到指定队列
*/
@Test
void directTest() {
// 发送到指定路由规则(默认为队列名)队列; 发送的消息
rabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECTNAME,"direct", "你好呀 direct!!!");
// 如果不指定发送的交换机名称则直接匹配的routingkey到队列名
// rabbitTemplate.convertAndSend("hello.lc", "你好呀 direct!!!");
}
}
②主题交换机(Topic)
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
- routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key与routing key一样也是句点号“. ”分隔的字符串
- binding key中可以存在两种特殊字符**“ * ”与“#”**,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
队列交换机配置:
/**
*topic策略 根据指定路径匹配符转发到指定队列
*/
@Configuration
public class RabbitTopicCofig {
public static final String TOPICNAME = "javaMr-topic";
@Bean
Queue xiaomi() {
return new Queue("xiaomi");
}
@Bean
Queue huawei() {
return new Queue("huawei");
}
@Bean
Queue iphone() {
return new Queue("iphone");
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPICNAME, true, false);
}
@Bean
Binding xiaomiBinding() {
// 绑定xiaomi队列到topic交换机,并指定队列访问规则
// 发送匹配的路由规格 以 xiao. 开头的路径
return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
}
@Bean
Binding huaweiBinding() {
// 绑定huawei队列到topic交换机,并指定队列访问规则
// 发送匹配的路由规格 以 huawei. 开头的路径
return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
}
@Bean
Binding iphoneBinding() {
// 绑定iphone队列到topic交换机,并指定队列访问规则
// 发送匹配的路由规格 以 .phone. 规则的路径
return BindingBuilder.bind(iphone()).to(topicExchange()).with("#.iphone.#");
}
}
消费者配置:
@Component
public class TopicReceiver {
@RabbitListener(queues = "xiaomi")
public void handlerXiaomi(String msg) {
System.out.println("TopicReceiver:handlerXiaomi>>>>>"+msg);
}
@RabbitListener(queues = "huawei")
public void handlerHuawei(String msg) {
System.out.println("TopicReceiver:handlerHuawei>>>>>"+msg);
}
@RabbitListener(queues = "iphone")
public void handlerIphone(String msg) {
System.out.println("TopicReceiver:handlerIphone>>>>>"+msg);
}
}
生成者配置:
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* topic模式 发送到指定交换机下的路径匹配的队列
*/
@Test
void topicTest() {
// 发送到指定交换机; 匹配路由为 xiaomi.# 的路径-->指定到xiaomi队列; 发送的消息
rabbitTemplate.convertAndSend(RabbitTopicCofig.TOPICNAME, "xiaomi.news", "小米手机");
// 发送到指定交换机; 匹配路由为 #.iphone.# 的路径-->指定到iphone队列; 发送的消息
rabbitTemplate.convertAndSend(RabbitTopicCofig.TOPICNAME, "iphoneXR.iphone", "iphone手机");
// 发送到指定交换机; 匹配路由为 huawei.# 和 #.iphone.# 的路径-->指定到huawei和iphone队列; 发送的消息
rabbitTemplate.convertAndSend(RabbitTopicCofig.TOPICNAME, "huawei.iphone", "A12系列和麒麟系列芯片");
}
}
③扇形交换机(Fanout)
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
队列交换机配置:
/**
* fanout策略 转发到该交换机下的所有队列(跟路由规则(routingkey)无关)
*/
@Configuration
public class RabbitFanoutConfig {
/**
* 定义交换机名称
*/
public static final String FANOUTNAME = "javaMr-fanout";
/**
* 定义第一个队列
* @return
*/
@Bean
Queue queueOne() {
return new Queue("queue-one");
}
/**
* 定义第二个队列
* @return
*/
@Bean
Queue queueTwo() {
return new Queue("queue-two");
}
/**
* 定义Fanout交换机
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUTNAME,true,false);
}
/**
* 绑定第一个队列到交换机
* @return
*/
@Bean
Binding bindingOne() {
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
/**
* 绑定到第二个交换机
* @return
*/
@Bean
Binding bindingTwo() {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}
消费者配置:
/**
* 定义消费者
*/
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handlerOne(String msg) {
System.out.println("queue-one>>>>>:"+msg);
}
@RabbitListener(queues = "queue-two")
public void handlerTwo(String msg) {
System.out.println("queue-two>>>>>:"+msg);
}
}
生产者配置:
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* fanout模式 发送到所有该交换机下的队列
*/
@Test
void fanoutTest(){
//发送到指定交换机; 这里的routingkey队列关键词会无效; 发送的消息
rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME, null, "java fanout np!!!!");
}
}
④头部交换机(Headers)
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
队列交换机配置:
/**
* headers策略 根据指定头部转发到指定队列(跟路由规则routingkey无关)
*/
@Configuration
public class RabbitHeadersConfig {
public static final String HEADERNAME = "javaMr-header";
@Bean
Queue queueName() {
return new Queue("name-queue");
}
@Bean
Queue queueAge() {
return new Queue("age-queue");
}
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADERNAME, true, false);
}
@Bean
Binding bindingName() {
Map<String,Object> map=new HashMap<>();
map.put("name", "lcc");
// 需要满足发送的消息 携带头部 name为lcc才匹配到该队列
// 只需要匹配该map中的任意一个即可
return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
// 当有多个键值对时,必须必须匹配所有。
// return BindingBuilder.bind(queueName()).to(headersExchange()).whereAll(map).match();
}
@Bean
Binding bindingAge() {
// 满足只要有age属性存在 即可匹配到该队列
return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();
}
}
消费者配置:
@Component
public class HeadersReceiver {
@RabbitListener(queues = "name-queue")
public void handlerName(byte[] msg) {
// 将字节数组解码为字符串对象
System.out.println("HeaderReceiver:handlerName>>>>>>"+new String(msg,0,msg.length));
}
@RabbitListener(queues = "age-queue")
public void handlerAge(byte[] msg) {
// 将字节数组解码为字符串对象
System.out.println("HeaderReceiver:handlerAge>>>>>>"+new String(msg,0,msg.length));
}
}
生产者配置:
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* header模式 发送消息时携带指定的头部信息 满足条件时即转发到指定队列
*/
@Test
void headerTest() {
// 构建消息对象
// 传入消息体转为字节数组; 设置匹配到路由的规则(map形式);
Message nameMsg= MessageBuilder.withBody("hello lcccccccc".getBytes()).setHeader("name", "lcc").build();
// 发送到指定交换机; 此时路由无效; 传入消息
rabbitTemplate.send(RabbitHeadersConfig.HEADERNAME, null, nameMsg);
// 构建消息对象
// 传入消息体转为字节数组; 设置匹配到路由的规则(map形式);
Message ageMsg=MessageBuilder.withBody("i am 20".getBytes()).setHeader("age", "22").build();
// 发送到指定交换机; 此时路由无效; 传入消息
rabbitTemplate.send(RabbitHeadersConfig.HEADERNAME, null, ageMsg);
}
}
三、RabbitMQ 选型和对比
1.从社区活跃度
按照目前网络上的资料,RabbitMQ
、activeM
、ZeroMQ
三者中,综合来看,RabbitMQ
是首选。
2.持久化消息比较
ZeroMq
不支持,ActiveMq
和RabbitMq
都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。
3.综合技术实现
可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。
RabbitMq
/ Kafka
最好,ActiveMq
次之,ZeroMq
最差。当然ZeroMq
也可以做到,不过自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。
4.高并发
毋庸置疑,RabbitMQ
最高,原因是它的实现语言是天生具备高并发高可用的erlang
语言。
5.比较关注的比较, RabbitMQ 和 Kafka
RabbitMq
比Kafka
成熟,在可用性上,稳定性上,可靠性上, RabbitMq 胜于 Kafka (理论上)。
另外,Kafka
的定位主要在日志等方面, 因为Kafka
设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq
。
还有就是,Kafka
的性能(吞吐量、TPS
)比RabbitMq
要高出来很多。