腾讯云海外购

深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议(上)

前言

消息队列在现今数据量超大,并发量超高的系统中是十分常用的。本文将会对现时最常用到的几款消息队列框架 ActiveMQ、RabbitMQ、Kafka 进行分析对比。

详细介绍 RabbitMQ 在 Spring 框架下的结构及实现原理,从Producer 端的事务、回调函数(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器进行详细的分析。通过对 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用类型介绍,深入剖析在消息处理各个传输环节中的原理及注意事项。

并举以实例对死信队列、持久化操作进行一一介绍。

目录

一、RabbitMQ 与 AMQP 的关系

二、RabbitMQ 的实现原理

三、RabbitMQ 应用实例

四、Producer 端的消息发送与监控

五、Consumer 端的消息接收与监控

六、死信队列

七、持久化操作

一、RabbitMQ 与 AMQP 的关系

1.1 AMQP简介

AMQP(Advanced Message Queue Protocol 高级消息队列协议)是一个消息队列协议,它支持符合条件的客户端和消息代理中间件(message middleware broker)进行通讯。RabbitMQ 则是 AMQP 协议的实现者,主要用于在分布式系统中信息的存储发送与接收,RabbitMQ 的服务器端用 Erlang 语言编写,客户端支持多种开发语言:Python、.NET、Java、Ruby、C、PHP、ActionScript、XMPP、STOMP 等。

1.2 ActiveMQ、RabbitMQ、Kafka 对比

现在在市场上有 ActiveMQ、RabbitMQ、Kafka 等多个常用的消息队列框架,与其他框架对比起来,RabbitMQ 在易用性、扩展性、高可用性、多协议、支持多语言客户端等方面都有不俗表现。

添加描述

1.2.1 AcitveMQ 特点

ActiveMQ 是 Apache 以 Java 语言开发的消息模型,它完美地支持 JMS(Java Message Service)消息服务,客户端支持 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多种开主发语言,支持OpenWire、Stomp、REST、XMPP、AMQP 等多种协议。ActiveMQ 采用异步消息传递方式,在设计上保证了多主机集群,客户端-服务器,点对点等模式的有效通信。从开始它就是按照 JMS 1.1 和 J2EE 1.4 规范进行开发,实现了消息持久化,XA,事务支撑等功能。经历多年的升级完善,现今已成为 Java 应用开发中主流的消息解决方案。但相比起 RabbitMQ、Kafka 它的主要缺点表现为资源消耗比较大,吞吐量较低,在高并发的情况下系统支撑能力较弱。如果系统全程使用 Java 开发,其并发量在可控范围内,或系统需要支持多种不同的协议,使用 ActiveMQ 可更轻便地搭建起消息队列服务。

1.2.2 Kafka 特点

Kafka 天生是面向分布式系统开发的消息队列,它具有高性能、容灾性、可动态扩容等特点。Kafka 与生俱来的特点在于它会把每个Partition 的数据都备份到不同的服务器当中,并与 ZooKeeper 配合,当某个Broker 故障失效时,ZooKeeper 服务就会将通知生产者和消费者,从备份服务器进行数据恢复。在性能上 Kafka 也大大超越了传统的 ActiveMQ、RabbitMQ ,由于 Kafka 集群可支持动态扩容,在负载量到达峰值时可动态增加新的服务器进集群而无需重启服务。但由于 Kafka 属于分布式系统,所以它只能在同一分区内实现消息有序,无法实现全局消息有序。而且它内部的监控机制不够完善,需要安装插件,依赖ZooKeeper 进行元数据管理。如果系统属于分布式管理机制,数据量较大且并发量难以预估的情况下,建议使用 Kafka 队列。

1.2.3 RabbitMQ 对比

由于 ActiveMQ 过于依赖 JMS 的规范而限制了它的发展,所以 RabbitMQ 在性能和吞吐量上明显会优于 ActiveMQ。

由于上市时间较长,在可用性、稳定性、可靠性上 RabbitMq 会比 Kafka 技术成熟,而且 RabbitMq 使用 Erlang 开发,所以天生具备高并发高可用的特点。而 Kafka 属于分布式系统,它的性能、吞吐量、TPS 都会比 RabbitMq 要强。

二、RabbitMQ 的实现原理

2.1 生产者(Producer)、消费者(Consumer)、服务中心(Broker)之间的关系

首先简单介绍 RabbitMQ 的运行原理,在 RabbitMQ 使用时,系统会先安装并启动 Broker Server,也就是 RabbitMQ 的服务中心。无论是生产者 (Producer),消费者(Consumer)都会通过连接池(Connection)使用 TCP/IP 协议(默认)来与 BrokerServer 进行连接。然后 Producer 会把 Exchange / Queue 的绑定信息发送到 Broker Server,Broker Server 根据 Exchange 的类型逻辑选择对应 Queue ,最后把信息发送到与 Queue 关联的对应 Consumer 。

2.2 交换器(Exchange)、队列(Queue)、信道(Channel)、绑定(Binding)的概念

2.2.1 交换器 Exchange

Producer 建立连接后,并非直接将消息投递到队列 Queue 中,而是把消息发送到交换器 Exchange,由 Exchange 根据不同逻辑把消息发送到一个或多个对应的队列当中。目前 Exchange 提供了四种不同的常用类型:Fanout、Direct、Topic、Header。

·Fanout类型

此类型是最为常见的交换器,它会将消息转发给所有与之绑定的队列上。比如,有N个队列与 Fanout 交换器绑定,当产生一条消息时,Exchange 会将该消息的N个副本分别发给每个队列,类似于广播机制。

·Direct类型

此类型的 Exchange 会把消息发送到 Routing_Key 完全相等的队列当中。多个 Cousumer 可以使用相同的关键字进行绑定,类似于数据库的一对多关系。比如,Producer 以 Direct 类型的 Exchange 推送 Routing_Key 为 direct.key1 的队列,系统再指定多个 Cousumer 绑定 direct.key1。如此,消息就会被分发至多个不同的 Cousumer 当中。

·Topic类型

此类型是最灵活的一种方式配置方式,它可以使用模糊匹配,根据 Routing_Key 绑定到包含该关键字的不同队列中。比如,Producer 使用 Topic类型的 Exchange 分别推送 Routing_Key 设置为 topic.guangdong.guangzhou 、topic.guangdong.shenzhen 的不同队列,Cousumer 只需要把 Routing_Key 设置为 topic.guangdong.# ,就可以把所有消息接收处理。

·Headers类型

该类型的交换器与前面介绍的稍有不同,它不再是基于关键字 Routing_Key 进行路由,而是基于多个属性进行路由的,这些属性比路由关键字更容易表示为消息的头。也就是说,用于路由的属性是取自于消息 Header 属性,当消息 Header 的值与队列绑定时指定的值相同时,消息就会路由至相应的队列中。

2.2.2 Queue 队列

Queue 队列是消息的载体,每个消息都会被投入到 Queue 当中,它包含 name,durable,arguments 等多个属性,name 用于定义它的名称,当 durable(持久化)为 true 时,队列将会持久化保存到硬盘上。反之为 false 时,一旦 Broker Server 被重启,对应的队列就会消失,后面还会有例子作详细介绍。

2.2.3 Channel 通道

当 Broker Server 使用 Connection 连接 Producer / Cousumer 时会使用到信道(Channel),一个 Connection上可以建立多个 Channel,每个 Channel 都有一个会话任务,可以理解为逻辑上的连接。主要用作管理相关的参数定义,发送消息,获取消息,事务处理等。

2.2.4 Binding 绑定

Binding 主要用于绑定交换器 Exchange 与 队列 Queue 之间的对应关系,并记录路由的 Routing-Key。Binding 信息会保存到系统当中,用于 Broker Server 信息的分发依据。

三、RabbitMQ 应用实例

3.1 Rabbit 常用类说明

3.1.1 RabbitTemplate 类

Spring 框架已经封装了 RabbitTemplate 对 RabbitMQ 的绑定、队列发送、接收进行简化管理

方法

说明

void setExchange(String exchange)

设置绑定的 exchange 名称

String getExchange()

获取已绑定的 exchange 名称

void setRoutingKey(String routingKey)

设置绑定的 routingKey

String getRoutingKey()

获取已绑定的 routingKey

void send(String exchange, String routingKey, Message message,CorrelationData data)

以Message方式发送信息到 Broken Server,CorrelationData 为标示符可为空

void convertAndSend(String exchange, String routingKey, Object object, CorrelationData data)

以自定义对象方式发送信息到 Broken Server,系统将自动把 object转换成 Message,CorrelationData 为标示符可为空

Message receive(String queueName, long timeoutMillis)

根据queueuName接收队列发送Message信息

Object receiveAndConvert(String queueName, long timeoutMillis)

根据queueuName接收队列对象信息

void setReceiveTimeout(long receiveTimeout)

设置接收过期时间

void setReplyTimeout(long replyTimeout)

设置重发时间

void setMandatory(boolean mandatory)

开启强制委托模式(下文会详细说明)

void setConfirmCallback(confirmCallback)

绑定消息确认回调方法(下文会详细说明)

void setReturnCallback(returnCallback)

绑定消息退出回调方法(下文会详细说明)

3.2 初探 RabbitMQ

在官网下载并成功安装完 RabbitMQ 后,打开默认路径 http://localhost:15672/#/ 即可看到 RabbitMQ 服务中心的管理界面

3.2.1 Producer 端开发

先在 pom 中添加 RabbitMQ 的依赖,并在 application.yml 中加入 RabbitMQ 帐号密码等信息。此例子,我们尝试使用 Direct 交换器把队列发送到不同的 Consumer。

1 **********************pom *************************  2 <project>  3 .............  4 <dependency>  5 <groupId>org.springframework.boot</groupId>  6 <artifactId>spring-boot-starter-amqp</artifactId>  7 <version>2.0.5.RELEASE</version>  8 </dependency>  9 </project> 10  11 **************** application.yml **************** 12 spring: 13 application: 14 name: rabbitMqProducer 15 rabbitmq: 16 host: localhost  17 port: 5672 18 username: admin 19 password: 12345678 20 virtual-host: /LeslieHost

首先使用 CachingConnectionFactory 建立链接,通过 BindingBuilder 绑定 Exchange、Queue、RoutingKey之间的关系。

然后通过 void convertAndSend (String exchange, String routingKey, Object object, CorrelationData data) 方法把信息发送到 Broken Server

 1 @Configuration  2 public class ConnectionConfig {  3 @Value("${spring.rabbitmq.host}")  4 public String host;  5   6 @Value("${spring.rabbitmq.port}")  7 public int port;  8   9 @Value("${spring.rabbitmq.username}") 10 public String username; 11  12 @Value("${spring.rabbitmq.password}") 13 public String password; 14  15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17  18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29  30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String second="direct.second"; 34 public final static String Exchange_NAME="directExchange"; 35 public final static String RoutingKey1="directKey1"; 36 public final static String RoutingKey2="directKey2"; 37  38 @Bean 39 public Queue queueFirst(){ 40 return new Queue(first); 41 } 42  43 @Bean 44 public Queue queueSecond(){ 45 return new Queue(second); 46 } 47  48 @Bean 49 public DirectExchange directExchange(){ 50 return new DirectExchange(Exchange_NAME,true,true); 51 } 52  53 //利用BindingBuilder绑定Direct与queueFirst 54 @Bean 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57 } 58  59 //利用BindingBuilder绑定Direct与queueSecond 60 @Bean 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){  62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63 }  64 } 65  66 @Controller 67 @RequestMapping("/producer") 68 public class ProducerController { 69 @Autowired 70 private RabbitTemplate template; 71  72 @RequestMapping("/send") 73 public void send() { 74 for(int n=0;n<100;n++){  75  76 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue! "+String.valueOf(n),getCorrelationData()); 77 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue! "+String.valueOf(n),getCorrelationData()); 78 } 79 } 80  81 private CorrelationData getCorrelationData(){ 82 return new CorrelationData(UUID.randomUUID().toString()); 83 } 84 }

此时,打开 RabbitMQ 管理界面,可看到 Producer 已经向 Broken Server 的 direct.first / direct.second 两个 Queue 分别发送100 个 Message

3.2.2 Consumer 端开发

分别建立两个不同的 Consumer ,一个绑定 direct.first 别一个绑定 direct.second , 然后通过注解 @RabbitListener 监听不同的 queue,当接到到 Producer 推送队列时,显示队列信息。

 1 @Configuration  2 public class ConnectionConfig {  3 @Value("${spring.rabbitmq.host}")  4 public String host;  5   6 @Value("${spring.rabbitmq.port}")  7 public int port;  8   9 @Value("${spring.rabbitmq.username}") 10 public String username; 11  12 @Value("${spring.rabbitmq.password}") 13 public String password; 14  15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17  18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29  30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String Exchange_NAME="directExchange"; 34 public final static String RoutingKey1="directKey1"; 35  36 @Bean 37 public Queue queueFirst(){ 38 return new Queue(first); 39 } 40  41 @Bean 42 public DirectExchange directExchange(){ 43 return new DirectExchange(Exchange_NAME); 44 } 45  46 //利用BindingBuilder绑定Direct与queueFirst 47 @Bean 48 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 49 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 50 }  51 } 52  53 @Configuration 54 @RabbitListener(queues="direct.first") 55 public class RabbitMqListener { 56  57 @RabbitHandler 58 public void handler(String message){ 59 System.out.println(message); 60 } 61 } 62  63 @SpringBootApplication 64 public class App { 65  66 public static void main(String[] args){ 67 SpringApplication.run(App.class, args); 68 } 69 }

运行后可以观察到不同的 Consumer 会收到不同队列的消息

如果觉得使用 Binding 代码绑定过于繁琐,还可以直接在监听类RabbitMqListener中使用 @QueueBinding 注解绑定

 1 @Configuration  2 public class ConnectionConfig {  3 @Value("${spring.rabbitmq.host}")  4 public String host;  5   6 @Value("${spring.rabbitmq.port}")  7 public int port;  8   9 @Value("${spring.rabbitmq.username}") 10 public String username; 11  12 @Value("${spring.rabbitmq.password}") 13 public String password; 14  15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17  18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29  30 @Configuration 31 @RabbitListener(bindings=@QueueBinding( 32 exchange=@Exchange(value="directExchange"), 33 value=@Queue(value="direct.second"), 34 key="directKey2")) 35 public class RabbitMqListener { 36  37 @RabbitHandler 38 public void handler(String message){ 39 System.out.println(message); 40 } 41 } 42  43 @SpringBootApplication 44 public class App { 45  46 public static void main(String[] args){ 47 SpringApplication.run(App.class, args); 48 } 49 }

运行结果

由于受到篇幅限制,关于 Producer 与 Consumer 的信息监控及死信队列,持久化操作等内容,将在下面章节详细介绍,敬请期待