1. 消息队列概述 1.1 定义 MQ(Message Queue):消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中 获取消息,因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦,别名为消息中间件。通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
1.2 应用场景
1.3 主流使用
2. 消息队列协议 2.1 AMQP协议 AMQP(Advanced Message Queuing Protocol)是高级消息队列协议,由摩根大通集团连接其他公司共同设计,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端和消息中间件可传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。
2.2 MQTT协议 MQTT(Meesgae Queuing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。
2.3 OpenMeesage协议 近几年由阿里巴巴、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
3. RabbitMQ介绍 3.1 架构设计
3.2 其他概念
Vistual Host:虚拟主机,用于多用户和应用隔离。
Channel:如果每一次访问RabbitMQ都建立Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的Channel进行通讯。AMQP method包含了channel id帮助客户端和message broker和别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大地减少了操作系统建立TCP Conntection的开销。
3.3 交换机类型 RabbitMQ提供了三种交换机类型:
Direct Exchange:直连交换机,根据RoutingKey精确匹配,消息会被投递到相应队列。
Fanout Exchange:广播交换机,发送消息时不需要声明RoutingKey,消息会被投递到与交换机绑定的所有队列。
Topic Exchange:主题交换机,根据RoutingKey模糊匹配,消息会被投递到相应队列。
4. RabbitMQ安装 安装原生的RabbitMQ需要erlang环境,懒得装了。
4.1 docker安装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 挂载目录 $ mkdir -p /home/rabbitmq/data # 启动容器 $ docker run -d --name rabbitmq \ -p 5672:5672 -p 15672:15672 \ -v /home/rabbitmq/data:/var/lib/rabbitmq \ -e RABBITMQ_DEFAULT_USER=Khighness \ -e RABBITMQ_DEFAULT_PASS=KAG1823 \ rabbitmq # 查看插件 $ rabbitmq-plugins list # 启用|禁用插件 $ rabbitmq-plugins enable | disable <plugin>
4.2 web管理 1 2 $ docker exec -it rabbitmq bash $ rabbitmq-plugins enable rabbitmq_management
5. RabbitMQ消息模型
,创建虚拟主机(vistual hosts)/learn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <dependencies > <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.7.3</version > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > 4.13</version > </dependency > <dependency > <groupId > ch.qos.logback</groupId > <artifactId > logback-classic</artifactId > <version > 1.1.2</version > </dependency > </dependencies >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <?xml version="1.0" encoding="UTF-8"?> <configuration debug ="false" > <property name ="LOG_HOME" value ="log/" /> <appender name ="STDOUT" class ="ch.qos.logback.core.ConsoleAppender" > <encoder class ="ch.qos.logback.classic.encoder.PatternLayoutEncoder" > <pattern > %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern > </encoder > </appender > <appender name ="FILE" class ="ch.qos.logback.core.rolling.RollingFileAppender" > <rollingPolicy class ="ch.qos.logback.core.rolling.TimeBasedRollingPolicy" > <FileNamePattern > ${LOG_HOME}/rabbitmq.%d{yyyy-MM-dd}.log</FileNamePattern > <MaxHistory > 30</MaxHistory > </rollingPolicy > <encoder class ="ch.qos.logback.classic.encoder.PatternLayoutEncoder" > <pattern > %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern > </encoder > <triggeringPolicy class ="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy" > <MaxFileSize > 10MB</MaxFileSize > </triggeringPolicy > </appender > <root level ="DEBUG" > <appender-ref ref ="STDOUT" /> <appender-ref ref ="FILE" /> </root > </configuration >
5.1 Hello World模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static ConnectionFactory connectionFactory;public static void init () { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("" ); connectionFactory.setPort(5672 ); connectionFactory.setVirtualHost("/learn" ); connectionFactory.setUsername("parak" ); connectionFactory.setPassword("123456" ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void testSendMessage () throws IOException, TimeoutException { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello" , false , false , false , null ); channel.basicPublish("" , "hello" , null , "hello, highness" .getBytes()); channel.close(); connection.close(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void testConsumeMessage () throws IOException, TimeoutException { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello" , false , false , false , null ); channel.basicConsume("hello" , true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费消息:{}" , new String(body)); } }); }
5.2 Work Queues模型
Work Queues,任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用Work Queues:让多个消费者绑定到一个队列,共同消费消息队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
创建虚拟主机(vistual host)/work
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class RabbitMQUtil { private static volatile ConnectionFactory connectionFactory = null ; private static final String HOST = "" ; private static final int PORT = 5672 ; private static final String VIRTUAL_HOST = "/learn" ; private static final String USERNAME = "parak" ; private static final String PASSWORD = "123456" ; public static Connection getConnection () { if (connectionFactory == null ) { synchronized (RabbitMQUtil.class) { if (connectionFactory == null ) { connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setPort(PORT); connectionFactory.setVirtualHost(VIRTUAL_HOST); connectionFactory.setUsername(USERNAME); connectionFactory.setPassword(PASSWORD); } } } try { return connectionFactory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null ; } public static void closeConnectionAndChannel (Channel channel, Connection connection) { try { if (connection != null ) channel.close(); if (connection != null ) connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class Provider { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work" , true , false , false , null ); for (int i = 0 ; i < 10 ; i++) { channel.basicPublish("" , "work" , MessageProperties.PERSISTENT_TEXT_PLAIN, new String("message" + (i + 1 ) + " => hello, khighness" ).getBytes()); } RabbitMQUtil.closeConnectionAndChannel(channel, connection); } } public class Consumer1 { private static Logger log = LoggerFactory.getLogger(Consumer1.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work" , true , false , false , null ); channel.basicConsume("work" , true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费者-1:{}" , new String(body)); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } public class Consumer2 { private static Logger log = LoggerFactory.getLogger(Consumer2.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work" , true , false , false , null ); channel.basicConsume("work" , true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费者-2:{}" , new String(body)); } }); } }
1 2 3 4 5 6 7 8 9 10 11 12 2021 -05 -24 19 :32 :52.218 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer1 - 消费者-1 :message1 => hello, khighness2021 -05 -24 19 :32 :53.225 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer1 - 消费者-1 :message3 => hello, khighness2021 -05 -24 19 :32 :54.226 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer1 - 消费者-1 :message5 => hello, khighness2021 -05 -24 19 :32 :55.229 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer1 - 消费者-1 :message7 => hello, khighness2021 -05 -24 19 :32 :56.231 [pool -1 -thread -5 ] DEBUG top.parak.workqueue.Consumer1 - 消费者-1 :message9 => hello, khighness2021 -05 -24 19 :32 :52.219 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message2 => hello, khighness2021 -05 -24 19 :32 :52.221 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message4 => hello, khighness2021 -05 -24 19 :32 :52.221 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message6 => hello, khighness2021 -05 -24 19 :32 :52.221 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message8 => hello, khighness2021 -05 -24 19 :32 :52.222 [pool -1 -thread -5 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message10 => hello, khighness
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Consumer1 { private static Logger log = LoggerFactory.getLogger(Consumer1.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); final Channel channel = connection.createChannel(); channel.basicQos(1 ); channel.queueDeclare("work" , true , false , false , null ); channel.basicConsume("work" , false , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费者-1:{}" , new String(body)); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false ); } }); } } public class Consumer2 { private static Logger log = LoggerFactory.getLogger(Consumer2.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); final Channel channel = connection.createChannel(); channel.basicQos(1 ); channel.queueDeclare("work" , true , false , false , null ); channel.basicConsume("work" , false , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费者-2:{}" , new String(body)); channel.basicAck(envelope.getDeliveryTag(), false ); } }); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 2021 -05 -24 20 :58 :34.534 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer1 - 消费者-1 :message1 => hello, khighness2021 -05 -24 20 :58 :33.531 [pool -1 -thread -4 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message2 => hello, khighness2021 -05 -24 20 :58 :33.535 [pool -1 -thread -5 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message3 => hello, khighness2021 -05 -24 20 :58 :33.536 [pool -1 -thread -6 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message4 => hello, khighness2021 -05 -24 20 :58 :33.537 [pool -1 -thread -7 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message5 => hello, khighness2021 -05 -24 20 :58 :33.538 [pool -1 -thread -8 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message6 => hello, khighness2021 -05 -24 20 :58 :33.538 [pool -1 -thread -9 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message7 => hello, khighness2021 -05 -24 20 :58 :33.539 [pool -1 -thread -10 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message8 => hello, khighness2021 -05 -24 20 :58 :33.540 [pool -1 -thread -11 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message9 => hello, khighness2021 -05 -24 20 :58 :33.541 [pool -1 -thread -12 ] DEBUG top.parak.workqueue.Consumer2 - 消费者-2 :message10 => hello, khighness
5.3 Publish/Subscribe模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class Provider { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("pubsub" , "fanout" ); channel.basicPublish("pubsub" , "" , null , "khighness's message" .getBytes()); RabbitMQUtil.closeConnectionAndChannel(channel, connection); } } public class Consumer1 { private static Logger log = LoggerFactory.getLogger(Consumer1.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("pubsub" , "fanout" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "pubsub" , "" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.info("消费者-1:{}" , new String(body)); } }); } } public class Consumer2 { private static Logger log = LoggerFactory.getLogger(Consumer2.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("pubsub" , "fanout" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "pubsub" , "" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.info("消费者-2:{}" , new String(body)); } }); } }
1 2 3 4 5 2021 -05 -24 21 :40 :58.365 [pool -1 -thread -4 ] INFO top.parak.pubsub.Consumer1 - 消费者-1 :khighness's message # 消费者2 2021-05-24 21:40:58.365 [pool-1-thread-4] INFO top.parak.pubsub.Consumer2 - 消费者-2:khighness' s message
5.4 Routing模型 在Fanout模型中,一条消息会被所有订阅的队列都消费,但是,在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用Routing的Direct类型的交换机。
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key
X:交换机,接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key为error的消息
C2:消费者,其所在队列指定了需要routing key为info、error、warning的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class Provider { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("routing" , "direct" ); channel.basicPublish("routing" , "info" , null , "direct => info" .getBytes()); channel.basicPublish("routing" , "debug" , null , "direct => debug" .getBytes()); channel.basicPublish("routing" , "warn" , null , "direct => warn" .getBytes()); channel.basicPublish("routing" , "error" , null , "direct => error" .getBytes()); RabbitMQUtil.closeConnectionAndChannel(channel, connection); } } public class Consumer1 { private static Logger log = LoggerFactory.getLogger(Consumer1.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("routing" , "direct" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "routing" , "info" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费者-1:{}" , new String(body)); } }); } } public class Consumer2 { private static Logger log = LoggerFactory.getLogger(Consumer2.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("routing" , "direct" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "routing" , "debug" ); channel.queueBind(queue, "routing" , "warn" ); channel.queueBind(queue, "routing" , "error" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费者-2:{}" , new String(body)); } }); } }
1 2 3 4 5 6 7 2021 -05 -24 22 :47 :26.753 [pool -1 -thread -4 ] DEBUG top.parak.direct.Consumer1 - 消费者-1 :direct => info2021 -05 -24 22 :47 :26.753 [pool -1 -thread -4 ] DEBUG top.parak.direct.Consumer2 - 消费者-2 :direct => debug2021 -05 -24 22 :47 :26.756 [pool -1 -thread -4 ] DEBUG top.parak.direct.Consumer2 - 消费者-2 :direct => warn2021 -05 -24 22 :47 :26.756 [pool -1 -thread -4 ] DEBUG top.parak.direct.Consumer2 - 消费者-2 :direct => error
5.5 Topics模型 Topics模型与Direct模型相比,都是可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class Provider { public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics" , "topic" ); channel.basicPublish("topics" , "k.a" , null , "topics => k.a" .getBytes()); channel.basicPublish("topics" , "k.a.g" , null , "topics => k.a.g" .getBytes()); RabbitMQUtil.closeConnectionAndChannel(channel, connection); } } public class Consumer2 { private static Logger log = LoggerFactory.getLogger(Consumer2.class); public static void main (String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics" , "topic" ); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "topics" , "k.#" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费者-2:{}" , new String(body)); } }); } } String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "routing" , "debug" ); channel.queueBind(queue, "routing" , "warn" ); channel.queueBind(queue, "routing" , "error" ); channel.basicConsume(queue, true , new DefaultConsumer(channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { log.debug("消费者-2:{}" , new String(body)); } }); } }
1 2 3 4 5 6 2021 -05 -24 23 :14 :53.574 [pool -1 -thread -4 ] DEBUG top.parak.topics.Consumer1 - 消费者-1 :topics => k.a2021 -05 -24 23 :14 :53.573 [pool -1 -thread -4 ] DEBUG top.parak.topics.Consumer2 - 消费者-2 :topics => k.a2021 -05 -24 23 :14 :53.578 [pool -1 -thread -4 ] DEBUG top.parak.topics.Consumer2 - 消费者-2 :topics => k.a.g
6. RabbitMQ整合SpringBoot 在使用SpringBoot的AMQP API之前,一定要理解消息的处理顺序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 <properties > <spring.boot.version > 2.2.2.RELEASE</spring.boot.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > <version > ${spring.boot.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > <version > ${spring.boot.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <version > ${spring.boot.version}</version > </dependency > </dependencies >
1 2 3 4 5 6 7 server.port =3333 spring.application.name =springboot-rabbitmq spring.rabbitmq.host = spring.rabbitmq.port =5672 spring.rabbitmq.virtual-host =/learn spring.rabbitmq.username =parak spring.rabbitmq.password =KAG1823
6.1 配置类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class RabbitMQConfig { public static final String DIRECT_EXCHANGE = "parak.exchange.direct" ; public static final String FANOUT_EXCHANGE = "parak.exchange.fanout" ; public static final String TOPIC_EXCHANGE = "parak.exchange.topic" ; public static final String HEADER_EXCHANGE = "parak.exchange.header" ; public static final String HELLO_QUEUE = "parak.queue.hello" ; public static final String WORK_QUEUE = "parak.queue.work1" ; public static final String DIRECT_QUEUE1 = "parak.queue.direct1" ; public static final String DIRECT_QUEUE2 = "parak.queue.direct2" ; public static final String FANOUT_QUEUE1 = "parak.queue.fanout1" ; public static final String FANOUT_QUEUE2 = "parak.queue.fanout2" ; public static final String TOPIC_QUEUE1 = "parak.queue.topic1" ; public static final String TOPIC_QUEUE2 = "parak.queue.topic2" ; public static final String HEADER_QUEUE = "parak.queue.header" ; public static final String DEFAULT_ROUTING_KEY = "" ; public static final String HELLO_ROUTING_KEY = "hello" ; public static final String WORK_ROUTING_KEY = "work" ; public static final String DIRECT_ROUTING_KEY1 = "k" ; public static final String DIRECT_ROUTING_KEY2 = "a" ; public static final String TOPIC_ROUTING_KEY1 = "k.a.*" ; public static final String TOPIC_ROUTING_KEY2 = "k.a.#" ; @Bean("directExchange") public DirectExchange directExchange () { return new DirectExchange(DIRECT_EXCHANGE); } @Bean("helloQueue") public Queue helloQueue () { return new Queue(HELLO_QUEUE, true ); } @Bean("workQueue") public Queue workQueue () { return new Queue(WORK_QUEUE, true ); } @Bean("directQueue1") public Queue directQueue1 () { return new Queue(DIRECT_QUEUE1, true ); } @Bean("directQueue2") public Queue directQueue2 () { return new Queue(DIRECT_QUEUE2, true ); } @Bean public Binding directBinding1 (@Qualifier("helloQueue") Queue queue, @Qualifier("directExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(HELLO_ROUTING_KEY); } @Bean public Binding directBinding2 (@Qualifier("workQueue") Queue queue, @Qualifier("directExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(WORK_ROUTING_KEY); } @Bean public Binding directBinding3 (@Qualifier("directQueue1") Queue queue, @Qualifier("directExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING_KEY1); } @Bean public Binding directBinding4 (@Qualifier("directQueue2") Queue queue, @Qualifier("directExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING_KEY2); } @Bean("fanoutExchange") public FanoutExchange fanoutExchange () { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean("fanoutQueue1") public Queue fanoutQueue1 () { return new Queue(FANOUT_QUEUE1, true ); } @Bean("fanoutQueue2") public Queue fanoutQueue2 () { return new Queue(FANOUT_QUEUE2, true ); } @Bean public Binding fanoutBinding1 (@Qualifier("fanoutQueue1") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding fanoutBinding2 (@Qualifier("fanoutQueue2") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean("topicExchange") public TopicExchange topicExchange () { return new TopicExchange(TOPIC_EXCHANGE); } @Bean("topicQueue1") public Queue topicQueue1 () { return new Queue(TOPIC_QUEUE1, true ); } @Bean("topicQueue2") public Queue topicQueue2 () { return new Queue(TOPIC_QUEUE2, true ); } @Bean public Binding topicBinding1 (@Qualifier("topicQueue1") Queue queue, @Qualifier("topicExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(topicExchange()).with(TOPIC_ROUTING_KEY1); } @Bean public Binding topicBinding2 (@Qualifier("topicQueue2") Queue queue, @Qualifier("topicExchange") TopicExchange exchange) { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY2); } @Bean("headersExchange") public HeadersExchange headersExchange () { return new HeadersExchange(HEADER_EXCHANGE); } @Bean("headerQueue") public Queue headerQueue () { return new Queue(HEADER_QUEUE, true ); } @Bean public Binding headerBinding (@Qualifier("headerQueue") Queue queue, @Qualifier("headersExchange") HeadersExchange exchange) { Map<String, Object> map = new HashMap<>(); map.put("signature" , "Khighness" ); return BindingBuilder.bind(queue).to(exchange).whereAny(map).match(); } }
6.2 生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.HttpStatus;import org.springframework.web.bind.annotation.*;@RestController @RequestMapping("/rabbitmq/send") public class MQController { private final Logger log = LoggerFactory.getLogger(this .getClass()); @Autowired private AmqpTemplate amqpTemplate; @RequestMapping("/hello") @ResponseStatus(HttpStatus.OK) public String sendHello (@RequestParam("msg") String message) { amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.HELLO_ROUTING_KEY, message); log.info("『Hello』 send message: [{}]" , message); return "success" ; } @RequestMapping("/work") @ResponseStatus(HttpStatus.OK) public String sendWork (@RequestParam("msg") String message) { amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.WORK_ROUTING_KEY, message); log.info("『Work』 send message: [{}]" , message); return "success" ; } @RequestMapping("/direct") @ResponseStatus(HttpStatus.OK) public String sendDirect (@RequestParam("key") String routingKey, @RequestParam("msg") String message) { amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, routingKey, message); log.info("『Direct』 send message: [{}], with key: [{}]" , message, routingKey); return "success" ; } @RequestMapping("/fanout") @ResponseStatus(HttpStatus.OK) public String sendFanout (@RequestParam("msg") String message) { amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, message); log.info("『Fanout』 send message: [{}]" , message); return "success" ; } @RequestMapping("/topic") @ResponseStatus(HttpStatus.OK) public String sendTopic (@RequestParam("key") String routingKey, @RequestParam("msg") String message) { amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routingKey, message); log.info("『Topic』 send message: [{}], with key: [{}]" , message, routingKey); return "success" ; } @RequestMapping("/header") @ResponseStatus(HttpStatus.OK) public String sendHeader (@RequestParam("sign") String signature, @RequestParam("msg") String message) { MessageProperties properties = new MessageProperties(); properties.setHeader("signature" , signature); Message obj = new Message(message.getBytes(), properties); amqpTemplate.convertAndSend(RabbitMQConfig.HEADER_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, obj); log.info("『Header』 send message: [{}], with sign: [{}]" , message, signature); return "success" ; } }
6.3 消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component public class MQReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = RabbitMQConfig.HELLO_QUEUE) public void receiveHello (String message) { log.info("『Hello』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.WORK_QUEUE) public void receiveWork1 (String message) { log.info("『Work-1』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.WORK_QUEUE) public void receiveWork2 (String message) { log.info("『Work-2』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE1) public void receiveDirect1 (String message) { log.info("『Direct-1』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE1) public void receiveDirect2 (String message) { log.info("『Direct-2』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE1) public void receiveFanout1 (String message) { log.info("『Fanout-1』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE2) public void receiveFanout2 (String message) { log.info("『Fanout-2』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE1) public void receiveTopic1 (String message) { log.info("『Topic-1』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE2) public void receiveTopic2 (String message) { log.info("『Topic-2』 receive message: [{}]" , message); } @RabbitListener(queues = RabbitMQConfig.HEADER_QUEUE) public void receiveHeader (String message) { log.info("『Header』 receive message: [{}]" , message); } }
6.4 启动类 1 2 3 4 5 6 7 8 9 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQApplication { public static void main (String[] args) { SpringApplication.run(RabbitMQApplication.class, args); } }
7. RabbitMQ死信队列 7.1 死信队列 消费消息时,如果出现以下情况,:
那么消息被称为死信(Dead Letter)。
7.2 实现原理 步骤:
7.3 具体编码 创建项目,引入依赖。
1 2 3 4 5 6 7 8 9 server.port=5555 spring.application.name=springboot-deadletter spring.rabbitmq.host= spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/learn spring.rabbitmq.username=parak spring.rabbitmq.password=KAG1823 spring.rabbitmq.listener.simple.default-requeue-rejected=false spring.rabbitmq.listener.simple.acknowledge-mode=manual
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class DeadLetterConfig { public static final String BUSINESS_EXCHANGE = "parak.dead.letter.business.exchange" ; public static final String BUSINESS_QUEUE1 = "parak.dead.letter.business.queue1" ; public static final String BUSINESS_QUEUE2 = "parak.dead.letter.business.queue2" ; public static final String DEAD_LETTER_EXCHANGE = "parak.dead.letter.death.exchange" ; public static final String DEAD_LETTER_QUEUE1 = "parak.dead.letter.death.queue1" ; public static final String DEAD_LETTER_QUEUE2 = "parak.dead.letter.death.queue2" ; public static final String DEAD_LETTER_ROUTING_KEY1 = "parak.dead.letter.death.key1" ; public static final String DEAD_LETTER_ROUTING_KEY2 = "parak.dead.letter.death.key2" ; @Bean("businessExchange") public FanoutExchange businessExchange () { return new FanoutExchange(BUSINESS_EXCHANGE); } @Bean("businessQueue1") public Queue businessQueue1 () { Map<String, Object> args = new HashMap<>(2 ); args.put("x-dead-letter-exchange" , DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key" , DEAD_LETTER_ROUTING_KEY1); return QueueBuilder.durable(BUSINESS_QUEUE1).withArguments(args).build(); } @Bean("businessQueue2") public Queue businessQueue2 () { Map<String, Object> args = new HashMap<>(2 ); args.put("x-dead-letter-exchange" , DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key" , DEAD_LETTER_ROUTING_KEY2); return QueueBuilder.durable(BUSINESS_QUEUE2).withArguments(args).build(); } @Bean public Binding businessBinding1 (@Qualifier("businessQueue1") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding businessBinding2 (@Qualifier("businessQueue2") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean("deadLetterExchange") public DirectExchange deadLetterExchange () { return new DirectExchange(DEAD_LETTER_EXCHANGE); } @Bean("deadLetterQueue1") public Queue deadLetterQueue1 () { return new Queue(DEAD_LETTER_QUEUE1); } @Bean("deadLetterQueue2") public Queue deadLetterQueue2 () { return new Queue(DEAD_LETTER_QUEUE2); } @Bean public Binding deadLetterBinding1 (@Qualifier("deadLetterQueue1") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY1); } @Bean public Binding deadLetterBinding2 (@Qualifier("deadLetterQueue2") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY1); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component public class BusinessMessageReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = DeadLetterConfig.BUSINESS_QUEUE1) public void receiveBusiness1 (Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("『business-1』 receive message: [{}]" , msg); boolean ack = true ; Exception exception = null ; try { if (msg.contains("dead-letter" )) { throw new RuntimeException("Dead Letter" ); } } catch (Exception e) { ack = false ; exception = e; } if (!ack) { log.error("consume message occur exception: [{}]" , exception.getMessage(), exception); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , false ); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } } @RabbitListener(queues = DeadLetterConfig.BUSINESS_QUEUE2) public void reveiveBusiness2 (Message message, Channel channel) throws IOException { log.info("『business-2』 receive message: [{}]" , new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component public class DeadLetterMessageReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE1) public void receiveDeadLetter1 (Message message, Channel channel) throws IOException { log.info("『dead letter-1』 receive message: [{}]" , new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } @RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE2) public void receiveDeadLetter2 (Message message, Channel channel) throws IOException { log.info("『dead letter-2』 receive message: [{}]" , new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import top.parak.rabbitmq.RabbitMQConfig;@Component public class BusinessMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg (String msg) { rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.HttpStatus;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.ResponseStatus;import org.springframework.web.bind.annotation.RestController;@RequestMapping("/rabbitmq/send") @RestController public class MessageController { @Autowired private BusinessMessageSender businessMessageSender; @RequestMapping("/delay") @ResponseStatus(HttpStatus.OK) public String sendMsg (@RequestParam("msg") String message, @RequestParam("type") Integer type) { DelayType delayType = DelayType.getDelayType(type); if (delayType == null ) throw new IllegalArgumentException("type is in [1, 2]" ); log.info("now: [{}], receive message: [{}], type: [{}]" , new Date(), message, delayType); delayMessageSender.sendMsg(message, delayType); return "success" ; } }
1 2 2021 -09 -19 20 :58 :58.857 INFO 10120 --- [ntContainer 2021 -09 -19 20 :58 :58.858 INFO 10120 --- [tContainer
1 2 3 4 5 6 7 8 9 10 2021 -09 -19 21 :07 :23.462 INFO 10120 --- [ntContainer 2021 -09 -19 21 :07 :23.462 INFO 10120 --- [tContainer 2021 -09 -19 21 :07 :23.462 ERROR 10120 --- [tContainer java.lang.RuntimeException : Dead Letter ... 2021 -09 -19 21 :07 :23.465 INFO 10120 --- [tContainer 2021 -09 -19 21 :07 :23.465 INFO 10120 --- [tContainer
7.4 消息变化 死信消息被丢到死信队列,会发生什么变化呢?
1 log.info("dead letter properties: [{}]", message.getMessageProperties());
1 2021-09-19 21:53:41.172 INFO 5628 --- [tContainer#12-1] t.parak.death.DeadLetterMessageReceiver : dead letter properties: [MessageProperties [headers={x-first-death-exchange=parak.dead.letter.business.exchange, x-death=[{reason=rejected, count=1, exchange=parak.dead.letter.business.exchange, time=Sun Sep 19 21:09:38 CST 2021, routing-keys=[], queue=parak.dead.letter.business.queue1}], x-first-death-reason=rejected, x-first-death-queue=parak.dead.letter.business.queue1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=parak.dead.letter.death.exchange, receivedRoutingKey=parak.dead.letter.death.key1, deliveryTag=1, consumerTag=amq.ctag-DmBuSjljWVkbuaFJvspHyw, consumerQueue=parak.dead.letter.death.queue1]]
第一次成为死信的原因: (1)rejected
:消息过期 (3)maxlen
7.5 应用场景 一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在消息错误导致处理异常、处理过程中参数校验异常或者因网络波动导致的查询异常等等。当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。
7.6 生命周期 总结一下死信消息的生命周期:
8. RabbitMQ延时队列 8.1 延时队列 延时队列:队列内部元素有序,并且每个元素带时间属性。
8.2 使用场景 考虑以下场景:
8.3 RabbitMQ-TTL 这里需要介绍一下RabbitMQ的高级特性:TTL
(Time to Live)。
1 2 3 Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl" , 6000 ); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
1 2 3 4 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000" ); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, message.getBytes());
8.4 实现原理 延时队列,即将消息延迟一定时间进行处理,TTL刚好能让消息在延迟一定时间成为死信,另一方面,成为死信的消息都会被投递到死信队列中,这样只需要消费者一直消费死信队列中的消息即可。
8.5 具体编码 创建项目,引入依赖。
1 2 3 4 5 6 7 8 9 server.port=6666 spring.application.name=springboot-delayqueue spring.rabbitmq.host= spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/learn spring.rabbitmq.username=parak spring.rabbitmq.password=KAG1823 spring.rabbitmq.listener.simple.default-requeue-rejected=false spring.rabbitmq.listener.simple.acknowledge-mode=manual
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class DelayQueueConfig { public static final String DELAY_EXCHANGE = "parak.delay.queue.business.exchange" ; public static final String DELAY_QUEUE1 = "parak.delay.queue.business.queue1" ; public static final String DELAY_QUEUE2 = "parak.delay.queue.business.queue2" ; public static final String DELAY_QUEUE_ROUTING_KEY1 = "parak.delay.queue.business.key1" ; public static final String DELAY_QUEUE_ROUTING_KEY2 = "parak.delay.queue.business.key2" ; public static final String DEAD_LETTER_EXCHANGE = "parak.delay.queue.deadletter.exchange" ; public static final String DEAD_LETTER_QUEUE1 = "parak.delay.queue.deadletter.queue1" ; public static final String DEAD_LETTER_QUEUE2 = "parak.delay.queue.deadletter.queue2" ; public static final String DEAD_LETTER_ROUTING_KEY1 = "parak.delay.queue.deadletter.key1" ; public static final String DEAD_LETTER_ROUTING_KEY2 = "parak.delay.queue.deadletter.key2" ; @Bean("delayExchange") public DirectExchange delayExchange () { return new DirectExchange(DELAY_EXCHANGE); } @Bean("delayQueue1") public Queue delayQueue1 () { Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange" , DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key" , DEAD_LETTER_ROUTING_KEY1); map.put("x-message-ttl" , 3000 ); return QueueBuilder.durable(DELAY_QUEUE1).withArguments(map).build(); } @Bean("delayQueue2") public Queue delayQueue2 () { Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange" , DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key" , DEAD_LETTER_ROUTING_KEY2); map.put("x-message-ttl" , 10000 ); return QueueBuilder.durable(DELAY_QUEUE2).withArguments(map).build(); } @Bean public Binding delayBinding1 (@Qualifier("delayQueue1") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY1); } @Bean public Binding delayBinding2 (@Qualifier("delayQueue2") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY2); } @Bean("deadLetterExchange") public DirectExchange deadLetterExchange () { return new DirectExchange(DEAD_LETTER_EXCHANGE); } @Bean("deadLetterQueue1") public Queue deadLetterQueue1 () { return new Queue(DEAD_LETTER_QUEUE1); } @Bean("deadLetterQueue2") public Queue deadLetterQueue2 () { return new Queue(DEAD_LETTER_QUEUE2); } @Bean public Binding deadLetterBinding1 (@Qualifier("deadLetterQueue1") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY1); } @Bean public Binding deadLetterBinding2 (@Qualifier("deadLetterQueue2") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY2); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Date;@Component public class DeadLetterMessageReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = DelayQueueConfig.DEAD_LETTER_QUEUE1) public void receiveDeadLetter1 (Message message, Channel channel) throws IOException { log.info("now: [{}], 『dead letter-1』 receive message: [{}]" , new Date(), new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } @RabbitListener(queues = DelayQueueConfig.DEAD_LETTER_QUEUE2) public void receiveDeadLetter2 (Message message, Channel channel) throws IOException { log.info("now: [{}], 『dead letter-2』 receive message: [{}]" , new Date(), new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public enum DelayType { DELAY_3_SECONDS(1 , 3000 ), DELAY_10_SECONDS(2 , 10000 ); private final int type; private final int delayTime; DelayType(int type, int delayTime) { this .type = type; this .delayTime = delayTime; } public int getType () { return type; } public int getDelayTime () { return delayTime; } public static DelayType getDelayType (int type) { for (DelayType delayType : DelayType.values()) { if (type == delayType.getType()) { return delayType; } } return null ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Component public class DelayMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg (String msg, DelayType type) { switch (type) { case DELAY_3_SECONDS: rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY1, msg); break ; case DELAY_10_SECONDS: rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY2, msg); break ; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.HttpStatus;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.ResponseStatus;import org.springframework.web.bind.annotation.RestController;import java.util.Date;@RequestMapping("/rabbitmq/send") @RestController public class MessageController { private final Logger log = LoggerFactory.getLogger(this .getClass()); @Autowired private DelayMessageSender delayMessageSender; @RequestMapping("/delay") @ResponseStatus(HttpStatus.OK) public String sendMsg (@RequestParam("msg") String message, @RequestParam("type") Integer type) { DelayType delayType = DelayType.getDelayType(type); if (delayType == null ) throw new IllegalArgumentException("type is in [1, 2]" ); log.info("now: [{}], receive message: [{}], type: [{}]" , new Date(), message, delayType); delayMessageSender.sendMsg(message, delayType); return "success" ; } }
1 2 2021 -09 -21 10 :15 :18.662 INFO 14652 --- [nio -7777 -exec -2 ] top.parak.delay.MessageController : now: [Tue Sep 21 10 :15 :18 CST 2021 ], receive message: [Khighness ], type : [DELAY_3_SECONDS ]2021 -09 -21 10 :15 :21.666 INFO 14652 --- [ntContainer
1 2 3 2021 -09 -21 10 :15 :26.273 INFO 14652 --- [nio -7777 -exec -3 ] top.parak.delay.MessageController : now: [Tue Sep 21 10 :15 :26 CST 2021 ], receive message: [Khighness ], type : [DELAY_10_SECONDS ]2021 -09 -21 10 :15 :36.276 INFO 14652 --- [ntContainer
8.6 队列优化 为了实现一种更通用的方案,那么而只能将消息设置在消息属性里了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static final String DELAY_QUEUE3 = "parak.delay.queue.business.queue3" ;public static final String DELAY_QUEUE_ROUTING_KEY3 = "parak.delay.queue.business.key3" ;public static final String DEAD_LETTER_QUEUE3 = "parak.delay.queue.deadletter.queue3" ;public static final String DEAD_LETTER_ROUTING_KEY3 = "parak.delay.queue.deadletter.key3" ;@Bean("delayQueue3") public Queue delayQueue3 () { Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange" , DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key" , DEAD_LETTER_ROUTING_KEY3); return QueueBuilder.durable(DELAY_QUEUE3).withArguments(map).build(); } @Bean public Binding delayBinding3 (@Qualifier("delayQueue3") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY3); } @Bean("deadLetterQueue3") public Queue deadLetterQueue3 () { return new Queue(DEAD_LETTER_QUEUE3); } @Bean public Binding deadLetterBinding3 (@Qualifier("deadLetterQueue3") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY3); }
1 2 3 4 5 @RabbitListener(queues = DelayQueueConfig.DEAD_LETTER_QUEUE3) public void receiveDeadLetter3 (Message message, Channel channel) throws IOException { log.info("now: [{}], 『dead letter-3』 receive message: [{}]" , new Date(), new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); }
1 2 3 4 5 6 public void sendMsg (String msg, long delayTime) { rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_QUEUE_ROUTING_KEY3, msg, m -> { m.getMessageProperties().setExpiration(Long.toString(delayTime)); return m; }); }
1 2 3 4 5 6 7 8 9 10 11 12 @RequestMapping("/time") @ResponseStatus(HttpStatus.OK) public String sendMsg (@RequestParam("msg") String message, @RequestParam("time") Long time) { log.info("now: [{}], receive message: [{}], time: [{}]" , new Date(), message, time); delayMessageSender.sendMsg(message, time); return "success" ; }
重新运行启动类,访问浏览器进行测试: (1)访问:http://localhost:7777/rabbitmq/send/time?msg=khighness&time=1000
1 2 2021 -09 -21 12 :35 :56.532 INFO 13876 --- [nio -7777 -exec -6 ] top.parak.delay.MessageController : now: [Tue Sep 21 12 :35 :56 CST 2021 ], receive message: [khighness ], time: [1000 ]2021 -09 -21 12 :35 :57.539 INFO 13876 --- [ntContainer
1 2 2021 -09 -21 12 :36 :00.917 INFO 13876 --- [nio -7777 -exec -7 ] top.parak.delay.MessageController : now: [Tue Sep 21 12 :36 :00 CST 2021 ], receive message: [khighness ], time: [5000 ]2021 -09 -21 12 :36 :05.921 INFO 13876 --- [ntContainer
8.7 插件实现 如果不能实现在消息粒度上添加TTL,并使其在设置的TTL时间内即时即时死亡,就无法设计一个通用的延时队列。
1 $ docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/plugins
1 2 $ docker exec -it rabbitmq bash $ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static final String DELAYED_MESSAGE_EXCHANGE = "parak.delay.queue.delayed.message.exchange" ;public static final String DELAYED_MESSAGE_QUEUE = "parak.delay.queue.delayed.message.queue" ;public static final String DELAYED_MESSAGE_ROUTING_KEY = "parak.delay.queue.delayed.message.key" ;@Bean("delayedMessageExchange") public CustomExchange delayedMessageExchange () { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type" , "direct" ); return new CustomExchange(DELAYED_MESSAGE_EXCHANGE, "x-delayed-message" , true , false , args); } @Bean("delayedMessageQueue") public Queue delayedMessageQueue () { return new Queue(DELAYED_MESSAGE_QUEUE); } @Bean public Binding delayedMessageBinding (@Qualifier("delayedMessageQueue") Queue queue, @Qualifier("delayedMessageExchange") CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAYED_MESSAGE_ROUTING_KEY).noargs(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Date;@Component public class DelayedMessageReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = DelayQueueConfig.DELAYED_MESSAGE_QUEUE) public void receiveDeadLetter1 (Message message, Channel channel) throws IOException { log.info("now: [{}], 『delayed message』 receive message: [{}]" , new Date(), new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } }
1 2 3 4 5 6 public void sendMsgToDelayedExchange (String msg, long delayTime) { rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_MESSAGE_EXCHANGE, DelayQueueConfig.DELAYED_MESSAGE_ROUTING_KEY, msg, m -> { m.getMessageProperties().setExpiration(Long.toString(delayTime)); return m; }); }
1 2 3 4 5 6 7 8 9 10 11 12 @RequestMapping("/delayed") @ResponseStatus(HttpStatus.OK) public String sendMsgToDelayedExchange (@RequestParam("msg") String message, @RequestParam("time") Long time) { log.info("now: [{}], receive message: [{}], time: [{}]" , new Date(), message, time); delayMessageSender.sendMsgToDelayedExchange(message, time); return "success" ; }
8.8 一个小结 延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用、RabbitMQ的特性,如:消息可靠性发送、消息可靠性投递以及死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。
9. RabbitMQ可靠投递 9.1 可靠投递 在RabbitMQ中,一个消息从生产者发送到RabbitMQ的服务器,需要经历以下步骤:
9.2 实现原理 默认情况下,发送消息的操作是不会反悔任何消息给生产者的,也就是说,默认情况下生产者是不知道消息有没有正确地到达服务器。
9.3 事务机制 RabbitMQ是支持事务机制的,在生产者确认机制之前,事务是确保消息被成功投递的唯一办法。
1 2 3 4 5 6 7 8 9 10 server.port =8888 spring.application.name =springboot-reliable-transaction spring.rabbitmq.host = spring.rabbitmq.port =5672 spring.rabbitmq.virtual-host =/learn spring.rabbitmq.username =parak spring.rabbitmq.password =KAG1823 spring.rabbitmq.listener.type =simple spring.rabbitmq.listener.simple.default-requeue-rejected =false spring.rabbitmq.listener.simple.acknowledge-mode =manual
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE = "parak.reliable.transaction.business.exchange" ; public static final String BUSINESS_QUEUE = "parak.reliable.transaction.business.queue" ; public static final String DEFAULT_ROUTING_KEY = "" ; @Bean("businessExchange") public FanoutExchange businessExchange () { return new FanoutExchange(BUSINESS_EXCHANGE); } @Bean("businessQueue") public Queue businessQueue () { return QueueBuilder.durable(BUSINESS_QUEUE).build(); } @Bean public Binding businessBinding (@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public RabbitTransactionManager rabbitTransactionManager (CachingConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component public class BusinessMessageReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE) public void sendMsg (Message message, Channel channel) throws IOException { log.info("『business』 receive message: [{}]" , new String(message.getBody())); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , false ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import javax.annotation.PostConstruct;@Component public class BusinessMessageSender { private final Logger log = LoggerFactory.getLogger(this .getClass()); @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init () { rabbitTemplate.setChannelTransacted(true ); } @Transactional public void sendMsg (String msg) { rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, msg); log.info("message: [{}]" , msg); if (msg == null || msg.equals("" ) || msg.contains("exception" )) { throw new RuntimeException("error" ); } log.info("message sent successfully: [{}]" , msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.HttpStatus;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.ResponseStatus;import org.springframework.web.bind.annotation.RestController;@RequestMapping("/rabbitmq/send") @RestController public class MessageController { @Autowired private BusinessMessageSender businessMessageSender; @RequestMapping("/tx") @ResponseStatus(HttpStatus.OK) public String sendMsg (@RequestParam("msg") String msg) { businessMessageSender.sendMsg(msg); return "success" ; } }
1 2 3 2021 -09 -21 16 :51 :55.913 INFO 18096 --- [nio -8888 -exec -5 ] t.p.transaction.BusinessMessageSender : message: [Khighness ]2021 -09 -21 16 :51 :55.913 INFO 18096 --- [nio -8888 -exec -5 ] t.p.transaction.BusinessMessageSender : message sent successfully: [Khighness ]2021 -09 -21 16 :51 :55.915 INFO 18096 --- [ntContainer
1 2 3 4 2021 -09 -21 16 :51 :59.444 INFO 18096 --- [nio -8888 -exec -6 ] t.p.transaction.BusinessMessageSender : message: [Kexception ]2021 -09 -21 16 :51 :59.445 ERROR 18096 --- [nio -8888 -exec -6 ] o.a.c.c.C.[. [. [/]. [dispatcherServlet ] : Servlet.service () for servlet [dispatcherServlet ] in context with path [] threw exception [Request processing failed ; nested exception is java.lang.RuntimeException : error ] with root cause java.lang.RuntimeException : error
9.4 生产者确认机制 RabbitMQ中的生产者确认功能是AMQP规范的增强功能,当生产者发布给所有队列的已路由消息被消费者应用程序直接消费时,或者消息被放入队列并根据需要进行持久化时,一个Basic Ack请求会被发送到生产者,如果消息无法路由,代理服务器将发送一个Basic Nack RPC请求用于表示失败。然后由生产者决定该如何处理该消息。
1 spring.rabbitmq.publisher-confirms =true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE = "parak.reliable.confirm.business.exchange" ; public static final String BUSINESS_QUEUE = "parak.reliable.confirm.business.queue" ; public static final String BUSINESS_KEY = "parak.reliable.confirm.business.key" ; @Bean("businessExchange") public DirectExchange businessExchange () { return new DirectExchange(BUSINESS_EXCHANGE); } @Bean("businessQueue") public Queue businessQueue () { return QueueBuilder.durable(BUSINESS_QUEUE).build(); } @Bean public Binding businessBinding (@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_KEY); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.transaction.annotation.Transactional;import javax.annotation.PostConstruct;import java.util.UUID;@Component public class BusinessMessageSender implements RabbitTemplate .ConfirmCallback { private final Logger log = LoggerFactory.getLogger(this .getClass()); @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init () { rabbitTemplate.setConfirmCallback(this ); } public void sendMsg (String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("id: [{}], message: [{}]" , correlationData.getId(), msg); rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTING_KEY, msg, correlationData); } @Override public void confirm (CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : "" ; if (b) { log.info("message confirm succeed, id: [{}]" , id); } else { log.info("message confirm failed, id: [{}], cause: [{}]" , id, s); } } }
9.6 可靠投递到队列 上面使用了两种方式实现了消息被可靠地投递到RabbitMQ的交换机中,但是如果如果Broker无法将消息路由到队列,还是会被丢弃。
1 rabbitTemplate.setMandatory(true );
1 2 3 4 5 6 7 8 9 10 11 server.port =11111 spring.application.name =springboot-reliable-queue spring.rabbitmq.host = spring.rabbitmq.port =5672 spring.rabbitmq.virtual-host =/learn spring.rabbitmq.username =parak spring.rabbitmq.password =KAG1823 spring.rabbitmq.listener.type =simple spring.rabbitmq.listener.simple.default-requeue-rejected =false spring.rabbitmq.listener.simple.acknowledge-mode =manual spring.rabbitmq.publisher-confirms =true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE = "parak.reliable.queue.business.exchange" ; public static final String BUSINESS_QUEUE = "parak.reliable.queue.business.queue" ; public static final String BUSINESS_KEY = "key" ; @Bean("businessExchange") public DirectExchange businessExchange () { return new DirectExchange(BUSINESS_EXCHANGE); } @Bean("businessQueue") public Queue businessQueue () { return QueueBuilder.durable(BUSINESS_QUEUE).build(); } @Bean public Binding businessBinding (@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_KEY); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component public class BusinessMessageReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE) public void sendMsg (Message message, Channel channel) throws IOException { log.info("『business』 receive message: [{}]" , new String(message.getBody())); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , false ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.util.UUID;@Component public class BusinessMessageSender implements RabbitTemplate .ConfirmCallback { private final Logger log = LoggerFactory.getLogger(this .getClass()); @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setMandatory(true ); } public void sendMsg (String routingKey, String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("id: [{}], message: [{}]" , correlationData.getId(), message); rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, routingKey, message, correlationData); } @Override public void confirm (CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : "" ; if (b) { log.info("message confirm succeed, id: [{}]" , id); } else { log.info("message confirm failed, id: [{}], cause: [{}]" , id, s); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.HttpStatus;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.ResponseStatus;import org.springframework.web.bind.annotation.RestController;@RequestMapping("/rabbitmq/send") @RestController public class MessageController { @Autowired private BusinessMessageSender businessMessageSender; @RequestMapping("/confirm") @ResponseStatus(HttpStatus.OK) public String sendMsg (@RequestParam("key") String routingKey, @RequestParam("msg") String message) { businessMessageSender.sendMsg(routingKey, message); return "success" ; } }
1 2 3 2021 -09 -21 22 :58 :33.392 INFO 21368 --- [io -11111 -exec -5 ] top.parak.queue.BusinessMessageSender : id: [81 ff86f3 -0000 -4 edb -b6a5 -92494 f27be3c ], message: [Khighness ]2021 -09 -21 22 :58 :33.394 INFO 21368 --- [ntContainer 2021 -09 -21 22 :58 :33.394 INFO 21368 --- [nectionFactory2 ] top.parak.queue.BusinessMessageSender : message confirm succeed , id : [81 ff86f3 -0000 -4 edb -b6a5 -92494 f27be3c ]
1 2 3 2021 -09 -21 22 :58 :34.867 INFO 21368 --- [io -11111 -exec -9 ] top.parak.queue.BusinessMessageSender : id: [373 c027e -8 e8e -4 f2a -b59c -53 a8b7553767 ], message: [FlowerK ]2021 -09 -21 22 :58 :34.868 INFO 21368 --- [nectionFactory1 ] top.parak.queue.BusinessMessageSender : message confirm succeed, id: [373 c027e -8 e8e -4 f2a -b59c -53 a8b7553767 ]2021 -09 -21 22 :58 :34.868 WARN 21368 --- [nectionFactory2 ] o.s.amqp.rabbit.core.RabbitTemplate : Returned message but no callback available
可以看到第二个消息发送后的提示Returned message but no callback available
1 2 3 4 5 6 7 8 9 10 11 12 @PostConstruct private void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setMandatory(true ); rabbitTemplate.setReturnCallback(this ); } @Override public void returnedMessage (Message message, int relayCode, String replyText, String exchange, String routingKey) { log.info("message is returned. msg: [{}], replayCode: [{}], replyText: [{}]exchange: [{}], routingKey: [{}]" , new String(message.getBody()), relayCode, replyText, exchange, routingKey); }
1 2 3 2021 -09 -21 23 :17 :55.245 INFO 24700 --- [io -11111 -exec -1 ] top.parak.queue.BusinessMessageSender : id: [ad8a28ed -b21c -497 d -87 c4 -ff3c447204db ], message: [FlowerK ]2021 -09 -21 23 :17 :55.253 INFO 24700 --- [nectionFactory1 ] top.parak.queue.BusinessMessageSender : message is returned. msg: [FlowerK ], replayCode: [312 ], replyText: [NO_ROUTE ], exchange: [parak.reliable.queue.business.exchange ], routingKey: [kkk ]2021 -09 -21 23 :17 :55.254 INFO 24700 --- [nectionFactory1 ] top.parak.queue.BusinessMessageSender : message confirm succeed, id: [ad8a28ed -b21c -497 d -87 c4 -ff3c447204db ]
1 2 3 4 5 6 7 8 9 10 11 server.port=12222 spring.application.name=springboot-exchange-backup spring.rabbitmq.host=192.168 .117.155 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/learn spring.rabbitmq.username=parak spring.rabbitmq.password=KAG1823 spring.rabbitmq.listener.type=simple spring.rabbitmq.listener.simple.default -requeue-rejected=false spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.publisher-confirms=true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE = "parak.exchange.backup.business.exchange" ; public static final String BUSINESS_QUEUE = "parak.exchange.backup.business.queue" ; public static final String BUSINESS_KEY = "key" ; public static final String BUSINESS_BACKUP_EXCHANGE = "parak.exchange.backup.backup.exchange" ; public static final String BUSINESS_BACKUP_QUEUE = "parak.exchange.backup.backup.queue" ; public static final String BUSINESS_BACKUP_WARNING_QUEUE = "parak.exchange.backup.backup.warning.queue" ; @Bean("businessExchange") public DirectExchange businessExchange () { ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE) .durable(true ) .withArgument("alternate-exchange" , BUSINESS_BACKUP_EXCHANGE); return (DirectExchange) exchangeBuilder.build(); } @Bean("businessQueue") public Queue businessQueue () { return QueueBuilder.durable(BUSINESS_QUEUE).build(); } @Bean public Binding businessBinding (@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_KEY); } @Bean("backupExchange") public FanoutExchange backupExchange () { ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE) .durable(true ); return exchangeBuilder.build(); } @Bean("backupQueue") public Queue backupQueue () { return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE).build(); } @Bean("warningQueue") public Queue warningQueue () { return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE).build(); } @Bean public Binding backupBinding (@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding warningBinding (@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component public class BusinessMessageReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE) public void sendMsg (Message message, Channel channel) throws IOException { log.info("『business』 receive message: [{}]" , new String(message.getBody())); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , false ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component public class BusinessWarningMessageReceiver { private final Logger log = LoggerFactory.getLogger(this .getClass()); @RabbitListener(queues = RabbitMQConfig.BUSINESS_BACKUP_WARNING_QUEUE) public void sendMsg (Message message, Channel channel) throws IOException { log.error("『warning』 receive unroutable message: [{}]" , new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.util.UUID;@Component public class BusinessMessageSender /* implements RabbitTemplate .ConfirmCallback , RabbitTemplate .ReturnCallback */ { private final Logger log = LoggerFactory.getLogger(this .getClass()); @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg (String routingKey, String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("id: [{}], message: [{}]" , correlationData.getId(), message); rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, routingKey, message, correlationData); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.HttpStatus;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.ResponseStatus;import org.springframework.web.bind.annotation.RestController;@RequestMapping("/rabbitmq/send") @RestController public class MessageController { @Autowired private BusinessMessageSender businessMessageSender; @RequestMapping("/backup") @ResponseStatus(HttpStatus.OK) public String sendMsg (@RequestParam("key") String routingKey, @RequestParam("msg") String message) { businessMessageSender.sendMsg(routingKey, message); return "success" ; } }
1 2 2021 -09 -22 11 :41 :58.976 INFO 16192 --- [io -12222 -exec -1 ] top.parak.backup.BusinessMessageSender : id: [f8896978 -db6c -47 bb -9 af7 -0 f0c15fb81f3 ], message: [Khighness ]2021 -09 -22 11 :41 :58.988 INFO 16192 --- [ntContainer
1 2 2021-09-22 11:42:02.200 INFO 16192 --- [io-12222-exec-3] top.parak.backup.BusinessMessageSender : id: [c56b5191-7810-4936-8c34-ef88ac16ccd2], message: [FlowerK] 2021-09-22 11:42:02.202 ERROR 16192 --- [ntContainer#1-1] t.p.b.BusinessWarningMessageReceiver : 『warning』 receive unroutable message: [FlowerK]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package top.parak.backup;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.util.UUID;@Component public class BusinessMessageSender implements RabbitTemplate .ConfirmCallback , RabbitTemplate .ReturnCallback { private final Logger log = LoggerFactory.getLogger(this .getClass()); @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setMandatory(true ); rabbitTemplate.setReturnCallback(this ); } public void sendMsg (String routingKey, String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("id: [{}], message: [{}]" , correlationData.getId(), message); rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE, routingKey, message, correlationData); } @Override public void confirm (CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : "" ; if (b) { log.info("message confirm succeed, id: [{}]" , id); } else { log.info("message confirm failed, id: [{}], cause: [{}]" , id, s); } } @Override public void returnedMessage (Message message, int relayCode, String replyText, String exchange, String routingKey) { log.info("message is returned. msg: [{}], replayCode: [{}], replyText: [{}], exchange: [{}], routingKey: [{}]" , new String(message.getBody()), relayCode, replyText, exchange, routingKey); } }
1 2 3 4 5 2021 -09 -22 11 :54 :30.713 INFO 2688 --- [nectionFactory1 ] top.parak.backup.BusinessMessageSender : message confirm succeed, id: [ab911258 -3966 -4 e3e -9 cfb -0 b920bc97807 ]2021 -09 -22 11 :54 :30.715 INFO 2688 --- [ntContainer 2021 -09 -22 11 :54 :37.117 INFO 2688 --- [io -12222 -exec -2 ] top.parak.backup.BusinessMessageSender : id : [ebaa3643 -731 d -430 d -ac92 -73 e39439a262 ], message : [FlowerK ]2021 -09 -22 11 :54 :37.120 INFO 2688 --- [nectionFactory1 ] top.parak.backup.BusinessMessageSender : message confirm succeed , id : [ebaa3643 -731 d -430 d -ac92 -73 e39439a262 ]2021 -09 -22 11 :54 :37.120 ERROR 2688 --- [ntContainer
9.5 一个小结 事务机制和生产者确认机制保证消息成功发送到RabbitMQ的Broker,生产者确认机制跟事务是不能一起工作的,它是事务的轻量级替代方案。因为事务和发布者确认模式都是需要先跟服务器协商,对信道启用的一种模式,不能对同一个信道同时使用两种模式。在生产者确认模式中,消息的确认可以是异步和批量的,所以相比使用事务,性能会更好。