
2. application.properties配置org.springframework.boot spring-boot-starter-amqp
# RabbitMQ配置 spring.rabbitmq.host=192.168.56.10 spring.rabbitmq.port=5672 # 虚拟主机配置**********这些是消息抵达配置**** spring.rabbitmq.virtual-host=/ # 开启发送端消息抵达Broker确认 spring.rabbitmq.publisher-confirms=true # 开启发送端消息抵达Queue确认 spring.rabbitmq.publisher-returns=true # 只要消息抵达Queue,就会异步发送优先回调returnfirm spring.rabbitmq.template.mandatory=true # 手动ack消息,不使用默认的消费端确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual3.确认抵达
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认 机制
publisher confirmCallback 确认模式
publisher returnCallback 未投递到 queue 退回模式
consumer ack机制
spring.rabbitmq.publisher-confirms=true
在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启
confirmcallback 。
CorrelationData:用来表示当前消息唯一性。
消息只要被 broker 接收到就会执行 /confirm/iCallback,如果是 cluster 模式,需要所有
broker 接收到才会调用 /confirm/iCallback。
被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递 到目标 queue 里。所以需要用到接下来的 returnCallback 。
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
package com.xunqi.gulimall.order.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class MyRabbitConfig {
private RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("/confirm/i...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}