栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ整合SpringBoor以及RabbitMQ的延迟队列

Java 更新时间:发布时间: 百科书网 趣学号

延迟队列的概念

前面介绍了死信队列,针对于ttl进入死信队列的情况,假如我们把前面的消费者一关闭,然后对所有的消息都进行设置过期时间,这样是不是就形成了一个延迟队列了?

使用场景:比如订单超时关闭,假如我们使用定时任务,假如数据量很大的情况下肯定会存在问题,因为需要检查是否达到定时时间,这么大的数据量,肯定会很占用时间,所以这个时候使用延迟队列就比较合适了,

创建SpringBoot项目

创建SpringBoot工程,添加依赖



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.6.8
         
    
    com.dongmu.rabbitmq
    springboot-rabbitmq
    0.0.1-SNAPSHOT
    springboot-rabbitmq
    springboot-rabbitmq
    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
        org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            com.alibaba
            fastjson
            1.2.47
        
        
            org.projectlombok
            lombok
        
        
        
            io.springfox
            springfox-swagger2
            2.9.2
        
        
            io.springfox
            springfox-swagger-ui
            2.9.2
        
        
        
            org.springframework.amqp
            spring-rabbit-test
            test
        


    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    



在配置文件中添加,里面写自己的东西。

spring.rabbitmq.host=**.***.***.***
spring.rabbitmq.port=5672
spring.rabbitmq.username=****
spring.rabbitmq.password=***

下面我们实现下面这个场景

编写配置类

package com.dongmu.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration

public class RabbitMqTTLConfig {


