
简单案例
其案例之前要确定你安装了RabbitMQ 并且启动了RiabbitMQ
启动lunix 并打开
一般我们会设置 Rabbit是自动启动的,但害怕失效.所以再重启下:
systemctl start rabbitmq-server我们再做几个简单案例,深度了解rabbitMQ
新建springboot 或maven工程
pox.xml样要
pom.xml的设置4.0.0 org.springframework.boot spring-boot-starter-parent2.3.2.RELEASE cn.tedu rabbitmq-springboot0.0.1-SNAPSHOT rabbitmq-springboot Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-maven-plugin
新建消费方consumer 提供,:producer测试方Main1
consumer(简单模式)111111111111--------------------------------------------------------------------------------------------------------------------------------------------------package cn.tedu.rabbitmqspringboot.m1;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = "helloworld")
public void receive(String msg){
System.out.println("收到: "+msg);
}
}
producer(简单模式)
package cn.tedu.rabbitmqspringboot.m1;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
//发送消息的封装工具
//RabbitAutoConfiguration 中创建了AmqpTemplate实例
@Autowired
private AmqpTemplate t;
public void send(){
//向 helloworld 队列发送消息
//队列的参数在启动类中设置
t.convertAndSend("helloworld","Hello world!" );
}
}
Main1(简单模式)
package cn.tedu.rabbitmqspringboot.m1;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
@SpringBootApplication
public class Main1 {
@Autowired
private Producer p;
public static void main(String[] args) {
SpringApplication.run(Main1.class,args);
}
//设置队列参数
@Bean
public Queue helloworldQueue(){
//非持久,非独占,不自动删除
return new Queue("helloworld",false,false,false);
}
@PostConstruct
public void test(){
// 在新线程中执行自己的运算,不阻塞 spring 主线程执行
new Thread(()->{
try {
Thread.sleep(3000L); //等消费者先启动(等helloworld队列创建出来)
} catch (InterruptedException e) {
}
p.send();
}).start();
}
}
task模式222222222222222222222222222222222222222222222222222222222222222------------------------------------------------------------------------------
Consumer(工作模式)
package cn.tedu.rabbitmqspringboot.m2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = "task_queue")
public void receive1(String msg){
System.out.println("消费1收到: "+msg);
}
@RabbitListener(queues = "task_queue")
public void receive2(String msg){
System.out.println("消费2收到: "+msg);
}
}
Mains (工作模式)
package cn.tedu.rabbitmqspringboot.m2;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
import java.util.Scanner;
@SpringBootApplication
public class Main2 {
@Autowired
private Producer p;
public static void main(String[] args) {
SpringApplication.run(Main2.class,args);
}
//设置队列参数
@Bean
public Queue taskQueue(){
//持久,非独占,不自动删除
//return new Queue("task_queue",true,false,false);
return new Queue("task_queue");//只给参数就是上面的默认值
}
@PostConstruct
public void test(){
// 在新线程中执行自己的运算,不阻塞 spring 主线程执行
new Thread(() -> {
while (true){
System.out.print("输入消息:");
String s = new Scanner(System.in).nextLine();
p.send(s);
}
}).start();
}
}
Producer
package cn.tedu.rabbitmqspringboot.m2;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
//发送消息的封装工具
//RabbitAutoConfiguration 中创建了AmqpTemplate实例
@Autowired
private AmqpTemplate t;
public void send(String s){
//向 helloworld 队列发送消息
//队列的参数在启动类中设置
t.convertAndSend("task_queue",s);
}
}
路由模式333333333333333333333333-----------------------------------------------------------------------------------------------------------------------------------------------------------------
Consumer
package cn.tedu.rabbitmqspringboot.m3;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//每个@RabbitListener都会注册成为一个消费者
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //队列,随机命名,false,true,true
// declare = "false" 不创建交换机,而是使用已存在的交换机
exchange = @Exchange(name = "logs", declare = "false") //交换机
))
public void receive1(String msg) {
System.out.println("消费者1收到:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "logs", declare = "false") //交换机
))
public void receive2(String msg) {
System.out.println("消费者2收到:"+msg);
}
}
Main3
package cn.tedu.rabbitmqspringboot.m3;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
import java.util.Scanner;
@SpringBootApplication
public class Main3 {
public static void main(String[] args) {
SpringApplication.run(Main3.class, args);
}
@Bean
public FanoutExchange logs() {
// 非持久,不自动删除
return new FanoutExchange("logs", false, false);
}
@Autowired
private Producer p;
@PostConstruct
public void test() {
// 在新线程中执行自己的运算,不阻塞 spring 主线程执行
new Thread(() -> {
while (true){
System.out.print("输入消息:");
String s = new Scanner(System.in).nextLine();
p.send(s);
}
}).start();
}
}
Producer
package cn.tedu.rabbitmqspringboot.m3;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
//发送消息的封装工具
//RabbitAutoConfiguration 中创建了AmqpTemplate实例
@Autowired
private AmqpTemplate t;
public void send(String s){
//向 helloworld 队列发送消息
//队列的参数在启动类中设置
t.convertAndSend("logs","",s);//对fanout交换机,第二个参数无效
}
}
发布和订阅模式4444444444---------------------------------------------------------------------------------------------------------------------------------------------------------------
Consumer
package cn.tedu.rabbitmqspringboot.m4;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//每个@RabbitListener都会注册成为一个消费者
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //队列,随机命名,false,true,true
// declare = "false" 不创建交换机,而是使用已存在的交换机
exchange = @Exchange(name = "direct_logs", declare = "false"),//交换机
key = {"error"}
))
public void receive1(String msg) {
System.out.println("消费者1收到:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "direct_logs", declare = "false"), //交换机
key = {"info","error","warning"}
))
public void receive2(String msg) {
System.out.println("消费者2收到:"+msg);
}
}
Main4
package cn.tedu.rabbitmqspringboot.m4;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
import java.util.Scanner;
@SpringBootApplication
public class Main4 {
public static void main(String[] args) {
SpringApplication.run(Main4.class, args);
}
@Bean
public DirectExchange logs() {
// 非持久,不自动删除
return new DirectExchange("direct_logs", false, false);
}
@Autowired
private Producer p;
@PostConstruct
public void test() {
// 在新线程中执行自己的运算,不阻塞 spring 主线程执行
new Thread(() -> {
while (true){
System.out.print("输入消息:");
String s = new Scanner(System.in).nextLine();
System.out.print("输入路由键消息:");
String k = new Scanner(System.in).nextLine();
p.send(s,k);
}
}).start();
}
}
Porducer
package cn.tedu.rabbitmqspringboot.m4;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
//发送消息的封装工具
//RabbitAutoConfiguration 中创建了AmqpTemplate实例
@Autowired
private AmqpTemplate t;
public void send(String s,String k){
//向 helloworld 队列发送消息
//队列的参数在启动类中设置
t.convertAndSend("direct_logs",k,s);//对fanout交换机,第二个参数无效
}
}
主题模式555555555--------------------------------------------------------------------------------------------------------------------------------------------------------------
Consumer
package cn.tedu.rabbitmqspringboot.m5;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//每个@RabbitListener都会注册成为一个消费者
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //队列,随机命名,false,true,true
// declare = "false" 不创建交换机,而是使用已存在的交换机
exchange = @Exchange(name = "topic_logs", declare = "false"),//交换机
key = {"*.orange.*"}
))
public void receive1(String msg) {
System.out.println("消费者1收到:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topic_logs", declare = "false"), //交换机
key = {"*.*.rabbit","lazy.#","*.warning.*"}
))
public void receive2(String msg) {
System.out.println("消费者2收到:"+msg);
}
}
Main5
package cn.tedu.rabbitmqspringboot.m5;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
import java.util.Scanner;
@SpringBootApplication
public class Main5 {
public static void main(String[] args) {
SpringApplication.run(Main5.class, args);
}
@Bean
public TopicExchange logs() {
// 非持久,不自动删除
return new TopicExchange("topic_logs", false, false);
}
@Autowired
private Producer p;
@PostConstruct
public void test() {
// 在新线程中执行自己的运算,不阻塞 spring 主线程执行
new Thread(() -> {
while (true){
System.out.print("输入消息:");
String s = new Scanner(System.in).nextLine();
System.out.print("输入路由键消息:");
String k = new Scanner(System.in).nextLine();
p.send(s,k);
}
}).start();
}
}
Porducer
package cn.tedu.rabbitmqspringboot.m5;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
//发送消息的封装工具
//RabbitAutoConfiguration 中创建了AmqpTemplate实例
@Autowired
private AmqpTemplate t;
public void send(String s,String k){
//向 helloworld 队列发送消息
//队列的参数在启动类中设置
t.convertAndSend("topic_logs",k,s);//对fanout交换机,第二个参数无效
}
}