
消息队列MQ
MQ全称为Message Queue,消息队列是应用程序之间的通信方法。
为什么使用MQ?
在项目中,可将一些无需及时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
开发中消息队列通常有如下应用场景:
任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方法进行异步处理,提高了应用程序的相应时间。
应用程序解耦合
MQ相当于一个中介,生产方式通过MQ与消费方交互,它将应用程序进行解耦合。
AMQP和JMS
MQ是消息通信的模型,实现MQ的大致有两种主流方式:AMQP、JMS。
AMQP
AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议),这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
JMS
JMS即Java消息服务(JavaMwssage Service)应用程序接口,是一个Java平台关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP与JMS区别
消息队列产品
市场上常见的消息队列有如下:
RabbitMQ
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ,不作介绍)。
官方对应模式介绍:https://www.rabbitmq.com/getstarted.html
安装说明
略
用户以及Virtual Hosts配置
用户角色
RabbitMQ在安装好后,可以访问http://localhost:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:
角色说明:
超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
创建Virtual Hosts
设置Virtual Hosts权限
入门案例中rabbitMQ的工作模式为:简单模式。
创建项目模块:ebuy-rabbitmq
引入相关依赖
org.springframework.cloud spring-cloud-stream-binder-rabbit
抽取创建connecttion的工具类
package cn.ebuy.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception
{
// 创建连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
// 主机地址:默认为localhost
connectionFactory.setHost("127.0.0.1");
// 连接端口:默认为5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为 /
connectionFactory.setVirtualHost("/crm");
// 连接用户名:默认为guest
connectionFactory.setUsername("qhj");
// 连接密码:默认为guest
connectionFactory.setPassword("101521");
// 创建连接
Connection connection=connectionFactory.newConnection();
return connection;
}
}
编写生产者
package cn.ebuy.rabbitmq.simple;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
// 队列名称
static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) {
try {
// 获得连接(从工具类中创建)
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
// 定义要发送的消息
String message="你好,我是001";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 发送消息后做输出
System.out.println("消息已发送:"+message);
// 关闭资源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
编写消费者
package cn.ebuy.rabbitmq.simple;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
// 队列名称
static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) {
try {
// 获得连接
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
//不关闭资源,应该一直监听消息
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试
依次启动consumer、producer:
小结
在上图的模型中,有以下概念:
Producer:生产者,也就是要发送消息的程序;
Consumer:消费者:消息的接受者,会一直等待消息到来。queue:消息队列,图中绿色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
注意:
在rabbitMQ中消息者是一定要到某个消息队列中取获取消息的。
简单模式
简单模式就是入门案例中所描述的实现方式,简单的实现消息发送与接收。
Work queues工作队列模式
消费者之间是竞争关系。
Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
代码实现:Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
生产者
package cn.ebuy.rabbitmq.work;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
// 队列名称
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) {
try {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for(int i=1;i<=30;i++) {
// 发送消息
String message = "你好,我是work:"+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息已发送:" + message);
}
// 关闭资源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者1
package cn.ebuy.rabbitmq.work;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
// 队列名称
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) {
try {
// 获得连接
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//一次只能接收并处理一个消息
//channel.basicQos(1);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("消费者1 接收到的消息为:" + new String(body, "utf-8"));
// 为了测试消费者的竞争关系,加上休眠
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//确认消息
// channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者2
package cn.ebuy.rabbitmq.work;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
// 队列名称
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) {
try {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//一次只能接收并处理一个消息
//channel.basicQos(1);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("消费者2 接收到的消息为:" + new String(body, "utf-8"));
// 为了测试消费者的竞争关系,加上休眠
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试
依次启动两个消费者,然后再启动生产者:
小结
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
Publish/Subscribe发布与订阅模式
面向交换机,解决消费者之间的竞争关系。
在订阅模型中,多了一个exchange角色,而且过程略有变化:
Producer:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给Exchange(交换机);
Consumer:消费者,消息的接受者,会一直等待消息到来;Queue:消息队列,接收消息、缓存消息。
Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
模式说明:
- 每个消费者监听自己的队列。
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息
生产者
package cn.ebuy.rabbitmq.publishSubscribe;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
// 交换机名称
static final String FANOUT_EXCHAGE = "fanout_exchange";
// 队列1名称
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
// 队列2名称
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
// 将队列绑定到交换机中(这里一般不用绑定)
// 交换机只是一个用于传递消息的平台,当生产者发送消息时,只需将队列加入到交换机中即可,绑定规则由消费者来决定
//channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHAGE,"");
//channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHAGE,"");
for(int i=1;i<=30;i++) {
// 定义要发送的消息
String message = "你好,我是Publish/Subscribe:"+i;
channel.basicPublish(FANOUT_EXCHAGE,"", null, message.getBytes());
// 发送完消息后输出
System.out.println("消息已发送:" + message);
}
// 关闭资源
channel.close();
connection.close();
}
}
消费者1
package cn.ebuy.rabbitmq.publishSubscribe;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
// 交换机名称
static final String FANOUT_EXCHAGE = "fanout_exchange";
// 队列1名称
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
// 队列2名称
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) {
try {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(FANOUT_EXCHAGE,BuiltinExchangeType.FANOUT);
// 声明(创建)队列
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
// 将队列绑定到交换机中
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE,"");
// channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE,"");
// 创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("消费者1 接收到的消息为:" + new String(body, "utf-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 监听消息
channel.basicConsume(FANOUT_QUEUE_1, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者2
package cn.ebuy.rabbitmq.publishSubscribe;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
// 交换机名称
static final String FANOUT_EXCHAGE = "fanout_exchange";
// 队列1名称
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
// 队列2名称
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) {
try {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(FANOUT_EXCHAGE,BuiltinExchangeType.FANOUT);
// 声明(创建)队列
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
// 将队列绑定到交换机中
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE,"");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("消費者2 接收到的消息为:" + new String(body, "utf-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 监听消息
channel.basicConsume(FANOUT_QUEUE_2, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试
依次启动两个消费者,再启动生产者:
小结
交换机需要 队列进行绑定,绑定之后,一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
Routing路由模式
实现了消费者方可以接收指定消息。
路由模式特点:队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
图解:
Producer:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
Exchange(交换机):接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
Consumer1:消费者,其所在队列指定了需要routing key 为 error 的消息
Consumer2:消费者,其所在队列指定了需要routing key 为 info、warning 的消息
生产者
package cn.ebuy.rabbitmq.routing;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
// 交换机名称
static final String DIRECT_EXCHAGE = "direct_exchange";
// 队列名称(插入)
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
// 队列名称(更新)
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws Exception {
// 获取连接
Connection conn= ConnectionUtil.getConnection();
// 创建频道
Channel channel=conn.createChannel();
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 声明(创建)队列
channel.queueDeclare(DIRECT_QUEUE_INSERT,true,false,false,null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE,true,false,false,null);
// 发送消息
// 发送消息1
String message = "新增了商品。路由模式;routing key 为 insert ";
channel.basicPublish(DIRECT_EXCHAGE,"INSERT",null,message.getBytes());
// 发送消息2
message = "修改了商品。路由模式;routing key 为 update" ;
channel.basicPublish(DIRECT_EXCHAGE,"UPDATE",null,message.getBytes());
// 关闭资源
channel.close();
conn.close();
}
}
消费者1
package cn.ebuy.rabbitmq.routing;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
// 交换机名称
static final String DIRECT_EXCHAGE = "direct_exchange";
// 队列名称(插入)
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
// 队列名称(更新)
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) {
try {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(DIRECT_EXCHAGE,BuiltinExchangeType.DIRECT);
// 创建(声明)队列
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
// 将队列绑定到交换机中(指定routing key)
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE,"INSERT");
// 创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("消費者1 接收到的消息为:" + new String(body, "utf-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 监听消息
channel.basicConsume(DIRECT_QUEUE_INSERT, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者2
package cn.ebuy.rabbitmq.routing;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
// 交换机名称
static final String DIRECT_EXCHAGE = "direct_exchange";
// 队列名称(插入)
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
// 队列名称(更新)
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) {
try {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(DIRECT_EXCHAGE,BuiltinExchangeType.DIRECT);
// 创建(声明)队列
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
// 键队列绑定到交换机中(并指定routing key)
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE,"UPDATE");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("消費者2 接收到的消息为:" + new String(body, "utf-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 监听消息
channel.basicConsume(DIRECT_QUEUE_UPDATE, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试
依次启动两个消费者,再启动生产者:
小结
Routing模式要求队列在 交换机时要指定routing key,消息会你发到和routing key的队列。
Topics通配符模式
实现了消费者方一次可以接收多个指定消息。
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
" # ":匹配一个或多个词;
" * ":匹配不多不少恰好1个词
例如:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
图解:
红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到;
黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。
生产者
使用topic类型的exchange,发送消息的routing key有三种:item.insert、item.update、item.delete。
package cn.ebuy.rabbitmq.topic;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
// 交换机名称
static final String TOPIC_EXCHAGE = "topic_exchange";
// 队列名称
static final String TOPIC_QUEUE_1 = "topic_queue_1";
// 队列名称
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
// 获取连接
Connection conn= ConnectionUtil.getConnection();
// 创建频道
Channel channel=conn.createChannel();
// 声明交换机
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 发送消息
// 发送消息1
String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送消息2
message = "商品修改。通配符模式 ;routing key 为 item.update ";
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送消息2
message = "商品删除。通配符模式 ;routing key 为 item.delete ";
channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
conn.close();
}
}
消费者1
package cn.ebuy.rabbitmq.topic;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
// 交换机名称
static final String TOPIC_EXCHAGE = "topic_exchange";
// 队列名称
static final String TOPIC_QUEUE_1 = "topic_queue_1";
// 队列名称
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) {
try {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
// 声明交换机
channel.exchangeDeclare(TOPIC_EXCHAGE,BuiltinExchangeType.TOPIC);
// 创建(声明)队列
`
channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
// 将队列绑定到交换机中(指定要接收的routing key)
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,
"item.update");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE,
"item.delete");
// 创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("消費者1 接收到的消息为:" + new String(body, "utf-8"));
}
};
// 监听消息
channel.basicConsume(TOPIC_QUEUE_1, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者2
package cn.ebuy.rabbitmq.topic;
import cn.ebuy.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
// 交换机名称
static final String TOPIC_EXCHAGE = "topic_exchange";
// 队列名称
static final String TOPIC_QUEUE_1 = "topic_queue_1";
// 队列名称
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) {
try {
// 获取连接
Connection connection= ConnectionUtil.getConnection();
// 创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(TOPIC_EXCHAGE,BuiltinExchangeType.TOPIC);
// 创建(声明)队列
channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
//队列绑定交换机(指定要接收的routing key)
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE,
"item.#");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
// 交换机
System.out.println("交换机为:" + envelope.getExchange());
// 消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
// 收到的消息
System.out.println("消費者2 接收到的消息为:" + new String(body, "utf-8"));
}
};
// 监听消息
channel.basicConsume(TOPIC_QUEUE_2, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试
依次启动两个消费者,然后再启动生产者发送消息。在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
小结
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式的功能;只是Topic在配置routing key的时候可以使用通配符,显得更加灵活。
RabbitMQ工作模式:
简介
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ,尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
生产者工程:
消费者工程:
搭建生产者工程
创建工程
创建工程:ebuy-rabbitmq
添加依赖
org.springframework.cloud spring-cloud-stream-binder-rabbit org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp
添加启动类
package cn.ebuy.rabbitmq;
import cn.ebuy.rabbitmq.provider.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
@SpringBootApplication
public class RabbitProviderApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitProviderApplication.class,args);
}
}
配置RabbitMQ配置文件
server:
port: 9091
spring:
rabbitmq:
username: qhj
password: *****
virtual-host: /crm
host: 127.0.0.1
port: 5672
绑定交换机和队列
package cn.ebuy.rabbitmq.provider;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 交换机名称
public static final String EBUY_EXCHANGE = "ebuy_exchange";
// 产品队列名称
public static final String PRODUCT_QUEUE = "product_queue1";
// 订单队列名称
public static final String ORDER_QUEUE = "order_queue1";
@Bean("EbuyExchange")
public Exchange ebuyExchange()
{
return ExchangeBuilder.topicExchange(EBUY_EXCHANGE).durable(true).build();
}
@Bean("ProductQueue")
public Queue ProductQueue()
{
return QueueBuilder.durable(PRODUCT_QUEUE).build();
}
@Bean("OrderQueue")
public Queue OrderQueue()
{
return QueueBuilder.durable(ORDER_QUEUE).build();
}
@Bean
public Binding productQueueExchange(@Qualifier("ProductQueue") Queue queue,@Qualifier("EbuyExchange") Exchange
exchange)
{
return BindingBuilder.bind(queue).to(exchange).with("product.#").noargs();
}
@Bean
public Binding orderQueueExchange(@Qualifier("OrderQueue") Queue queue,@Qualifier("EbuyExchange") Exchange
exchange)
{
return BindingBuilder.bind(queue).to(exchange).with("order.*").noargs();
}
}
编写Controller(用于测试)
package cn.ebuy.rabbitmq.provider;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class rabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendProductMQ")
public String sendProductMQ()
{
rabbitTemplate.convertAndSend(RabbitMQConfig.EBUY_EXCHANGE,"product.insert", "商品新增,routing key 为product.insert");
return "success";
}
}
搭建消费者工程
创建工程
创建工程:ebuy-rabbitmqConsumer
添加依赖
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-amqp
添加启动类
package cn.ebuy.rabbitmqConsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
配置RabbitMQ配置文件
server:
port: 9090
spring:
rabbitmq:
username: qhj
password: *****
virtual-host: /crm
host: 127.0.0.1
port: 5672
消息监听处理类
package cn.ebuy.rabbitmqConsumer.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerListener {
@RabbitListener(queues = "product_queue1")
public void myListener(String message){
System.out.println("消费者接收到的产品消息为:" + message);
}
}
测试
在生产者工程中创建测试类,发送消息:
package cn.ebuy.rabbitmq;
import cn.ebuy.rabbitmq.provider.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EBUY_EXCHANGE,"item.delete","商品删除,routingkey为item.delete");
}
}
注意:要先运行测试类(交换机和嘟列才能先被声明和绑定),然后启动消费者;在消费者工程中查看控制台是否接收到对应消息。