小鸭子的学习笔记duck

Duck Blog

唐如飞

( ^∀^)/欢迎\( ^∀^)

79 文章数
14 评论数

RabbitMQ消息队列

tangrufei
2022-08-28 / 0 评论 / 216 阅读 / 0 点赞

1、课程介绍2、RabbitMQ认识2.1、RabbitMQ 是什么2.2、使用场景2.3、市面上有哪些消息队列?2.4、为什么使用RabbitMQ 呢?2.5、AMQP协议3、RabbitMQ 安装3.1、安装前置依赖Erlang3.2、下载安装RabbitMQ3.3、启动RabbitMQ 3.4、安装管理插件3.5、登录RabbitMQ3.6、注意事项3.7、系统用户是中文账户问题解决4、RabbitMQ 工作原理(重点掌握)4.1、组成部分说明4.2、消息发布接收流程4.3、扩展5、RabbitMQ 常见模型5.1、HelloWorld基本消费模型(重点)5.1.1、搭建环境5.1.2、生产者编写5.1.3、消费者编写5.1.4、抽取获取连接工具类5.1.5、消息确认机制(ACK)5.2、Work queues消息模型5.2.1、生产者5.2.2、消费者15.2.3、消费者25.2.4、能者多劳5.3、交换机分类5.4、fanout-广播模型5.4.1、生产者5.4.2、消费者15.4.3、消费者25.4.4、测试5.5、direct - 定向模型5.5.1、生产者5.5.2、消费者15.5.3、消费者25.5.4、测试5.6、topic - 通配符模型5.6.1、生产者5.6.2、消费者15.6.3、消费者25.6.4、测试6、持久化-解决数据安全6.1、交换机持久化6.2、队列持久化代码6.3、消息持久化7、SpringBoot集成RabbitMQ8、晚自习

1、课程介绍

  1. RabbitMQ认识(了解)

  2. RabbitMQ安装(了解)

  3. RabbitMQ常用模型(重点掌握)

  4. SpringBoot集成RabbitMQ(重点掌握,项目中做集成)

2、RabbitMQ认识

2.1、RabbitMQ 是什么

  • MQ全称为Message Queue,即消息队列,它也是一个队列,遵循FIFO(First-In First-Out,先入先出)原则

  • RabbitMQ是由erlang(二郎)语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列

  • 它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛

  • 官方地址:http://www.rabbitmq.com

我们先来看如下场景:


优化一下:


最后的实现方式:


2.2、使用场景

开发中消息队列通常有如下应用场景:

1、提高系统响应速度

任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。 提高了应用程序的响应时间

2、提高系统稳定性(请求不丢失)

系统挂了没关系,操作内容放到消息队列

3、服务调用异步化

服务没有直接的调用关系,而是通过队列进行服务通信

4、服务解耦

应用程序解耦合 MQ 相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合

5、排序保证FIFO

遵循队列先进先出的特点

6、削峰填谷



现实生活中的使用场景:

  • 12306买票

  • 手机银行转账

  • 淘宝/京东购物(秒杀场景)

2.3、市面上有哪些消息队列?

ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis


2.4、为什么使用RabbitMQ 呢?

1、使用简单,功能强大

2、基于AMQP协议

3、社区活跃,文档完善

4、高并发性能好,这主要得益于Erlang语言

5、Spring Boot默认已集成RabbitMQ

2.5、AMQP协议

  • AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题

  • RabbitMQ就是遵循AMQP标准协议开发的MQ服务。

  • 官网:http://www.amqp.org

扩展知识:

JMS(Java Message Service)

JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。

它和AMQP有什么不同,JMS是java语言专属的消息服务标准,它是在API层定义标准,并且只能用于Java应用;而AMQP是在协议层定义的标准,是跨语言的。

3、RabbitMQ 安装

由于RabbitMQ是由Erlang语言开发的,所以我们需要先安装Erlang,就好像学习Java需要先安装JDK一样。

安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配,如下图:


3.1、安装前置依赖Erlang

  • 下载Erlang,地址:http://erlang.org/download/otp_win64_20.3.exe

  • 或者去老师提供的软件包中找到:otp_win64_20.2.exe,以管理员方式运行此文件,安装即可。

  • erlang安装完成需要配置erlang环境变量:ERLANG_HOME=D:\Program Files\erl9.3,在path中添加:%ERLANG_HOME%

3.2、下载安装RabbitMQ

3.3、启动RabbitMQ

  • 安装成功会自动创建RabbitMQ服务并且启动

3.4、安装管理插件

  • 安装RabbitMQ 的管理插件,方便在浏览器端管理RabbitMQ

  • 进入到RabbitMQ 的sbin目录,使用cmd执行命令:

rabbitmq-plugins.bat enable rabbitmq_management
  • 安装成功后重启RabbitMQ,使用如下命令:

rabbitmq-service.bat start

3.5、登录RabbitMQ

进入浏览器,输入:http://localhost:15672

初始账号和密码:guest/guest


登录成功页面:


3.6、注意事项

  • 如果安装路径有中文,或者系统登录用户是中文账户,启动RabbitMQ会闪退,需要修改MQ的配置。解决方案:https://blog.csdn.net/leoma2012/article/details/97636859

  • 安装erlang和RabbitMQ以管理员身份运行。

  • 当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang 搜索RabbitMQ、ErlSrv,将对应的项全部删除。

3.7、系统用户是中文账户问题解决

首先以管理员身份运行cmd命令


这里不要点击“以管理员身份运行”,而是点击打开文件位置,如下:


这个时候不要右键以管理员身份运行,而是右键,选择打开文件位置,如下:


这个时候可以右键,以管理员身份运行了,如下:


此时可以看到,以管理员身份运行的cmd窗口中路径没有中文了。

接下来用cd加上你RabbitMQ下的sbin目录路径,进入sbin目录,如下:


然后执行下面命令:

rabbitmq-service.bat remove

然后再执行:

set RABBITMQ_BASE=D:\opensource\mq

前面是设置RabbitMQ相关配置文件存放路径的含义,后面你指定本地某个文件夹下即可,此千万不要包含中文,以后RabbitMQ再读取配置文件时,就会从这里加载了,不会去你带有中文的系统路径下读取了

然后再执行:

rabbitmq-service.bat install

然后再执行:

rabbitmq-plugins enable rabbitmq_management

最后执行:

rabbitmq-service.bat start

就启动OK了,就可以访问RabbitMQ管理页面了。

4、RabbitMQ 工作原理(重点掌握)



4.1、组成部分说明

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。

  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行转发和过滤,不存储消息

  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费者

  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ

  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息

4.2、消息发布接收流程

1.发送消息

1、生产者和Broker建立TCP连接

2、生产者和Broker建立通道

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发

4、Exchange将消息转发到指定的Queue(队列)

2.接收消息

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者

5、消费者接收到消息

4.3、扩展

RabbitMQ为什么使用信道channel呢?

试想这样一个场景,一个应用有多个线程需要从RabbitMQ 中消费或生产消息,那么必然会建立很多个connection,也就是多个TCP连接,对操作系统而言,建立和销毁TCP连接是很昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现,所以RabbitMQ 采用类似NIO的做法,采用TCP连接复用,不仅可以减少性能开销,同时也便于管理。

5、RabbitMQ 常见模型

  • 官网常见模型介绍:https://www.rabbitmq.com/getstarted.html

  • 官网提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。

    但是其实3、4、5这三种都属于订阅模型,只不过路由的方式不同而已。



5.1、HelloWorld基本消费模型(重点)


5.1.1、搭建环境

创建maven工程rabbitmq-demo,加入RabbitMQ java client的依赖:

<dependencies>
   <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>5.4.1</version>
   </dependency>
</dependencies>

5.1.2、生产者编写

