消息队列是大型复杂系统解耦利器。本文根据应用广泛的消息队列RabbitMQ,介绍Spring Boot应用程序中队列中间件的开发和应用。
一、RabbitMQ基础
1、RabbitMQ简介
RabbitMQ是Spring所在公司Pivotal自己的产品,是基于AMQP高级队列协议的消息中间件,采用erlang开发,所以你的RabbitMQ队列服务器需要erlang环境。
可以直接参考官方的说法:RabbitMQ is the most widely deployed open source message broker.言简意赅,一目了然。
2、AMQP
高级消息队列协议(AMQP)是一个异步消息传递所使用的应用层协议规范。作为线路层协议(AMQP是一个抽象的协议,它不负责处理具体的数据),而不是API(例如Java消息系统JMS),AMQP客户端能够无视消息的来源任意发送和接受信息。
AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件(MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。扩展阅读:既然有高级的消息协议,必然有简单的协议,STOMP(Simple (or Streaming) Text Orientated Messaging Protocol),也就是简单消息文本协议,猛击
3、MSMQ
这里附带介绍一下MSMQ。.NET开发者接触最多的可能还是这个消息队列,我知道有两个以.NET作为主要开发语言的公司都选择MSMQ来开发公共框架如ESB、日志组件等。
如果你有.NET下MSMQ(微软消息队列)开发和使用经验,一定不会对队列常用术语陌生。对比一下,对后面RabbitMQ的学习和理解非常有帮助。
逻辑结构如下:
4、基本术语
安装好RabbitMQ后,可以启用插件,打开RabbitMQ自带的后台,一图胜千言,你会看到很多似曾相识的技术术语和名词。
当然你也可以参考的图片示例一个一个验证下面的名词。
(1)Broker:消息队列服务器实体。
(2)Producer:生产者。
(3)Consumer:消费者。
(4)Queue(队列):消息队列载体,每个消息都会被投入到一个或多个队列。Queue是 RabbitMQ 的内部对象,用于存储消息;消费者Consumer就是通过订阅队列来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,生产者Producer生产消息并最终投递到 Queue 中,消费者可以从 Queue 中获取消息并消费;多个消费者可以订阅同一个 Queue。
(5)Connection(连接):Producer 和 Consumer 通过TCP 连接到 RabbitMQ Server。
(6)Channel(信道):基于 Connection创建,数据流动都是在 Channel 中进行。
(7)Exchange(交换器):生产者将消息发送到 Exchange(交换器),由Exchange 将消息路由到一个或多个 Queue 中(或者丢弃);Exchange 并不存储消息;Exchange Types 常用的有 Fanout、Direct、Topic 和Header四种类型,每种类型对应不同的路由规则:
Direct:完全匹配,消息路由到那些 Routing Key 与 Binding Key 完全匹配的 Queue 中。比如 Routing Key 为mq_cleint-key,只会转发mq_cleint-key,不会转发mq_cleint-key.1,也不会转发mq_cleint-key.1.2。Topic:模式匹配,Exchange 会把消息发送到一个或者多个满足通配符规则的 routing-key 的 Queue。其中*表示匹配一个 word,#匹配多个 word 和路径,路径之间通过.隔开。如满足a.*.c的 routing-key 有a.hello.c;满足#.hello的 routing-key 有a.b.c.hello。Fanout:忽略匹配,把所有发送到该 Exchange 的消息路由到所有与它绑定 的Queue 中。Header:也根据规则匹配,相较于Direct和Topic固定地使用RoutingKey ,Headers 则是一个自定义匹配规则的类型。在队列与交换器绑定时, 会设定一组键值对(Key-Value)规则, 消息中也包括一组键值对( Headers 属性), 当这些键值对有一对,,或全部匹配时, 消息被投送到对应队列。
(8)Binding(绑定):是 Exchange(交换器)将消息路由给 Queue 所需遵循的规则。
(9)Routing Key(路由键):消息发送给 Exchange(交换器)时,消息将拥有一个路由键(默认为空), Exchange(交换器)根据这个路由键将消息发送到匹配的队列中。
(10)Binding Key(绑定键):指定当前 Exchange(交换器)下,什么样的 Routing Key(路由键)会被下派到当前绑定的 Queue 中。
5、应用场景
我们使用一个技术或组件或中间件,必须要非常理解它的适用场景,否则很容易误用。
RabbitMQ的经典应用场景包括:异步处理、应用解耦、流量削峰、日志处理、消息通讯。
已经有很多人总结了这5种场景下的RabbitMQ实际应用。
推荐阅读:猛击
到这里,RabbitMQ基础知识介绍结束,下面开始动手实践。
添加依赖
org.springframework.boot spring-boot-starter-amqp
配置RabbitMQ
## RabbitMQ相关配置spring.application.name=springbootdemospring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=springbootmqspring.rabbitmq.password=123456
新增RabbitMQConfig类
package com.power.demo.messaging;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * RabbitMQ消息队列配置类 ** 注意:如果已在配置文件中声明了Queue对象,就不用在RabbitMQ的管理员页面创建队列(Queue)了 */@Configurationpublic class RabbitMQConfig { /** * 声明接收字符串的队列 Hello 默认 * * @return */ @Bean public Queue stringQueue() { //boolean isDurable = true;//是否持久化 //boolean isExclusive = false; //仅创建者可以使用的私有队列,断开后自动删除 //boolean isAutoDelete = false; //当所有消费客户端连接断开后,是否自动删除队列 //Queue queue = new Queue(MQField.HELLO_STRING_QUEUE, isDurable, isExclusive, isAutoDelete); //return queue; //return new Queue(MQField.HELLO_STRING_QUEUE); //默认支持持久化 return QueueBuilder.durable(MQField.HELLO_STRING_QUEUE) //.exclusive() //.autoDelete() .build(); } /** * 声明接收Goods对象的队列 Hello 支持持久化 * * @return */ @Bean public Queue goodsQueue() { return QueueBuilder.durable(MQField.HELLO_GOODS_QUEUE).build(); } /** * 声明WorkQueue队列 competing consumers pattern,多个消费者不会重复消费队列的相同消息 * * @return */ @Bean public Queue workQueue() { return QueueBuilder.durable(MQField.MY_WORKER_QUEUE).build(); } /** * 消息队列中最常见的模式:发布订阅模式 *
* 声明发布订阅模式队列 Publish/Subscribe *
* exchange类型包括:direct, topic, headers 和 fanout **/ /*fanout(广播)队列相关声明开始*/ @Bean public Queue fanOutAQueue() { return QueueBuilder.durable(MQField.MY_FANOUTA_QUEUE).build(); } @Bean public Queue fanOutBQueue() { return QueueBuilder.durable(MQField.MY_FANOUTB_QUEUE).build(); } @Bean FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange(MQField.MY_FANOUT_EXCHANGE).build(); //return new FanoutExchange(MQField.MY_FANOUT_EXCHANGE); } @Bean Binding bindingExchangeA(Queue fanOutAQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutAQueue).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue fanOutBQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutBQueue).to(fanoutExchange); } /*fanout队列相关声明结束*/ /*topic队列相关声明开始*/ @Bean public Queue topicAQueue() { return QueueBuilder.durable(MQField.MY_TOPICA_QUEUE).build(); } @Bean public Queue topicBQueue() { return QueueBuilder.durable(MQField.MY_TOPICB_QUEUE).build(); } @Bean TopicExchange topicExchange() { return (TopicExchange) ExchangeBuilder.topicExchange(MQField.MY_TOPIC_EXCHANGE).build(); } //绑定时,注意队列名称与上述方法名一致 @Bean Binding bindingTopicAExchangeMessage(Queue topicAQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicAQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYA); } @Bean Binding bindingTopicBExchangeMessages(Queue topicBQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicBQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYB); } /*topic队列相关声明结束*/ /*direct队列相关声明开始*/ @Bean public Queue directAQueue() { return QueueBuilder.durable(MQField.MY_DIRECTA_QUEUE).build(); } @Bean public Queue directBQueue() { return QueueBuilder.durable(MQField.MY_DIRECTB_QUEUE).build(); } /** * 声明Direct交换机 支持持久化. * * @return the exchange */ @Bean DirectExchange directExchange() { return (DirectExchange) ExchangeBuilder.directExchange(MQField.MY_DIRECT_EXCHANGE).durable(true).build(); } @Bean Binding bindingDirectAExchangeMessage(Queue directAQueue, DirectExchange directExchange) { return BindingBuilder.bind(directAQueue).to(directExchange).with(MQField.MY_DIRECT_ROUTINGKEYA); } @Bean Binding bindingDirectBExchangeMessage(Queue directBQueue, DirectExchange directExchange) { return BindingBuilder.bind(directBQueue).to(directExchange) //.with(MQField.MY_DIRECT_ROUTINGKEYB) .with(MQField.MY_DIRECT_ROUTINGKEYB); } /*direct队列相关声明结束*/}
RabbitMQConfig我将常用到的模式都配置在里面了,注释已经写得很清楚,在详细介绍模式的地方会用到这里定义的队列、绑定和交换器。
持久化配置
在RabbitMQConfig类中尤其注意这几个参数,包括是否可持久化durable;仅创建者可以使用的私有队列,断开后自动删除exclusive;当所有消费客户端连接断开后,是否自动删除队列autoDelete。其中durable和autoDelete对队列和交换器都可以配置。
RabbitMQ支持的消息的持久化(durable),也就是将数据写在磁盘上,为了数据安全考虑,绝大多数场景下我们都会选择持久化,可能记录一些不是业务必须的日志稍微例外。
消息队列持久化包括3个部分:(1)、队列持久化,在声明时指定Queue.durable为1
(2)、交换器持久化,在声明时指定Exchange.durable为1
(3)、消息持久化,在投递时指定消息的delivery_mode为2(而1表示非持久化) 参考:
如果Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的;如果Exchange和Queue两者之间有一个持久化,另一个非持久化,就不允许建立绑定。
二、常见模式
在Spring Boot下使用RabbitMQ非常容易,直接调用AmqpTemplate类封装好的接口即可。
1、hello world
P为生产者,C为消费者,中间红色框表示消息队列。生产者P将消息发送到消息队列Queue,消费者C对消息进行处理。
生产者:
package com.power.demo.messaging.hello;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;/** * Hello消息生产者 **/@Componentpublic class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息为空"); return isOK; } rabbitTemplate.convertAndSend(MQField.HELLO_STRING_QUEUE, message); isOK = true; System.out.println(String.format("HelloSender发送字符串消息结果:%s", isOK)); return isOK; } public boolean send(GoodsVO goodsVO) throws Exception { boolean isOK = false; rabbitTemplate.convertAndSend(MQField.HELLO_GOODS_QUEUE, goodsVO); isOK = true; System.out.println(String.format("HelloSender发送对象消息结果:%s", isOK)); return isOK; }}
消费者:
package com.power.demo.messaging.hello;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import com.power.demo.util.SerializeUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * Hello消息消费者 **/@Componentpublic class HelloReceiver { @RabbitListener(queues = MQField.HELLO_STRING_QUEUE) @RabbitHandler public void process(String message) { try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } System.out.println("HelloReceiver接收到的字符串消息是 => " + message); } @RabbitListener(queues = MQField.HELLO_GOODS_QUEUE) @RabbitHandler public void process(GoodsVO goodsVO) { System.out.println("------ 接收实体对象 ------"); System.out.println("HelloReceiver接收到的实体对象是 => " + SerializeUtil.Serialize(goodsVO)); }}
这是最简单的一种模式,这个最简单示例,可以看到应用场景里的异步处理的影子。
在Controller中,新增一个接口:
@RequestMapping(value = "/hello/sendmsg", method = RequestMethod.GET) @ApiOperation("简单字符串消息测试") @ApiImplicitParams({ @ApiImplicitParam(paramType = "query", name = "message", required = true, value = "字符串消息", dataType = "String") }) public String sendMsg(String message) throws Exception { boolean isOK = helloSender.send(message); return String.valueOf(isOK); }
按照传统方式调用RPC接口,通常都是同步等待接口返回,而使用队列后,消息生产者直接向RabbitMQ服务器发送一条消息,不需要同步等待这个消息的处理结果。
示例代码中,消息消费者会刻意等待5秒(Thread.sleep(5000);)后才处理(打印出)消息,但是实际调用这个接口的时候,非常快就返回成功结果了,因为这个发送消息的动作不需要等待消费者消费消息的结果。
发送的消息,除了简单消息对象如字符串等,示例里你还看到有一个发送商品对象的消息,也就是说明RabbitMQ支持自定义的复杂对象消息。
2、work queues
P为生产者,C1、C2为消费者,中间红色框表示消息队列。生产者P将消息发送到消息队列Queue,消费者C1和C2对消息进行处理。
这种模式比较容易产生误解的地方是,多个消费者会不会消费队列里的同一条消息。答案是不会。
官方的说明是因为消费者根据竞争消费模式()分派任务(Distributing tasks among workers (the competing consumers pattern) )。
对于work queues这种模式,同一条消息M1,要么C1拉取到,要么C2拉取到,不会出现C1和C2同时拉取到并消费。
当然,这种模式还可以扩展,除了一个生产者,也可以有多个生产者。
生产者:
package com.power.demo.messaging.workqueues;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class WorkProducerA { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息为空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message); isOK = true; System.out.println(String.format("WorkProducerA发送字符串消息结果:%s", isOK)); return isOK; }}
相同队列下另一个生产者:
package com.power.demo.messaging.workqueues;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class WorkProducerB { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息为空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message); isOK = true; System.out.println(String.format("WorkProducerB发送字符串消息结果:%s", isOK)); return isOK; }}
消费者:
package com.power.demo.messaging.workqueues;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.concurrent.atomic.AtomicInteger;@Componentpublic class WorkConsumerA { private static AtomicInteger atomicInteger = new AtomicInteger(); @RabbitListener(queues = MQField.MY_WORKER_QUEUE) @RabbitHandler public void process(String message) throws Exception { int index = atomicInteger.getAndIncrement(); Thread.sleep(2000); System.out.println("WorkConsumerA接收到的字符串消息是 => " + message); System.out.println("WorkConsumerA自增序号 => " + index); }}
另一个消费者:
package com.power.demo.messaging.workqueues;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.concurrent.atomic.AtomicInteger;@Componentpublic class WorkConsumerB { private static AtomicInteger atomicInteger = new AtomicInteger(); @RabbitListener(queues = MQField.MY_WORKER_QUEUE) @RabbitHandler public void process(String message) throws Exception { int index = atomicInteger.getAndIncrement(); Thread.sleep(10); System.out.println("WorkConsumerB接收到的字符串消息是 => " + message); System.out.println("WorkConsumerB自增序号 => " + index); }}
pub/sub
应用最广泛的发布/订阅模式。
官方的说法是:发送多个消息到多个消费者(Sending messages to many consumers at once.)
这个模式和work queues模式最明显的区别是,队列Queue前加了一层,多了Exchange(交换器)。
P为生产者,X为交换器,C1、C2为消费者,中间红色框表示消息队列。生产者P将消息不是直接发送到队列Queue,而是发送到交换器X(注意:交换器Exchange并不存储消息),然后由交换机X发送给两个队列,两个消费者C1和C2各自监听一个队列,来消费消息。
根据交换器类型的不同,又可以分为Fanout、Direct和Topic这三种消费方式,Headers方式实际应用不是非常广泛,本文暂不讨论。
3、fanout
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
(1)可以理解为路由表的模式
(2)这种模式不需要RoutingKey,即使配置了也忽略
(3)这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定
(4)如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃
Fanout广播模式实现同一个消息被多个消费者消费,而work queues是同一个消息只能有一个消费者(竞争去)消费。
生产者:
package com.power.demo.messaging.pubsub.fanout;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(GoodsVO goodsVO) throws Exception { boolean isOK = false; if (goodsVO == null) { System.out.println("消息为空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_FANOUT_EXCHANGE, "", goodsVO); isOK = true; System.out.println(String.format("FanoutSender发送对象消息结果:%s", isOK)); return isOK; }}
消费者:
package com.power.demo.messaging.pubsub.fanout;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import com.power.demo.util.SerializeUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class FanoutReceiverA { @RabbitListener(queues = MQField.MY_FANOUTA_QUEUE) @RabbitHandler public void process(GoodsVO goodsVO) { System.out.println("FanoutReceiverA接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO)); }}
另一个消费者:
package com.power.demo.messaging.pubsub.fanout;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import com.power.demo.util.SerializeUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class FanoutReceiverB { @RabbitListener(queues = MQField.MY_FANOUTB_QUEUE) @RabbitHandler public void process(GoodsVO goodsVO) { System.out.println("FanoutReceiverB接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO)); }}
4、direct
Fanout是1对多以广播的方式,发送给所有的消费者。
Direct则是创建消息队列的时候,指定一个BindingKey。当发送者发送消息的时候,指定对应的RoutingKey,当RoutingKey和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中。
Direct广播模式最明显不同于Fanout模式的地方是,消费者可以进行消息过滤,有选择的进行接收想要消费的消息,也就是队列绑定关键字,发送者将数据根据关键字发送到Exchange,Exchange根据关键字判定应该将数据发送(路由)到指定队列。任何发送到Direct Exchange的消息都会被转发到RoutingKey中指定的Queue。
(1)消息传递时需要一个“RoutingKey”,可以简单的理解为要发送到的队列名字
(2)如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃
生产者:
package com.power.demo.messaging.pubsub.direct;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class DirectSender { @Autowired private AmqpTemplate rabbitTemplate; public boolean sendDirectA(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息为空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYA, message); isOK = true; System.out.println(String.format("DirectSender发送DirectA字符串消息结果:%s", isOK)); return isOK; } public boolean sendDirectB(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息为空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYB, message); isOK = true; System.out.println(String.format("DirectSender发送DirectB字符串消息结果:%s", isOK)); return isOK; }}
消费者:
package com.power.demo.messaging.pubsub.direct;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class DirectReceiverA { @RabbitListener(queues = MQField.MY_DIRECTA_QUEUE) @RabbitHandler public void process(String message) { System.out.println("DirectReceiverA接收到的字符串消息是 => " + message); }}
另一个消费者:
package com.power.demo.messaging.pubsub.direct;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class DirectReceiverB { @RabbitListener(queues = MQField.MY_DIRECTB_QUEUE) @RabbitHandler public void process(String message) { System.out.println("DirectReceiverB接收到的字符串消息是 => " + message); }}
5、topic
Topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的RoutingKey和该模式相匹配的时候,消息才会被发送到该消息队列中。
任何发送到Topic Exchange的消息都会被转发到所有关心RoutingKey中指定话题的Queue上
(1)每个队列都有其关心的主题,所有的消息都带有一个“标题”(RoutingKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列
(2)需要RoutingKey,也需要提前绑定Exchange与Queue
(3)在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RoutingKey为”mq.log.error”的消息会被转发到该队列)
(4)“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但“log.#”能与上述两者都匹配
(5)如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息
生产者:
package com.power.demo.messaging.pubsub.topic;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public boolean sendTopicA(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息为空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, MQField.MY_TOPIC_ROUTINGKEYA, message); isOK = true; System.out.println(String.format("TopicSender发送TopicA字符串消息结果:%s", isOK)); return isOK; } public boolean sendTopicB(String message) throws Exception { boolean isOK = false; if (StringUtils.isEmpty(message)) { System.out.println("消息为空"); return isOK; } rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, MQField.MY_TOPIC_ROUTINGKEYB, message); isOK = true; System.out.println(String.format("TopicSender发送TopicB字符串消息结果:%s", isOK)); return isOK; } public boolean sendToMatchedTopic() { boolean isOK = false; String routingKey = "my_topic_routingkeyA.16";//模糊匹配MQField.MY_TOPIC_ROUTINGKEYA //String routingKey = "my_topic_routingkeyB.32";//模糊匹配MQField.MY_TOPIC_ROUTINGKEYB String matchedKeys = ""; if (MQField.MY_TOPIC_ROUTINGKEYA.contains(routingKey.split("\\.")[0])) { matchedKeys = "TopicReceiverA"; } else if (MQField.MY_TOPIC_ROUTINGKEYB.contains(routingKey.split("\\.")[0])) { matchedKeys = "TopicReceiverB"; } String msg = "message to matched receivers:" + matchedKeys; rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, routingKey, msg); isOK = true; System.out.println(String.format("TopicSender发送字符串消息结果:%s", isOK)); return isOK; }}
消费者:
package com.power.demo.messaging.pubsub.topic;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class TopicReceiverA { @RabbitListener(queues = MQField.MY_TOPICA_QUEUE) @RabbitHandler public void process(String message) { System.out.println("TopicReceiverA接收到的字符串消息是 => " + message); }}
另一个消费者:
package com.power.demo.messaging.pubsub.topic;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class TopicReceiverB { @RabbitListener(queues = MQField.MY_TOPICB_QUEUE) @RabbitHandler public void process(String message) { System.out.println("TopicReceiverB接收到的字符串消息是 => " + message); }}
示例代码中,定义了两个topic,生产者通过调用sendToMatchedTopic方法,根据RoutingKey模糊匹配,将消息发送到匹配的队列上。
到这里,发布订阅模式的介绍就结束了。我们再来总结下发布订阅模式下RabbitMQ消息队列主要工作流程。以Topic为例:
生产者
1、获取一个连接(Connection)2、从连接(Connection)上获取一个信道( Channel)3、声明一个交换器( Exchange)4、声明1个或多个队列(Queue)5、把队列(Queue)绑定到交换器(Exchange)上6、向指定的交换器(Exchange)发送消息,消息路由到特定队列(Queue)消费者
RabbitMQ消费者消费消息,支持推(push)模式和拉(pull)模式,这里以拉模式说明下流程。
1、创建一个连接(Connection)
2、启动MainLoop后台线程,通过连接(Connection)循环拉取消息3、处理并确认消息被消费6、rpc
RPC调用流程说明:
(1)当客户端启动的时候,它创建一个匿名独享的回调队列(2)在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性
(3)将请求发送到一个 rpc_queue 队列中
(4)服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
(5)客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用
Callback queue回调队列,客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
Correlation id关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。服务端:
package com.power.demo.messaging.rpc;import com.power.demo.messaging.MQField;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Envelope;import java.io.IOException;import java.util.concurrent.TimeoutException;public class RPCServer { private static int fib(int n) { if (n == 0) { return 0; } if (n == 1) { return 1; } return fib(n - 1) + fib(n - 2); } //直接运行此方法 public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); //factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(MQField.MY_RPC_QUEUE, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body, "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); System.out.println(String.format("RPCServer计算fib数列应答:%s", response)); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized (this) { this.notify(); } } } }; channel.basicConsume(MQField.MY_RPC_QUEUE, false, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) { } } }}
客户端:
package com.power.demo.messaging.rpc;import com.power.demo.messaging.MQField;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Envelope;import java.io.IOException;import java.util.UUID;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeoutException;public class RPCClient { private Connection connection; private Channel channel; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); //factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", MQField.MY_RPC_QUEUE, props, message.getBytes("UTF-8")); final BlockingQueueresponse = new ArrayBlockingQueue (1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } //直接运行此方法 public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(10)"); response = fibonacciRpc.call("10"); System.out.println(" [.] Got '" + response + "'"); System.out.println(String.format("RPCClient得到计算fib数列应答:%s", response)); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (IOException _ignore) { } } } }}
示例代码我这里直接改造了一下官方的demo代码。启动RPCServer,再运行RPCClient就可以看到RPC调用结果了。
三、常见问题
1、幂等性
生产环境各种业务系统出现重复消息是不可避免的,因为不能保证生产者不发送重复消息。
对于读操作而言,重复消息可能无害,但是对于写操作,重复消息容易造成业务灾难,比如相同消息多次扣减库存,多次支付请求扣款等。
有一种情况也会造成重复消息,就是RabbitMQ对设置autoAck=false之后没有被Ack的消息是不会清除掉的,消费者可以多次重复消费。
我个人认为RabbitMQ只是消息传递的载体,要保证幂等性,还是需要在消费者业务逻辑上下功夫。
2、有序消息
我碰到过某厂有一个开发团队通过Kafka来实现有序队列,因为发送的消息有先后依赖关系,需要消费者收到多个消息保存起来最后聚合后一起处理业务逻辑。
但是,其实大部分业务场景下我们都不需要消息有先后依赖关系,因为有序队列产生依赖关系,后续消费很容易造成各种处理难题。
归根结底,我认为需要有序消息的业务系统在设计上就是不合理的,争取在设计上规避才好。当然良好的设计需要丰富的经验和优化,以及妥协。
3、高可用
RabbitMQ支持集群,模式主要可分为三种:单一模式、普通模式和镜像模式。
RabbitMQ支持弹性部署,在业务高峰期间可通过集群弹性部署支撑业务系统。
RabbitMQ支持消息持久化,如果队列服务器出现问题,消息做了持久化,后续恢复正常,消息数据不丢失不会影响正常业务流程。
RabbitMQ还有很多高级特性,比如发布确认和事务等,虽然可能会降低性能,但是增强了可靠性。
参考: