栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

[点开即用]Spring Boot 整合rabbitMq

Java 更新时间:发布时间: 百科书网 趣学号
Spring Boot 整合 RabbitMQ 1、依赖
            
            
                org.springframework.boot
                spring-boot-starter-amqp
                ${rabbitmq.version}
            
2、配置
spring:
  rabbitmq:
    addresses: 127.0.0.1
    username: guest  # 改账户只有在本机(localhost)下可以使用,远程环境需要自己新增帐号
    password: guest
    port: 5672
    virtual-host: /
    publisher-/confirm/i-type: correlated
    publisher-returns: true
    # 将ack 自动应答改为手动应答
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          # 开启重试 只有在自动ack模式下有效
          enabled: true
          # 最大重试次数
          max-attempts: 10
          # 重试间隔时间
          initial-interval: 2000ms
2.1、配置类配置(主要配置交换机和队列之间的绑定关系)
package com.weinigb.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class RabbitMqConfiguration {
    @Bean
    public FanoutExchange fanoutExchange(){
        // 交换机名字,是否持久花,是否重启删除
        return new FanoutExchange("fanout_test_exchange",true,false);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_exchange",true,false);
    }

    @Bean
    public Queue directQueue() {
        Map params = new HashMap<>();
        // 设置队列内消息的自动过期时间
        params.put("x-message-ttl",5000);
        // 设置队列的死信队列
        params.put("x-dead-letter-exchange", "队列名");
        // 设置死信队列的路由key
        params.put("x-dead-letter-routing-key","routekey");
        // 设置队列的最大消息数量
        params.put("x-max-length",5);
        // 队列名字 是否持久化
        return new Queue("direct_queuq", true);
    }
    @Bean
    public Queue testQueue(){
        return new Queue("test_queue", true);
    }
    @Bean
    public Binding binding(){
        // 队列名.to 交换机名字
        return BindingBuilder.bind(testQueue()).to(fanoutExchange());
    }

    @Bean
    Binding directBinding() {
        // with(路由key)
        return            BindingBuilder.bind(directQueue()).to(directExchange()).with("routeKey");
    }

}

2.2、使用注解进行关系绑定
// 在类上加注解实现关系绑定
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "队列名",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "交换机名称",type = ExchangeTypes.TOPIC),
        key = "路由key"
))
public class TopicSMSCOnsumer(){
    
}
3、调用方法 3.1、生产者
package com.weinigb;

import com.alibaba.nacos.common.utils.UuidUtils;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.nio.charset.StandardCharsets;


@SpringBootTest
public class RabbitMqConnectTest {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sendOrder(){
        String orderId = UuidUtils.generateUuid();
        // 交换机名称
        String exchangeName = "fanout_test_exchange";
        // 路由key
        String routeKey = "";
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(exchangeName,routeKey,"Hello Spring Boot Rabbit Mq"+i);
        }
    }
}

3.2、消费者
package com.weinigb.service;

import jdk.internal.org.objectweb.asm.tree.analysis.Value;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;


@Service
//@RabbitListener(queues = {"test_queue"})
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "队列名",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "交换机名称",type = ExchangeTypes.TOPIC),
        key = "路由key"
))
public class RabbitMqTestService {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println(message);
    }
}

4、消息可靠性配置 4.1、生产者
  • 利用发送消息之后mq返回的信息来进行回调判断。
package com.weinigb.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;


@Configuration
public class RabbitMqTemplateConfig implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnsCallback {
    @Autowired
    RabbitTemplate rabbitTemplate;

    
    @PostConstruct
    public void init(){
        //指定 /confirm/iCallback
        rabbitTemplate.set/confirm/iCallback(this);
        //指定 ReturnCallback
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println(correlationData.getId()+"  "+ack+"  "+cause);
    }

    @Override
    
    public void returnedMessage(ReturnedMessage returned) {
        // 消息
        System.out.println(returned.getMessage());
        // 交换机
        System.out.println(returned.getExchange());
        // 路由key
        System.out.println(returned.getRoutingKey());
        // 错误码
        System.out.println(returned.getReplyCode());
        // 错误信息
        System.out.println(returned.getReplyText());
    }
}
4.2、消费者
  • 通过手动ack来进行消息的正确消费
  • 注意需要开启配置 具体配置看上面的 第二部分
package com.weinigb.service;

import com.rabbitmq.client.Channel;
import jdk.internal.org.objectweb.asm.tree.analysis.Value;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

import java.io.IOException;


@Service
@RabbitListener(queues = {"test_queue"})
//@RabbitListener(bindings = @QueueBinding(
//        value = @Queue(value = "队列名",durable = "true",autoDelete = "false"),
//        exchange = @Exchange(value = "交换机名称",type = ExchangeTypes.TOPIC),
//        key = "路由key"
//))
public class RabbitMqTestService {
    
    @RabbitHandler
    public void reviceMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("long:->"+tag+"  消息:"+message);
        try {
            
            channel.basicAck(tag,true);
        } catch (IOException e) {
            
            channel.basicNack(tag,false,false);
            e.printStackTrace();
        }
    }
}

4.3、具体的实现逻辑
# 方案一:利用/confirm/i消息确认机制
	前面讲完了RabbitMQ自带的/confirm/i消息确认机制和Return机制,而实现消息可靠性投递的第一个方案就是利用该确认机制

实现思路如下:
	1、在生产端向消息队列发送消息前,首先将业务信息和对应的消息信息入库(如生成订单时,需要修改数据库中的订单表和订单消息表),其中订单消息表中有一个记录该消息是否发送成功的字段
	2、向消息队列发送该消息,并在发送前设置好/confirm/i消息的监听器
	3、如果收到/confirm/i消息,代表该消息已发送成功,那么就可以将订单消息表中的发送状态改为发送成功
	4、设置一定时任务去抓取订单消息表中没有没有发送成功的消息,并进行重新发送
	5、如果重新发送了几次后消息都没有发送成功,则将其状态修改为发送失败,后续进行人工补偿
	该方案的缺点是在发送消息前,需要进行两次落库操作(修改数据库中的订单表和订单消息表),因此会对性能造成一定影响
# 方案二:消息延迟投递,做二次确认,回调检查
实现思路如下:

	1、生产者消息发送前,只需要将业务信息入库(如修改订单表)
	2、向MQ发送该消息以及一条延迟消息(其中第一条消息由消费者接收,第二条消息由独立的Callback服务接收)
	3、消费者收到第一条消息后,将接收成功的消息回送给MQ(该回送消息也由Callback服务接收)
	4、Callback服务收到消费者接收成功的消息,将该消息入库
	5、一段时间后,Callback服务又收到生产者的延迟消息,它根据该延迟消息的id信息去查找数据库中有没有该条记录
	6、如果查到了,说明该消息已成功投递且被消费者成功消费
	7、如果没查到,说明该消息没有被消费者成功消费,可能是没有投递成功,这时Callback服务再去远程调用生产者告知其重新发送消息
	该方案优点是在生产者端只需要入库一次,而将消息的入库操作独立到了Callback服务中去,提升了生产者端的性能。但是该方案实现较复杂,里面还有很多的细节值得考虑

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/295132.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号