package cn.itsource.mq._01_helloword;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* @description: 消息生产者
*/
public class ProducerTest {

//队列名称
public static final String QUEUE_HELLOWORD = "helloworld";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、准备消息
* 5、发送消息
*/
public static void main(String[] args) throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//1、创建连接,通过工厂获取连接
//写在try里面,含义是:到时候会自动关闭连接
try(Connection connection = factory.newConnection()){
//2、创建信道
Channel channel = connection.createChannel();
//3、创建队列
/**
* 声明队列,如果RabbitMQ中没有此队列将自动创建
* param1:队列名称
* param2:是否持久化
* param3:队列是否独占此连接
* param4:队列不再使用时是否自动删除此队列
* param5:队列参数
*/
channel.queueDeclare(QUEUE_HELLOWORD, true, false, false, null);

//4、准备消息
String message = "这是一个Hello World消息";
//5、发送消息
channel.basicPublish("", QUEUE_HELLOWORD, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
}

运行代码,然后进入RabbitMQ管理页面查看:


消息发送改成功,因为消息是存在队列里的,所以我们看下队列,发现队列里有一个消息,目前还未被消费。那么我们下面就开始写消费者代码,来消费消息。

5.1.3、消费者编写

package cn.itsource.mq._01_helloword;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class ConsumerTest {

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、监听队列
* 4、处理消息
*/
public static void main(String[] args) throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//1、创建连接,通过工厂获取连接
Connection connection = factory.newConnection();
//2、创建信道
Channel channel = connection.createChannel();
//3、监听队列(接收消息)
channel.basicConsume(ProducerTest.QUEUE_HELLOWORD, true, new DefaultConsumer(channel){
//4、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//消费者标签
System.out.println("consumerTag=" + consumerTag);
//交换机
System.out.println("getExchange=" + envelope.getExchange());
//路由key
System.out.println("getRoutingKey=" + envelope.getRoutingKey());
//消息ID
System.out.println("getDeliveryTag=" + envelope.getDeliveryTag());
}
});
}
}

注意:

这里消费者先不要加:

connection.close();

否则打印不出消息,因为这里是回调处理,在拿到消息后,还没来得及消费,你的连接就关闭了,所以这里就没有打印了。这里不用关闭连接,一直开启监听就可以了,只要队列有消息,我这里都会打印

然后我们
执行程序,控制台会打印结果:


再看RabbitMQ管理页面,队列的消息就没有了,因为已经被消费掉了:


5.1.4、抽取获取连接工具类

我们发现上面生产者和消费者有一些相同代码,我们可以抽取一个获取连接的工具类,如下:

package cn.itsource.mq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* @description: 获取RabbitMQ连接
*/
public class ConnectionUtil {

/**
* 建立与RabbitMQ的连接
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}

这个时候,上面的生产者代码和消费者代码就可以改成下面这样了,使用工具类去获取连接:

生产者:

package cn.itsource.mq._01_helloword;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
* @description: 消息生产者
*/
public class ProducerTest {

//队列名称
public static final String QUEUE_HELLOWORD = "helloworld";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、准备消息
* 5、发送消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
//写在try里面,含义是:到时候会自动关闭连接
try(Connection connection = ConnectionUtil.getConnection()){
//2、创建信道
Channel channel = connection.createChannel();
//3、创建队列
/**
* 声明队列,如果RabbitMQ中没有此队列将自动创建
* param1:队列名称
* param2:是否持久化
* param3:队列是否独占此连接
* param4:队列不再使用时是否自动删除此队列
* param5:队列参数
*/
channel.queueDeclare(QUEUE_HELLOWORD, true, false, false, null);

//4、准备消息
String message = "这是一个Hello World消息";
//5、发送消息
channel.basicPublish("", QUEUE_HELLOWORD, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
}

消费者:

package cn.itsource.mq._02_workqueues;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class ConsumerTest {

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、监听队列
* 4、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();
//3、监听队列(接收消息)
//这里的第二个参数autoAck,先给true,给false的话需要手动ack,后面会讲到,这里先自动确认
channel.basicConsume(ProducerTest.QUEUE_WORKQUEUES, true, new DefaultConsumer(channel){
//4、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//消费者标签
System.out.println("consumerTag=" + consumerTag);
//交换机
System.out.println("getExchange=" + envelope.getExchange());
//路由key
System.out.println("getRoutingKey=" + envelope.getRoutingKey());
//消息ID
System.out.println("getDeliveryTag=" + envelope.getDeliveryTag());
}
});
}
}

5.1.5、消息确认机制(ACK)

  • 通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。那么问题来了:RabbitMQ怎么知道消息被接收了呢?

  • 如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

  1. 自动ACK:消息一旦被接收,消费者自动发送ACK

  2. 手动ACK:消息接收后,不会自动发送ACK,需要手动调用

大家觉得哪种更好呢?

这需要看消息的重要性:

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便

