RabbitMQ入门
本文最后更新于49 天前,其中的信息可能已经过时,如有错误请发送邮件到big_fw@foxmail.com

一、初识MQ

1、同步和异步通讯

  • 微服务间通讯的两种方式:
    • 同步通讯:就像打电话,需要实时响应。
    • 异步通讯:就像发邮件,不需要马上回复。
(1)同步调用
  • 基于OpenFeign的调用都属于是同步调用,以余额支付功能为例:
    • 支付服务先调用用户服务完成余额扣减,然后支付服务要更新支付流水单的状态,然后支付服务调用交易服务,更新业务订单状态为已支付。
  • 缺点:
    • 拓展性差。业务规模扩大,出现新的通知需求,这时不可能一个一个服务通知。
    • 性能下降。采用同步调用,每次只能等待返回结果才继续下一个。
    • 级联失败。当服务服务出现故障时,整个事务都会回滚,交易失败。
  • 优点:
    • 时效性较强,可以立即得到结果
(2)异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:服务调用者,发送消息。
  • 消息Broker:中介,暂存,转发,代理。
  • 消息接收者:收消息。

异步调用中,消息会发给消息Broker,然后接收者去消息Broker中获取。

  • 好处:
    • 耦合度更低
    • 性能更好
    • 业务拓展性强
    • 故障隔离,避免级联失败
  • 缺点:
    • 完全依赖于Broker的可靠性、安全性和性能
    • 架构复杂,后期维护和调试麻烦
(3)技术选型

MQ:消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

  • 比较常见的MQ实现:
    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 几种常见MQ的对比:
RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

追求可用性:Kafka、 RocketMQ 、RabbitMQ

注:国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好。

二、RabbitMQ

(1)安装

官网上找个erlang+RabbitMQ直接安装。

访问 http://192.168.150.101:15672即可看到管理控制台。

  • RabbitMQ中的一些角色:
    • publisher:生产者
    • consumer:消费者
    • exchange:交换机,负责消息路由
    • queue:队列,存储消息
    • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

(2)收发消息

首次使用流程:消息发送到交换机,交换机转发到队列,消费者从队列中获取。

(3)数据隔离

需求:在RabbitMQ的控制台完成下列操作:
    新建一个用户hmall
    为hmall用户创建一个virtual host
    测试不同virtual host之间的数据隔离现象
  • 用户管理
Name:用户名
Tags:权限等级,如administrator
Can access virtual host: 可以访问的virtual host,/是默认的virtual host

出于成本考虑,通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。为了避免互相干扰, 会利用virtual host的隔离特性,将不同项目隔离:

每个项目创建独立的运维账号,将管理权限分离。

每个项目创建不同的virtual host,将每个项目的数据隔离。

三、SpringAMQP

Spring官方提供的基于AMQP协议的用来操作RabbitMQ的工具。开发业务功能时,不会在控制台收发消息,而是应该基于编程的方式。

RabbitMQ采用了AMQP协议,因此具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。且RabbitMQ官方也提供了各种不同语言的客户端。

  • SpringAMQP提供了三个功能:
    • 自动声明队列、交换机及其绑定关系
    • 基于注解的监听器模式,异步接收消息
    • 封装了RabbitTemplate工具,用于发送消息

(1)导入Demo工程

需求如下:
    利用控制台创建队列simple.queue
    在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
    在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
// 配置
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /hmall
    username: hmall
    password: 123
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1、消息发送

@Test
// 消息发送
void testSendMessage2Queue() {
    String queueName = "simple.queue";
    String msg = "hello,amqp!";
    rabbitTemplate.convertAndSend(queueName, msg);
}

2、消息接收

@Slf4j
@Component
public class MqListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者收到了simple.queue的消息: 【" + msg + "】");
    }
}

(2)WorkQueues模型

让多个消费者绑定到一个队列,共同消费队列中的消息。

当消息处理比较耗时的时,可能生产速度会远大于消费速度。时间久了会堆积。

此时就可以使用work模型,多个消费者共同处理。

  • 基本思路:
1.在RabbitMQ的控制台创建一个队列,名为work.queue
2.在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
3.在consumer服务中定义两个消息监听者,都监听work.queue队列
4.消费者1每秒处理50条消息,消费者2每秒处理5条消息
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}
​
// 消息接收
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
​
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}
  • 能者多劳

有些消费者效率低,任务需要给高效率的消费者。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

(3)交换机类型

Exchange(交换机):负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

之前的例子中,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

  • 角色:
    • Publisher:生产者消息不再发送消息到队列中,而是发给交换机
    • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
    • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
    • Consumer:消费者,与以前一样,订阅队列,没有变化
  • 交换机类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
    • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
    • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
    • Headers:头匹配,基于MQ的消息头匹配,用的较少。

(4)Fanout交换机

广播,消息群发。

多个队列
每个队列都要绑定到Exchange上
生产者发消息,只能送到交换机
交换机把消息发送给绑定过的所有队列
订阅队列的消费者都能拿到消息

实现思路: 声明两个队列和一个交换机,将两队列与其绑定,存在两个消费者方法,分别监听队列,编写测试方法,向交换机t发送消息。

    @RabbitListener(queues = "famout.queue1")
    public void listenFamoutQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了famout.queue1的消息: 【" + msg + "】");
    }
​
    @RabbitListener(queues = "famout.queue2")
    public void listenFamoutQueue2(String msg) {
        System.out.println("消费者2 收到了famout.queue2的消息: 【" + msg + "】");
    }
--------------------------------------------------------------------------------------------
    @Test
    void testSendFanout() {
        String exchange = "hmall.famout";
        String msg = "hello,我喜欢洛丽塔";
        rabbitTemplate.convertAndSend(exchange, "", msg);
    }

(5)Direct交换机

会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。比如:成功的需要发,不成功的不需要。

  • 规则:
    • 每一Queue都与Exchange设置一个BindingKey;
    • 发布者发送消息时,指定消息的RoutingKey;
    • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
1.声明一个名为hmall.direct的交换机
2.声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
3.声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
4.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2 
5.在publisher中编写测试方法,向hmall.direct发送消息 
    @RabbitListener(queues = "direct.queue1")
    public void listenDirectQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了direct.queue1的消息: 【" + msg + "】");
    }
​
    @RabbitListener(queues = "direct.queue2")
    public void listenDirectQueue2(String msg) {
        System.out.println("消费者2 收到了direct.queue2的消息: 【" + msg + "】");
    }
    
--------------------------------------------------------------------------------------------
    @Test
    void testSendFanout() {
        String exchange = "hmall.direct";
        String msg = "hello,我喜欢洛丽塔1";
        rabbitTemplate.convertAndSend(exchange, "blue", msg);
    }
  • 描述下Direct交换机与Fanout交换机的差异?
    • Fanout交换机将消息路由给每一个与之绑定的队列
    • Direct交换机根据RoutingKey判断路由给哪个队列
    • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

(6)Topic交换机

与Direct类似,区别在于RoutingKey可以是多个单词的列表,并且以 . 分割。如:china.news

  • 通配符规则:
    • #:匹配一个或多个词
    • *:匹配不多不少恰好1个词
china.# :跟中国相关的 
japan.# :跟日本相关的
#.weather :只跟天气相关
#.news :只跟新闻相关

    @RabbitListener(queues = "topic.queue1")
    public void listenTopicQueue1(String msg) {
        System.out.println("消费者1 收到了topic.queue1的消息: 【" + msg + "】");
    }
​
    @RabbitListener(queues = "topic.queue2")
    public void listenTopicQueue2(String msg) {
        System.out.println("消费者2 收到了topic.queue2的消息: 【" + msg + "】");
    }
--------------------------------------------------------------------------------------------
    @Test
    void testSendTopic() {
        String exchange = "hmall.topic";
        String msg = "hello,我喜欢洛丽塔1";
        rabbitTemplate.convertAndSend(exchange, "china.news", msg);
    }

(7)声明队列和交换机

实际开发中,需要在代码中生成交换机和队列。程序启动时检查队列和交换机是否存在,如果不存在自动创建。

  • SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
    • Queue:用于声明队列,可以用工厂类QueueBuilder构建
    • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
    • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
  • 发送方只关心发送消息,声明和创建都在消费方。
  • 快速入门:fanout交换机
// 创建配置类
@Configuration
public class FanoutConfiguration {
​
    // 交换机创建
    @Bean
    public FanoutExchange fanoutExchange() {
        // 1
        // ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
        // 2
        return new FanoutExchange("hmall.fanout2");
    }
​
    // 队列创建
    @Bean
    public Queue fanoutQueue3() {
        // QueueBuilder.durable("fanout.queue3").build();
        return new Queue("fanout.queue3");
    }
​
    // 队列创建
    @Bean
    public Queue fanoutQueue4() {
        return new Queue("fanout.queue4");
    }
​
    // 绑定
    @Bean
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    }
​
    // 绑定
    @Bean
    public Binding fanoutBinding4() {
        return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
    }
}
  • 如果是需要RoutingKey的交换机,基于注解声明。

不需要写配置,直接在消费者类那边使用注解,没有会自动声明。

@Slf4j
@Component
public class MqListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue1", durable = "true"),
            exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) {
        System.out.println("消费者1 收到了direct.queue1的消息: 【\" + msg + \"】");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue2", durable = "true"),
            exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue2(String msg) {
        System.out.println("消费者2 收到了direct.queue2的消息: 【\" + msg + \"】");
    }
}

(8)消息转换器

Spring的消息发送代码接收的消息体是一个Object:在数据传输时会把发送的消息序列化为字节发送给MQ,接收时会把字节反序列化为Java对象。

默认采用的序列化方式是JDK序列化。

  • JDK序列化存在下列问题:
    • 数据体积过大
    • 有安全漏洞
    • 可读性差
  • 测试默认转换器

声明名为object.queue的队列,编写单元测试,向队列中直接发消息,消息类型为Map在控制台查看消息。

@SpringBootApplication
public class PublishApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublishApplication.class, args);
    }
// 使用JSON方式来做序列化和反序列化。
    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
下一篇