1. 消息队列概述

1.1 定义

MQ(Message Queue):消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中 获取消息,因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦,别名为消息中间件。通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2 应用场景

  • 跨系统数据传递
  • 高并发流量削峰
  • 数据分发和异步处理
  • 大数据分析传递
  • 分布式事务

1.3 主流使用

  1. ActiveMQ

ActiveMQ是Apache出品,能力强劲的开源消息系统,它是一个完全支持JMS规范的消息中间件,丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎。

  1. Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

  1. RocketMQ

RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息和可靠传输及事务性做了优化,目前在阿里集团广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

  1. RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全、AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

2. 消息队列协议

2.1 AMQP协议

AMQP(Advanced Message Queuing Protocol)是搞基消息队列协议,由摩根大通集团连接其他公司共同设计,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端和消息中间件可传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。

2.2 MQTT协议

MQTT(Meesgae Queuing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。

2.3 OpenMeesage协议

近几年由阿里巴巴、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。

为什么不使用http协议?

(1)因为http请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码等附加功能,但是对于一个消息而言,并不需要这么复杂,只需要数据传递、存储和分发就行,追求的是高性能,尽量简洁和快速。

(2)大部分情况下http是短连接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会进行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息进行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

3. RabbitMQ安装

安装原生的RabbitMQ需要erlang环境,懒得装了。

3.1 docker安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 挂载目录
$ mkdir -p /home/rabbitmq/data

# 启动容器
$ docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq/data:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=Khighness \
-e RABBITMQ_DEFAULT_PASS=KAG1823 \
rabbitmq

# 查看插件
$ rabbitmq-plugins list
# 启用|禁用插件
$ rabbitmq-plugins enable | disable <plugin>

3.2 web管理

1
2
$ docker exec -it rabbitmq bash
$ rabbitmq-plugins enable rabbitmq_management

登录用户名和密码即为RABBITMQ_DEFAULT_USERRABBITMQ_DEFAULT_PASS

4. RabbitMQ消息模型

官方文档:https://www.rabbitmq.com/getstarted.html

MQ准备:

创建用户(Username)parak,密码(Password)123456,创建虚拟主机(vistual hosts)/learn

引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.2</version>
</dependency>
</dependencies>

日志配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<property name="LOG_HOME" value="log/" />

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/rabbitmq.%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender>

<root level="DEBUG">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE"/>
</root>

</configuration>

4.1 Hello World模型

特点:点到点

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者,消息的接受者,会一直等待消息到来
  • queue:消息队列,图中红色部分,生产者向其中投入消息,消费者在其中获取消息。

构建连接工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static ConnectionFactory connectionFactory;

public static void init() {
// 创建MQ连接工厂
connectionFactory = new ConnectionFactory();
// 设置服务器IP:PORT
connectionFactory.setHost("192.168.117.155");
connectionFactory.setPort(5672);
// 设置连接虚拟主机
connectionFactory.setVirtualHost("/learn");
// 设置用户密码
connectionFactory.setUsername("parak");
connectionFactory.setPassword("123456");
}

生产消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void testSendMessage() throws IOException, TimeoutException {
// 获取连接对象
Connection connection = connectionFactory.newConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 通道绑定消息队列
// 参数1:队列名称,如果队列不存在则自动创建
// 参数2:用来定义队列特性是否要持久化
// 参数3:是否独占队列,只允许当前连接可用
// 参数4:是否在消费完成后自动删除队列
// 参数5:附加参数
channel.queueDeclare("hello", false, false, false, null);
// 发布消息
// 参数1:交换机名称
// 参数2:队列名称
// 参数3:传递的额外设置
// 参数4:消息的具体内容
channel.basicPublish("", "hello", null, "hello, highness".getBytes());
// 关闭通道
channel.close();
// 关闭连接
connection.close();
}

消费消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void testConsumeMessage() throws IOException, TimeoutException {
// 创建连接对象
Connection connection = connectionFactory.newConnection();
// 创建连接通道
Channel channel = connection.createChannel();
// 通道绑定队列
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
// 参数1:消费的队列名称
// 参数2:开始消息的自动确认机制
// 参数3:消费时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
// 参数1:标签
// 参数2:信封
// 参数3:属性
// 参数4:消息队列中取出的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费消息:{}", new String(body));
}
});
// 需要不断接收队列中的消息,所以不关闭通道和连接
}

让队列和消息都持久化

