栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ spring boot整合MabbitMQ(完整演练)

大数据系统 更新时间:发布时间: 百科书网 趣学号

简单案例

其案例之前要确定你安装了RabbitMQ 并且启动了RiabbitMQ 

启动lunix 并打开 

 

一般我们会设置 Rabbit是自动启动的,但害怕失效.所以再重启下:

systemctl start rabbitmq-server
我们再做几个简单案例,深度了解rabbitMQ

 新建springboot 或maven工程

pox.xml样要

pom.xml的设置



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.3.2.RELEASE
         
    
    cn.tedu
    rabbitmq-springboot
    0.0.1-SNAPSHOT
    rabbitmq-springboot
    Demo project for Spring Boot
    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


新建消费方consumer 提供,:producer测试方Main1

consumer(简单模式)111111111111--------------------------------------------------------------------------------------------------------------------------------------------------
package cn.tedu.rabbitmqspringboot.m1;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    @RabbitListener(queues = "helloworld")
    public void  receive(String msg){
        System.out.println("收到: "+msg);
    }


}
producer(简单模式)
package cn.tedu.rabbitmqspringboot.m1;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    //发送消息的封装工具
    //RabbitAutoConfiguration 中创建了AmqpTemplate实例
    @Autowired
    private AmqpTemplate t;

    public void send(){
        //向 helloworld 队列发送消息
        //队列的参数在启动类中设置
        t.convertAndSend("helloworld","Hello world!" );

    }
}
Main1(简单模式)
package cn.tedu.rabbitmqspringboot.m1;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main1 {

    @Autowired
    private  Producer p;

    public static void main(String[] args) {
        SpringApplication.run(Main1.class,args);
    }
    //设置队列参数
    @Bean
    public Queue helloworldQueue(){
        //非持久,非独占,不自动删除
        return new Queue("helloworld",false,false,false);
    }
    
    @PostConstruct
    public void test(){

        // 在新线程中执行自己的运算,不阻塞 spring 主线程执行
        new Thread(()->{
            try {
                Thread.sleep(3000L); //等消费者先启动(等helloworld队列创建出来)

            } catch (InterruptedException e) {

            }
            p.send();
        }).start();
    }

}
task模式222222222222222222222222222222222222222222222222222222222222222------------------------------------------------------------------------------

Consumer(工作模式)
package cn.tedu.rabbitmqspringboot.m2;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {
    @RabbitListener(queues = "task_queue")
    public void  receive1(String msg){
        System.out.println("消费1收到: "+msg);
    }
    @RabbitListener(queues = "task_queue")
    public void  receive2(String msg){
        System.out.println("消费2收到: "+msg);
    }
}
Mains (工作模式)
package cn.tedu.rabbitmqspringboot.m2;



import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;
import java.util.Scanner;

@SpringBootApplication
public class Main2 {

    @Autowired
    private Producer p;

    public static void main(String[] args) {
        SpringApplication.run(Main2.class,args);
    }
    //设置队列参数
    @Bean
    public Queue taskQueue(){
        //持久,非独占,不自动删除
        //return new Queue("task_queue",true,false,false);
        return new Queue("task_queue");//只给参数就是上面的默认值
    }
    
    @PostConstruct
    public void test(){

        // 在新线程中执行自己的运算,不阻塞 spring 主线程执行
        new Thread(() -> {
            while (true){
                System.out.print("输入消息:");
                String s = new Scanner(System.in).nextLine();
                p.send(s);
            }
        }).start();
    }

}


Producer
package cn.tedu.rabbitmqspringboot.m2;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    //发送消息的封装工具
    //RabbitAutoConfiguration 中创建了AmqpTemplate实例
    @Autowired
    private AmqpTemplate t;

    public void send(String s){
        //向 helloworld 队列发送消息
        //队列的参数在启动类中设置
        t.convertAndSend("task_queue",s);
        

    }
}


路由模式333333333333333333333333----------------------------------------------------------------------------------------------------------------------------------------------------------------- Consumer
package cn.tedu.rabbitmqspringboot.m3;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//每个@RabbitListener都会注册成为一个消费者
@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,   //队列,随机命名,false,true,true
            // declare = "false" 不创建交换机,而是使用已存在的交换机
            exchange = @Exchange(name = "logs", declare = "false") //交换机
    ))
    public void receive1(String msg) {
        System.out.println("消费者1收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "logs", declare = "false") //交换机
    ))
    public void receive2(String msg) {
        System.out.println("消费者2收到:"+msg);
    }
}

Main3
package cn.tedu.rabbitmqspringboot.m3;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;
import java.util.Scanner;
@SpringBootApplication
public class Main3 {

    public static void main(String[] args) {
        SpringApplication.run(Main3.class, args);
    }

    @Bean
    public FanoutExchange logs() {
        // 非持久,不自动删除
        return new FanoutExchange("logs", false, false);
    }

