1. 消息队列概述

1.1 定义

MQ(Message Queue):消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中 获取消息,因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦,别名为消息中间件。通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2 应用场景

  • 跨系统数据传递
  • 高并发流量削峰
  • 数据分发和异步处理
  • 大数据分析传递
  • 分布式事务

1.3 主流使用

  1. ActiveMQ

ActiveMQ是Apache出品,能力强劲的开源消息系统,它是一个完全支持JMS规范的消息中间件,丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎。

  1. Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

  1. RocketMQ

RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息和可靠传输及事务性做了优化,目前在阿里集团广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

  1. RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全、AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

2. 消息队列协议

2.1 AMQP协议

AMQP(Advanced Message Queuing Protocol)是高级消息队列协议,由摩根大通集团连接其他公司共同设计,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端和消息中间件可传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。

2.2 MQTT协议

MQTT(Meesgae Queuing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。

2.3 OpenMeesage协议

近几年由阿里巴巴、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。

为什么不使用http协议?

(1)因为http请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码等附加功能,但是对于一个消息而言,并不需要这么复杂,只需要数据传递、存储和分发就行,追求的是高性能,尽量简洁和快速。

(2)大部分情况下http是短连接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会进行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息进行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

3. RabbitMQ介绍

3.1 架构设计


  • Broker:服务节点,接收和分发消息的应用。
  • Exchange:交换机,生产者将消息发送到交换机,由交换机将消息路由到一个或多个队列中。如果路由不到,或返回给生产者,或直接丢弃,或做其他处理。
  • Queue:队列,是RabbitMQ的内部对象,用于存储消息。生产者投递消息到队列,消费者从队列中获取消息并消费。
  • RoutingKey:路由key,Exchange将消息发送到Queue的路由匹配规则。

3.2 其他概念

  • Vistual Host:虚拟主机,用于多用户和应用隔离。
  • Connection:publisher/consumer和broker之间的TCP连接。
  • Channel:如果每一次访问RabbitMQ都建立Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的Channel进行通讯。AMQP method包含了channel id帮助客户端和message broker和别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大地减少了操作系统建立TCP Conntection的开销。

3.3 交换机类型

RabbitMQ提供了三种交换机类型:

  1. Direct Exchange:直连交换机,根据RoutingKey精确匹配,消息会被投递到相应队列。
  2. Fanout Exchange:广播交换机,发送消息时不需要声明RoutingKey,消息会被投递到与交换机绑定的所有队列。
  3. Topic Exchange:主题交换机,根据RoutingKey模糊匹配,消息会被投递到相应队列。

4. RabbitMQ安装

安装原生的RabbitMQ需要erlang环境,懒得装了。

4.1 docker安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 挂载目录
$ mkdir -p /home/rabbitmq/data

# 启动容器
$ docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq/data:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=Khighness \
-e RABBITMQ_DEFAULT_PASS=KAG1823 \
rabbitmq

# 查看插件
$ rabbitmq-plugins list
# 启用|禁用插件
$ rabbitmq-plugins enable | disable <plugin>

4.2 web管理

1
2
$ docker exec -it rabbitmq bash
$ rabbitmq-plugins enable rabbitmq_management

登录用户名和密码即为RABBITMQ_DEFAULT_USERRABBITMQ_DEFAULT_PASS

5. RabbitMQ消息模型

官方文档:https://www.rabbitmq.com/getstarted.html

MQ准备:

创建用户(Username)parak,密码(Password)123456,创建虚拟主机(vistual hosts)/learn

引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.2</version>
</dependency>
</dependencies>

日志配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<property name="LOG_HOME" value="log/" />

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/rabbitmq.%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender>

<root level="DEBUG">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE"/>
</root>

</configuration>

5.1 Hello World模型


在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者,消息的接受者,会一直等待消息到来
  • queue:消息队列,图中红色部分,生产者向其中投入消息,消费者在其中获取消息。

构建连接工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static ConnectionFactory connectionFactory;

public static void init() {
// 创建MQ连接工厂
connectionFactory = new ConnectionFactory();
// 设置服务器IP:PORT
connectionFactory.setHost("192.168.117.155");
connectionFactory.setPort(5672);
// 设置连接虚拟主机
connectionFactory.setVirtualHost("/learn");
// 设置用户密码
connectionFactory.setUsername("parak");
connectionFactory.setPassword("123456");
}

生产消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void testSendMessage() throws IOException, TimeoutException {
// 获取连接对象
Connection connection = connectionFactory.newConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 通道绑定消息队列
// 参数1:队列名称,如果队列不存在则自动创建
// 参数2:用来定义队列特性是否要持久化
// 参数3:是否独占队列,只允许当前连接可用
// 参数4:是否在消费完成后自动删除队列
// 参数5:附加参数
channel.queueDeclare("hello", false, false, false, null);
// 发布消息
// 参数1:交换机名称
// 参数2:队列名称
// 参数3:传递的额外设置
// 参数4:消息的具体内容
channel.basicPublish("", "hello", null, "hello, highness".getBytes());
// 关闭通道
channel.close();
// 关闭连接
connection.close();
}

消费消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void testConsumeMessage() throws IOException, TimeoutException {
// 创建连接对象
Connection connection = connectionFactory.newConnection();
// 创建连接通道
Channel channel = connection.createChannel();
// 通道绑定队列
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
// 参数1:消费的队列名称
// 参数2:开始消息的自动确认机制
// 参数3:消费时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
// 参数1:标签
// 参数2:信封
// 参数3:属性
// 参数4:消息队列中取出的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费消息:{}", new String(body));
}
});
// 需要不断接收队列中的消息,所以不关闭通道和连接
}

让队列和消息都持久化

queueDeclare方法的第二个参数需要设置为true

basicPublish方法的第三个参数需要设置为带MessagePropertiesPERSISTENT的属性:

  • MINIMAL_PERSISTENT_BASIC
  • PERSISTENT_BASIC
  • PERSISTENT_TEXT_PLAIN

5.2 Work Queues模型


Work Queues,任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用Work Queues:让多个消费者绑定到一个队列,共同消费消息队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

MQ准备:

  • 创建虚拟主机(vistual host)/work

MQ工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class RabbitMQUtil {
private static volatile ConnectionFactory connectionFactory = null;
private static final String HOST = "192.168.117.155";
private static final int PORT = 5672;
private static final String VIRTUAL_HOST = "/learn";
private static final String USERNAME = "parak";
private static final String PASSWORD = "123456";

public static Connection getConnection() {
if (connectionFactory == null) {
synchronized (RabbitMQUtil.class) {
if (connectionFactory == null) {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
}
}
}
try {
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}

public static void closeConnectionAndChannel(Channel channel, Connection connection) {
try {
if (connection != null)
channel.close();
if (connection != null)
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}

模型演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, new String("message" + (i + 1) + " => hello, khighness").getBytes());
}
RabbitMQUtil.closeConnectionAndChannel(channel, connection);
}
}

// 消费者1
public class Consumer1 {
private static Logger log = LoggerFactory.getLogger(Consumer1.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-1:{}", new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

// 消费者2
public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
}
});
}
}

先运行消费者1和消费者2,最后运行生产者:

1
2
3
4
5
6
7
8
9
10
11
12
# 消费者1
2021-05-24 19:32:52.218 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message1 => hello, khighness
2021-05-24 19:32:53.225 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message3 => hello, khighness
2021-05-24 19:32:54.226 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message5 => hello, khighness
2021-05-24 19:32:55.229 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message7 => hello, khighness
2021-05-24 19:32:56.231 [pool-1-thread-5] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message9 => hello, khighness
# 消费者2
2021-05-24 19:32:52.219 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message2 => hello, khighness
2021-05-24 19:32:52.221 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message4 => hello, khighness
2021-05-24 19:32:52.221 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message6 => hello, khighness
2021-05-24 19:32:52.221 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message8 => hello, khighness
2021-05-24 19:32:52.222 [pool-1-thread-5] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message10 => hello, khighness

总结:默认情况下,RabbitMQ按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息,这种分发消息的方式成为循环。

上面的消息消费时是平均分配,那么如果需要修改的话,就要关闭消息自动确认机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Consumer1 {
private static Logger log = LoggerFactory.getLogger(Consumer1.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
final Channel channel = connection.createChannel();
// 一次只能消费一条消息
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
// 第二个参数,关闭消息自动确认
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-1:{}", new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

重新运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
# consumer1
2021-05-24 20:58:34.534 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message1 => hello, khighness

# consumer2
2021-05-24 20:58:33.531 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message2 => hello, khighness
2021-05-24 20:58:33.535 [pool-1-thread-5] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message3 => hello, khighness
2021-05-24 20:58:33.536 [pool-1-thread-6] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message4 => hello, khighness
2021-05-24 20:58:33.537 [pool-1-thread-7] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message5 => hello, khighness
2021-05-24 20:58:33.538 [pool-1-thread-8] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message6 => hello, khighness
2021-05-24 20:58:33.538 [pool-1-thread-9] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message7 => hello, khighness
2021-05-24 20:58:33.539 [pool-1-thread-10] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message8 => hello, khighness
2021-05-24 20:58:33.540 [pool-1-thread-11] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message9 => hello, khighness
2021-05-24 20:58:33.541 [pool-1-thread-12] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message10 => hello, khighness

5.3 Publish/Subscribe模型


Publish/Subscribe模型,也称为Fanout模型,即发布-订阅模型或者广播模型。

流程:

  • 每个消费者都有自己的队列
  • 每个队列都要绑定交换机
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 交换机把消息发送到绑定过的所有队列
  • 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

模型演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
// 参数1:交换机名称,不存在则自动创建
// 参数2:交换机类型
channel.exchangeDeclare("pubsub", "fanout");
// 发送消息
channel.basicPublish("pubsub", "", null, "khighness's message".getBytes());
RabbitMQUtil.closeConnectionAndChannel(channel, connection);
}
}

// 消费者-1
public class Consumer1 {
private static Logger log = LoggerFactory.getLogger(Consumer1.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("pubsub", "fanout");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue, "pubsub", "");
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("消费者-1:{}", new String(body));
}
});
}
}

// 消费者-2
public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("pubsub", "fanout");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue, "pubsub", "");
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("消费者-2:{}", new String(body));
}
});
}
}

先运行消费者1和消费者2,最后运行生产者:

1
2
3
4
5
# 消费者1
2021-05-24 21:40:58.365 [pool-1-thread-4] INFO top.parak.pubsub.Consumer1 - 消费者-1:khighness's message

# 消费者2
2021-05-24 21:40:58.365 [pool-1-thread-4] INFO top.parak.pubsub.Consumer2 - 消费者-2:khighness's message

5.4 Routing模型

在Fanout模型中,一条消息会被所有订阅的队列都消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用Routing的Direct类型的交换机。

在Direct模型中:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey。
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会收到消息。

图解:


  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key
  • X:交换机,接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key为error的消息
  • C2:消费者,其所在队列指定了需要routing key为info、error、warning的消息

模型演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("routing", "direct");
// 发送消息
channel.basicPublish("routing", "info", null, "direct => info".getBytes());
channel.basicPublish("routing", "debug", null, "direct => debug".getBytes());
channel.basicPublish("routing", "warn", null, "direct => warn".getBytes());
channel.basicPublish("routing", "error", null, "direct => error".getBytes());
RabbitMQUtil.closeConnectionAndChannel(channel, connection);
}
}

// 消费者1
public class Consumer1 {
private static Logger log = LoggerFactory.getLogger(Consumer1.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("routing", "direct");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 基于route key绑定队列和交换机
channel.queueBind(queue, "routing", "info");
// 获取消费的消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-1:{}", new String(body));
}
});
}
}

// 消费者2
public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("routing", "direct");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 基于route key绑定队列和交换机
channel.queueBind(queue, "routing", "debug");
channel.queueBind(queue, "routing", "warn");
channel.queueBind(queue, "routing", "error");
// 获取消费的消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
}
});
}
}

先运行消费者1和消费者2,最后运行生产者:

1
2
3
4
5
6
7
# 消费者1
2021-05-24 22:47:26.753 [pool-1-thread-4] DEBUG top.parak.direct.Consumer1 - 消费者-1:direct => info

# 消费者2
2021-05-24 22:47:26.753 [pool-1-thread-4] DEBUG top.parak.direct.Consumer2 - 消费者-2:direct => debug
2021-05-24 22:47:26.756 [pool-1-thread-4] DEBUG top.parak.direct.Consumer2 - 消费者-2:direct => warn
2021-05-24 22:47:26.756 [pool-1-thread-4] DEBUG top.parak.direct.Consumer2 - 消费者-2:direct => error

5.5 Topics模型

Topics模型与Direct模型相比,都是可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符。


Topics模型的RoutingKey是由一个或者多个单词组成,多个单词之间以.分隔,有两种通配符:

  • *:匹配一个单词
  • #:匹配多个单词

模型演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics", "topic");
channel.basicPublish("topics", "k.a", null, "topics => k.a".getBytes());
channel.basicPublish("topics", "k.a.g", null, "topics => k.a.g".getBytes());
RabbitMQUtil.closeConnectionAndChannel(channel, connection);
}
}

// 消费者1
public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics", "topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "topics", "k.#");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
}
});
}
}
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 基于route key绑定队列和交换机
channel.queueBind(queue, "routing", "debug");
channel.queueBind(queue, "routing", "warn");
channel.queueBind(queue, "routing", "error");
// 获取消费的消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
}
});
}
}

先运行消费者1和消费者2,最后运行生产者:

1
2
3
4
5
6
# 消费者1
2021-05-24 23:14:53.574 [pool-1-thread-4] DEBUG top.parak.topics.Consumer1 - 消费者-1:topics => k.a

# 消费者2
2021-05-24 23:14:53.573 [pool-1-thread-4] DEBUG top.parak.topics.Consumer2 - 消费者-2:topics => k.a
2021-05-24 23:14:53.578 [pool-1-thread-4] DEBUG top.parak.topics.Consumer2 - 消费者-2:topics => k.a.g

6. RabbitMQ整合SpringBoot

在使用SpringBoot的AMQP API之前,一定要理解消息的处理顺序:

生产的消息先到达交换机,然后经过routingKey匹配,到达绑定的队列中,等待消费。

我们使用SpringBoot整合RabbitMQ的过程中,一般顺序如下:

  1. 在配置类中注册相应的交换机、队列,同时通过路由进行绑定;
  2. 编写消息生产者,生产者发送消息时需要填入参数:交换机、路由和消息;
  3. 编写消息消费者,消费者需要监听相应的队列。

创建项目,引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<properties>
<spring.boot.version>2.2.2.RELEASE</spring.boot.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependencies>

项目配置application.properties

1
2
3
4
5
6
7
server.port=3333
spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=192.168.117.155
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/learn
spring.rabbitmq.username=parak
spring.rabbitmq.password=KAG1823

6.1 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

public static final String DIRECT_EXCHANGE = "parak.exchange.direct";
public static final String FANOUT_EXCHANGE = "parak.exchange.fanout";
public static final String TOPIC_EXCHANGE = "parak.exchange.topic";
public static final String HEADER_EXCHANGE = "parak.exchange.header";

public static final String HELLO_QUEUE = "parak.queue.hello";
public static final String WORK_QUEUE = "parak.queue.work1";
public static final String DIRECT_QUEUE1 = "parak.queue.direct1";
public static final String DIRECT_QUEUE2 = "parak.queue.direct2";
public static final String FANOUT_QUEUE1 = "parak.queue.fanout1";
public static final String FANOUT_QUEUE2 = "parak.queue.fanout2";
public static final String TOPIC_QUEUE1 = "parak.queue.topic1";
public static final String TOPIC_QUEUE2 = "parak.queue.topic2";
public static final String HEADER_QUEUE = "parak.queue.header";

public static final String DEFAULT_ROUTING_KEY = "";
public static final String HELLO_ROUTING_KEY = "hello";
public static final String WORK_ROUTING_KEY = "work";
public static final String DIRECT_ROUTING_KEY1 = "k";
public static final String DIRECT_ROUTING_KEY2 = "a";
public static final String TOPIC_ROUTING_KEY1 = "k.a.*";
public static final String TOPIC_ROUTING_KEY2 = "k.a.#";

/**
* 1. Direct Exchange:直连交换机,精确匹配路由,推送到相应队列
*/
@Bean("directExchange")
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}

@Bean("helloQueue")
public Queue helloQueue() {
return new Queue(HELLO_QUEUE, true);
}

@Bean("workQueue")
public Queue workQueue() {
return new Queue(WORK_QUEUE, true);
}

@Bean("directQueue1")
public Queue directQueue1() {
return new Queue(DIRECT_QUEUE1, true);
}

@Bean("directQueue2")
public Queue directQueue2() {
return new Queue(DIRECT_QUEUE2, true);
}

@Bean
public Binding directBinding1(@Qualifier("helloQueue") Queue queue, @Qualifier("directExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(HELLO_ROUTING_KEY);
}

@Bean
public Binding directBinding2(@Qualifier("workQueue") Queue queue, @Qualifier("directExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(WORK_ROUTING_KEY);
}

@Bean
public Binding directBinding3(@Qualifier("directQueue1") Queue queue, @Qualifier("directExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING_KEY1);
}

@Bean
public Binding directBinding4(@Qualifier("directQueue2") Queue queue, @Qualifier("directExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING_KEY2);
}

/**
* 2. Fanout Exchange: 广播交换机,不需要路由,发送到所有绑定过的队列队列
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}

@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE1, true);
}

@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE2, true);
}

@Bean
public Binding fanoutBinding1(@Qualifier("fanoutQueue1") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

@Bean
public Binding fanoutBinding2(@Qualifier("fanoutQueue2") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

/**
* 3. Topic Exchange:主题交换机,模糊匹配路由,推送到相应队列
*/
@Bean("topicExchange")
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}

@Bean("topicQueue1")
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, true);
}

@Bean("topicQueue2")
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}

@Bean
public Binding topicBinding1(@Qualifier("topicQueue1") Queue queue, @Qualifier("topicExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(topicExchange()).with(TOPIC_ROUTING_KEY1);
}

@Bean
public Binding topicBinding2(@Qualifier("topicQueue2") Queue queue, @Qualifier("topicExchange") TopicExchange exchange) {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY2);
}

/**
* 4. Header Exchange: 头部交换机,根据header匹配,推送到相应队列
*/
@Bean("headersExchange")
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADER_EXCHANGE);
}

@Bean("headerQueue")
public Queue headerQueue() {
return new Queue(HEADER_QUEUE, true);
}

@Bean
public Binding headerBinding(@Qualifier("headerQueue") Queue queue, @Qualifier("headersExchange") HeadersExchange exchange) {
Map<String, Object> map = new HashMap<>();
map.put("signature", "Khighness");
return BindingBuilder.bind(queue).to(exchange).whereAny(map).match();
}

}

6.2 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/rabbitmq/send")
public class MQController {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@Autowired
private AmqpTemplate amqpTemplate;

/**
* 双端模型, 使用routing名称匹配queue名称,默认相等匹配
* @see <a href="http://localhost:3333/rabbitmq/send/hello?msg=Khighness">hello</a>
*/
@RequestMapping("/hello")
@ResponseStatus(HttpStatus.OK)
public String sendHello(@RequestParam("msg") String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.HELLO_ROUTING_KEY, message);
log.info("『Hello』 send message: [{}]", message);
return "success";
}

/**
* 工作模型,多消费者,消费者轮询消费
* @see <a href="http://localhost:3333/rabbitmq/send/work?msg=Khighness">work</a>
*/
@RequestMapping("/work")
@ResponseStatus(HttpStatus.OK)
public String sendWork(@RequestParam("msg") String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.WORK_ROUTING_KEY, message);
log.info("『Work』 send message: [{}]", message);
return "success";
}

/**
* 路由模型,根据routingKey精确匹配,routingKey未匹配的消息会丢失
* @see <a href="http://localhost:3333/rabbitmq/send/direct?key=k&msg=Khighness">k</a>
* @see <a href="http://localhost:3333/rabbitmq/send/direct?key=a&msg=Khighness">a</a>
*/
@RequestMapping("/direct")
@ResponseStatus(HttpStatus.OK)
public String sendDirect(@RequestParam("key") String routingKey, @RequestParam("msg") String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, routingKey, message);
log.info("『Direct』 send message: [{}], with key: [{}]", message, routingKey);
return "success";
}

/**
* 广播模型,不需要routingKey,消息推送到交换机绑定的所有队列
* @see <a href="http://localhost:3333/rabbitmq/send/fanout?msg=Khighness">fanout</a>
*/
@RequestMapping("/fanout")
@ResponseStatus(HttpStatus.OK)
public String sendFanout(@RequestParam("msg") String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, message);
log.info("『Fanout』 send message: [{}]", message);
return "success";
}

/**
* 主题模型,根据routingKey模糊匹配,推送到相应队列,routingKey未匹配的消息会丢失
* @see <a href="http://localhost:3333/rabbitmq/send/topic?key=k.a.g&msg=Khighness">k.a.g</a>
* @see <a href="http://localhost:3333/rabbitmq/send/topic?key=k.a.g.c&msg=Khighness">k.a.g.c</a>
*/
@RequestMapping("/topic")
@ResponseStatus(HttpStatus.OK)
public String sendTopic(@RequestParam("key") String routingKey, @RequestParam("msg") String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routingKey, message);
log.info("『Topic』 send message: [{}], with key: [{}]", message, routingKey);
return "success";
}

/**
* 头部模型,不需要routingKey,根据header字段匹配,未匹配的消息会丢失
* @see <a href="http://localhost:3333/rabbitmq/send/header?sign=Khighness&msg=Khighness">Khighness</a>
* @see <a href="http://localhost:3333/rabbitmq/send/header?sign=follwerK&msg=Khighness">flowerK</a>
*/
@RequestMapping("/header")
@ResponseStatus(HttpStatus.OK)
public String sendHeader(@RequestParam("sign") String signature, @RequestParam("msg") String message) {
MessageProperties properties = new MessageProperties();
properties.setHeader("signature", signature);
Message obj = new Message(message.getBytes(), properties);
amqpTemplate.convertAndSend(RabbitMQConfig.HEADER_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, obj);
log.info("『Header』 send message: [{}], with sign: [{}]", message, signature);
return "success";
}

}

