栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot中使用rabbitmq的几种模式

大数据系统 更新时间:发布时间: 百科书网 趣学号

文章目录
      • 简单模式
      • 工作模式
      • 发布和订阅模式
      • 路由模式
      • 主题模式

简单模式

设置启动类

//消息的简单模式
@SpringBootApplication
public class Main1 {
    public static void main(String[] args) {
        SpringApplication.run(Main1.class,args);
    }
    @Bean
    public Queue helloworld(){
        return new Queue("helloworld",false,false,false);
    }
    @Autowired
    private Producer p;

    
    @PostConstruct
    public void test(){
        //在新的线程中执行自己的运算,不阻塞spring主线程执行
        new Thread(()->{
            try {
                Thread.sleep(3000L);//等待helloworld队列创建
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            p.send();
        }).start();
    }
}

生产者

@Component
public class Producer {
    //发送消息的封装工具
    @Autowired
    private AmqpTemplate t;

    public void send(){
        //队列的参数在启动类中设置
        t.convertAndSend("helloworld","helloworld");
    }

}

消费者

//@RabbitListener(queues = "helloworld")//也可以注解到方法上,就不需要RabbitHandler
@Component
public class Consumer {
    @RabbitListener(queues = "helloworld")
    public void receive(String msg){
        System.out.println("收到"+msg);
    }

}
工作模式

启动类

//消息的工作模式
@SpringBootApplication
public class Main2 {
    public static void main(String[] args) {
        SpringApplication.run(Main2.class,args);
    }

    @Bean
    public Queue TaskQueue(){
        return new Queue("task_queue",true,false,false);
    }
    @Autowired
    private Producer p;

    
    @PostConstruct
    public void test(){
        //在新的线程中执行自己的运算,不阻塞spring主线程执行
        new Thread(() -> {
            while (true){
            System.out.println("输入消息:");
            String s = new Scanner(System.in).nextLine();
            p.send(s);}
        }).start();

    }
}

生产者

@Component
public class Producer {
    //发送消息的封装工具
    @Autowired
    private AmqpTemplate t;

    public void send(String s){
        //队列的参数在启动类中设置
        t.convertAndSend("task_queue",s);
        
    }
}

消费者

@Component
public class Consumer {
    @RabbitListener(queues = "task_queue")
    public void receive1(String msg){
        System.out.println("消费者1收到"+msg);
    }
    @RabbitListener(queues = "task_queue")
    public void receive2(String msg){
        System.out.println("消费者2收到"+msg);
    }

}

发布和订阅模式

启动类中配置

  @Bean
    public FanoutExchange logs(){
        return new FanoutExchange("logs",false,false);
    }

生产者

@Component
public class Producer {
    //发送消息的封装工具
    @Autowired
    private AmqpTemplate t;

    public void send(String s){
        //队列的参数在启动类中设置
        t.convertAndSend("logs","",s);//第二个参数(路由键)可以不写,没用
    }

}

消费者

@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//队列,默认,随机命名,false,true,true
            //declare="false"表示不创建,使用已存在的交换机
            exchange = @Exchange(value = "logs",declare = "false")//交换机
    ))
    public void receive1(String msg){
        System.out.println("消费者1收到"+msg);
    }
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//队列,默认,随机命名,false,true,true
            exchange = @Exchange(value = "logs",declare = "false")//交换机
    ))
    public void receive2(String msg){
        System.out.println("消费者2收到"+msg);
    }
}

路由模式

启动类

@Bean
    public DirectExchange logs(){
        return new DirectExchange("direct_logs",false,false);
    }

生产者

@Component
public class Producer {
    //发送消息的封装工具
    @Autowired
    private AmqpTemplate t;

    public void send(String k, String s){
        //队列的参数在启动类中设置
        t.convertAndSend("direct_logs",k,s);
    }
}

消费者
多加个key键

@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//队列,默认,随机命名,false,true,true
            //declare="false"表示不创建,使用已存在的交换机
            exchange = @Exchange(value = "direct_logs",declare = "false"),//交换机
            key = {"error"}
    ))
    public void receive1(String msg){
        System.out.println("消费者1收到"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//队列,默认,随机命名,false,true,true
            exchange = @Exchange(value = "direct_logs",declare = "false"),//交换机
            key = {"info","error","warning"}
    ))
    public void receive2(String msg){
        System.out.println("消费者2收到"+msg);
    }
}

主题模式

启动类修改

@Bean
    public TopicExchange logs(){
        return new TopicExchange("topic_logs",false,false);
    }

生产者修改

public void send(String k, String s){
        //队列的参数在启动类中设置
        t.convertAndSend("topic_logs",k,s);
    }

消费者修改

@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//队列,默认,随机命名,false,true,true
            //declare="false"表示不创建,使用已存在的交换机
            exchange = @Exchange(value = "topic_logs",declare = "false"),//交换机
            key = {"*.orange.*"}
    ))
    public void receive1(String msg){
        System.out.println("消费者1收到"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,//队列,默认,随机命名,false,true,true
            exchange = @Exchange(value = "topic_logs",declare = "false"),//交换机
            key = {"*.*.rabbit","lazy.#"}
    ))
    public void receive2(String msg){
        System.out.println("消费者2收到"+msg);
    }
}
转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/601023.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号