一、初识MQ
1、同步和异步通讯
- 微服务间通讯的两种方式:
- 同步通讯:就像打电话,需要实时响应。
- 异步通讯:就像发邮件,不需要马上回复。
(1)同步调用
- 基于OpenFeign的调用都属于是同步调用,以余额支付功能为例:
- 支付服务先调用用户服务完成余额扣减,然后支付服务要更新支付流水单的状态,然后支付服务调用交易服务,更新业务订单状态为已支付。
- 缺点:
- 拓展性差。业务规模扩大,出现新的通知需求,这时不可能一个一个服务通知。
- 性能下降。采用同步调用,每次只能等待返回结果才继续下一个。
- 级联失败。当服务服务出现故障时,整个事务都会回滚,交易失败。
- 优点:
- 时效性较强,可以立即得到结果
(2)异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:服务调用者,发送消息。
- 消息Broker:中介,暂存,转发,代理。
- 消息接收者:收消息。
异步调用中,消息会发给消息Broker,然后接收者去消息Broker中获取。
- 好处:
- 耦合度更低
- 性能更好
- 业务拓展性强
- 故障隔离,避免级联失败
- 缺点:
- 完全依赖于Broker的可靠性、安全性和性能
- 架构复杂,后期维护和调试麻烦
(3)技术选型
MQ:消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
- 比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
- 几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
追求可用性:Kafka、 RocketMQ 、RabbitMQ
注:国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好。
二、RabbitMQ
(1)安装
官网上找个erlang+RabbitMQ直接安装。
访问 http://192.168.150.101:15672即可看到管理控制台。
- RabbitMQ中的一些角色:
- publisher:生产者
- consumer:消费者
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
(2)收发消息
首次使用流程:消息发送到交换机,交换机转发到队列,消费者从队列中获取。
(3)数据隔离
需求:在RabbitMQ的控制台完成下列操作:
新建一个用户hmall
为hmall用户创建一个virtual host
测试不同virtual host之间的数据隔离现象
- 用户管理
Name:用户名
Tags:权限等级,如administrator
Can access virtual host: 可以访问的virtual host,/是默认的virtual host
出于成本考虑,通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。为了避免互相干扰, 会利用virtual host的隔离特性,将不同项目隔离:
每个项目创建独立的运维账号,将管理权限分离。
每个项目创建不同的virtual host,将每个项目的数据隔离。
三、SpringAMQP
Spring官方提供的基于AMQP协议的用来操作RabbitMQ的工具。开发业务功能时,不会在控制台收发消息,而是应该基于编程的方式。
RabbitMQ采用了AMQP协议,因此具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。且RabbitMQ官方也提供了各种不同语言的客户端。
- SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
(1)导入Demo工程
需求如下:
利用控制台创建队列simple.queue
在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
// 配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /hmall
username: hmall
password: 123
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1、消息发送
@Test
// 消息发送
void testSendMessage2Queue() {
String queueName = "simple.queue";
String msg = "hello,amqp!";
rabbitTemplate.convertAndSend(queueName, msg);
}
2、消息接收
@Slf4j
@Component
public class MqListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者收到了simple.queue的消息: 【" + msg + "】");
}
}
(2)WorkQueues模型
让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时,可能生产速度会远大于消费速度。时间久了会堆积。
此时就可以使用work模型,多个消费者共同处理。
- 基本思路:
1.在RabbitMQ的控制台创建一个队列,名为work.queue
2.在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
3.在consumer服务中定义两个消息监听者,都监听work.queue队列
4.消费者1每秒处理50条消息,消费者2每秒处理5条消息
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
// 消息接收
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
- 能者多劳
有些消费者效率低,任务需要给高效率的消费者。
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
(3)交换机类型
Exchange(交换机):负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
之前的例子中,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:
- 角色:
- Publisher:生产者消息不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
- 交换机类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
(4)Fanout交换机
广播,消息群发。
多个队列
每个队列都要绑定到Exchange上
生产者发消息,只能送到交换机
交换机把消息发送给绑定过的所有队列
订阅队列的消费者都能拿到消息
实现思路: 声明两个队列和一个交换机,将两队列与其绑定,存在两个消费者方法,分别监听队列,编写测试方法,向交换机t发送消息。
@RabbitListener(queues = "famout.queue1")
public void listenFamoutQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 收到了famout.queue1的消息: 【" + msg + "】");
}
@RabbitListener(queues = "famout.queue2")
public void listenFamoutQueue2(String msg) {
System.out.println("消费者2 收到了famout.queue2的消息: 【" + msg + "】");
}
--------------------------------------------------------------------------------------------
@Test
void testSendFanout() {
String exchange = "hmall.famout";
String msg = "hello,我喜欢洛丽塔";
rabbitTemplate.convertAndSend(exchange, "", msg);
}
(5)Direct交换机
会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。比如:成功的需要发,不成功的不需要。
- 规则:
- 每一Queue都与Exchange设置一个BindingKey;
- 发布者发送消息时,指定消息的RoutingKey;
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
1.声明一个名为hmall.direct的交换机
2.声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
3.声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
4.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
5.在publisher中编写测试方法,向hmall.direct发送消息
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 收到了direct.queue1的消息: 【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2 收到了direct.queue2的消息: 【" + msg + "】");
}
--------------------------------------------------------------------------------------------
@Test
void testSendFanout() {
String exchange = "hmall.direct";
String msg = "hello,我喜欢洛丽塔1";
rabbitTemplate.convertAndSend(exchange, "blue", msg);
}
- 描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
(6)Topic交换机
与Direct类似,区别在于RoutingKey可以是多个单词的列表,并且以 . 分割。如:china.news
- 通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好1个词
china.# :跟中国相关的
japan.# :跟日本相关的
#.weather :只跟天气相关
#.news :只跟新闻相关
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
System.out.println("消费者1 收到了topic.queue1的消息: 【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
System.out.println("消费者2 收到了topic.queue2的消息: 【" + msg + "】");
}
--------------------------------------------------------------------------------------------
@Test
void testSendTopic() {
String exchange = "hmall.topic";
String msg = "hello,我喜欢洛丽塔1";
rabbitTemplate.convertAndSend(exchange, "china.news", msg);
}
(7)声明队列和交换机
实际开发中,需要在代码中生成交换机和队列。程序启动时检查队列和交换机是否存在,如果不存在自动创建。
- SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
- 发送方只关心发送消息,声明和创建都在消费方。
- 快速入门:fanout交换机
// 创建配置类
@Configuration
public class FanoutConfiguration {
// 交换机创建
@Bean
public FanoutExchange fanoutExchange() {
// 1
// ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
// 2
return new FanoutExchange("hmall.fanout2");
}
// 队列创建
@Bean
public Queue fanoutQueue3() {
// QueueBuilder.durable("fanout.queue3").build();
return new Queue("fanout.queue3");
}
// 队列创建
@Bean
public Queue fanoutQueue4() {
return new Queue("fanout.queue4");
}
// 绑定
@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
// 绑定
@Bean
public Binding fanoutBinding4() {
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
}
}
- 如果是需要RoutingKey的交换机,基于注解声明。
不需要写配置,直接在消费者类那边使用注解,没有会自动声明。
@Slf4j
@Component
public class MqListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "direct.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者1 收到了direct.queue1的消息: 【\" + msg + \"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "direct.queue2", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue2(String msg) {
System.out.println("消费者2 收到了direct.queue2的消息: 【\" + msg + \"】");
}
}
(8)消息转换器
Spring的消息发送代码接收的消息体是一个Object:在数据传输时会把发送的消息序列化为字节发送给MQ,接收时会把字节反序列化为Java对象。
默认采用的序列化方式是JDK序列化。
- JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
- 测试默认转换器
声明名为object.queue的队列,编写单元测试,向队列中直接发消息,消息类型为Map在控制台查看消息。
@SpringBootApplication
public class PublishApplication {
public static void main(String[] args) {
SpringApplication.run(PublishApplication.class, args);
}
// 使用JSON方式来做序列化和反序列化。
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}