6.3 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MQReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = RabbitMQConfig.HELLO_QUEUE)
public void receiveHello(String message) {
log.info("『Hello』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.WORK_QUEUE)
public void receiveWork1(String message) {
log.info("『Work-1』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.WORK_QUEUE)
public void receiveWork2(String message) {
log.info("『Work-2』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE1)
public void receiveDirect1(String message) {
log.info("『Direct-1』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE1)
public void receiveDirect2(String message) {
log.info("『Direct-2』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE1)
public void receiveFanout1(String message) {
log.info("『Fanout-1』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE2)
public void receiveFanout2(String message) {
log.info("『Fanout-2』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE1)
public void receiveTopic1(String message) {
log.info("『Topic-1』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE2)
public void receiveTopic2(String message) {
log.info("『Topic-2』 receive message: [{}]", message);
}

@RabbitListener(queues = RabbitMQConfig.HEADER_QUEUE)
public void receiveHeader(String message) {
log.info("『Header』 receive message: [{}]", message);
}
}

6.4 启动类

1
2
3
4
5
6
7
8
9
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class, args);
}
}

7. RabbitMQ死信队列

7.1 死信队列

消费消息时,如果出现以下情况,:

  1. 消息被否定确认,使用channel.basicNack或者channel.basicReject,并且此时requeue被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

那么消息被称为死信(Dead Letter)。

7.2 实现原理

步骤:

  1. 声明业务交换机,配置业务路由,绑定业务队列;
  2. 声明死信交换机,配置死信路由,绑定死信队列;
  3. 为业务队列绑定死信路由和死信交换机。

一个项目声明一个死信交换机即可,可以为任意类型【Direct、Fanout、Topic】绑定多个业务队列,为每个业务队列分配一个单独的路由key。

7.3 具体编码

创建项目,引入依赖。

项目配置application.properties

1
2
3
4
5
6
7
8
9
server.port=5555
spring.application.name=springboot-deadletter
spring.rabbitmq.host=192.168.117.155
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/learn
spring.rabbitmq.username=parak
spring.rabbitmq.password=KAG1823
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.acknowledge-mode=manual

创建配置类DeadLetterConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DeadLetterConfig {
public static final String BUSINESS_EXCHANGE = "parak.dead.letter.business.exchange";
public static final String BUSINESS_QUEUE1 = "parak.dead.letter.business.queue1";
public static final String BUSINESS_QUEUE2 = "parak.dead.letter.business.queue2";
public static final String DEAD_LETTER_EXCHANGE = "parak.dead.letter.death.exchange";
public static final String DEAD_LETTER_QUEUE1 = "parak.dead.letter.death.queue1";
public static final String DEAD_LETTER_QUEUE2 = "parak.dead.letter.death.queue2";
public static final String DEAD_LETTER_ROUTING_KEY1 = "parak.dead.letter.death.key1";
public static final String DEAD_LETTER_ROUTING_KEY2 = "parak.dead.letter.death.key2";

/**
* 声明业务交换机
*/
@Bean("businessExchange")
public FanoutExchange businessExchange() {
return new FanoutExchange(BUSINESS_EXCHANGE);
}

@Bean("businessQueue1")
public Queue businessQueue1() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY1);
return QueueBuilder.durable(BUSINESS_QUEUE1).withArguments(args).build();
}

@Bean("businessQueue2")
public Queue businessQueue2() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY2);
return QueueBuilder.durable(BUSINESS_QUEUE2).withArguments(args).build();
}

@Bean
public Binding businessBinding1(@Qualifier("businessQueue1") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

@Bean
public Binding businessBinding2(@Qualifier("businessQueue2") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

/**
* 声明死信交换机
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}

@Bean("deadLetterQueue1")
public Queue deadLetterQueue1() {
return new Queue(DEAD_LETTER_QUEUE1);
}

@Bean("deadLetterQueue2")
public Queue deadLetterQueue2() {
return new Queue(DEAD_LETTER_QUEUE2);
}

@Bean
public Binding deadLetterBinding1(@Qualifier("deadLetterQueue1") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY1);
}

@Bean
public Binding deadLetterBinding2(@Qualifier("deadLetterQueue2") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY1);
}

}

上述类中配置了两个交换机:

  • 业务交换机
  • 死信交换机

并且分别为两个交换机绑定了两个队列。

创建业务消息的消费者BusinessMessageReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class BusinessMessageReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = DeadLetterConfig.BUSINESS_QUEUE1)
public void receiveBusiness1(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("『business-1』 receive message: [{}]", msg);
boolean ack = true;
Exception exception = null;
try {
if (msg.contains("dead-letter")) {
throw new RuntimeException("Dead Letter");
}
} catch (Exception e) {
ack = false;
exception = e;
}
if (!ack) {
log.error("consume message occur exception: [{}]", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

@RabbitListener(queues = DeadLetterConfig.BUSINESS_QUEUE2)
public void reveiveBusiness2(Message message, Channel channel) throws IOException {
log.info("『business-2』 receive message: [{}]", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

创建死信消息的消费者DeadLetterMessageReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class DeadLetterMessageReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE1)
public void receiveDeadLetter1(Message message, Channel channel) throws IOException {
log.info("『dead letter-1』 receive message: [{}]", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

@RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE2)
public void receiveDeadLetter2(Message message, Channel channel) throws IOException {
log.info("『dead letter-2』 receive message: [{}]", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

封装业务消息发送者BusinessMessageSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.parak.rabbitmq.RabbitMQConfig;

@Component
public class BusinessMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg(String msg) {
rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, msg);
}

}

创建接口用于生产消息MessageController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/rabbitmq/send")
@RestController
public class MessageController {
@Autowired
private BusinessMessageSender businessMessageSender;

/**
* 发送延时消息,按照类型
* @see <a href="http://localhost:7777/rabbitmq/send/delay?msg=khighness&type=1">delay 3s</a>
* @see <a href="http://localhost:7777/rabbitmq/send/delay?msg=khighness&type=2">delay 10s</a>
*/
@RequestMapping("/delay")
@ResponseStatus(HttpStatus.OK)
public String sendMsg(@RequestParam("msg") String message, @RequestParam("type") Integer type) {
DelayType delayType = DelayType.getDelayType(type);
if (delayType == null)
throw new IllegalArgumentException("type is in [1, 2]");
log.info("now: [{}], receive message: [{}], type: [{}]", new Date(), message, delayType);
delayMessageSender.sendMsg(message, delayType);
return "success";
}
}

编写启动类并运行,访问浏览器进行测试:

(1)访问:http://localhost:5555/rabbitmq/send/dead?msg=khighness

1
2
2021-09-19 20:58:58.857  INFO 10120 --- [ntContainer#9-1] top.parak.death.BusinessMessageReceiver  : 『business-2』 receive message: [khighness]
2021-09-19 20:58:58.858 INFO 10120 --- [tContainer#10-1] top.parak.death.BusinessMessageReceiver : 『business-1』 receive message: [khighness]

(2)访问:http://localhost:5555/rabbitmq/send/dead?msg=dead-letter-kkk

1
2
3
4
5
6
7
8
9
10
2021-09-19 21:07:23.462  INFO 10120 --- [ntContainer#9-1] top.parak.death.BusinessMessageReceiver  : 『business-2』 receive message: [dead-letter]
2021-09-19 21:07:23.462 INFO 10120 --- [tContainer#10-1] top.parak.death.BusinessMessageReceiver : 『business-1』 receive message: [dead-letter]
2021-09-19 21:07:23.462 ERROR 10120 --- [tContainer#10-1] top.parak.death.BusinessMessageReceiver : consumer message occur exception: [Dead Letter]

java.lang.RuntimeException: Dead Letter
...

2021-09-19 21:07:23.465 INFO 10120 --- [tContainer#12-1] t.parak.death.DeadLetterMessageReceiver : 『dead letter-1』 receive message: [dead-letter]
2021-09-19 21:07:23.465 INFO 10120 --- [tContainer#11-1] t.parak.death.DeadLetterMessageReceiver : 『dead letter-2』 receive message: [dead-letter]

7.4 消息变化

死信消息被丢到死信队列,会发生什么变化呢?

如果队列配置了参数x-dead-letter-routing-key的话,死信的路由key将会被替换成该参数对应的值。如果没有配置,则保留该消息原有的路由key。

举个🌰:

如果原有消息的路由key是K,被发送到业务交换机中,然后被投递到业务队列Queue1中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keyK,如果配置了该参数,并且值设置为A,那么该消息成为死信后,路由key会被替换为A,然后被抛到死信交换机中。

另外,由于被抛到了死信交换机,所以消息的交换机名称也会被替换为死信交换机的名称。

而消息的Header中,也会添加很多奇奇怪怪的字段,修改死信队列的消费者代码,添加一行日志输出:

1
log.info("dead letter properties: [{}]", message.getMessageProperties());

再次测试,可以得到死信消息Header中被添加的信息:

1
2021-09-19 21:53:41.172  INFO 5628 --- [tContainer#12-1] t.parak.death.DeadLetterMessageReceiver  : dead letter properties: [MessageProperties [headers={x-first-death-exchange=parak.dead.letter.business.exchange, x-death=[{reason=rejected, count=1, exchange=parak.dead.letter.business.exchange, time=Sun Sep 19 21:09:38 CST 2021, routing-keys=[], queue=parak.dead.letter.business.queue1}], x-first-death-reason=rejected, x-first-death-queue=parak.dead.letter.business.queue1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=parak.dead.letter.death.exchange, receivedRoutingKey=parak.dead.letter.death.key1, deliveryTag=1, consumerTag=amq.ctag-DmBuSjljWVkbuaFJvspHyw, consumerQueue=parak.dead.letter.death.queue1]]

简要说明一下Header中的值:

字段 含义
x-first-death-exchange 第一次被抛入的死信交换机的名称
x-first-death-reason 第一次成为死信的原因:
(1)rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected参数被设置为false
(2)expired:消息过期
(3)maxlen:队列内消息数量超过队列最大容量
x-first-death-queue 第一次称为死信前所在队列名称
x-death 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新。

7.5 应用场景

一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在消息错误导致处理异常、处理过程中参数校验异常或者因网络波动导致的查询异常等等。当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。

通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清除问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据好太多了。

7.6 生命周期

总结一下死信消息的生命周期:

  1. 业务消息被投入业务队列;
  2. 消费者消费业务队列的消息,由于处理过程中产生了异常,于是进行nack或者reject操作;
  3. 被nack或reject的消息由RabbitMQ投递到死信交换机中;
  4. 死信交换机将消息投递到相应的死信队列中;
  5. 死信队列的消费者消费死信消息。

死信队列并没有特殊的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,只不过专门用于处理死信。

8. RabbitMQ延时队列

8.1 延时队列

延时队列:队列内部元素有序,并且每个元素带时间属性。

元素需要在指定时间被取出和处理,通常来说是消息或任务。

8.2 使用场景

考虑以下场景:

  • 用户下单后,订单十分钟内未支付则自动取消;
  • 用户注册成功后,三天未登录则进行短信提醒;
  • 预定会议后,预约时间前十分钟通知人员参会。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点内完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;发生用户注册事件,三天后检查新注册用户的活动数据,然后通知没有任何活动记录的用户。

以上场景很容易想到使用定时任务+轮询解决,一直轮询数据,每秒查询一次,取出需要被处理的数据。每天晚上跑个定时任务检查所有未支付的订单,在数据量比较少的情况这么做没有太大的毛病。但是对于数据量比较大,并且时效性较强的场景,比如:秒杀场景下订单十分钟未支付则关闭,在活动期间的订单数据量庞大的情况下进行去轮询数据库,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来巨大的压力,无法满足业务要求并且性能低下。

而将延时队列应用到上述场景,保证性能的同时还能保持优雅。

8.3 RabbitMQ-TTL

这里需要介绍一下RabbitMQ的高级特性:TTL(Time to Live)。

TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

设置TTL属性有两种方式:

(1)创建队列的时候设置队列的x-message-ttl属性

1
2
3
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。

(2)针对每条消息设置TTL最大存活时间

1
2
3
4
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, message.getBytes());

但是这两种方式还是区别的,如果设置了队列的TTL属性,那么消息一旦过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意:

  • 如果不设置TTL,表示消息永远不会过期。
  • 如果设置TTL=0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

8.4 实现原理

延时队列,即将消息延迟一定时间进行处理,TTL刚好能让消息在延迟一定时间成为死信,另一方面,成为死信的消息都会被投递到死信队列中,这样只需要消费者一直消费死信队列中的消息即可。

消息流向如下:


生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingKey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingKey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

8.5 具体编码

创建项目,引入依赖。

项目配置application.properties

1
2
3
4
5
6
7
8
9
server.port=6666
spring.application.name=springboot-delayqueue
spring.rabbitmq.host=192.168.117.155
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/learn
spring.rabbitmq.username=parak
spring.rabbitmq.password=KAG1823
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.acknowledge-mode=manual

编写配置类DelayQueueConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayQueueConfig {
public static final String DELAY_EXCHANGE = "parak.delay.queue.business.exchange";
public static final String DELAY_QUEUE1 = "parak.delay.queue.business.queue1";
public static final String DELAY_QUEUE2 = "parak.delay.queue.business.queue2";
public static final String DELAY_QUEUE_ROUTING_KEY1 = "parak.delay.queue.business.key1";
public static final String DELAY_QUEUE_ROUTING_KEY2 = "parak.delay.queue.business.key2";
public static final String DEAD_LETTER_EXCHANGE = "parak.delay.queue.deadletter.exchange";
public static final String DEAD_LETTER_QUEUE1 = "parak.delay.queue.deadletter.queue1";
public static final String DEAD_LETTER_QUEUE2 = "parak.delay.queue.deadletter.queue2";
public static final String DEAD_LETTER_ROUTING_KEY1 = "parak.delay.queue.deadletter.key1";
public static final String DEAD_LETTER_ROUTING_KEY2 = "parak.delay.queue.deadletter.key2";

/**
* 声明延时交换机
*/
@Bean("delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}

@Bean("delayQueue1")
public Queue delayQueue1() {
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY1);
map.put("x-message-ttl", 3000);
return QueueBuilder.durable(DELAY_QUEUE1).withArguments(map).build();
}

@Bean("delayQueue2")
public Queue delayQueue2() {
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY2);
map.put("x-message-ttl", 10000);
return QueueBuilder.durable(DELAY_QUEUE2).withArguments(map).build();
}

@Bean
public Binding delayBinding1(@Qualifier("delayQueue1") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY1);
}

@Bean
public Binding delayBinding2(@Qualifier("delayQueue2") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY2);
}

/**
* 声明死信交换机
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}

@Bean("deadLetterQueue1")
public Queue deadLetterQueue1() {
return new Queue(DEAD_LETTER_QUEUE1);
}

@Bean("deadLetterQueue2")
public Queue deadLetterQueue2() {
return new Queue(DEAD_LETTER_QUEUE2);
}

@Bean
public Binding deadLetterBinding1(@Qualifier("deadLetterQueue1") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY1);
}

@Bean
public Binding deadLetterBinding2(@Qualifier("deadLetterQueue2") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY2);
}

}

在配置类中注册了两个延迟队列和两个死信队列。

创建死信消息消费者DeadLetterMessageReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

@Component
public class DeadLetterMessageReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = DelayQueueConfig.DEAD_LETTER_QUEUE1)
public void receiveDeadLetter1(Message message, Channel channel) throws IOException {
log.info("now: [{}], 『dead letter-1』 receive message: [{}]", new Date(), new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

@RabbitListener(queues = DelayQueueConfig.DEAD_LETTER_QUEUE2)
public void receiveDeadLetter2(Message message, Channel channel) throws IOException {
log.info("now: [{}], 『dead letter-2』 receive message: [{}]", new Date(), new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

}

创建延时类型DelayType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public enum DelayType {

DELAY_3_SECONDS(1, 3000),
DELAY_10_SECONDS(2, 10000);

private final int type;
private final int delayTime;

DelayType(int type, int delayTime) {
this.type = type;
this.delayTime = delayTime;
}

public int getType() {
return type;
}

public int getDelayTime() {
return delayTime;
}

public static DelayType getDelayType(int type) {
for (DelayType delayType : DelayType.values()) {
if (type == delayType.getType()) {
return delayType;
}
}
return null;
}
}

创建消息发送者DelayMessageSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg(String msg, DelayType type) {
switch (type) {
case DELAY_3_SECONDS:
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY1, msg);
break;
case DELAY_10_SECONDS:
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY2, msg);
break;
}
}
}

创建接口用于生产消息MessageController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RequestMapping("/rabbitmq/send")
@RestController
public class MessageController {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@Autowired
private DelayMessageSender delayMessageSender;

/**
* 发送延时消息
* @see <a href="http://localhost:7777/rabbitmq/send/delay?msg=khighness&type=1">delay 3s</a>
* @see <a href="http://localhost:7777/rabbitmq/send/delay?msg=khighness&type=2">delay 10s</a>
*/
@RequestMapping("/delay")
@ResponseStatus(HttpStatus.OK)
public String sendMsg(@RequestParam("msg") String message, @RequestParam("type") Integer type) {
DelayType delayType = DelayType.getDelayType(type);
if (delayType == null)
throw new IllegalArgumentException("type is in [1, 2]");
log.info("now: [{}], receive message: [{}], type: [{}]", new Date(), message, delayType);
delayMessageSender.sendMsg(message, delayType);
return "success";
}
}

编写启动类并运行,访问浏览器进行测试:

(1)访问:http://localhost:7777/rabbitmq/send/delay?msg=khighness&type=1

1
2
2021-09-21 10:15:18.662  INFO 14652 --- [nio-7777-exec-2] top.parak.delay.MessageController        : now: [Tue Sep 21 10:15:18 CST 2021], receive message: [Khighness], type: [DELAY_3_SECONDS]
2021-09-21 10:15:21.666 INFO 14652 --- [ntContainer#0-1] t.parak.delay.DeadLetterMessageReceiver : now: [Tue Sep 21 10:15:21 CST 2021], 『dead letter-1』 receive message: [Khighness]

(2)访问:http://localhost:7777/rabbitmq/send/delay?msg=khighness&type=2

1
2
3

2021-09-21 10:15:26.273 INFO 14652 --- [nio-7777-exec-3] top.parak.delay.MessageController : now: [Tue Sep 21 10:15:26 CST 2021], receive message: [Khighness], type: [DELAY_10_SECONDS]
2021-09-21 10:15:36.276 INFO 14652 --- [ntContainer#1-1] t.parak.delay.DeadLetterMessageReceiver : now: [Tue Sep 21 10:15:36 CST 2021], 『dead letter-2』 receive message: [Khighness]

第一条消息在3S后变成死信消息,然后被消费者消费掉,第二条消息在10S之后变成死信消息。

如果这样使用的话,每增加一个新的时间需求,就要新增一个队列,那么对于预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

8.6 队列优化

为了实现一种更通用的方案,那么而只能将消息设置在消息属性里了。

DelayQueueConfig中增加一个延时队列,不设置TTL,用于接收任意延时市场的消息,增加一个相应的死信队列和路由Key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static final String DELAY_QUEUE3 = "parak.delay.queue.business.queue3";
public static final String DELAY_QUEUE_ROUTING_KEY3 = "parak.delay.queue.business.key3";
public static final String DEAD_LETTER_QUEUE3 = "parak.delay.queue.deadletter.queue3";
public static final String DEAD_LETTER_ROUTING_KEY3 = "parak.delay.queue.deadletter.key3";

// 不设置TTL
@Bean("delayQueue3")
public Queue delayQueue3() {
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY3);
return QueueBuilder.durable(DELAY_QUEUE3).withArguments(map).build();
}

@Bean
public Binding delayBinding3(@Qualifier("delayQueue3") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY3);
}

@Bean("deadLetterQueue3")
public Queue deadLetterQueue3() {
return new Queue(DEAD_LETTER_QUEUE3);
}

@Bean
public Binding deadLetterBinding3(@Qualifier("deadLetterQueue3") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY3);
}

DeadLetterMessageReceiver中增加一个死信队列的消费者:

1
2
3
4
5
@RabbitListener(queues = DelayQueueConfig.DEAD_LETTER_QUEUE3)
public void receiveDeadLetter3(Message message, Channel channel) throws IOException {
log.info("now: [{}], 『dead letter-3』 receive message: [{}]", new Date(), new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

DelayMessageSender中封装一个方法发送指定延时时间的消息:

1
2
3
4
5
6
public void sendMsg(String msg, long delayTime) {
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY3, msg, m -> {
m.getMessageProperties().setExpiration(Long.toString(delayTime));
return m;
});
}

MessageController中增加一个接口用于发送延时消息:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 发送延时消息,按照时间
* @see <a href="http://localhost:7777/rabbitmq/send/time?msg=khighness&time=1000">delay 3s</a>
* @see <a href="http://localhost:7777/rabbitmq/send/time?msg=khighness&time=5000">delay 5s</a>
*/
@RequestMapping("/time")
@ResponseStatus(HttpStatus.OK)
public String sendMsg(@RequestParam("msg") String message, @RequestParam("time") Long time) {
log.info("now: [{}], receive message: [{}], time: [{}]", new Date(), message, time);
delayMessageSender.sendMsg(message, time);
return "success";
}

重新运行启动类,访问浏览器进行测试:
(1)访问:http://localhost:7777/rabbitmq/send/time?msg=khighness&time=1000

1
2
2021-09-21 12:35:56.532  INFO 13876 --- [nio-7777-exec-6] top.parak.delay.MessageController        : now: [Tue Sep 21 12:35:56 CST 2021], receive message: [khighness], time: [1000]
2021-09-21 12:35:57.539 INFO 13876 --- [ntContainer#0-1] t.parak.delay.DeadLetterMessageReceiver : now: [Tue Sep 21 12:35:57 CST 2021], 『dead letter-3』 receive message: [khighness]

(2)访问:http://localhost:7777/rabbitmq/send/time?msg=khighness&time=5000

1
2
2021-09-21 12:36:00.917  INFO 13876 --- [nio-7777-exec-7] top.parak.delay.MessageController        : now: [Tue Sep 21 12:36:00 CST 2021], receive message: [khighness], time: [5000]
2021-09-21 12:36:05.921 INFO 13876 --- [ntContainer#0-1] t.parak.delay.DeadLetterMessageReceiver : now: [Tue Sep 21 12:36:05 CST 2021], 『dead letter-3』 receive message: [khighness]

8.7 插件实现

如果不能实现在消息粒度上添加TTL,并使其在设置的TTL时间内即时即时死亡,就无法设计一个通用的延时队列。

这时使用RabbitMQ的插件即可解决:https://www.rabbitmq.com/community-plugins.html

延时插件安装

  1. 下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  2. 复制到容器:

    1
    $ docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/plugins
  3. 激活插件:

    1
    2
    $ docker exec -it rabbitmq bash
    $ rabbitmq-plugins enable rabbitmq_delayed_message_exchange

打开Web控制台,可以看到有新的交换机类型:

DelayQueueConfig中创建新的交换机和队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static final String DELAYED_MESSAGE_EXCHANGE = "parak.delay.queue.delayed.message.exchange";
public static final String DELAYED_MESSAGE_QUEUE = "parak.delay.queue.delayed.message.queue";
public static final String DELAYED_MESSAGE_ROUTING_KEY = "parak.delay.queue.delayed.message.key";

/**
* 声明自定义交换机
*/
@Bean("delayedMessageExchange")
public CustomExchange delayedMessageExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_MESSAGE_EXCHANGE, "x-delayed-message", true, false, args);
}

@Bean("delayedMessageQueue")
public Queue delayedMessageQueue() {
return new Queue(DELAYED_MESSAGE_QUEUE);
}

@Bean
public Binding delayedMessageBinding(@Qualifier("delayedMessageQueue") Queue queue, @Qualifier("delayedMessageExchange") CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAYED_MESSAGE_ROUTING_KEY).noargs();
}

创建延迟队列的消息消费者DelayedMessageReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

@Component
public class DelayedMessageReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = DelayQueueConfig.DELAYED_MESSAGE_QUEUE)
public void receiveDeadLetter1(Message message, Channel channel) throws IOException {
log.info("now: [{}], 『delayed message』 receive message: [{}]", new Date(), new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

}

DelayMessageSender中封装新的方法用于向延时交换机中发送消息:

1
2
3
4
5
6
public void sendMsgToDelayedExchange(String msg, long delayTime) {
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_MESSAGE_EXCHANGE, DelayQueueConfig.DELAYED_MESSAGE_ROUTING_KEY, msg, m -> {
m.getMessageProperties().setExpiration(Long.toString(delayTime));
return m;
});
}

MessageController中添加新的接口用于发送延时消息:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 发送延时消息,按照时间
* @see <a href="http://localhost:7777/rabbitmq/send/delayed?msg=khighness&time=1000">delay 3s</a>
* @see <a href="http://localhost:7777/rabbitmq/send/delayed?msg=khighness&time=5000">delay 5s</a>
*/
@RequestMapping("/delayed")
@ResponseStatus(HttpStatus.OK)
public String sendMsgToDelayedExchange(@RequestParam("msg") String message, @RequestParam("time") Long time) {
log.info("now: [{}], receive message: [{}], time: [{}]", new Date(), message, time);
delayMessageSender.sendMsgToDelayedExchange(message, time);
return "success";
}

8.8 一个小结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用、RabbitMQ的特性,如:消息可靠性发送、消息可靠性投递以及死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。

当然,那是队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的ZSet,利用Quartz或者Kafka的时间轮,这些方式各有特点。了解的越多,遇到问题便能游刃有余。

9. RabbitMQ可靠投递

9.1 可靠投递

在RabbitMQ中,一个消息从生产者发送到RabbitMQ的服务器,需要经历以下步骤:

  1. 生产者准备好需要投递的消息;
  2. 生产者与RabbitMQ服务器建立连接;
  3. 生产者发送消息;
  4. RabbitMQ服务器接收到消息,并将其路由到指定队列;
  5. RabbitMQ服务器发起回调,告知生产者消息发送成功。

所谓可靠投递,就是确保消息能够保证从生产者发送到服务器。

说明:如果没有设置Mandatory,是不需要先路由消息才发起回调的,服务器收到消息后就会进行回调确认。

2、3、5步都是通过TCP连接进行交互,有网络调用的地方就会有事故,网络波动随时都有可能发生,不管是内部机房停电,还是外部光缆被切,网络事故无法预测,虽然这些都是小概率事件,但是对于敏感数据来说,这些情况下导致消息丢失都是不可接受的。

9.2 实现原理

默认情况下,发送消息的操作是不会反悔任何消息给生产者的,也就是说,默认情况下生产者是不知道消息有没有正确地到达服务器。

对此,RabbitMQ中有一些相关的解决方案:

  1. 使用事务机制来让生产者感知消息被成功投递到服务器;
  2. 通过生产者确认机制实现。

在RabbitMQ中,所有确保消息可靠投递都会对性能产生一定影响,如果使用不当,可能会对吞吐量造成重大影响,只有通过执行性能基准测试,才能在确定性能与可靠投递之间的平衡。

9.3 事务机制

RabbitMQ是支持事务机制的,在生产者确认机制之前,事务是确保消息被成功投递的唯一办法。

创建项目,引入依赖。

项目配置application.properties

1
2
3
4
5
6
7
8
9
10
server.port=8888
spring.application.name=springboot-reliable-transaction
spring.rabbitmq.host=192.168.117.155
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/learn
spring.rabbitmq.username=parak
spring.rabbitmq.password=KAG1823
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.acknowledge-mode=manual

创建配置类RabbitMQConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE = "parak.reliable.transaction.business.exchange";
public static final String BUSINESS_QUEUE = "parak.reliable.transaction.business.queue";
public static final String DEFAULT_ROUTING_KEY = "";

/**
* 声明业务交换机
*/
@Bean("businessExchange")
public FanoutExchange businessExchange() {
return new FanoutExchange(BUSINESS_EXCHANGE);
}

@Bean("businessQueue")
public Queue businessQueue() {
return QueueBuilder.durable(BUSINESS_QUEUE).build();
}

@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

/**
* 启用RabbitMQ事务
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}

}

创建业务消息消费者BusinessMessageReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class BusinessMessageReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)
public void sendMsg(Message message, Channel channel) throws IOException {
log.info("『business』 receive message: [{}]", new String(message.getBody()));
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}

}

创建业务消息发送者BusinessMessageSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;

@Component
public class BusinessMessageSender {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
private void init() {
rabbitTemplate.setChannelTransacted(true);
}

@Transactional
public void sendMsg(String msg) {
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, msg);
log.info("message: [{}]", msg);
if (msg == null || msg.equals("") || msg.contains("exception")) {
throw new RuntimeException("error");
}
log.info("message sent successfully: [{}]", msg);
}
}

创建接口用于生产消息MessageController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/rabbitmq/send")
@RestController
public class MessageController {
@Autowired
private BusinessMessageSender businessMessageSender;

/**
* @see <a hred="http://localhost:8888/rabbitmq/send/tx?msg=Khighness">succeed</a>
* @see <a hred="http://localhost:8888/rabbitmq/send/tx?msg=Kexception">exception</a>
*/
@RequestMapping("/tx")
@ResponseStatus(HttpStatus.OK)
public String sendMsg(@RequestParam("msg") String msg) {
businessMessageSender.sendMsg(msg);
return "success";
}
}

编写启动类并运行,访问浏览器进行测试:

(1)访问:http://localhost:8888/rabbitmq/send/tx?msg=Khighness

1
2
3
2021-09-21 16:51:55.913  INFO 18096 --- [nio-8888-exec-5] t.p.transaction.BusinessMessageSender    : message: [Khighness]
2021-09-21 16:51:55.913 INFO 18096 --- [nio-8888-exec-5] t.p.transaction.BusinessMessageSender : message sent successfully: [Khighness]
2021-09-21 16:51:55.915 INFO 18096 --- [ntContainer#0-1] t.p.transaction.BusinessMessageReceiver : 『business』 receive message: [Khighness]

(2)访问:http://localhost:8888/rabbitmq/send/tx?msg=Kexception

1
2
3
4
2021-09-21 16:51:59.444  INFO 18096 --- [nio-8888-exec-6] t.p.transaction.BusinessMessageSender    : message: [Kexception]
2021-09-21 16:51:59.445 ERROR 18096 --- [nio-8888-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: error] with root cause

java.lang.RuntimeException: error

编码过程中,有两个需要注意的地方:

  1. 在初始化消息发送者的时候,需要在初始化中使用rabbitTemplate.setChannelTrsansacted(true)来开启事务;
  2. 在发送消息的方法上加上@Transactional注解,这样在该方法发生异常时,消息将不会发送。

RabbitMQ中的事务使用起来虽然简单,但是对性能的形象是不可忽视的,因为每次事务的提交都是阻塞式的等待服务器返回结果,而默认模式下,客户端是不需要等待的,直接发送就完事了,除此之外,事务消息需要比普通消息多4次与服务器的交互,这就意味着会占用着更多的处理时间,所以如果对消息处理速度有较高要求时,尽量不要采用事务机制。

9.4 生产者确认机制

RabbitMQ中的生产者确认功能是AMQP规范的增强功能,当生产者发布给所有队列的已路由消息被消费者应用程序直接消费时,或者消息被放入队列并根据需要进行持久化时,一个Basic Ack请求会被发送到生产者,如果消息无法路由,代理服务器将发送一个Basic Nack RPC请求用于表示失败。然后由生产者决定该如何处理该消息。

简而言之,通过生产者确认机制,生产者可以在消息被代理服务器成功接收时得到反馈,并有机会处理未被成功接收的消息。

创建项目,引入依赖。

项目配置在上述配置上需要加一行开启生产者确认:

1
spring.rabbitmq.publisher-confirms=true

创建配置类RabbitMQConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE = "parak.reliable.confirm.business.exchange";
public static final String BUSINESS_QUEUE = "parak.reliable.confirm.business.queue";
public static final String BUSINESS_KEY = "parak.reliable.confirm.business.key";

@Bean("businessExchange")
public DirectExchange businessExchange() {
return new DirectExchange(BUSINESS_EXCHANGE);
}

@Bean("businessQueue")
public Queue businessQueue() {
return QueueBuilder.durable(BUSINESS_QUEUE).build();
}

@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_KEY);
}

}

创建消息发送者BusinessMessageSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Component
public class BusinessMessageSender implements RabbitTemplate.ConfirmCallback{
private final Logger log = LoggerFactory.getLogger(this.getClass());

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
}

public void sendMsg(String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("id: [{}], message: [{}]", correlationData.getId(), msg);
// 设置消息ID,用于回调时的判断
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, msg, correlationData);
}

@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("message confirm succeed, id: [{}]", id);
} else {
log.info("message confirm failed, id: [{}], cause: [{}]", id, s);
}
}
}

这里需要为消息设置消息ID,以便在回调时通过该ID来判断是对哪个消息的回调,因为在回调函数中,我们是无法直接获取到消息内容的,所以需要将消息先暂存起来,根据消息的重要程度,可以考虑使用本地缓存,或者存入Redis中,或者MySQL中,然后在回调时更新其状态或者从缓存中移除,最后使用定时任务对一段时间内未发送的消息进行重新投递。

9.6 可靠投递到队列

上面使用了两种方式实现了消息被可靠地投递到RabbitMQ的交换机中,但是如果如果Broker无法将消息路由到队列,还是会被丢弃。

RabbitMQ有两个机制可以实现可靠投递到队列:

  1. mandatory参数:将不可路由消息退回给生产者;
  2. 备份交换机:将不可路由消息转发给备份交换机。

第一种方式只需要在初始化rabbitTemplate的时候添加一行代码即可:

1
rabbitTemplate.setMandatory(true);

先演示消息丢弃情况,创建项目,引入依赖。

项目配置application.properties

1
2
3
4
5
6
7
8
9
10
11
server.port=11111
spring.application.name=springboot-reliable-queue
spring.rabbitmq.host=192.168.117.155
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/learn
spring.rabbitmq.username=parak
spring.rabbitmq.password=KAG1823
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.publisher-confirms=true

创建配置类RabbitMQConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE = "parak.reliable.queue.business.exchange";
public static final String BUSINESS_QUEUE = "parak.reliable.queue.business.queue";
public static final String BUSINESS_KEY = "key";

/**
* 声明业务交换机
*/
@Bean("businessExchange")
public DirectExchange businessExchange() {
return new DirectExchange(BUSINESS_EXCHANGE);
}

@Bean("businessQueue")
public Queue businessQueue() {
return QueueBuilder.durable(BUSINESS_QUEUE).build();
}

@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_KEY);
}

}

创建消息消费者BusinessMessageReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class BusinessMessageReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)
public void sendMsg(Message message, Channel channel) throws IOException {
log.info("『business』 receive message: [{}]", new String(message.getBody()));
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}

}