queueDeclare方法的第二个参数需要设置为true

basicPublish方法的第三个参数需要设置为带MessagePropertiesPERSISTENT的属性:

  • MINIMAL_PERSISTENT_BASIC
  • PERSISTENT_BASIC
  • PERSISTENT_TEXT_PLAIN

4.2 Work Queues模型

Work Queues,任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用Work Queues:让多个消费者绑定到一个队列,共同消费消息队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

演示:平均分配

MQ准备:

  • 创建虚拟主机(vistual host)/work

MQ工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class RabbitMQUtil {
private static volatile ConnectionFactory connectionFactory = null;
private static final String HOST = "192.168.117.155";
private static final int PORT = 5672;
private static final String VIRTUAL_HOST = "/learn";
private static final String USERNAME = "parak";
private static final String PASSWORD = "123456";

public static Connection getConnection() {
if (connectionFactory == null) {
synchronized (RabbitMQUtil.class) {
if (connectionFactory == null) {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
}
}
}
try {
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}

public static void closeConnectionAndChannel(Channel channel, Connection connection) {
try {
if (connection != null)
channel.close();
if (connection != null)
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}

模型演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, new String("message" + (i + 1) + " => hello, khighness").getBytes());
}
RabbitMQUtil.closeConnectionAndChannel(channel, connection);
}
}

// 消费者1
public class Consumer1 {
private static Logger log = LoggerFactory.getLogger(Consumer1.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-1:{}", new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

// 消费者2
public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
}
});
}
}

先运行消费者1和消费者2,最后运行生产者:

1
2
3
4
5
6
7
8
9
10
11
12
# 消费者1
2021-05-24 19:32:52.218 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message1 => hello, khighness
2021-05-24 19:32:53.225 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message3 => hello, khighness
2021-05-24 19:32:54.226 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message5 => hello, khighness
2021-05-24 19:32:55.229 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message7 => hello, khighness
2021-05-24 19:32:56.231 [pool-1-thread-5] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message9 => hello, khighness
# 消费者2
2021-05-24 19:32:52.219 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message2 => hello, khighness
2021-05-24 19:32:52.221 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message4 => hello, khighness
2021-05-24 19:32:52.221 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message6 => hello, khighness
2021-05-24 19:32:52.221 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message8 => hello, khighness
2021-05-24 19:32:52.222 [pool-1-thread-5] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message10 => hello, khighness

总结:默认情况下,RabbitMQ按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息,这种分发消息的方式成为循环。

修改:按劳分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Consumer1 {
private static Logger log = LoggerFactory.getLogger(Consumer1.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
final Channel channel = connection.createChannel();
// 一次只能消费一条消息
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
// 第二个参数,关闭消息自动确认
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-1:{}", new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 第二个参数,关闭自动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

重新运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
# consumer1
2021-05-24 20:58:34.534 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer1 - 消费者-1:message1 => hello, khighness

# consumer2
2021-05-24 20:58:33.531 [pool-1-thread-4] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message2 => hello, khighness
2021-05-24 20:58:33.535 [pool-1-thread-5] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message3 => hello, khighness
2021-05-24 20:58:33.536 [pool-1-thread-6] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message4 => hello, khighness
2021-05-24 20:58:33.537 [pool-1-thread-7] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message5 => hello, khighness
2021-05-24 20:58:33.538 [pool-1-thread-8] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message6 => hello, khighness
2021-05-24 20:58:33.538 [pool-1-thread-9] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message7 => hello, khighness
2021-05-24 20:58:33.539 [pool-1-thread-10] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message8 => hello, khighness
2021-05-24 20:58:33.540 [pool-1-thread-11] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message9 => hello, khighness
2021-05-24 20:58:33.541 [pool-1-thread-12] DEBUG top.parak.workqueue.Consumer2 - 消费者-2:message10 => hello, khighness

4.3 Publish/Subscribe模型

Publish/Subscribe模型,也称为fanout模型,即发布-订阅模型或者广播模型。

流程:

  • 每个消费者都有自己的队列
  • 每个队列都要绑定交换机
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送到绑定过的所有队列
  • 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

模型演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
// 参数1:交换机名称,不存在则自动创建
// 参数2:交换机类型
channel.exchangeDeclare("pubsub", "fanout");
// 发送消息
channel.basicPublish("pubsub", "", null, "khighness's message".getBytes());
RabbitMQUtil.closeConnectionAndChannel(channel, connection);
}
}

// 消费者-1
public class Consumer1 {
private static Logger log = LoggerFactory.getLogger(Consumer1.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("pubsub", "fanout");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue, "pubsub", "");
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("消费者-1:{}", new String(body));
}
});
}
}

// 消费者-2
public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("pubsub", "fanout");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue, "pubsub", "");
// 消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("消费者-2:{}", new String(body));
}
});
}
}

先运行消费者1和消费者2,最后运行生产者:

1
2
3
4
5
# 消费者1
2021-05-24 21:40:58.365 [pool-1-thread-4] INFO top.parak.pubsub.Consumer1 - 消费者-1:khighness's message

# 消费者2
2021-05-24 21:40:58.365 [pool-1-thread-4] INFO top.parak.pubsub.Consumer2 - 消费者-2:khighness's message

4.4 Routing模型

在fanout模型中,一条消息会被所有订阅的队列都消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用Routing的Direct类型的交换机。

在Direct模型中:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey。
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会收到消息。

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key
  • X:交换机,接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key为error的消息
  • C2:消费者,其所在队列指定了需要routing key为info、error、warning的消息

模型演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("routing", "direct");
// 发送消息
channel.basicPublish("routing", "info", null, "direct => info".getBytes());
channel.basicPublish("routing", "debug", null, "direct => debug".getBytes());
channel.basicPublish("routing", "warn", null, "direct => warn".getBytes());
channel.basicPublish("routing", "error", null, "direct => error".getBytes());
RabbitMQUtil.closeConnectionAndChannel(channel, connection);
}
}

// 消费者1
public class Consumer1 {
private static Logger log = LoggerFactory.getLogger(Consumer1.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("routing", "direct");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 基于route key绑定队列和交换机
channel.queueBind(queue, "routing", "info");
// 获取消费的消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-1:{}", new String(body));
}
});
}
}

// 消费者2
public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
channel.exchangeDeclare("routing", "direct");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 基于route key绑定队列和交换机
channel.queueBind(queue, "routing", "debug");
channel.queueBind(queue, "routing", "warn");
channel.queueBind(queue, "routing", "error");
// 获取消费的消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
}
});
}
}

先运行消费者1和消费者2,最后运行生产者:

1
2
3
4
5
6
7
# 消费者1
2021-05-24 22:47:26.753 [pool-1-thread-4] DEBUG top.parak.direct.Consumer1 - 消费者-1:direct => info

# 消费者2
2021-05-24 22:47:26.753 [pool-1-thread-4] DEBUG top.parak.direct.Consumer2 - 消费者-2:direct => debug
2021-05-24 22:47:26.756 [pool-1-thread-4] DEBUG top.parak.direct.Consumer2 - 消费者-2:direct => warn
2021-05-24 22:47:26.756 [pool-1-thread-4] DEBUG top.parak.direct.Consumer2 - 消费者-2:direct => error

4.5 Topics模型

Topics模型与Direct模型相比,都是可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符。

Topics模型的RoutingKey是由一个或者多个单词组成,多个单词之间以.分隔,有两种通配符:

  • *:匹配一个单词
  • #:匹配多个单词

模型演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics", "topic");
channel.basicPublish("topics", "k.a", null, "topics => k.a".getBytes());
channel.basicPublish("topics", "k.a.g", null, "topics => k.a.g".getBytes());
RabbitMQUtil.closeConnectionAndChannel(channel, connection);
}
}

// 消费者1
public class Consumer2 {
private static Logger log = LoggerFactory.getLogger(Consumer2.class);
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics", "topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, "topics", "k.#");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
}
});
}
}
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 基于route key绑定队列和交换机
channel.queueBind(queue, "routing", "debug");
channel.queueBind(queue, "routing", "warn");
channel.queueBind(queue, "routing", "error");
// 获取消费的消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.debug("消费者-2:{}", new String(body));
}
});
}
}

先运行消费者1和消费者2,最后运行生产者:

1
2
3
4
5
6
# 消费者1
2021-05-24 23:14:53.574 [pool-1-thread-4] DEBUG top.parak.topics.Consumer1 - 消费者-1:topics => k.a

# 消费者2
2021-05-24 23:14:53.573 [pool-1-thread-4] DEBUG top.parak.topics.Consumer2 - 消费者-2:topics => k.a
2021-05-24 23:14:53.578 [pool-1-thread-4] DEBUG top.parak.topics.Consumer2 - 消费者-2:topics => k.a.g

5. RabbitMQ整合SprigBoot

引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
</dependencies>

配置文件:

1
2
3
4
5
6
7
server.port=3333
spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=192.168.117.155
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/learn
spring.rabbitmq.username=parak
spring.rabbitmq.password=123456

工具类:

1
2
3
4
5
6
7
8
9
public class TimeUtil {
private static class TimeFormatHolder {
static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
}

public static String getTime() {
return TimeFormatHolder.formatter.format(LocalDateTime.now());
}
}

启动类:

1
2
3
4
5
6
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class, args);
}
}

测试类:

1
2
3
4
5
@SpringBootTest(classes = RabbitMQApplication.class)
public class RabbitMQApplicationTest {
@Autowired
private RabbitTemplate rabbitTemplate;
}

5.1 Hello World模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 生产消息
@Test void hello() {
rabbitTemplate.convertAndSend("hello", "hello, Khighness");
}

// 消费者
@Component
public class HelloConsumer {
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
@RabbitHandler
public void receive(String message) {
System.out.println(String.format("%s INFO %d --- [consumer] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}
}

结果:

1
2021-05-25 11:50:31.703  INFO 17 --- [consumer] top.parak.consumer.HelloConsumer receive message: hello, Khighness

5.2 Work Queue模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 生产消息
@Test void work() {
for (int i = 1; i <= 10; i++)
rabbitTemplate.convertAndSend("work", "work message-" + i);
}

// 消费者
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
@RabbitHandler
public void receive1(String message) {
System.out.println(String.format("%s INFO %d --- [consumer-1] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}

@RabbitListener(queuesToDeclare = @Queue("work"))
@RabbitHandler
public void receive2(String message) {
System.out.println(String.format("%s INFO %d --- [consumer-2] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}
}

结果:

1
2
3
4
5
6
7
8
9
10
2021-05-25 11:53:06.618  INFO 21 --- [consumer-2] top.parak.consumer.WorkConsumer receive message: work message-2
2021-05-25 11:53:06.618 INFO 19 --- [consumer-1] top.parak.consumer.WorkConsumer receive message: work message-1
2021-05-25 11:53:06.619 INFO 21 --- [consumer-2] top.parak.consumer.WorkConsumer receive message: work message-4
2021-05-25 11:53:06.619 INFO 19 --- [consumer-1] top.parak.consumer.WorkConsumer receive message: work message-3
2021-05-25 11:53:06.619 INFO 21 --- [consumer-2] top.parak.consumer.WorkConsumer receive message: work message-6
2021-05-25 11:53:06.620 INFO 19 --- [consumer-1] top.parak.consumer.WorkConsumer receive message: work message-5
2021-05-25 11:53:06.620 INFO 21 --- [consumer-2] top.parak.consumer.WorkConsumer receive message: work message-8
2021-05-25 11:53:06.620 INFO 19 --- [consumer-1] top.parak.consumer.WorkConsumer receive message: work message-7
2021-05-25 11:53:06.620 INFO 21 --- [consumer-2] top.parak.consumer.WorkConsumer receive message: work message-10
2021-05-25 11:53:06.620 INFO 19 --- [consumer-1] top.parak.consumer.WorkConsumer receive message: work message-9

5.3 Publish/Subscribe模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 生产消息
@Test void fanout() {
for (int i = 1; i <= 3; i++)
rabbitTemplate.convertAndSend("fanout", "", "fanout message-" + i);
}

// 消费者
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, // 未指定名称,创建临时队列
exchange = @Exchange(value = "fanout", type = "fanout") // 绑定交换机
)
})
@RabbitHandler
public void receive1(String message) {
System.out.println(String.format("%s INFO %d --- [consumer-1] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue, // 未指定名称,创建临时队列
exchange = @Exchange(value = "fanout", type = "fanout") // 绑定交换机
)
})
@RabbitHandler
public void receive2(String message) {
System.out.println(String.format("%s INFO %d --- [consumer-2] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}
}

结果:

1
2
3
4
5
6
2021-05-25 12:21:17.283  INFO 19 --- [consumer-2] top.parak.consumer.FanoutConsumer receive message: fanout message-1
2021-05-25 12:21:17.283 INFO 17 --- [consumer-1] top.parak.consumer.FanoutConsumer receive message: fanout message-1
2021-05-25 12:21:17.284 INFO 17 --- [consumer-1] top.parak.consumer.FanoutConsumer receive message: fanout message-2
2021-05-25 12:21:17.284 INFO 19 --- [consumer-2] top.parak.consumer.FanoutConsumer receive message: fanout message-2
2021-05-25 12:21:17.284 INFO 17 --- [consumer-1] top.parak.consumer.FanoutConsumer receive message: fanout message-3
2021-05-25 12:21:17.284 INFO 19 --- [consumer-2] top.parak.consumer.FanoutConsumer receive message: fanout message-3

5.4 Routing模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 生产消息
@Test void routing() {
rabbitTemplate.convertAndSend("routing", "info", "routing info");
rabbitTemplate.convertAndSend("routing", "debug", "routing debug");
rabbitTemplate.convertAndSend("routing", "warn", "routing warn");
rabbitTemplate.convertAndSend("routing", "error", "routing error");
}

// 消费者
@Component
public class RoutingConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 临时队列
exchange = @Exchange(value = "routing", type = "direct"),
key = {"info", "debug"}
)
})
@RabbitHandler
public void receive1(String message) {
System.out.println(String.format("%s INFO %d --- [consumer-1] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 临时队列
exchange = @Exchange(value = "routing", type = "direct"),
key = {"warn", "error"}
)
})
@RabbitHandler
public void receive2(String message) {
System.out.println(String.format("%s INFO %d --- [consumer-2] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}
}

结果:

1
2
3
4
2021-05-25 12:32:38.151  INFO 23 --- [consumer-1] top.parak.consumer.RoutingConsumer receive message: routing info
2021-05-25 12:32:38.151 INFO 25 --- [consumer-2] top.parak.consumer.RoutingConsumer receive message: routing warn
2021-05-25 12:32:38.152 INFO 23 --- [consumer-1] top.parak.consumer.RoutingConsumer receive message: routing debug
2021-05-25 12:32:38.152 INFO 25 --- [consumer-2] top.parak.consumer.RoutingConsumer receive message: routing error

5.5 Topics模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 生产消息
@Test void topics() {
rabbitTemplate.convertAndSend("topics", "k.a", "topics k.a");
rabbitTemplate.convertAndSend("topics", "k.a.g", "topics k.a.g");
}

// 消费者
@Component
public class TopicsConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(name = "topics", type = "topic"),
key = {"k.*"}
)
})
@RabbitHandler
public void receive1(String message) {
System.out.println(String.format("%s INFO %d --- [consumer-1] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(name = "topics", type = "topic"),
key = {"k.#"}
)
})
@RabbitHandler
public void receive2(String message) {
System.out.println(String.format("%s INFO %d --- [consumer-2] %s receive message: %s", TimeUtil.getTime(), Thread.currentThread().getId(), this.getClass().getName(), message));
}
}

结果:

1
2
3
2021-05-25 13:33:11.243  INFO 29 --- [consumer-1] top.parak.consumer.TopicsConsumer receive message: topics k.a
2021-05-25 13:33:11.243 INFO 27 --- [consumer-2] top.parak.consumer.TopicsConsumer receive message: topics k.a
2021-05-25 13:33:11.244 INFO 27 --- [consumer-2] top.parak.consumer.TopicsConsumer receive message: topics k.a.g

6. RabbitMQ应用场景

6.1 异步处理

场景说明:用户注册,需要发送短信通知和验证邮件,传统的做法有串行和并行的方式。

  • 串行方式:将注册信息写入数据库后,发送短信通知,再发送验证邮件。
  • 并行方式:将注册信息写入数据库后,发送短信通知同时发送验证邮件。

缺点:注册后都需要等待任务处理完成才能返回。

消息队列:将注册信息写入数据库后,将短信和邮件写入消息队列后直接返回。

6.2 应用解耦

场景说明:用户下单,订单系统需要通知库存系统,传统做法就是订单系统调用库存系统的接口。

缺点:当库存系统出现故障时,订单就会失败。

消息队列:

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
  • 库存系统:订阅下单的消息,获取下单消息,进行库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

6.3 流量削峰

场景说明:秒杀活动,一般会因为流量过大,导致应用挂掉。

消息队列:

  1. 控制活动人数,超过阈值的订单直接丢弃。
  2. 缓解短时间的高流量压垮服务。

未完结,占坑,集群留着,待更~