  • 如果消息非常重要,不允许丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了

我们之前的测试都是自动ACK的,这样是有消息丢失风险的,所以我们要手动ACK,现在需要改动我们消费者的代码:

package cn.itsource.mq._01_helloword;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class ConsumerTest {

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、监听队列
* 4、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();
//3、监听队列(接收消息)
channel.basicConsume(ProducerTest.QUEUE_HELLOWORD, false, new DefaultConsumer(channel){
//4、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//消费者标签
System.out.println("consumerTag=" + consumerTag);
//交换机
System.out.println("getExchange=" + envelope.getExchange());
//路由key
System.out.println("getRoutingKey=" + envelope.getRoutingKey());
//消息ID
System.out.println("getDeliveryTag=" + envelope.getDeliveryTag());
//手动ack,也就是手动签收
/**
* 参数1:理解为消息的ID,你要签收具体哪个消息
* 参数2:是否批量签收,false:一次签收一个消息;true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

重要的代码只有下面这一行:

channel.basicAck(envelope.getDeliveryTag(), false);

注意:

一般这个手动签收都放在所有业务代码之后,这样做的原因是,如果处理业务中发生异常,就不会执行这个手动签收了,那么就不会通知MQ删除消息了,这样就不会丢失消息了。

5.2、Work queues消息模型


这个消息模型和上面讲的HelloWorld模型很类似

5.2.1、生产者

package cn.itsource.mq._02_workqueues;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
* @description: 消息生产者
*/
public class ProducerTest {

//队列名称
public static final String QUEUE_WORKQUEUES = "workqueues";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、准备消息
* 5、发送消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
//写在try里面,含义是:到时候会自动关闭连接
try(Connection connection = ConnectionUtil.getConnection()){
//2、创建信道
Channel channel = connection.createChannel();
//3、创建队列
/**
* 声明队列,如果RabbitMQ中没有此队列将自动创建
* param1:队列名称
* param2:是否持久化
* param3:队列是否独占此连接
* param4:队列不再使用时是否自动删除此队列
* param5:队列参数
*/
channel.queueDeclare(QUEUE_WORKQUEUES, true, false, false, null);

//发送消息
for (int i = 0; i < 20; i++) {
channel.basicPublish("", QUEUE_WORKQUEUES, null, ("这是一个Work Queues消息" + i).getBytes());
}
System.out.println("消息发送完毕");
}
}
}

5.2.2、消费者1

package cn.itsource.mq._02_workqueues;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer1 {

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、监听队列
* 4、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//3、监听队列(接收消息)
channel.basicConsume(ProducerTest.QUEUE_WORKQUEUES, true, new DefaultConsumer(channel){
//4、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//消费者标签
System.out.println("consumerTag=" + consumerTag);
//交换机
System.out.println("getExchange=" + envelope.getExchange());
//路由key
System.out.println("getRoutingKey=" + envelope.getRoutingKey());
//消息ID
System.out.println("getDeliveryTag=" + envelope.getDeliveryTag());
}
});
}
}

5.2.3、消费者2

package cn.itsource.mq._02_workqueues;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer2 {

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、监听队列
* 4、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//3、监听队列(接收消息)
//这里的autoAck先给true,给false的话需要手动ack,后面会讲到,这里先自动确认
channel.basicConsume(ProducerTest.QUEUE_WORKQUEUES, true, new DefaultConsumer(channel){
//4、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer2,收到消息:" + new String(body));
//消费者标签
System.out.println("consumerTag=" + consumerTag);
//交换机
System.out.println("getExchange=" + envelope.getExchange());
//路由key
System.out.println("getRoutingKey=" + envelope.getRoutingKey());
//消息ID
System.out.println("getDeliveryTag=" + envelope.getDeliveryTag());
}
});
}
}

5.2.4、能者多劳

经过测试,我们发现,多个消费者消费消息的模式是轮寻模式

仔细想想,这样真的合理吗?

最完美的方案是能者多劳,就是消费越快的人,消费的越多。

那怎么实现呢?

我们可以使用basicQos方法和prefetchCount = 1设置,简单理解就是一个消费者同时只能处理一个消息。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

简单理解:就是消费者手上同时只能有一个消息