创建消息发送者BusinessMessageSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Component
public class BusinessMessageSender implements RabbitTemplate.ConfirmCallback{
private final Logger log = LoggerFactory.getLogger(this.getClass());

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
}

public void sendMsg(String routingKey, String message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("id: [{}], message: [{}]", correlationData.getId(), message);
// 设置消息ID,用于回调时的判断
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, routingKey, message, correlationData);
}

@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("message confirm succeed, id: [{}]", id);
} else {
log.info("message confirm failed, id: [{}], cause: [{}]", id, s);
}
}

}

创建接口发送消息MessageController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/rabbitmq/send")
@RestController
public class MessageController {
@Autowired
private BusinessMessageSender businessMessageSender;

/**
* @see <a hred="http://localhost:11111/rabbitmq/send/confirm?msg=Khighness">succeed</a>
* @see <a hred="http://localhost:11111/rabbitmq/send/confirm?msg=Kexception">exception</a>
*/
@RequestMapping("/confirm")
@ResponseStatus(HttpStatus.OK)
public String sendMsg(@RequestParam("key") String routingKey, @RequestParam("msg") String message) {
businessMessageSender.sendMsg(routingKey, message);
return "success";
}
}

编写启动类并运行,访问浏览器进行测试:

(1)访问:http://localhost:11111/rabbitmq/send/queue?key=key&msg=Khighness

1
2
3
2021-09-21 22:58:33.392  INFO 21368 --- [io-11111-exec-5] top.parak.queue.BusinessMessageSender    : id: [81ff86f3-0000-4edb-b6a5-92494f27be3c], message: [Khighness]
2021-09-21 22:58:33.394 INFO 21368 --- [ntContainer#0-1] top.parak.queue.BusinessMessageReceiver : 『business』 receive message: [Khighness]
2021-09-21 22:58:33.394 INFO 21368 --- [nectionFactory2] top.parak.queue.BusinessMessageSender : message confirm succeed, id: [81ff86f3-0000-4edb-b6a5-92494f27be3c]

(2)访问:http://localhost:11111/rabbitmq/send/queue?key=kkk&msg=FlowerK

1
2
3
2021-09-21 22:58:34.867  INFO 21368 --- [io-11111-exec-9] top.parak.queue.BusinessMessageSender    : id: [373c027e-8e8e-4f2a-b59c-53a8b7553767], message: [FlowerK]
2021-09-21 22:58:34.868 INFO 21368 --- [nectionFactory1] top.parak.queue.BusinessMessageSender : message confirm succeed, id: [373c027e-8e8e-4f2a-b59c-53a8b7553767]
2021-09-21 22:58:34.868 WARN 21368 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available

可以看到第二个消息发送后的提示Returned message but no callback available

设置mandatory参数后,如果消息无法被路由,则会返回给生产者,是通过回调的方式进行的,所以,生产者需要设置相应的回调函数才能接受消息。

为了进行回调,消息发送者需要实现回调接口RabbitTemplate.ReturnCallback,实现returnedMessage方法,同时在初始化时设置回调接口为自己:

1
2
3
4
5
6
7
8
9
10
11
12
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}

@Override
public void returnedMessage(Message message, int relayCode, String replyText, String exchange, String routingKey) {
log.info("message is returned. msg: [{}], replayCode: [{}], replyText: [{}]exchange: [{}], routingKey: [{}]",
new String(message.getBody()), relayCode, replyText, exchange, routingKey);
}

重新运行启动类,访问浏览器后日志如下:

1
2
3
2021-09-21 23:17:55.245  INFO 24700 --- [io-11111-exec-1] top.parak.queue.BusinessMessageSender    : id: [ad8a28ed-b21c-497d-87c4-ff3c447204db], message: [FlowerK]
2021-09-21 23:17:55.253 INFO 24700 --- [nectionFactory1] top.parak.queue.BusinessMessageSender : message is returned. msg: [FlowerK], replayCode: [312], replyText: [NO_ROUTE], exchange: [parak.reliable.queue.business.exchange], routingKey: [kkk]
2021-09-21 23:17:55.254 INFO 24700 --- [nectionFactory1] top.parak.queue.BusinessMessageSender : message confirm succeed, id: [ad8a28ed-b21c-497d-87c4-ff3c447204db]

可以看到,我们接收到了被退回的消息,并带上了原因。但是要注意的是,mandatory参数仅仅是在当消息无法北路由的时候,让消息生产者感知到这一点,只要开启了生产者确认机制,无论设置了mandatory参数,都会在交换机接收到消息时进行消息确认回调,而且通常消息的退回会在消息的确认回调之前。

通过mandatory参数我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打印个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务器有多台机器的时候,手动复制会更加麻烦而且容易出错。

而且设置mandatory参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

RabbitMQ有一种备份交换机的机制存在,可以很好的应对这个问题。

备份交换机可以理解为RabbitMQ中的交换机的备胎,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会将这条消息转发到备份交换机中,由备份交换机来进行钻发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

草图如下:


创建项目,引入依赖,配置如下:

1
2
3
4
5
6
7
8
9
10
11
server.port=12222
spring.application.name=springboot-exchange-backup
spring.rabbitmq.host=192.168.117.155
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/learn
spring.rabbitmq.username=parak
spring.rabbitmq.password=KAG1823
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.publisher-confirms=true

创建配置类RabbitMQConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE = "parak.exchange.backup.business.exchange";
public static final String BUSINESS_QUEUE = "parak.exchange.backup.business.queue";
public static final String BUSINESS_KEY = "key";
public static final String BUSINESS_BACKUP_EXCHANGE = "parak.exchange.backup.backup.exchange";
public static final String BUSINESS_BACKUP_QUEUE = "parak.exchange.backup.backup.queue";
public static final String BUSINESS_BACKUP_WARNING_QUEUE = "parak.exchange.backup.backup.warning.queue";

/**
* 声明业务交换机
*/
@Bean("businessExchange")
public DirectExchange businessExchange() {
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE)
.durable(true)
// 设置备份交换机
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE);
return (DirectExchange) exchangeBuilder.build();
}

@Bean("businessQueue")
public Queue businessQueue() {
return QueueBuilder.durable(BUSINESS_QUEUE).build();
}

@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_KEY);
}

/**
* 声明备份交换机
*/
@Bean("backupExchange")
public FanoutExchange backupExchange() {
ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE)
.durable(true);
return exchangeBuilder.build();
}

@Bean("backupQueue")
public Queue backupQueue() {
return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE).build();
}

@Bean("warningQueue")
public Queue warningQueue() {
return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE).build();
}

@Bean
public Binding backupBinding(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}

}

创建业务消息消费者BusinessMessageReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class BusinessMessageReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)
public void sendMsg(Message message, Channel channel) throws IOException {
log.info("『business』 receive message: [{}]", new String(message.getBody()));
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}

创建报警消息消费者BusinessWarningMessageReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class BusinessWarningMessageReceiver {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@RabbitListener(queues = RabbitMQConfig.BUSINESS_BACKUP_WARNING_QUEUE)
public void sendMsg(Message message, Channel channel) throws IOException {
log.error("『warning』 receive unroutable message: [{}]", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

创建消息发送者BusinessMessageSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Component
public class BusinessMessageSender/* implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback */ {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg(String routingKey, String message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("id: [{}], message: [{}]", correlationData.getId(), message);
// 设置消息ID,用于回调时的判断
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, routingKey, message, correlationData);
}
}

创建接口用于发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@RequestMapping("/rabbitmq/send")
@RestController
public class MessageController {
@Autowired
private BusinessMessageSender businessMessageSender;

/**
* @see <a href="http://localhost:12222/rabbitmq/send/backup?key=key&msg=Khighness">succeed</a>
* @see <a href="http://localhost:12222/rabbitmq/send/backup?key=kkk&msg=FlowerK">fail</a>
*/
@RequestMapping("/backup")
@ResponseStatus(HttpStatus.OK)
public String sendMsg(@RequestParam("key") String routingKey, @RequestParam("msg") String message) {
businessMessageSender.sendMsg(routingKey, message);
return "success";
}
}

编写启动类并运行,访问浏览器进行测试:

(1)访问:http://localhost:12222/rabbitmq/send/backup?key=key&msg=Khighness

1
2
2021-09-22 11:41:58.976  INFO 16192 --- [io-12222-exec-1] top.parak.backup.BusinessMessageSender   : id: [f8896978-db6c-47bb-9af7-0f0c15fb81f3], message: [Khighness]
2021-09-22 11:41:58.988 INFO 16192 --- [ntContainer#0-1] t.parak.backup.BusinessMessageReceiver : 『business』 receive message: [Khighness]

(2)访问:http://localhost:12222/rabbitmq/send/backup?key=kkk&msg=FlowerK

1
2
2021-09-22 11:42:02.200  INFO 16192 --- [io-12222-exec-3] top.parak.backup.BusinessMessageSender   : id: [c56b5191-7810-4936-8c34-ef88ac16ccd2], message: [FlowerK]
2021-09-22 11:42:02.202 ERROR 16192 --- [ntContainer#1-1] t.p.b.BusinessWarningMessageReceiver : 『warning』 receive unroutable message: [FlowerK]

那么,如果mandatory参数与备份交换机一起使用,消息会被退回到生产者还是被转发给交换机呢?

修改消息发送者BusinessMessageSender的代码,测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package top.parak.backup;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Component
public class BusinessMessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private final Logger log = LoggerFactory.getLogger(this.getClass());

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}

public void sendMsg(String routingKey, String message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("id: [{}], message: [{}]", correlationData.getId(), message);
// 设置消息ID,用于回调时的判断
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, routingKey, message, correlationData);
}

@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("message confirm succeed, id: [{}]", id);
} else {
log.info("message confirm failed, id: [{}], cause: [{}]", id, s);
}
}

@Override
public void returnedMessage(Message message, int relayCode, String replyText, String exchange, String routingKey) {
log.info("message is returned. msg: [{}], replayCode: [{}], replyText: [{}], exchange: [{}], routingKey: [{}]",
new String(message.getBody()), relayCode, replyText, exchange, routingKey);
}

}

重新运行启动类,重新访问浏览器:

1
2
3
4
5
2021-09-22 11:54:30.713  INFO 2688 --- [nectionFactory1] top.parak.backup.BusinessMessageSender   : message confirm succeed, id: [ab911258-3966-4e3e-9cfb-0b920bc97807]
2021-09-22 11:54:30.715 INFO 2688 --- [ntContainer#0-1] t.parak.backup.BusinessMessageReceiver : 『business』 receive message: [Khighness]
2021-09-22 11:54:37.117 INFO 2688 --- [io-12222-exec-2] top.parak.backup.BusinessMessageSender : id: [ebaa3643-731d-430d-ac92-73e39439a262], message: [FlowerK]
2021-09-22 11:54:37.120 INFO 2688 --- [nectionFactory1] top.parak.backup.BusinessMessageSender : message confirm succeed, id: [ebaa3643-731d-430d-ac92-73e39439a262]
2021-09-22 11:54:37.120 ERROR 2688 --- [ntContainer#1-1] t.p.b.BusinessWarningMessageReceiver : 『warning』 receive unroutable message: [FlowerK]

可以看到,两条消息都可以收到成功回调,但是不可路由消息不会回退给生产者,而是直接转发到备份交换机,可见备份交换机的处理优先级更高。

9.5 一个小结

事务机制和生产者确认机制保证消息成功发送到RabbitMQ的Broker,生产者确认机制跟事务是不能一起工作的,它是事务的轻量级替代方案。因为事务和发布者确认模式都是需要先跟服务器协商,对信道启用的一种模式,不能对同一个信道同时使用两种模式。在生产者确认模式中,消息的确认可以是异步和批量的,所以相比使用事务,性能会更好。

想要保证消息成功被exchange路由到队列,可以采用mandatory参数和备份交换机两种方式,两者可以同时启用,备份交换机的处理优先级会更高。