共计 19591 个字符,预计需要花费 49 分钟才能阅读完成。
一、RabbitMQ 相关概念
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。
1 生产者和消费者
Producer:生产者,投递消息的一方。
生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体和标签(Label)。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来描述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者(Consumer)。
Consumer:消费者, 接受消息的一方。
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
Broker:消息中间件的服务节点。
对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看做一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看做一台RabbitMQ服务器。
2 队列
Queue:队列,是 RabbitMQ 的内部对象,用于存储消息。
RabbitMQ 中消息都只能存储在队列中,这一点和Kafka这种消息中间件相反。Kafka 将消息存储在topic(主题)这个逻辑层面,而相对应的队列逻辑知识 topic 实际存储文件中的位移标识。RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
RabbitMQ 不支持队列层面的广播消息,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做。
3 交换器、路由键、绑定
Exchange:交换器。
生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。这里可以将 RabbitMQ 中的交换器看做一个简单的实体。
交换器有四种类型,不同的类型有着不同的路由决策:
1)fanout
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
2)direct
direct类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。
3)topic
前面讲到direct类型的交换器路由规则是完全匹配BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器类似,也是将消息路由到BindingKey和RoutingKey想匹配的队列中,但这里的匹配规则有些不同,它约定如下:
- RoutingKey为一个点号“.”分割的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如“com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
- BindingKey和RoutingKey一样也是点号“.”分隔的字符串;
- BindingKey中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多规则单词(可以是零个)。
4)headers
headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
RoutingKey:路由键。
生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。
Binding: 绑定。
RabbitMQ中通过绑定将交换器与队列联合起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了
生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。BindingKey并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视BindingKey,而是将消息路由到所有绑定该交换器的队列中。
二、RabbitMQ 运转流程
了解了以上的RabbitMQ架构模型及相关术语,再来回顾整个消息队列过程。
在最初状态下,生产者发送消息的时候
(1)生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
(2)生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
(3)生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
(4)生产者通过路由键将交换器和队列绑定起来
(5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
(6)相应的交换器根据接收到的路由键查找相匹配的队列。
(7)如果找到,则将从生产者发送过来的消息存入相应的队列。
(8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
(9)关闭信道
(10)关闭连接
消费者接收消息的过程:
(1)消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
(2)消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
(3)等待RabbitMQ Broker回应并投递相应队列中队列的消息,消费者接收消息。
(4)消费者确认(ack)接收到的消息。
(5)RabbitMQ从队列中删除相应已经被确认的消息。
(6)关闭信道
(7)关闭连接
又引入了两个新的概念:Connection
和Channel
。我们知道无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
完全可以直接使用Connection就能完成信道的工作,为什么还要引入信道呢?试想这样一个场景,一个应用程序中有很多个线程需要从RabbitMQ中消费消息,或者生产消息,那么必然要建立很多个Connection,也就是许多个TCP连接。然后对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ采用类似NIO(Non-blocking I/O)的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection,将这些信道均摊到这些Connection中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。
NIO,也称为非阻塞I/O,包含三大核心部分:Channel(信道)、Buffer(缓冲区)和Selector(选择器)。NIO基于Channel和Buffer进行操作,数据总是从信道读取数据到缓冲区中,或者从缓存区写入到信道中。Selector用于监听多个信道的事件(比如连接打开,数据到达等)。因此,单线程可以监听多个数据的信道。
三、RabbitMQ 安装
1 安装依赖
yum install -y make gcc gcc-c++ m4 openssl openssl-devel
yum install -y ncurses-devel unixODBC unixODBC-devel java java-devel socat
2 安装 Erlang
Erlang RPM包下载地址:https://packagecloud.io/rabbitmq/erlang
下载成功后,到下载的文件资源目录执行以下命令:
yum localinstall erlang-22.3.4.10-1.el7.x86_64.rpm
# 安装成功后,可以以下运行命令来查看erl版本
erl -version
3 安装 RabbitMQ
RabbitMQ RPM包下载地址:https://github.com/rabbitmq/rabbitmq-server/releases
下载完成后,你需要运行下面的命令来将key导入:
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
安装:
yum localinstall rabbitmq-server-3.8.8-1.el7.noarch.rpm
# 命令启动 rabbitmq 服务器
systemctl start rabbitmq-server
# 添加 web 管理插件
rabbitmq-plugins enable rabbitmq_management
通过IP:端口(http://172.16.93.128:15672)的形式,就可以访问RabbitMQ的Web管理界面了。
4 初始操作
添加新用户,用户名为"root",密码为"root" :
rabbitmqctl add_user root root
为root用户设置所有权限:
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
设置用户为管理员角色:
rabbitmqctl set_user_tags root administratorxxxxxxxxxx 设置用户为管理员角色rabbitmqctl set_user_tags root administratorsh
默认情况下,访问RabbitMQ服务的用户名和密码都是"guest",这个账户有限制,默认只能通过本地网络(如localhost)访问,远程网络访问受限,使用默认的用户 guest / guest (此也为管理员用户)登陆,会发现无法登陆,报错:User can only log in via localhost。那是因为默认是限制了guest用户只能在本机登陆,也就是只能登陆localhost:15672。所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限
四、RabbitMQ 常用命令
1 服务启动与停止
# 启动
systemctl start rabbitmq-server
# 停止
systemctl stop rabbitmq-server
# 查看状态
systemctl status rabbitmq-server
2 插件管理
# 插件列表
rabbitmq-plugins list
# 启动插件(XXX为插件名)
rabbitmq-plugins enable XXX
# 停用插件(XXX为插件名)
rabbitmq-plugins disable XXX
3 用户管理
# 添加用户
rabbitmqctl add_user username password
# 删除用户
rabbitmqctl delete_user username
# 修改密码
rabbitmqctl change_password username newpassword
# 设置用户角色
rabbitmqctl set_user_tags username tag
# 列出用户
rabbitmqctl list_users
4 权限管理
# 列出所有用户权限
rabbitmqctl list_permissions
# 查看指定用户权限
rabbitmqctl list_user_permissions username
# 清除用户权限
rabbitmqctl clear_permissions [-p vhostpath] username
# 设置用户权限
rabbitmqctl set_permissions [-p vhostpath] username conf write read
conf: 一个正则匹配哪些资源能被该用户访问
write:一个正则匹配哪些资源能被该用户写入
read: 一个正则匹配哪些资源能被该用户读取
五、RabbitMQ 基本使用
准备工作,实现获取连接的工具类:
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
1 简单模式
直接发送到队列,只有一个消费者,最简单的模式。
public class Send {
// 创建队列,发送消息
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息:"+message);
//关闭连接和通道
channel.close();
connection.close();
}
}
public class Recv{
// 消费者消费消息
public static void main(String[] args) throws Exception {
// 获取连接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明通道
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
//这个方法会阻塞住,直到获取到消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收到消息:"+message);
}
}
}
2 work模式
和简单模式类似,也是直接发送到队列,不同的是有多个消费者,消息将轮流发送给消费者进行消费。
public class Send{
//消息生产者
public static void main(String[] args) throws Exception {
//获取连接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "";
for(int i = 0; i<100; i++){
message = "" + i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息:"+message);
Thread.sleep(i);
}
channel.close();
connection.close();
}
}
// 消费者1
public class Recv1 {
//消费者
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//同一时刻服务器只发送一条消息给消费端
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,false,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(100);
//消息消费完给服务器返回确认状态,表示该消息已被消费
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
// 消费者2
public class Recv2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(10);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
消息消费的两种模式:
1、 自动模式
消费者从消息队列获取消息后,服务端就认为该消息已经成功消费。(可能会导致处理异常就移除消息的情况)
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(10);
//无需反馈
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
2、 手动模式
消费者从消息队列获取消息后,服务端并没有标记为成功消费
消费者成功消费后需要将状态返回到服务端
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME,false,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(100);
//消息消费完给服务器返回确认状态,表示该消息已被消费
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
3 订阅模式
一个生产者发送的消息会被多个消费者获取。
生产者:可以将消息发送到队列或者是交换机。
消费者:只能从队列中获取消息。
如果消息发送到没有队列绑定的交换机上,那么消息将丢失。
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。 fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。 fanout 类型转发消息是最快的 。
消息发送到交换机,因为不处理路由键,所以所有与该交换机绑定的队列都将收到消息。
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
// 生产者,发送消息到交换机
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
// 消费者1
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public final static String QUEUE_NAME = "test_queue_exchange_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机上
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
}
// 消费者2
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_fanout";
public final static String QUEUE_NAME = "test_queue_exchange_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机上
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
}
4 路由模式
1、 发送消息到交换机时,要指定路由key
2、 消费者将队列绑定到交换机时,需要指定路由key
一种完全匹配,只有匹配到的消费者才能消费消息
消息中的路由键( routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“ dog”,则只转发 routing key 标记为“ dog”的消息,不会转发“ dog.puppy”,也不会转发“ dog.guard”等等。它是完全匹配、单播的模式。
消息发送到交换机,所有 binding key 与 routing key 一致的队列将收到消息,从而让消费者消费。说白了是生产者有目的的发送消息。
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_direct";
//生产者
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机 fanout:交换机类型 主要有fanout,direct,topics三种
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME,"dog",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
// 消费者1
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public final static String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机上,并制定路由键为"dog"
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"dog");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
}
// 消费者2
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_direct";
public final static String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机上,并制定路由键为"cat"
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"cat");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
}
5 通配符模式
topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词。如下图所示:
消息发送到交换机,类似路由模式,只是这里变成了可以通配符匹配,更方便进行分配。
// 生产者
public class Send {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机 topic:交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME,"dog.1",null,message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
}
// 消费者1
public class Recv1 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public final static String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上,并制定路由键为"dog"
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"dog");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
}
// 消费者2
public class Recv2 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public final static String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上,并制定路由键匹配规则为"#.1"
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"#.1");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
}
// 消费者3
public class Recv3 {
public static final String EXCHANGE_NAME = "test_exchange_topic";
public final static String QUEUE_NAME = "test_queue_topic_3";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机上,并制定路由键匹配规则为"cat.#"
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"cat.#");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
}
结果:消费者1和消费者2可以收到消息,消费者3不能收到消息。
六、RabbitMQ 整合 SpringBoot
1 新建 SpringBoot 项目
pop.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbit</groupId>
<artifactId>springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties 文件:
## 端口号
server.port=8080
## rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
2 新建 RabbitMQConfig
@Configuration
public class RabbitMQConfig {
// 测试队列名称
private String testQueueName = "test_queue";
// 测试交换机名称
private String testExchangeName = "test_exchange";
// RoutingKey
private String testRoutingKey = "test_routing_key";
/** 创建队列 */
@Bean
public Queue testQueue() {
return new Queue(testQueueName);
}
/** 创建交换机 */
@Bean
public TopicExchange testExchange() {
return new TopicExchange(testExchangeName);
}
/** 通过routingKey把队列与交换机绑定起来 */
@Bean
public Binding testBinding() {
return BindingBuilder.bind(testQueue()).to(testExchange()).with(testRoutingKey);
}
}
3 创建生产者
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String queueName) {
public void send(String queueName) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "1532246395@qq.com");
jsonObject.put("timestamp", System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
rabbitTemplate.convertAndSend(queueName, jsonString);
}
}
4 创建消费者
@Component
@RabbitListener(queues = "OneByOne")
public class Consumer {
@RabbitListener(queues = "test_queue")
public void consumeMessage(Message message) throws Exception{
String msg = new String(message.getBody(), "UTF-8");
JSONObject jsonObject = JSONObject.parseObject(msg);
System.out.println("消费消息:" + jsonObject);
}
}
5 编写controller测试
@RestController
public class ProducerController {
@Autowired
private Producer producer;
@RequestMapping("/sendMsg")
public String sendFanout() {
producer.send("test_queue");
return "success";
}
}
浏览器访问 http://localhost:8080/sendMsg,结果:
消费消息:{"email":"1532246395@qq.com","timestamp":1629766485289}
提醒:本文发布于609天前,文中所关联的信息可能已发生改变,请知悉!