消费者1代码:

package cn.itsource.mq._02_workqueues;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer1 {

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、监听队列
* 4、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//设置消息并发处理数量
channel.basicQos(1);

//3、监听队列(接收消息)
channel.basicConsume(ProducerTest.QUEUE_WORKQUEUES, false, new DefaultConsumer(channel){
//4、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//这里睡眠很久,模拟实际业务场景,处理消息很慢
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//消费者标签
System.out.println("consumerTag=" + consumerTag);
//交换机
System.out.println("getExchange=" + envelope.getExchange());
//路由key
System.out.println("getRoutingKey=" + envelope.getRoutingKey());
//消息ID
System.out.println("getDeliveryTag=" + envelope.getDeliveryTag());
//手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

消费者2代码

package cn.itsource.mq._02_workqueues;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer2 {

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、监听队列
* 4、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//设置消息并发处理数量
channel.basicQos(1);

//3、监听队列(接收消息)
//这里的autoAck先给true,给false的话需要手动ack,后面会讲到,这里先自动确认
channel.basicConsume(ProducerTest.QUEUE_WORKQUEUES, false, new DefaultConsumer(channel){
//4、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer2,收到消息:" + new String(body));
//消费者标签
System.out.println("consumerTag=" + consumerTag);
//交换机
System.out.println("getExchange=" + envelope.getExchange());
//路由key
System.out.println("getRoutingKey=" + envelope.getRoutingKey());
//消息ID
System.out.println("getDeliveryTag=" + envelope.getDeliveryTag());
//手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

生产者代码改成for循环发消息:


此时,先启动两个消费者,然后启动生产者,会发现消费者2里面会打印消费了后面19个消息,第一个消息由消费者1正在消费,只是他消费的比较慢,所以其他的消息都交给消费者2进行消费了,达到了我们所说的能者多劳的效果。


5.3、交换机分类

在之前的模式中,我们创建了一个工作队列。 工作队列背后的假设是:每个任务只被传递给一个工作人员。 在这一部分,我们将做一些完全不同的事情 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。


X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型

具体常见的分类:

  • Fanout:广播,将消息发给所有绑定到交换机的队列(所有)

  • Direct:定向,把消息发给符合指定routing key 的队列(一堆或一个)

  • Topic:通配符,把消息发给符合routing pattern(路由模式)的队列(一堆或者一个)

5.4、fanout-广播模型


5.4.1、生产者

改动:队列不再由生产者创建,而是由消费者创建,因为由消费者决定将队列绑定到哪个交换机。

package cn.itsource.mq._03_fanout;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
* @description: 消息生产者
*/
public class ProducerTest {

//交换机名称
public static final String EXCHANGE_FANOUT = "exchange_fanout";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建交换机
* 4、发送消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
try(Connection connection = ConnectionUtil.getConnection()){
//2、创建信道
Channel channel = connection.createChannel();
//3、创建交换机
channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);

//4、发送消息
for (int i = 1; i <= 5; i++) {
channel.basicPublish(EXCHANGE_FANOUT, "", null, ("这是一个fanout消息" + i).getBytes());
}
System.out.println("消息发送完毕");
}
}
}

5.4.2、消费者1

package cn.itsource.mq._03_fanout;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer1 {

public static final String QUEUE_FANOUT_1 = "queue_fanout_1";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、绑定队列到交换机
* 5、监听队列
* 6、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//设置消息并发处理数量
channel.basicQos(1);

//3、创建队列
channel.queueDeclare(QUEUE_FANOUT_1, true, false, false, null);

//4、绑定队列到交换机
channel.queueBind(QUEUE_FANOUT_1, ProducerTest.EXCHANGE_FANOUT, "");//fanout模型不用写routingKey,因为是群发

//5、监听队列
//这里的autoAck先给true,给false的话需要手动ack,后面会讲到,这里先自动确认
channel.basicConsume(QUEUE_FANOUT_1, false, new DefaultConsumer(channel){
//6、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

5.4.3、消费者2

package cn.itsource.mq._03_fanout;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer2 {

public static final String QUEUE_FANOUT_2 = "queue_fanout_2";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、绑定队列到交换机
* 5、监听队列
* 6、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//设置消息并发处理数量
channel.basicQos(1);

//3、创建队列
channel.queueDeclare(QUEUE_FANOUT_2, true, false, false, null);

//4、绑定队列到交换机
channel.queueBind(QUEUE_FANOUT_2, ProducerTest.EXCHANGE_FANOUT, "");//fanout模型不用写routingKey,因为是群发

//5、监听队列
//这里的autoAck先给true,给false的话需要手动ack,后面会讲到,这里先自动确认
channel.basicConsume(QUEUE_FANOUT_2, false, new DefaultConsumer(channel){
//6、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

5.4.4、测试

先启动生产者,观看RabbitMQ管理页面,已经有消息进去了:


然后启动两个消费者,将队列绑定到交换机,查看RabbitMQ管理页面,发现两个队列都已经产生了:


然后再运行一次生产者发布消息,此时两个消费者的控制台都会打印下面信息:


此时只要生产者发布消息,两个消费者都可以同时打印出消息

5.5、direct - 定向模型

学好了上面的fanout,这个direct 和上面的fanout就下面两个区别:

  • 交换机类型需要变

  • 需要指定routingKey


5.5.1、生产者

package cn.itsource.mq._04_direct;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
* @description: 消息生产者
*/
public class ProducerTest {

//队列名称
public static final String EXCHANGE_DIRECT = "exchange_direct";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建交换机
* 4、发送消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
try(Connection connection = ConnectionUtil.getConnection()){
//2、创建信道
Channel channel = connection.createChannel();
//3、创建交换机
channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);

//4、发送消息
for (int i = 1; i <= 5; i++) {
//这里的第二个参数就是指定routingKey
channel.basicPublish(EXCHANGE_DIRECT, "error", null, ("这是一个direct消息" + i).getBytes());
}
System.out.println("消息发送完毕");
}
}
}

5.5.2、消费者1

package cn.itsource.mq._04_direct;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer1 {

public static final String QUEUE_DIRECT_1 = "queue_direct_1";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、绑定队列到交换机
* 5、监听队列
* 6、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//设置消息并发处理数量
channel.basicQos(1);

//3、创建队列
channel.queueDeclare(QUEUE_DIRECT_1, true, false, false, null);

//4、绑定队列到交换机
channel.queueBind(QUEUE_DIRECT_1, ProducerTest.EXCHANGE_DIRECT, "error");
channel.queueBind(QUEUE_DIRECT_1, ProducerTest.EXCHANGE_DIRECT, "info");

//5、监听队列
channel.basicConsume(QUEUE_DIRECT_1, false, new DefaultConsumer(channel){
//6、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

5.5.3、消费者2

package cn.itsource.mq._04_direct;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer2 {

public static final String QUEUE_DIRECT_2 = "queue_direct_2";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、绑定队列到交换机
* 5、监听队列
* 6、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//设置消息并发处理数量
channel.basicQos(1);

//3、创建队列
channel.queueDeclare(QUEUE_DIRECT_2, true, false, false, null);

//4、绑定队列到交换机
channel.queueBind(QUEUE_DIRECT_2, ProducerTest.EXCHANGE_DIRECT, "error");
channel.queueBind(QUEUE_DIRECT_2, ProducerTest.EXCHANGE_DIRECT, "debug");

//5、监听队列
channel.basicConsume(QUEUE_DIRECT_2, false, new DefaultConsumer(channel){
//6、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer2,收到消息:" + new String(body));
//手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

5.5.4、测试

第一次

将生产者的routingKey写error,因为两个消费者的队列都和error这个routingKey绑定了,所以都接收并打印出生产者的消息

第二次

将生产者的routingKey写info,因为只有消费者1的队列和info这个routingKey绑定了,所以只有消费者1接收并打印出了生产者的消息,而消费者2没有和info这个routingKey绑定,所以是打印不出消息

5.6、topic - 通配符模型


lazy.orange.rabbit:都可以收到消息

lazy.hello.orange.rabbit:只有队列2收到消息

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符。

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

audit.#:能够匹配audit.irs.corporate 或者 audit.irs

audit.*:只能匹配audit.irs

5.6.1、生产者

package cn.itsource.mq._05_topic;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
* @description: 消息生产者
*/
public class ProducerTest {

//队列名称
public static final String EXCHANGE_TOPIC = "EXCHANGE_TOPIC";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建交换机
* 4、发送消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
try(Connection connection = ConnectionUtil.getConnection()){
//2、创建信道
Channel channel = connection.createChannel();
//3、创建交换机
channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC);

//4、发送消息
for (int i = 1; i <= 5; i++) {
channel.basicPublish(EXCHANGE_TOPIC, "pethome.userservice.error", null, ("这是一个topic消息" + i).getBytes());
}
System.out.println("消息发送完毕");
}
}
}

5.6.2、消费者1

package cn.itsource.mq._05_topic;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer1 {

public static final String QUEUE_TOPIC_1 = "QUEUE_TOPIC_1";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、绑定队列到交换机
* 5、监听队列
* 6、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//设置消息并发处理数量
channel.basicQos(1);

//3、创建队列
channel.queueDeclare(QUEUE_TOPIC_1, true, false, false, null);

//4、绑定队列到交换机
channel.queueBind(QUEUE_TOPIC_1, ProducerTest.EXCHANGE_TOPIC, "*.*.info");

//5、监听队列
channel.basicConsume(QUEUE_TOPIC_1, false, new DefaultConsumer(channel){
//6、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer1,收到消息:" + new String(body));
//手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

5.6.3、消费者2

package cn.itsource.mq._05_topic;

import cn.itsource.mq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
* @description: 消费者
*/
public class Consumer2 {

public static final String QUEUE_TOPIC_2 = "QUEUE_TOPIC_2";

/**
* 步骤:
* 1、创建连接
* 2、创建信道
* 3、创建队列
* 4、绑定队列到交换机
* 5、监听队列
* 6、处理消息
*/
public static void main(String[] args) throws Exception {
//1、创建连接
Connection connection = ConnectionUtil.getConnection();
//2、创建信道
Channel channel = connection.createChannel();

//设置消息并发处理数量
channel.basicQos(1);

//3、创建队列
channel.queueDeclare(QUEUE_TOPIC_2, true, false, false, null);

//4、绑定队列到交换机
channel.queueBind(QUEUE_TOPIC_2, ProducerTest.EXCHANGE_TOPIC, "pethome.*.error");
channel.queueBind(QUEUE_TOPIC_2, ProducerTest.EXCHANGE_TOPIC, "pethome.*.info");

//5、监听队列
channel.basicConsume(QUEUE_TOPIC_2, false, new DefaultConsumer(channel){
//6、处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是消费者Consumer2,收到消息:" + new String(body));
//手动签收
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

5.6.4、测试

通过测试,我们可以发现,符合通配符规则的消费者是可以打印出接收到的消息,否则是打印不出消息的。

6、持久化-解决数据安全

含义:将RabbitMQ中的数据从内存中保存到磁盘中

6.1、交换机持久化

持久化之前代码:

//3、创建交换机
channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC);

持久化之后代码:

//3、创建交换机
channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC, true);

相当于调用exchangeDeclare方法时,入参最后多加了一个true,表示开启持久化

6.2、队列持久化代码

//3、创建队列
channel.queueDeclare(QUEUE_TOPIC_1, true, false, false, null);

之前我们写的代码中,队列已经开启了队列持久化,第二个参数设置为 true,就表明开启队列的持久化

6.3、消息持久化

消息持久化之前代码:

channel.basicPublish(EXCHANGE_TOPIC, "pethome.userservice.error", null, ("这是一个topic消息" + i).getBytes());

消息持久化之后代码:

channel.basicPublish(EXCHANGE_TOPIC, "pethome.userservice.error", MessageProperties.PERSISTENT_TEXT_PLAIN, ("这是一个topic消息" + i).getBytes());

将第三个参数由null改为MessageProperties.PERSISTENT_TEXT_PLAIN即可开启持久化了

7、SpringBoot集成RabbitMQ

项目三中我们再具体介绍

8、晚自习

  1. RabbitMQ的安装,正常使用

  2. 白天代码全部要敲一遍

  3. 自己总结笔记

#



文章不错,扫码支持一下吧~
上一篇 下一篇
评论
来首音乐
光阴似箭
今日已经过去小时
这周已经过去
本月已经过去
今年已经过去个月