SpringBoot整合RabbitMQ

Lou.Chen
大约 10 分钟

Springboot整合RabbitMQ

一、安装RabbitMQ

这里我们使用docker

通信端口 : 5672 管理界面端口 :15672 默认账户密码都是 guest

1、拉取镜像

docker pull rabbitmq:3-management //下载带管理界面的消息队列

2、初始化容器

docker run -d -p 5672:5672 -p 15672:15672 --restart=always --name myrabbitmq 镜像id

3、修改rabbitmq密码
docker exec -it fd4f81cdd3e7 bash  //进入容器
rabbitmqctl  list_users   //查看所有用户 
rabbitmqctl  change_password  guest 'Newpassword' //修改指定用户密码
4、常见命令
新建用户:rabbitmqctl add_user username passwd
删除用户:rabbitmqctl delete_user username
改密码: rabbimqctl change_password {username} {newpassword}
设置用户角色:rabbitmqctl set_user_tags {username} {tag ...}
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"  //添加权限

二、springboot整合RabbitMQ

1、什么是RabbitMQ?

​ RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的高级消息中间件,是应用层协议的一个开放标准。基于Erlang语言,可跨平台。它主要的技术特点是可用性,安全性,集群,多协议支持,可视化的客户端,活跃的社区。

​ AMQP为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

2、基本配置
pom.xml配置

spring-boot-starter-amqp

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.5.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>org.lc</groupId>
	<artifactId>rabbitmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>rabbitmq</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

properties配置
#配置主机地址
spring.rabbitmq.host=47.96.141.44
#通信端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
①直连交换机(Direct)

直连型交换机,根据消息携带的路由键将消息投递给对应队列。

大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。 然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

队列交换机配置:

/**
 * Direct策略 直接指定到发送的队列名称。
 */
@Configuration
public class RabbitDirectConfig {
    /**
     * 定义directExchange的名称
     */
    public final static String DIRECTNAME = "javaMr-direct";

    /**
     * 注意导的包  org.springframework.amqp.core.Queue;
     * 定义队列
     * @return
     */
    @Bean
    Queue queue() {
        return new Queue("hello.lc");
    }


    /**
     * 定义Direct交换机
     * @return
     */
    @Bean
    DirectExchange directExchange() {
        //定义的DirectExchange名称;  durable(持久性)重启后是否依然有效;   autodelete长期未使用是否自动删除
        return new DirectExchange(DIRECTNAME,true,false);
    }

    /**
     * 绑定指定的队列到交换机上并指定策略
     * @return
     */
    @Bean
    Binding binding() {
//        绑定队列到交换机上  并指定路由匹配规则
        return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
    }


}

消费者:

/**
 * direct消费者
 */
@Component
public class DirectReceiver {

    /**
     * 监听指定队列(hello.lc)
     * @param msg
     */
    @RabbitListener(queues = "hello.lc")
    public void handler(String msg) {
        System.out.println(">>>>>>>:"+msg);
    }

}

生产者:

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	/**
	 * direct模式 发送到指定队列
	 */
	@Test
	void directTest() {
//		发送到指定路由规则(默认为队列名)队列;  发送的消息
		rabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECTNAME,"direct", "你好呀 direct!!!");
//		如果不指定发送的交换机名称则直接匹配的routingkey到队列名
//		rabbitTemplate.convertAndSend("hello.lc", "你好呀 direct!!!");
	}
}
②主题交换机(Topic)

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

  • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key与routing key一样也是句点号“. ”分隔的字符串
  • binding key中可以存在两种特殊字符**“ * ”与“#”**,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

队列交换机配置:

/**
 *topic策略  根据指定路径匹配符转发到指定队列
 */
@Configuration
public class RabbitTopicCofig {
    public static final String TOPICNAME = "javaMr-topic";

    @Bean
    Queue xiaomi() {
        return new Queue("xiaomi");
    }
    @Bean
    Queue huawei() {
        return new Queue("huawei");
    }
    @Bean
    Queue iphone() {
        return new Queue("iphone");
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(TOPICNAME, true, false);
    }

    @Bean
    Binding xiaomiBinding() {
//        绑定xiaomi队列到topic交换机,并指定队列访问规则
//        发送匹配的路由规格 以 xiao. 开头的路径
        return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
    }

    @Bean
    Binding huaweiBinding() {
//        绑定huawei队列到topic交换机,并指定队列访问规则
//        发送匹配的路由规格 以 huawei. 开头的路径
        return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
    }

    @Bean
    Binding iphoneBinding() {
//        绑定iphone队列到topic交换机,并指定队列访问规则
//        发送匹配的路由规格 以  .phone.  规则的路径
        return BindingBuilder.bind(iphone()).to(topicExchange()).with("#.iphone.#");
    }

}

消费者配置:

@Component
public class TopicReceiver {

    @RabbitListener(queues = "xiaomi")
    public void handlerXiaomi(String msg) {
        System.out.println("TopicReceiver:handlerXiaomi>>>>>"+msg);
    }
    @RabbitListener(queues = "huawei")
    public void handlerHuawei(String msg) {
        System.out.println("TopicReceiver:handlerHuawei>>>>>"+msg);
    }
    @RabbitListener(queues = "iphone")
    public void handlerIphone(String msg) {
        System.out.println("TopicReceiver:handlerIphone>>>>>"+msg);
    }

}

生成者配置:

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private RabbitTemplate rabbitTemplate;
		/**
	 * topic模式 发送到指定交换机下的路径匹配的队列
	 */
	@Test
	void topicTest() {
//		发送到指定交换机;  匹配路由为 xiaomi.# 的路径-->指定到xiaomi队列;	   发送的消息
		rabbitTemplate.convertAndSend(RabbitTopicCofig.TOPICNAME, "xiaomi.news", "小米手机");
//		发送到指定交换机;  匹配路由为 #.iphone.# 的路径-->指定到iphone队列; 	发送的消息
		rabbitTemplate.convertAndSend(RabbitTopicCofig.TOPICNAME, "iphoneXR.iphone", "iphone手机");
//		发送到指定交换机;  匹配路由为 huawei.# 和  #.iphone.# 的路径-->指定到huawei和iphone队列;	 发送的消息
		rabbitTemplate.convertAndSend(RabbitTopicCofig.TOPICNAME, "huawei.iphone", "A12系列和麒麟系列芯片");
	}

}
③扇形交换机(Fanout)

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

队列交换机配置:

/**
 * fanout策略 转发到该交换机下的所有队列(跟路由规则(routingkey)无关)
 */
@Configuration
public class RabbitFanoutConfig {
    /**
     * 定义交换机名称
     */
    public static final String FANOUTNAME = "javaMr-fanout";

    /**
     * 定义第一个队列
     * @return
     */
    @Bean
    Queue queueOne() {
        return new Queue("queue-one");
    }

    /**
     * 定义第二个队列
     * @return
     */
    @Bean
    Queue queueTwo() {
        return new Queue("queue-two");
    }

    /**
     * 定义Fanout交换机
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUTNAME,true,false);
    }

    /**
     * 绑定第一个队列到交换机
     * @return
     */
    @Bean
    Binding bindingOne() {
        return BindingBuilder.bind(queueOne()).to(fanoutExchange());
    }

    /**
     * 绑定到第二个交换机
     * @return
     */
    @Bean
    Binding bindingTwo() {
        return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
    }
}

消费者配置:

/**
 * 定义消费者
 */
@Component
public class FanoutReceiver {

    @RabbitListener(queues = "queue-one")
    public void handlerOne(String msg) {
        System.out.println("queue-one>>>>>:"+msg);
    }

    @RabbitListener(queues = "queue-two")
    public void handlerTwo(String msg) {
        System.out.println("queue-two>>>>>:"+msg);
    }

}

生产者配置:

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	/**
	 * fanout模式 发送到所有该交换机下的队列
	 */
	@Test
	void fanoutTest(){
		//发送到指定交换机;  这里的routingkey队列关键词会无效;  发送的消息
		rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME, null, "java fanout np!!!!");
	}
}
④头部交换机(Headers)

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

队列交换机配置:

/**
 * headers策略 根据指定头部转发到指定队列(跟路由规则routingkey无关)
 */
@Configuration
public class RabbitHeadersConfig {
    public static final String HEADERNAME = "javaMr-header";

    @Bean
    Queue queueName() {
        return new Queue("name-queue");
    }

    @Bean
    Queue queueAge() {
        return new Queue("age-queue");
    }


    @Bean
    HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERNAME, true, false);
    }

    @Bean
    Binding bindingName() {
        Map<String,Object> map=new HashMap<>();
        map.put("name", "lcc");
//        需要满足发送的消息 携带头部 name为lcc才匹配到该队列
//        只需要匹配该map中的任意一个即可
        return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
//       当有多个键值对时,必须必须匹配所有。
//        return BindingBuilder.bind(queueName()).to(headersExchange()).whereAll(map).match();
    }

    @Bean
    Binding bindingAge() {
//        满足只要有age属性存在 即可匹配到该队列
        return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();
    }

}

消费者配置:

@Component
public class HeadersReceiver {

    @RabbitListener(queues = "name-queue")
    public void handlerName(byte[] msg) {
//        将字节数组解码为字符串对象
        System.out.println("HeaderReceiver:handlerName>>>>>>"+new String(msg,0,msg.length));
    }

    @RabbitListener(queues = "age-queue")
    public void handlerAge(byte[] msg) {
//        将字节数组解码为字符串对象
        System.out.println("HeaderReceiver:handlerAge>>>>>>"+new String(msg,0,msg.length));
    }
}

生产者配置:

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	/**
	 * header模式  发送消息时携带指定的头部信息 满足条件时即转发到指定队列
	 */
	@Test
	void headerTest() {
//		构建消息对象
//		传入消息体转为字节数组;	设置匹配到路由的规则(map形式);
		Message nameMsg= MessageBuilder.withBody("hello lcccccccc".getBytes()).setHeader("name", "lcc").build();
//		发送到指定交换机;	此时路由无效;	传入消息
		rabbitTemplate.send(RabbitHeadersConfig.HEADERNAME, null, nameMsg);

//		构建消息对象
//		传入消息体转为字节数组;	设置匹配到路由的规则(map形式);
		Message ageMsg=MessageBuilder.withBody("i am 20".getBytes()).setHeader("age", "22").build();
//		发送到指定交换机;	此时路由无效;	传入消息
		rabbitTemplate.send(RabbitHeadersConfig.HEADERNAME, null, ageMsg);
	}
}

三、RabbitMQ 选型和对比

1.从社区活跃度

按照目前网络上的资料,RabbitMQactiveMZeroMQ 三者中,综合来看,RabbitMQ 是首选。

2.持久化消息比较

ZeroMq 不支持,ActiveMqRabbitMq 都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。

3.综合技术实现

可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。

RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。当然ZeroMq 也可以做到,不过自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。

4.高并发

毋庸置疑,RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言。

5.比较关注的比较, RabbitMQ 和 Kafka

RabbitMqKafka 成熟,在可用性上,稳定性上,可靠性上, RabbitMqopen in new window 胜于 Kafkaopen in new window (理论上)。

另外,Kafka 的定位主要在日志等方面, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq

还有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出来很多。