    @Autowired
    private Producer p;
    
    @PostConstruct
    public void test() {
        // 在新线程中执行自己的运算,不阻塞 spring 主线程执行
        new Thread(() -> {
            while (true){
                System.out.print("输入消息:");
                String s = new Scanner(System.in).nextLine();
                p.send(s);
            }
        }).start();
    }

}
Producer
package cn.tedu.rabbitmqspringboot.m3;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    //发送消息的封装工具
    //RabbitAutoConfiguration 中创建了AmqpTemplate实例
    @Autowired
    private AmqpTemplate t;

    public void send(String s){
        //向 helloworld 队列发送消息
        //队列的参数在启动类中设置
        t.convertAndSend("logs","",s);//对fanout交换机,第二个参数无效
        

    }
}




发布和订阅模式4444444444--------------------------------------------------------------------------------------------------------------------------------------------------------------- Consumer
package cn.tedu.rabbitmqspringboot.m4;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//每个@RabbitListener都会注册成为一个消费者
@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,   //队列,随机命名,false,true,true
            // declare = "false" 不创建交换机,而是使用已存在的交换机
            exchange = @Exchange(name = "direct_logs", declare = "false"),//交换机
            key = {"error"}
    ))
    public void receive1(String msg) {
        System.out.println("消费者1收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "direct_logs", declare = "false"), //交换机
            key = {"info","error","warning"}
    ))
    public void receive2(String msg) {
        System.out.println("消费者2收到:"+msg);
    }
}

Main4
package cn.tedu.rabbitmqspringboot.m4;


import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;
import java.util.Scanner;

@SpringBootApplication
public class Main4 {

    public static void main(String[] args) {
        SpringApplication.run(Main4.class, args);
    }

    @Bean
    public DirectExchange logs() {
        // 非持久,不自动删除
        return new DirectExchange("direct_logs", false, false);
    }

    @Autowired
    private Producer p;
    
    @PostConstruct
    public void test() {
        // 在新线程中执行自己的运算,不阻塞 spring 主线程执行
        new Thread(() -> {
            while (true){
                System.out.print("输入消息:");
                String s = new Scanner(System.in).nextLine();
                System.out.print("输入路由键消息:");
                String k = new Scanner(System.in).nextLine();
                p.send(s,k);
            }
        }).start();
    }

}
Porducer
package cn.tedu.rabbitmqspringboot.m4;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    //发送消息的封装工具
    //RabbitAutoConfiguration 中创建了AmqpTemplate实例
    @Autowired
    private AmqpTemplate t;

    public void send(String s,String k){
        //向 helloworld 队列发送消息
        //队列的参数在启动类中设置
        t.convertAndSend("direct_logs",k,s);//对fanout交换机,第二个参数无效
        

    }
}



主题模式555555555-------------------------------------------------------------------------------------------------------------------------------------------------------------- Consumer
package cn.tedu.rabbitmqspringboot.m5;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//每个@RabbitListener都会注册成为一个消费者
@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,   //队列,随机命名,false,true,true
            // declare = "false" 不创建交换机,而是使用已存在的交换机
            exchange = @Exchange(name = "topic_logs", declare = "false"),//交换机
            key = {"*.orange.*"}
    ))
    public void receive1(String msg) {
        System.out.println("消费者1收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "topic_logs", declare = "false"), //交换机
            key = {"*.*.rabbit","lazy.#","*.warning.*"}
    ))
    public void receive2(String msg) {
        System.out.println("消费者2收到:"+msg);
    }
}

Main5
package cn.tedu.rabbitmqspringboot.m5;


import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;
import java.util.Scanner;

@SpringBootApplication
public class Main5 {

    public static void main(String[] args) {
        SpringApplication.run(Main5.class, args);
    }

    @Bean
    public TopicExchange logs() {
        // 非持久,不自动删除
        return new TopicExchange("topic_logs", false, false);
    }

    @Autowired
    private Producer p;
    
    @PostConstruct
    public void test() {
        // 在新线程中执行自己的运算,不阻塞 spring 主线程执行
        new Thread(() -> {
            while (true){
                System.out.print("输入消息:");
                String s = new Scanner(System.in).nextLine();
                System.out.print("输入路由键消息:");
                String k = new Scanner(System.in).nextLine();
                p.send(s,k);
            }
        }).start();
    }

}
Porducer
package cn.tedu.rabbitmqspringboot.m5;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    //发送消息的封装工具
    //RabbitAutoConfiguration 中创建了AmqpTemplate实例
    @Autowired
    private AmqpTemplate t;

    public void send(String s,String k){
        //向 helloworld 队列发送消息
        //队列的参数在启动类中设置
        t.convertAndSend("topic_logs",k,s);//对fanout交换机,第二个参数无效
        

    }
}



转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/601114.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号