    //普通交换机名称
    public static final String NORMAL_EXHANGE = "X";
    //死信交换机名称
    public static final String DEAD_EXHANGE = "Y";
    //普通队列名称
    public static final String NORMAL_QUEUEA = "QA";
    public static final String NORMAL_QUEUEB = "QB";
    //死信队列名称
    public static final String DEAD_QUEUED = "QD";
    
//    设置两个交换机

    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(NORMAL_EXHANGE);
    }

    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(DEAD_EXHANGE);
    }

    
    //分别创建三个队列
    @Bean("queueA")
    public Queue queueA(){
        Map stringObjectMap = new HashMap<>(3);
        //设置死信交换机
        stringObjectMap.put("x-dead-letter-exchange",DEAD_EXHANGE);
        //设置死信队列
        stringObjectMap.put("x-dead-letter-routing-key","YD");
        //设置TTL
        stringObjectMap.put("x-message-ttl",10000);

        return QueueBuilder.durable(NORMAL_QUEUEA).withArguments(stringObjectMap).build();
    }

    @Bean("queueB")
    public Queue queueB(){
        Map stringObjectMap = new HashMap<>(3);
        //设置死信交换机
        stringObjectMap.put("x-dead-letter-exchange",DEAD_EXHANGE);
        //设置死信队列
        stringObjectMap.put("x-dead-letter-routing-key","YD");
        //设置TTL
        stringObjectMap.put("x-message-ttl",40000);

        return QueueBuilder.durable(NORMAL_QUEUEB).withArguments(stringObjectMap).build();
    }

    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_QUEUED).build();
    }

    
    //把队列和交换机进行绑定,并且设置routingkey
    @Bean
    public Binding queueA_X(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    @Bean
    public Binding queueB_X(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    @Bean
    public Binding queueD_Y(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

写发送消息的Controller

package com.dongmu.rabbitmq.controller;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
@Slf4j
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendmsg/{message}")
    public void sendMsg (@PathVariable("message") String message){
        log.info("当前时间:{},发送一条消息给两个ttl队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40秒:"+message);

    }
}

写处理延时消息的消费者

package com.dongmu.rabbitmq.controller;


import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@Slf4j
@Component
public class DeadQueueConsumerController {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel){
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("当前时间:{},发送一条消息给两个ttl队列:{}结束。",new Date().toString(),msg);
    }

}

结果


可以发现延时队列实现成功。

延迟队列优化

我们上面的队列都是每个队列设置一个消息的超时时间,所以我们每设置一个延时任务都需要重新设置一个队列,这样肯定是不合理的。如何进行优化呢?

我们可以设置一个队列不设置ttl

添加队列

public static final String NORMAL_QUEUEC = "QC";

@Bean("queueC")
    public Queue queueC(){
        Map stringObjectMap = new HashMap<>();
        //设置死信交换机
        stringObjectMap.put("x-dead-letter-exchange",DEAD_EXHANGE);
        //设置死信队列
        stringObjectMap.put("x-dead-letter-routing-key","YD");

        return QueueBuilder.durable(NORMAL_QUEUEC).withArguments(stringObjectMap).build();
    }

将队列和交换机进行绑定

@Bean
    public Binding queueC_X(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

编写Controller层在交互的时候设置过期时间,这样就就可以做到同一个队列实现不同的延迟任务了

@GetMapping("/sendExpirationMsg/{message}/{ttltime}")
    public void sendExpirationMsg (@PathVariable("message") String message,@PathVariable("ttltime") String ttltime){
        log.info("当前时间:{},发送一条延迟消息给普通ttl队列C:{},时长是"+ttltime+"毫秒。",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XC","消息来自ttlC时间为"+ttltime+"毫秒:"+message,mesg->{
            mesg.getMessageProperties().setExpiration(ttltime);
            return mesg;
        });

    }

访问测试

可以发现访问测试成功。

但是还是存在问题的,假如我很快地发送两条消息一个20秒,一个两秒,效果如下

可以发现这个延迟任务时间到了,但是队列前面还有元素,它会等待队列前面的元素也出队了之后才会进入死信队列。

所以这种方式虽然只需要一个队列,但是这个方式就无法保证过期的消息立马执行。也就是只能检测第一条消息是否到期。怎么解决呢?

采用延迟队列的方式

在官网上下载插件 ,rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。

  • 安装方式

把插件拷贝在我们rabbitmq的安装目录中的一个文件夹下

[root@iZwz9hv1phm24s3jicy8x1Z rabbitmq]# ll
total 34844
-rw-r--r-- 1 root root 18850824 May 15 22:48 erlang-21.3-1.el7.x86_64.rpm
-rw-r--r-- 1 root root  1254680 Jun  1 22:02 openssl-libs-1.0.2k-19.el7.x86_64.rpm
-rw-r--r-- 1 root root    43377 May 15 22:48 rabbitmq_delayed_message_exchange-3.8.0.ez
-rw-r--r-- 1 root root 15520399 May 15 22:48 rabbitmq-server-3.8.8-1.el7.noarch.rpm
[root@iZwz9hv1phm24s3jicy8x1Z rabbitmq]# cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
[root@iZwz9hv1phm24s3jicy8x1Z rabbitmq]# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
[root@iZwz9hv1phm24s3jicy8x1Z plugins]# /sbin/service rabbitmq-server restart
Redirecting to /bin/systemctl restart rabbitmq-server.service

重新启动之后我们就可以在服务器中的可视化界面里面看到

这个延迟消息的实现是由交换机来实现为不是通过队列来实现的了。
交换机来负责延迟消息,延迟时间到了之后才会把消息传递给队列。

  • 具体实现


编写配置类

package com.dongmu.rabbitmq.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayQueueConfig {
    //延迟队列名称
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //延迟交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //延迟routingkey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
                args);
    }
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange
                                               delayedExchange) {
        return
                BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

编写Controller代码

 @GetMapping("/sendDelayMsg/{message}/{ttltime}")
    public void sendDelayMsg (@PathVariable("message") String message,@PathVariable("ttltime") int ttltime){
        log.info("当前时间:{},发送一条延迟消息给延迟交换机:{},时长是"+ttltime+"毫秒。",new Date().toString(),message);
        rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE_NAME,DelayQueueConfig.DELAYED_ROUTING_KEY,
                "消息来自ttlC时间为"+ttltime+"毫秒:"+message, mesg->{
            mesg.getMessageProperties().setDelay(ttltime);
            return mesg;
        });
    }

编写消费者代码

@RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message, Channel channel){
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("当前时间:{},收到延迟交换机发送给队列的消息:{}结束。",new Date().toString(),msg);
    }

然后我们继续同时发送两条消息可以发现上面的问题已经解决了

所以总的来说

  • 第一种方式创建队列很麻烦
  • 第二种存在延迟时间到了无法处理的问题
  • 第三种基于插件的实现方式
转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/956998.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号