博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot消息队列应用实践
阅读量:6410 次
发布时间:2019-06-23

本文共 30746 字,大约阅读时间需要 102 分钟。

消息队列是大型复杂系统解耦利器。本文根据应用广泛的消息队列RabbitMQ,介绍Spring Boot应用程序中队列中间件的开发和应用。

一、RabbitMQ基础

1、RabbitMQ简介

RabbitMQ是Spring所在公司Pivotal自己的产品,是基于AMQP高级队列协议的消息中间件,采用erlang开发,所以你的RabbitMQ队列服务器需要erlang环境。

可以直接参考官方的说法:RabbitMQ is the most widely deployed open source message broker.言简意赅,一目了然。

2、AMQP

高级消息队列协议(AMQP)是一个异步消息传递所使用的应用层协议规范。作为线路层协议(AMQP是一个抽象的协议,它不负责处理具体的数据),而不是API(例如Java消息系统JMS),AMQP客户端能够无视消息的来源任意发送和接受信息。

AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件(MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。

扩展阅读:既然有高级的消息协议,必然有简单的协议,STOMP(Simple (or Streaming) Text Orientated Messaging Protocol),也就是简单消息文本协议,猛击

3、MSMQ

这里附带介绍一下MSMQ。.NET开发者接触最多的可能还是这个消息队列,我知道有两个以.NET作为主要开发语言的公司都选择MSMQ来开发公共框架如ESB、日志组件等。

如果你有.NET下MSMQ(微软消息队列)开发和使用经验,一定不会对队列常用术语陌生。对比一下,对后面RabbitMQ的学习和理解非常有帮助。

逻辑结构如下:

4、基本术语  

安装好RabbitMQ后,可以启用插件,打开RabbitMQ自带的后台,一图胜千言,你会看到很多似曾相识的技术术语和名词。

当然你也可以参考的图片示例一个一个验证下面的名词。

(1)Broker:消息队列服务器实体。

(2)Producer:生产者。

(3)Consumer:消费者。

(4)Queue(队列):消息队列载体,每个消息都会被投入到一个或多个队列。Queue是 RabbitMQ 的内部对象,用于存储消息;消费者Consumer就是通过订阅队列来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,生产者Producer生产消息并最终投递到 Queue 中,消费者可以从 Queue 中获取消息并消费;多个消费者可以订阅同一个 Queue。

(5)Connection(连接):Producer 和 Consumer 通过TCP 连接到 RabbitMQ Server。

(6)Channel(信道):基于 Connection创建,数据流动都是在 Channel 中进行。

(7)Exchange(交换器):生产者将消息发送到 Exchange(交换器),由Exchange 将消息路由到一个或多个 Queue 中(或者丢弃);Exchange 并不存储消息;Exchange Types 常用的有 Fanout、Direct、Topic 和Header四种类型,每种类型对应不同的路由规则:

Direct:完全匹配,消息路由到那些 Routing Key 与 Binding Key 完全匹配的 Queue 中。比如 Routing Key 为mq_cleint-key,只会转发mq_cleint-key,不会转发mq_cleint-key.1,也不会转发mq_cleint-key.1.2。
Topic:模式匹配,Exchange 会把消息发送到一个或者多个满足通配符规则的 routing-key 的 Queue。其中*表示匹配一个 word,#匹配多个 word 和路径,路径之间通过.隔开。如满足a.*.c的 routing-key 有a.hello.c;满足#.hello的 routing-key 有a.b.c.hello。
Fanout:忽略匹配,把所有发送到该 Exchange 的消息路由到所有与它绑定 的Queue 中。

Header:也根据规则匹配,相较于Direct和Topic固定地使用RoutingKey ,Headers 则是一个自定义匹配规则的类型。在队列与交换器绑定时, 会设定一组键值对(Key-Value)规则, 消息中也包括一组键值对( Headers 属性), 当这些键值对有一对,,或全部匹配时, 消息被投送到对应队列。

(8)Binding(绑定):是 Exchange(交换器)将消息路由给 Queue 所需遵循的规则。

(9)Routing Key(路由键):消息发送给 Exchange(交换器)时,消息将拥有一个路由键(默认为空), Exchange(交换器)根据这个路由键将消息发送到匹配的队列中。

(10)Binding Key(绑定键):指定当前 Exchange(交换器)下,什么样的 Routing Key(路由键)会被下派到当前绑定的 Queue 中。

5、应用场景

我们使用一个技术或组件或中间件,必须要非常理解它的适用场景,否则很容易误用。

RabbitMQ的经典应用场景包括:异步处理、应用解耦、流量削峰、日志处理、消息通讯。

已经有很多人总结了这5种场景下的RabbitMQ实际应用。

推荐阅读:猛击

到这里,RabbitMQ基础知识介绍结束,下面开始动手实践。

添加依赖

org.springframework.boot
spring-boot-starter-amqp
RabbitMQ

配置RabbitMQ

## RabbitMQ相关配置spring.application.name=springbootdemospring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=springbootmqspring.rabbitmq.password=123456
application.mq.properties

新增RabbitMQConfig类

package com.power.demo.messaging;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * RabbitMQ消息队列配置类 * 

* 注意:如果已在配置文件中声明了Queue对象,就不用在RabbitMQ的管理员页面创建队列(Queue)了 */@Configurationpublic class RabbitMQConfig { /** * 声明接收字符串的队列 Hello 默认 * * @return */ @Bean public Queue stringQueue() { //boolean isDurable = true;//是否持久化 //boolean isExclusive = false; //仅创建者可以使用的私有队列,断开后自动删除 //boolean isAutoDelete = false; //当所有消费客户端连接断开后,是否自动删除队列 //Queue queue = new Queue(MQField.HELLO_STRING_QUEUE, isDurable, isExclusive, isAutoDelete); //return queue; //return new Queue(MQField.HELLO_STRING_QUEUE); //默认支持持久化 return QueueBuilder.durable(MQField.HELLO_STRING_QUEUE) //.exclusive() //.autoDelete() .build(); } /** * 声明接收Goods对象的队列 Hello 支持持久化 * * @return */ @Bean public Queue goodsQueue() { return QueueBuilder.durable(MQField.HELLO_GOODS_QUEUE).build(); } /** * 声明WorkQueue队列 competing consumers pattern,多个消费者不会重复消费队列的相同消息 * * @return */ @Bean public Queue workQueue() { return QueueBuilder.durable(MQField.MY_WORKER_QUEUE).build(); } /** * 消息队列中最常见的模式:发布订阅模式 *

* 声明发布订阅模式队列 Publish/Subscribe *

* exchange类型包括:direct, topic, headers 和 fanout **/ /*fanout(广播)队列相关声明开始*/ @Bean public Queue fanOutAQueue() { return QueueBuilder.durable(MQField.MY_FANOUTA_QUEUE).build(); } @Bean public Queue fanOutBQueue() { return QueueBuilder.durable(MQField.MY_FANOUTB_QUEUE).build(); } @Bean FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange(MQField.MY_FANOUT_EXCHANGE).build(); //return new FanoutExchange(MQField.MY_FANOUT_EXCHANGE); } @Bean Binding bindingExchangeA(Queue fanOutAQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutAQueue).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue fanOutBQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutBQueue).to(fanoutExchange); } /*fanout队列相关声明结束*/ /*topic队列相关声明开始*/ @Bean public Queue topicAQueue() { return QueueBuilder.durable(MQField.MY_TOPICA_QUEUE).build(); } @Bean public Queue topicBQueue() { return QueueBuilder.durable(MQField.MY_TOPICB_QUEUE).build(); } @Bean TopicExchange topicExchange() { return (TopicExchange) ExchangeBuilder.topicExchange(MQField.MY_TOPIC_EXCHANGE).build(); } //绑定时,注意队列名称与上述方法名一致 @Bean Binding bindingTopicAExchangeMessage(Queue topicAQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicAQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYA); } @Bean Binding bindingTopicBExchangeMessages(Queue topicBQueue, TopicExchange topicExchange) { return BindingBuilder.bind(topicBQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYB); } /*topic队列相关声明结束*/ /*direct队列相关声明开始*/ @Bean public Queue directAQueue() { return QueueBuilder.durable(MQField.MY_DIRECTA_QUEUE).build(); } @Bean public Queue directBQueue() { return QueueBuilder.durable(MQField.MY_DIRECTB_QUEUE).build(); } /** * 声明Direct交换机 支持持久化. * * @return the exchange */ @Bean DirectExchange directExchange() { return (DirectExchange) ExchangeBuilder.directExchange(MQField.MY_DIRECT_EXCHANGE).durable(true).build(); } @Bean Binding bindingDirectAExchangeMessage(Queue directAQueue, DirectExchange directExchange) { return BindingBuilder.bind(directAQueue).to(directExchange).with(MQField.MY_DIRECT_ROUTINGKEYA); } @Bean Binding bindingDirectBExchangeMessage(Queue directBQueue, DirectExchange directExchange) { return BindingBuilder.bind(directBQueue).to(directExchange) //.with(MQField.MY_DIRECT_ROUTINGKEYB) .with(MQField.MY_DIRECT_ROUTINGKEYB); } /*direct队列相关声明结束*/}

RabbitMQConfig

RabbitMQConfig我将常用到的模式都配置在里面了,注释已经写得很清楚,在详细介绍模式的地方会用到这里定义的队列、绑定和交换器。

持久化配置

在RabbitMQConfig类中尤其注意这几个参数,包括是否可持久化durable;仅创建者可以使用的私有队列,断开后自动删除exclusive;当所有消费客户端连接断开后,是否自动删除队列autoDelete。其中durable和autoDelete对队列和交换器都可以配置。

RabbitMQ支持的消息的持久化(durable),也就是将数据写在磁盘上,为了数据安全考虑,绝大多数场景下我们都会选择持久化,可能记录一些不是业务必须的日志稍微例外。

消息队列持久化包括3个部分:

(1)、队列持久化,在声明时指定Queue.durable为1

(2)、交换器持久化,在声明时指定Exchange.durable为1

(3)、消息持久化,在投递时指定消息的delivery_mode为2(而1表示非持久化) 参考:

如果Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的;如果Exchange和Queue两者之间有一个持久化,另一个非持久化,就不允许建立绑定。

二、常见模式

在Spring Boot下使用RabbitMQ非常容易,直接调用AmqpTemplate类封装好的接口即可。

1、hello world

 

P为生产者,C为消费者,中间红色框表示消息队列。生产者P将消息发送到消息队列Queue,消费者C对消息进行处理。

生产者:

package com.power.demo.messaging.hello;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;/** * Hello消息生产者 **/@Componentpublic class HelloSender {    @Autowired    private AmqpTemplate rabbitTemplate;    public boolean send(String message) throws Exception {        boolean isOK = false;        if (StringUtils.isEmpty(message)) {            System.out.println("消息为空");            return isOK;        }        rabbitTemplate.convertAndSend(MQField.HELLO_STRING_QUEUE, message);        isOK = true;        System.out.println(String.format("HelloSender发送字符串消息结果:%s", isOK));        return isOK;    }    public boolean send(GoodsVO goodsVO) throws Exception {        boolean isOK = false;        rabbitTemplate.convertAndSend(MQField.HELLO_GOODS_QUEUE, goodsVO);        isOK = true;        System.out.println(String.format("HelloSender发送对象消息结果:%s", isOK));        return isOK;    }}
HelloSender

消费者:

package com.power.demo.messaging.hello;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import com.power.demo.util.SerializeUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/** * Hello消息消费者 **/@Componentpublic class HelloReceiver {    @RabbitListener(queues = MQField.HELLO_STRING_QUEUE)    @RabbitHandler    public void process(String message) {        try {            Thread.sleep(5000);        } catch (Exception e) {            e.printStackTrace();        }        System.out.println("HelloReceiver接收到的字符串消息是 => " + message);    }    @RabbitListener(queues = MQField.HELLO_GOODS_QUEUE)    @RabbitHandler    public void process(GoodsVO goodsVO) {        System.out.println("------ 接收实体对象 ------");        System.out.println("HelloReceiver接收到的实体对象是 => " + SerializeUtil.Serialize(goodsVO));    }}
HelloReceiver

这是最简单的一种模式,这个最简单示例,可以看到应用场景里的异步处理的影子。

在Controller中,新增一个接口:

@RequestMapping(value = "/hello/sendmsg", method = RequestMethod.GET)    @ApiOperation("简单字符串消息测试")    @ApiImplicitParams({            @ApiImplicitParam(paramType = "query", name = "message", required = true, value = "字符串消息", dataType = "String")    })    public String sendMsg(String message) throws Exception {        boolean isOK = helloSender.send(message);        return String.valueOf(isOK);    }
sendmsg

按照传统方式调用RPC接口,通常都是同步等待接口返回,而使用队列后,消息生产者直接向RabbitMQ服务器发送一条消息,不需要同步等待这个消息的处理结果。

示例代码中,消息消费者会刻意等待5秒(Thread.sleep(5000);)后才处理(打印出)消息,但是实际调用这个接口的时候,非常快就返回成功结果了,因为这个发送消息的动作不需要等待消费者消费消息的结果。

发送的消息,除了简单消息对象如字符串等,示例里你还看到有一个发送商品对象的消息,也就是说明RabbitMQ支持自定义的复杂对象消息。

2、work queues

P为生产者,C1、C2为消费者,中间红色框表示消息队列。生产者P将消息发送到消息队列Queue,消费者C1和C2对消息进行处理。

这种模式比较容易产生误解的地方是,多个消费者会不会消费队列里的同一条消息。答案是不会。

官方的说明是因为消费者根据竞争消费模式()分派任务(Distributing tasks among workers (the competing consumers pattern) )。

对于work queues这种模式,同一条消息M1,要么C1拉取到,要么C2拉取到,不会出现C1和C2同时拉取到并消费。

当然,这种模式还可以扩展,除了一个生产者,也可以有多个生产者。

生产者:

package com.power.demo.messaging.workqueues;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class WorkProducerA {    @Autowired    private AmqpTemplate rabbitTemplate;    public boolean send(String message) throws Exception {        boolean isOK = false;        if (StringUtils.isEmpty(message)) {            System.out.println("消息为空");            return isOK;        }        rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message);        isOK = true;        System.out.println(String.format("WorkProducerA发送字符串消息结果:%s", isOK));        return isOK;    }}
WorkProducerA

相同队列下另一个生产者:

package com.power.demo.messaging.workqueues;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class WorkProducerB {    @Autowired    private AmqpTemplate rabbitTemplate;    public boolean send(String message) throws Exception {        boolean isOK = false;        if (StringUtils.isEmpty(message)) {            System.out.println("消息为空");            return isOK;        }        rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message);        isOK = true;        System.out.println(String.format("WorkProducerB发送字符串消息结果:%s", isOK));        return isOK;    }}
WorkProducerB

消费者:

package com.power.demo.messaging.workqueues;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.concurrent.atomic.AtomicInteger;@Componentpublic class WorkConsumerA {    private static AtomicInteger atomicInteger = new AtomicInteger();    @RabbitListener(queues = MQField.MY_WORKER_QUEUE)    @RabbitHandler    public void process(String message) throws Exception {        int index = atomicInteger.getAndIncrement();        Thread.sleep(2000);        System.out.println("WorkConsumerA接收到的字符串消息是 => " + message);        System.out.println("WorkConsumerA自增序号 => " + index);    }}
WorkConsumerA

另一个消费者:

package com.power.demo.messaging.workqueues;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.concurrent.atomic.AtomicInteger;@Componentpublic class WorkConsumerB {    private static AtomicInteger atomicInteger = new AtomicInteger();    @RabbitListener(queues = MQField.MY_WORKER_QUEUE)    @RabbitHandler    public void process(String message) throws Exception {        int index = atomicInteger.getAndIncrement();        Thread.sleep(10);        System.out.println("WorkConsumerB接收到的字符串消息是 => " + message);        System.out.println("WorkConsumerB自增序号 => " + index);    }}
View Code

pub/sub

应用最广泛的发布/订阅模式。

官方的说法是:发送多个消息到多个消费者(Sending messages to many consumers at once.)

这个模式和work queues模式最明显的区别是,队列Queue前加了一层,多了Exchange(交换器)。

 P为生产者,X为交换器,C1、C2为消费者,中间红色框表示消息队列。生产者P将消息不是直接发送到队列Queue,而是发送到交换器X(注意:交换器Exchange并不存储消息),然后由交换机X发送给两个队列,两个消费者C1和C2各自监听一个队列,来消费消息。

根据交换器类型的不同,又可以分为Fanout、Direct和Topic这三种消费方式,Headers方式实际应用不是非常广泛,本文暂不讨论。

3、fanout

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

(1)可以理解为路由表的模式

(2)这种模式不需要RoutingKey,即使配置了也忽略

(3)这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定

(4)如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃

Fanout广播模式实现同一个消息被多个消费者消费,而work queues是同一个消息只能有一个消费者(竞争去)消费。

生产者:

package com.power.demo.messaging.pubsub.fanout;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class FanoutSender {    @Autowired    private AmqpTemplate rabbitTemplate;    public boolean send(GoodsVO goodsVO) throws Exception {        boolean isOK = false;        if (goodsVO == null) {            System.out.println("消息为空");            return isOK;        }        rabbitTemplate.convertAndSend(MQField.MY_FANOUT_EXCHANGE, "", goodsVO);        isOK = true;        System.out.println(String.format("FanoutSender发送对象消息结果:%s", isOK));        return isOK;    }}
FanoutSender

消费者:

package com.power.demo.messaging.pubsub.fanout;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import com.power.demo.util.SerializeUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class FanoutReceiverA {    @RabbitListener(queues = MQField.MY_FANOUTA_QUEUE)    @RabbitHandler    public void process(GoodsVO goodsVO) {        System.out.println("FanoutReceiverA接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO));    }}
FanoutReceiverA

另一个消费者:

package com.power.demo.messaging.pubsub.fanout;import com.power.demo.entity.vo.GoodsVO;import com.power.demo.messaging.MQField;import com.power.demo.util.SerializeUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class FanoutReceiverB {    @RabbitListener(queues = MQField.MY_FANOUTB_QUEUE)    @RabbitHandler    public void process(GoodsVO goodsVO) {        System.out.println("FanoutReceiverB接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO));    }}
FanoutReceiverB

4、direct

Fanout是1对多以广播的方式,发送给所有的消费者。

Direct则是创建消息队列的时候,指定一个BindingKey。当发送者发送消息的时候,指定对应的RoutingKey,当RoutingKey和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中。

Direct广播模式最明显不同于Fanout模式的地方是,消费者可以进行消息过滤,有选择的进行接收想要消费的消息,也就是队列绑定关键字,发送者将数据根据关键字发送到Exchange,Exchange根据关键字判定应该将数据发送(路由)到指定队列。

任何发送到Direct Exchange的消息都会被转发到RoutingKey中指定的Queue。

(1)消息传递时需要一个“RoutingKey”,可以简单的理解为要发送到的队列名字

(2)如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃

生产者:

package com.power.demo.messaging.pubsub.direct;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class DirectSender {    @Autowired    private AmqpTemplate rabbitTemplate;    public boolean sendDirectA(String message) throws Exception {        boolean isOK = false;        if (StringUtils.isEmpty(message)) {            System.out.println("消息为空");            return isOK;        }        rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYA, message);        isOK = true;        System.out.println(String.format("DirectSender发送DirectA字符串消息结果:%s", isOK));        return isOK;    }    public boolean sendDirectB(String message) throws Exception {        boolean isOK = false;        if (StringUtils.isEmpty(message)) {            System.out.println("消息为空");            return isOK;        }        rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYB, message);        isOK = true;        System.out.println(String.format("DirectSender发送DirectB字符串消息结果:%s", isOK));        return isOK;    }}
DirectSender

消费者:

package com.power.demo.messaging.pubsub.direct;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class DirectReceiverA {    @RabbitListener(queues = MQField.MY_DIRECTA_QUEUE)    @RabbitHandler    public void process(String message) {        System.out.println("DirectReceiverA接收到的字符串消息是 => " + message);    }}
DirectReceiverA

另一个消费者:

package com.power.demo.messaging.pubsub.direct;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class DirectReceiverB {    @RabbitListener(queues = MQField.MY_DIRECTB_QUEUE)    @RabbitHandler    public void process(String message) {        System.out.println("DirectReceiverB接收到的字符串消息是 => " + message);    }}
DirectReceiverB

5、topic

Topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的RoutingKey和该模式相匹配的时候,消息才会被发送到该消息队列中。

任何发送到Topic Exchange的消息都会被转发到所有关心RoutingKey中指定话题的Queue上

(1)每个队列都有其关心的主题,所有的消息都带有一个“标题”(RoutingKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列

(2)需要RoutingKey,也需要提前绑定Exchange与Queue

(3)在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RoutingKey为”mq.log.error”的消息会被转发到该队列)

(4)“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但“log.#”能与上述两者都匹配

(5)如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息

生产者:

package com.power.demo.messaging.pubsub.topic;import com.power.demo.messaging.MQField;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;@Componentpublic class TopicSender {    @Autowired    private AmqpTemplate rabbitTemplate;    public boolean sendTopicA(String message) throws Exception {        boolean isOK = false;        if (StringUtils.isEmpty(message)) {            System.out.println("消息为空");            return isOK;        }        rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, MQField.MY_TOPIC_ROUTINGKEYA, message);        isOK = true;        System.out.println(String.format("TopicSender发送TopicA字符串消息结果:%s", isOK));        return isOK;    }    public boolean sendTopicB(String message) throws Exception {        boolean isOK = false;        if (StringUtils.isEmpty(message)) {            System.out.println("消息为空");            return isOK;        }        rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, MQField.MY_TOPIC_ROUTINGKEYB, message);        isOK = true;        System.out.println(String.format("TopicSender发送TopicB字符串消息结果:%s", isOK));        return isOK;    }    public boolean sendToMatchedTopic() {        boolean isOK = false;        String routingKey = "my_topic_routingkeyA.16";//模糊匹配MQField.MY_TOPIC_ROUTINGKEYA        //String routingKey = "my_topic_routingkeyB.32";//模糊匹配MQField.MY_TOPIC_ROUTINGKEYB        String matchedKeys = "";        if (MQField.MY_TOPIC_ROUTINGKEYA.contains(routingKey.split("\\.")[0])) {            matchedKeys = "TopicReceiverA";        } else if (MQField.MY_TOPIC_ROUTINGKEYB.contains(routingKey.split("\\.")[0])) {            matchedKeys = "TopicReceiverB";        }        String msg = "message to matched receivers:" + matchedKeys;        rabbitTemplate.convertAndSend(MQField.MY_TOPIC_EXCHANGE, routingKey, msg);        isOK = true;        System.out.println(String.format("TopicSender发送字符串消息结果:%s", isOK));        return isOK;    }}
TopicSender

消费者:

package com.power.demo.messaging.pubsub.topic;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class TopicReceiverA {    @RabbitListener(queues = MQField.MY_TOPICA_QUEUE)    @RabbitHandler    public void process(String message) {        System.out.println("TopicReceiverA接收到的字符串消息是 => " + message);    }}
TopicReceiverA

另一个消费者:

package com.power.demo.messaging.pubsub.topic;import com.power.demo.messaging.MQField;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class TopicReceiverB {    @RabbitListener(queues = MQField.MY_TOPICB_QUEUE)    @RabbitHandler    public void process(String message) {        System.out.println("TopicReceiverB接收到的字符串消息是 => " + message);    }}
TopicReceiverB

示例代码中,定义了两个topic,生产者通过调用sendToMatchedTopic方法,根据RoutingKey模糊匹配,将消息发送到匹配的队列上。

到这里,发布订阅模式的介绍就结束了。我们再来总结下发布订阅模式下RabbitMQ消息队列主要工作流程。以Topic为例:

生产者

1、获取一个连接(Connection)
2、从连接(Connection)上获取一个信道( Channel)
3、声明一个交换器( Exchange)
4、声明1个或多个队列(Queue)
5、把队列(Queue)绑定到交换器(Exchange)上
6、向指定的交换器(Exchange)发送消息,消息路由到特定队列(Queue)

消费者

RabbitMQ消费者消费消息,支持推(push)模式和拉(pull)模式,这里以拉模式说明下流程。

1、创建一个连接(Connection)

2、启动MainLoop后台线程,通过连接(Connection)循环拉取消息
3、处理并确认消息被消费

6、rpc

RPC调用流程说明:

(1)当客户端启动的时候,它创建一个匿名独享的回调队列

(2)在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性

(3)将请求发送到一个 rpc_queue 队列中

(4)服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。

(5)客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用

Callback queue回调队列,客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。

Correlation id关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

服务端:

package com.power.demo.messaging.rpc;import com.power.demo.messaging.MQField;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Envelope;import java.io.IOException;import java.util.concurrent.TimeoutException;public class RPCServer {    private static int fib(int n) {        if (n == 0) {            return 0;        }        if (n == 1) {            return 1;        }        return fib(n - 1) + fib(n - 2);    }    //直接运行此方法    public static void main(String[] argv) {        ConnectionFactory factory = new ConnectionFactory();        //factory.setHost("localhost");        Connection connection = null;        try {            connection = factory.newConnection();            final Channel channel = connection.createChannel();            channel.queueDeclare(MQField.MY_RPC_QUEUE, false, false, false, null);            channel.basicQos(1);            System.out.println(" [x] Awaiting RPC requests");            Consumer consumer = new DefaultConsumer(channel) {                @Override                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties                            .Builder()                            .correlationId(properties.getCorrelationId())                            .build();                    String response = "";                    try {                        String message = new String(body, "UTF-8");                        int n = Integer.parseInt(message);                        System.out.println(" [.] fib(" + message + ")");                        response += fib(n);                        System.out.println(String.format("RPCServer计算fib数列应答:%s", response));                    } catch (RuntimeException e) {                        System.out.println(" [.] " + e.toString());                    } finally {                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));                        channel.basicAck(envelope.getDeliveryTag(), false);                        // RabbitMq consumer worker thread notifies the RPC server owner thread                        synchronized (this) {                            this.notify();                        }                    }                }            };            channel.basicConsume(MQField.MY_RPC_QUEUE, false, consumer);            // Wait and be prepared to consume the message from RPC client.            while (true) {                synchronized (consumer) {                    try {                        consumer.wait();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        } catch (IOException | TimeoutException e) {            e.printStackTrace();        } finally {            if (connection != null)                try {                    connection.close();                } catch (IOException _ignore) {                }        }    }}
RPCServer

客户端:

package com.power.demo.messaging.rpc;import com.power.demo.messaging.MQField;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Envelope;import java.io.IOException;import java.util.UUID;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeoutException;public class RPCClient {    private Connection connection;    private Channel channel;    private String replyQueueName;    public RPCClient() throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        //factory.setHost("localhost");        connection = factory.newConnection();        channel = connection.createChannel();        replyQueueName = channel.queueDeclare().getQueue();    }    public String call(String message) throws IOException, InterruptedException {        final String corrId = UUID.randomUUID().toString();        AMQP.BasicProperties props = new AMQP.BasicProperties                .Builder()                .correlationId(corrId)                .replyTo(replyQueueName)                .build();        channel.basicPublish("", MQField.MY_RPC_QUEUE, props, message.getBytes("UTF-8"));        final BlockingQueue
response = new ArrayBlockingQueue
(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } //直接运行此方法 public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(10)"); response = fibonacciRpc.call("10"); System.out.println(" [.] Got '" + response + "'"); System.out.println(String.format("RPCClient得到计算fib数列应答:%s", response)); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (IOException _ignore) { } } } }}
RPCClient

示例代码我这里直接改造了一下官方的demo代码。启动RPCServer,再运行RPCClient就可以看到RPC调用结果了。

 三、常见问题

1、幂等性

生产环境各种业务系统出现重复消息是不可避免的,因为不能保证生产者不发送重复消息。

对于读操作而言,重复消息可能无害,但是对于写操作,重复消息容易造成业务灾难,比如相同消息多次扣减库存,多次支付请求扣款等。

有一种情况也会造成重复消息,就是RabbitMQ对设置autoAck=false之后没有被Ack的消息是不会清除掉的,消费者可以多次重复消费。

我个人认为RabbitMQ只是消息传递的载体,要保证幂等性,还是需要在消费者业务逻辑上下功夫。

2、有序消息

我碰到过某厂有一个开发团队通过Kafka来实现有序队列,因为发送的消息有先后依赖关系,需要消费者收到多个消息保存起来最后聚合后一起处理业务逻辑。

但是,其实大部分业务场景下我们都不需要消息有先后依赖关系,因为有序队列产生依赖关系,后续消费很容易造成各种处理难题。

归根结底,我认为需要有序消息的业务系统在设计上就是不合理的,争取在设计上规避才好。当然良好的设计需要丰富的经验和优化,以及妥协。

3、高可用

RabbitMQ支持集群,模式主要可分为三种:单一模式、普通模式和镜像模式。

RabbitMQ支持弹性部署,在业务高峰期间可通过集群弹性部署支撑业务系统。

RabbitMQ支持消息持久化,如果队列服务器出现问题,消息做了持久化,后续恢复正常,消息数据不丢失不会影响正常业务流程。

RabbitMQ还有很多高级特性,比如发布确认和事务等,虽然可能会降低性能,但是增强了可靠性。

 

参考:

转载地址:http://hpzra.baihongyu.com/

你可能感兴趣的文章
Flutter 云音乐
查看>>
RecyclerView实现多type页面
查看>>
个人的web商城网站
查看>>
debian fcitx
查看>>
排中律与实无穷问题的性质分析
查看>>
08/23 学习总结
查看>>
物理层
查看>>
linux多网卡路由设置
查看>>
八大监听器
查看>>
self.navigationController退出到指定页面,或者一次性pop出n个页面
查看>>
iptables 端口转发以及双向通信
查看>>
备战一线互联网公司Java工程师面试题 (1)
查看>>
jquery图片切换插件jquery.cycle.js参数详解
查看>>
JavaScript push() 方法
查看>>
Map集合
查看>>
JSP基础语法1
查看>>
elasticsearch Java API 之GET API & DELETE API
查看>>
《深入理解Java虚拟机》——GC基础概念
查看>>
微信小程序联盟:官方文档+精品教程+demo集合(5月31日更新,持续更新中……)...
查看>>
Fastjson 的 Set类型和 WriteClassName 选项引起的BUG
查看>>