文章目录
一、认识 MQ1、介绍2、常见的 MQ 技术
二、认识 RabbitMQ1、介绍2、部署 RabbitMQ(安装)1)部署方式2)单机部署(CentOS 7)① 下载镜像② 安装MQ③ 部署成功后,访问 RabbitMQ 管理平台
3、RabbitMQ 的结构和概念
三、RabbitMQ 五种常见模型1、五种模型的介绍2、快速入门——“Hello World!” 基本消息队列模型案例1、基本消息队列模型“Hello World!”案例介绍2、项目整体结构和依赖3、消息发送者代码编写4、消息消费者代码编写5、消息发送者与消息消费者的执行流程
四、Spring AMQP 规范1、Spring AMQP介绍2、快速入门——使用 Spring AMQP 规范优化 “Hello World!”基本消息队列模型案例1)父工程中引入 Spring AMQP 的依赖2)消息发送者用 RabbitTemplate 消息模版对象 发送消息到队列3)消息消费者用 @RabbitListener 注解 订阅队列中的消息
3、实现 “Work Queues” 工作消息队列模型案例——SpringAMQP规范1)案例实现思路2)发送者代码编写3)消费者代码编写4)运行结果5)改进运行速度——设置预取消息上限① 找出消费速度慢的原因② 设置预取消息上限③ 优化后运行结果展示
五、实现 RabbitMQ 三种发布订阅消息模型案例——Spring AMQP规范1、FanoutExchange 广播消息模型案例1)在消费者服务中声明交换机、队列、绑定① 创建配置类 FanoutConfig② 声明交换机、队列、绑定的三种bean,通过绑定对象将队列与交换绑定在一起
2)编写消费者代码,订阅两个队列的消息① 在Listener类中创建两个消费者② 运行消费者启动类,来启动消费者端
3)编写发送者代码,将消息发送到交换机① 编写发送者测试类代码② 运行发送者测试方法
4)运行结果分析
2、DirectExchange 路由消息模型案例1)编写消费者代码,并用注解的方式声明交换机对象、队列对象、绑定对象① 在 Listener 的监听器类中,编写消费者代码② 运行消费者启动类 ConsumerApplication,来启动消费者端
2)编写发送者代码,将消息发送到交换机① 编写发送者代码② 运行发送者测试方法
3)运行结果4)路由交换机与广播交换机的区别
3、TopicExchange 话题消息模型案例1)编写消费者代码,并用注解的方式声明交换机对象、队列对象、绑定对象① 在 Listener 的监听器类中,编写消费者代码② 运行消费者启动类 ConsumerApplication,来启动消费者端
2)编写发送者代码,将消息发送到交换机① 编写发送者代码② 运行发送者测试方法
3)运行结果
六、优化 RabbitMQ 原生消息转换器——改用 Jackson 消息转换器1、原生消息转换器问题引出2、使用 Jackson 消息转换器1)在项目父工程引入 Jackson 依赖2)编写发送者代码——声明 Jackson 消息转换器3)编写发送者代码——发送对象类型消息到队列中4)运行发送者测试方法——查看队列中的消息类型5)编写消费者代码——在启动类中声明 jackson 消息转换器6)编写消费者代码——接收消息7)运行消费者启动类 ConsumerApplication——查看运行结果
一、认识 MQ
1、介绍
MQ(MessageQueue):消息队列,是用于存放消息的队列技术。也是在事件驱动架构中的 Broker。 消息:事件
事件驱动架构是异步通信的一种实现方案,具体请看该文章: 异步通信 与 同步通信 的案例和优缺点
2、常见的 MQ 技术
二、认识 RabbitMQ
1、介绍
RabbitMQ:是基于 Erlang 语言开发的开源消息通信中间件 Erlang 语言:是一个面向并发的编程语言,非常适合用于分布式系统设计。
2、部署 RabbitMQ(安装)
使用 Docker 容器部署 RabbitMQ
1)部署方式
分为单机部署和集群部署,详情参考部署文档:
链接: https://pan.baidu.com/s/1pK9s4QnnTp3-mZxIBCPQsQ?pwd=rad6 提取码: rad6
2)单机部署(CentOS 7)
① 下载镜像
方式一:在线拉取镜像
docker pull rabbitmq:3.8-management
方式二:本地上传镜像
RabbitMQ 镜像包 链接: https://pan.baidu.com/s/1k6C2teXXpm2BTtTq1tXitQ?pwd=3ekd 提取码: 3ekd
上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
② 安装MQ
执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
命令分析:
③ 部署成功后,访问 RabbitMQ 管理平台
通过 本机IP:设置的管理平台端口 即可访问 RabbitMQ 管理平台 进入时,需要输入用户名和密码
Rabbit MQ 管理平台的简单介绍和操作,参考文章:RabbitMQ 管理平台(控制中心)的介绍
3、RabbitMQ 的结构和概念
概念:
channal:操作 MQ 的工具exchange:消息路由(分发)给队列queue:缓存消息(存放消息)virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组,虚拟主机之间相互隔离
因为虚拟主机之间是相互隔离的,所以虚拟机的队列、交换机等信息与其他虚拟主机可以相同并不冲突
消息发送者:发送消息给交换机 交换机:将消息路由到消息队列 消息队列:负责暂时存放消息 消息接收者(消费者):从消息队列中拿取消息并处理
三、RabbitMQ 五种常见模型
1、五种模型的介绍
2、快速入门——“Hello World!” 基本消息队列模型案例
1、基本消息队列模型“Hello World!”案例介绍
根据 “Hello World!” 消息队列模型来搭建一个Java项目,名为mq-demo(SpringBoot 项目)
通过网盘分享的文件:mq-demo.7z 链接: https://pan.baidu.com/s/1t4mrxq7Kk5FWUKQ_8gdDcQ?pwd=wbtc 提取码: wbtc
2、项目整体结构和依赖
项目依赖及两个角色
消息接受者和消息发送者项目结构
项目整体结构
3、消息发送者代码编写
运行测试类 PublicherTest 即可发送消息
4、消息消费者代码编写
运行测试类 ConsumerTest 即可接收消息
5、消息发送者与消息消费者的执行流程
四、Spring AMQP 规范
1、Spring AMQP介绍
AMQP:应用之间消息通信的一种协议,与语言和平台无关
Spring AMQP 特征
2、快速入门——使用 Spring AMQP 规范优化 “Hello World!”基本消息队列模型案例
步骤解析: 1、父工程中引入 Spring AMQP 的依赖 2、消息发送者用 RabbitTemplate 消息模版对象 发送消息到队列 3、消息消费者用 @RabbitListener 注解 订阅队列中的消息
1)父工程中引入 Spring AMQP 的依赖
2)消息发送者用 RabbitTemplate 消息模版对象 发送消息到队列
① 在配置文件中编写RabbitMQ连接信息 ② 编写订阅消息的代码
③ 运行测试类即可发送消息
3)消息消费者用 @RabbitListener 注解 订阅队列中的消息
① 在配置文件中编写RabbitMQ连接信息 ② 编写订阅消息的代码
③ 运行启动类 ConsumerApplication 即可接收消息
3、实现 “Work Queues” 工作消息队列模型案例——SpringAMQP规范
1)案例实现思路
Work Queues 工作消息队列的案例实现基于上面的Hello World基本消息队列案例项目做改动
2)发送者代码编写
在测试类中编写发送者代码
3)消费者代码编写
在 SpringRabbitListener 类中编写消费者代码
4)运行结果
运行消费者客户端的启动类 ConsumerApplication ,消费者打印以下信息: 两个消费者共花费了近 4 秒时间来处理这 50 条消息
5)改进运行速度——设置预取消息上限
① 找出消费速度慢的原因
可以看到以上运行结果中,消费者1处理的是偶数消息,而消费者2处理的是奇数消息。这是因为:
消息队列会根据轮询机制,让消费者轮流预取消息(消费者们轮流拿消息,一个一个拿)所有消费者手头上的消息还没处理完就从消息队列中预取消息,这会导致消息堆积在消费者那里因为 消费者1 处理消息很快(20毫秒一个),所以一下就处理完了,而 消费者2 处理消息较慢(200毫秒一个),只能等待 消费者2 处理完成才算运行结束
② 设置预取消息上限
所有消费者必须将手头上的消息消费完了才能从消息队列拿(能者多劳) 在消费者端的配置文件中设置
③ 优化后运行结果展示
从开始到结束只花费了近 1 秒的时间,能者多劳,消费者1 处理速度快,所以处理消息更多。
五、实现 RabbitMQ 三种发布订阅消息模型案例——Spring AMQP规范
前面是只有队列的消息模型案例,接下来是三种包含交换机的消息模型案例 所有的消息模型案例都在同一个项目中做演示
Spring AMQP 提供的交换机类和对应关系。 .
1、FanoutExchange 广播消息模型案例
FanoutExchange (广播交换机)特性: 会将同一个消息路由给所有队列,使得所有消费者都能接收同一个消息
1)在消费者服务中声明交换机、队列、绑定
① 创建配置类 FanoutConfig
② 声明交换机、队列、绑定的三种bean,通过绑定对象将队列与交换绑定在一起
2)编写消费者代码,订阅两个队列的消息
① 在Listener类中创建两个消费者
② 运行消费者启动类,来启动消费者端
3)编写发送者代码,将消息发送到交换机
① 编写发送者测试类代码
② 运行发送者测试方法
4)运行结果分析
一个发送者发消息,可以让多个消费者接收
2、DirectExchange 路由消息模型案例
1)编写消费者代码,并用注解的方式声明交换机对象、队列对象、绑定对象
① 在 Listener 的监听器类中,编写消费者代码
之前案例中是采用了 config配置类 的方式来分别声明:交换机对象、队列对象、绑定对象,并绑定关系 这里采用的是通过注解 @RabbitListener 的方式来声明对象和绑定关系,格式如下: @RabbitListener( bindings( value , exchange , key ) )
注解参数结构如下:
@RabbitListener注解,用于监听队列中的消息,属性为: – bindings属性:值为 @QueueBinding注解@QueueBinding注解,声明一个绑定对象,把交换机和队列进行绑定,属性为: – value属性:值为 @Queue注解 – exchange属性:值为 @Exchange注解 – key属性:值为一个或多个 routingkey名称 (路由key)@Queue注解,声明一个队列对象,属性为: – name属性:队列的名称@Exchange注解,声明一个交换机对象,属性为: – name属性:交换机的名称 – type属性:交换机的类型,枚举(广播、路由、主题等……)
② 运行消费者启动类 ConsumerApplication,来启动消费者端
2)编写发送者代码,将消息发送到交换机
① 编写发送者代码
② 运行发送者测试方法
3)运行结果
4)路由交换机与广播交换机的区别
3、TopicExchange 话题消息模型案例
**TopicExchange 话题交换机:**与前两者不同的地方在于,该消息模型的 bindingkey 是以多个单词组成的,用 " . " 号进行多个单词的分割。由于有通配符的存在,交换机路由消息到队列会更加方便。
如下图,例如: 交换机要将消息发送到 bindingkey 为 japan.weather 的消息队列,此时消息会被路由到 queue2 和 queue3 的消息队列中,并会被名为 consumer2 和 consumer3 的消费者订阅到消息。 (消息队列只需要用通配符就可以达到绑定多个key的效果,简化了代码)
1)编写消费者代码,并用注解的方式声明交换机对象、队列对象、绑定对象
① 在 Listener 的监听器类中,编写消费者代码
代码与之前案例大致相同,但要注意: 交换机类型为:ExchangeTypes.TOPIC,以及key属性的写法
② 运行消费者启动类 ConsumerApplication,来启动消费者端
2)编写发送者代码,将消息发送到交换机
① 编写发送者代码
② 运行发送者测试方法
3)运行结果
六、优化 RabbitMQ 原生消息转换器——改用 Jackson 消息转换器
1、原生消息转换器问题引出
由于 RabbitMQ 只支持字节数据进行传输,所以在发送对象类型数据时spring框架会自动进行序列化的操作,将对象类型数据转换成字节类型数据,但是Spring框架的消息转换器采用的是 Java 的 ObjectOutputStream 类来实现序列化操作的,这会出现以下问题:
性能差安全性低(容易出现注入问题)数据长度过长(如下图所示)
案例如下: 1、发送对象类型的消息给队列 2、进入RabbitMQ控制中心,发现队列中的消息内容被序列化了
2、使用 Jackson 消息转换器
发送者服务实现步骤:
消费者服务实现步骤:
1)在项目父工程引入 Jackson 依赖
因为在发送者服务和消费者服务都需要引入 Jackson 依赖,那么直接在父工程引入依赖即可
2)编写发送者代码——声明 Jackson 消息转换器
注意: MessageConverter 消息转换器所使用的包,必须是SpringAMQP提供的。如下图
3)编写发送者代码——发送对象类型消息到队列中
在发送者测试类中,编写发送者代码
4)运行发送者测试方法——查看队列中的消息类型
查看队列中的消息类型是否为 json ,并且内容是否是一个 json 对象。如下图代表成功
5)编写消费者代码——在启动类中声明 jackson 消息转换器
6)编写消费者代码——接收消息
方法的参数接收消息时,类型需要与消息发送时一致
7)运行消费者启动类 ConsumerApplication——查看运行结果
成功接收到消息
小结: