1.RabbitMQ简介

1.1 什么是RabbitMQ

1.2 RabbitMQ的特点

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点如下:

1.3 架构图与主要概念

1.3.1 架构图

clipboard (1).png

1.3.2 主要概念

2. RabbitMQ的安装

2.1 Windows下RabbitMQ的安装

(1) 下载并安装Erlang

下载 otp_win64_20.2.exe , 并以管理员的身份运行安装 ;

(2) 下载并安装rabbitMQ Server

下载地址 : http://www.rabbitmq.com/install-windows-manual.html

下载完成为 : rabbitmq-server-3.7.4.exe

注意 : 安装路径不能存在中文与空格 , 安装完整后Windows服务中就已经是存在rabbitMQ了 , 而且是启动状态

(3) 安装管理界面

进入 rabbitMQ 安装目录的 sbin 目录,输入以下命令

rabbitmq-plugins enable rabbitmq_management 

clipboard.png

(4) 重启rabbitmq 服务

clipboard.png

(5) 测试

打开浏览器地址栏输入http://127.0.0.1:15672(默认端口为15671) ,即可看到管理界面的登陆页如下 , 用户名和密码均为guest , 进入主界面

Snipaste_2019-05-08_21-11-44.png

clipboard.png

2.2 docker下安装rabbitMQ

(1) 镜像下载 :

docker pull rabbitmq:management 

(2) 创建容器 :

rabbitmq 需要有映射以下端口: 5671 5672 4369 15671 15672 25672 , 启动命令中以多个 -p 映射端口号 , 其中各个端口的作用如下 :

docker run -di --name=rabbitmq_demo -p 5671:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management 

(3) 测试 :

在浏览器中访问如下url(ip地址根据实际情况更换 , 这里我安装在个人虚拟上) :

http://192.168.66.133:15672

image.png

3. RabbitMQ的Exchange模式详述

以下RabbitMQ代码实例均使用Spring对Rabbit进行整合 , 执行相关操作

3.1 直接模式(Direct)

3.1.1 使用场景

我们需要将消息发给唯一一个节点(队列)时使用这种模式,这是最简单的一种形式。

3.1.2 直接模式的流程图

image.png

3.1.3 直接模式实例

(1) 创建队列

创建一个叫test_queue01的队列

image.png

Durability:是否做持久化 Durable(持久) transient(临时)

Auto delete : 是否自动删除

(2) 环境准备

创建maven工程(module) , 在pom.xml中引入AMQP起步依赖以及其他 :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wk</groupId>
    <artifactId>rabbitmq_demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/>
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!--RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--Spring Boot 整合 Junit-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

编写spring boot的application.yml配置文件 :

server:
  port: 9201
spring:
  rabbitmq:
    host: 192.168.66.133
    username: guest # 默认值,可以不写
    password: guest # 默认值,可以不写

编写SpringBoot启动类 :

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqApplication.class, args);
    }
}

(3) 代码实现消息生产者与消费者

编写消息生产者测试类 , 发送消息到上面已创建好的queue :

关键对象 : RabbitMessagingTemplate

image.png

编写消息消费者测试类 :

关键注解 : @RabbitListener, @Component, @RabbitHandler

代码编写完毕后 , 运行工程 , 消费者自动生效

image.png

工程启动完毕 , 消费者自动接收指定queue的消息并处理 , 可以看到控制台中显示如下内容:

image.png

(4) 多个消费者同时监听一个queue测试

这里使用IDEA进行测试时会遇到一个问题, 就是IDEA默认是不允许启动两个相同的application的, 需要进行相关设置 , 如下 :

image.png

image.png

注意 : 上述设置修改完毕后 , 若要再次启动同一个应用 , 需要在application.yml中 重新指定不同的端口号 , 默认是8080 , 上面已经启动了一个端口号为9201的程序 , 这里修改端口号为9202并再次启动 ;

server:
  port: 9202 # <----修改端口号为9202
spring:
  rabbitmq:
    host: 192.168.66.133
    username: guest # 默认值,可以不写,密码同
    password: guest

再次启动时发现控制台运行着两个同样的程序 , 端口号不相同

image.png

连续4次运行生产者程序 , 两个消息消费者接收消息的情况如下 , 表明多个消费者同时监听一个queue的时候 , 消息的分发情况是遵循轮询算法的

image.png

3.2 分列模式(Fanout)

3.2.1 使用场景

当我们需要将消息一次发送给多个队列时 , 需要使用该模式

3.2.2 分列模式流程图

image.png

3.2.3 分列模式实例

(1) 新建两个queue

image.png

(2) 新建一个exchange

image.png

(3) 将 test_queue02 和 test_queue03 绑定到 test_fanout_exchange 上

image.png

绑定完成如下 :

1557322434836.png

(4) 代码实现消息生产者

image.png

(5) 代码实现消息消费者

消费者1 监听 test_queue02

@RabbitListener(queues = "test_queue02")
@Component
public class TestFanoutConsumer1 {
    @RabbitHandler
    public void handle(String msg){
        System.out.println("consumer1接收消息为:" + msg);
    }
}

消费者2 监听 test_queue03

@RabbitListener(queues = "test_queue03")
@Component
public class TestFanoutConsumer2 {
    @RabbitHandler
    public void handle(String msg){
        System.out.println("consumer2接收消息为:" + msg);
    }
}

执行生产者代码 , 消息发送到exchange 中 , 然后启动消费者 , 消费者接收并处理消息内容 , 打印到控制台上 , 如下

image.png

3.3 主题模式(Topic)

3.3.1 使用场景

需要通过exchange将消息转发到关心该消息routingkey 的queue 上时 , 可以使用主题模式

3.3.2 主题模式详解

image.png

实质是通过exchange 实现 类似activeMQ的 主题/订阅模式 , 指定exchange 将消息发送到其绑定的 ,bindkey与生产者发送消息到exchange时指定的routingkey相匹配的 queue中 , 说白一点 , 就是消息的routingkeybindkey进行模糊匹配 .

这里需要了解上图中的一些概念 :

image.png

image.png

主题模式的特点 :

3.3.3 主题模式实例

(1) 新建exchange

image.png

(2) 新建queue

image.png

(3) 绑定exchange与queue , 并指定routingkey

image.png

image.png

(4) 代码实现消息消费者

消费者1 :

@RabbitListener(queues = "test_topic_queue01")
@Component
public class TestTopicConsumer01 {
    @RabbitHandler
    public void handle(String msg){
        System.out.println("TopicConsumer01 - routingkey:aaa.#");
        System.out.println("接收消息为:" + msg);
        System.out.println("------------------------------------");
    }
}

消费者2 :

@RabbitListener(queues = "test_topic_queue02")
@Component
public class TestTopicConsumer02 {
    @RabbitHandler
    public void handle(String msg){
        System.out.println("TopicConsumer02 - routingkey:aaa.*");
        System.out.println("接收消息为:" + msg);
        System.out.println("------------------------------------");
    }
}

消费者3 :

@RabbitListener(queues = "test_topic_queue03")
@Component
public class TestTopicConsumer03 {
    @RabbitHandler
    public void handle(String msg){
        System.out.println("TopicConsumer03 - routingkey:bbb.aaa");
        System.out.println("接收消息为:" + msg);
        System.out.println("------------------------------------");
    }
}

启动程序监听queue ;

(5) 代码实现消息生产者

/**
 * topic模式
 */
@Test
public void testTopicSend() {
    rabbitTemplate.convertAndSend("test_topic_exchange","aaa.bbb","test topic 1111");
    //rabbitTemplate.convertAndSend("test_topic_exchange","aaa.bbb.ccc","test topic 2222");
    //rabbitTemplate.convertAndSend("test_topic_exchange","bbb.aaa","test topic 3333");
}

依次执行6 7 8行代码 , 消费者接收消息打印到控制台的情况如下 :

image.png

image.png

image.png

这里需要注意两点 :

  1. 多个消费者的操作是线程不安全的 , 上述第一个图出现了以下打印情况
TopicConsumer02 - routingkey:aaa.*
接收消息为:test topic 1111
TopicConsumer01 - routingkey:aaa.#
接收消息为:test topic 1111
------------------------------------
------------------------------------
  1. 执行消费者代码时 , 如果同时取消注释 6 7 行代码 , 按道理应该是出现以下打印情况 :
TopicConsumer01 - routingkey:aaa.#
接收消息为:test topic 1111
------------------------------------
TopicConsumer01 - routingkey:aaa.#
接收消息为:test topic 2222
------------------------------------
TopicConsumer02 - routingkey:aaa.*
接收消息为:test topic 1111
------------------------------------

但是实际打印结果是 :

TopicConsumer01 - routingkey:aaa.#
接收消息为:test topic 1111
------------------------------------
TopicConsumer02 - routingkey:aaa.*
接收消息为:test topic 1111
------------------------------------

这个问题尚待思考 ;


参考链接:

https://blog.csdn.net/m0_37383637/article/details/79264767

https://blog.csdn.net/u013952133/article/details/79435783

https://blog.csdn.net/mingongge/article/details/81132632