
1、导入依赖
org.springframework.amqp spring-rabbit-testtest org.springframework.boot spring-boot-starter-amqp
2、配置信息
①application.properties
spring.rabbitmq.host=10.15.70.40 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-/confirm/i-type=correlated spring.rabbitmq.publisher-returns=true 还有没列举
②application.yml
spring:
rabbitmq:
port: 5672
host: 10.15.70.40
username: guest
password: guest
#这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
publisher-/confirm/i-type: correlated
#保证交换机能把消息推送到队列中
publisher-returns: true
virtual-host: /
#这个配置是保证消费者会消费消息,手动确认
listener:
simple:
acknowledge-mode: manual
template:
mandatory: true
3、编写配置类(即图中的1部分)
①把队列名、交换机名字和路由key名字写好
②创建交换机
③创建队列
④把队列和交换机通过路由key进行绑定。
@Configuration
public class ConfirmConfig {
//交换机
public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i_exchange";
//队列
public static final String /confirm/i_QUEQUE_NAME = "/confirm/i_queue";
//路由Key
public static final String /confirm/i_ROUTING_KEY = "key1";
//备份交换机
public static final String BACKUP_EXCHANGE_NAME= "back_exchange";
//备份队列
public static final String BACKUP_QUEQUE_NAME = "/confirm/i_queue";
//报警队列
public static final String WARNING_QUEEUE_NAME= "warning_queue";
//声明交换机
@Bean("/confirm/iExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(/confirm/i_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
}
@Bean("/confirm/iQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(/confirm/i_QUEQUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("/confirm/iQueue") Queue /confirm/iQueue,@Qualifier("/confirm/iExchange") DirectExchange /confirm/iExchange){
return BindingBuilder.bind(/confirm/iQueue).to(/confirm/iExchange).with(/confirm/i_ROUTING_KEY);
}
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEQUE_NAME).build();
}
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEEUE_NAME).build();
}
@Bean
public Binding backupQueueBindingbackupExchange(@Qualifier("backupQueue") Queue backupQueue,@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding warningQueueBindingbackupExchange(@Qualifier("warningQueue") Queue warningQueue,@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
4、编写生产者(图中2部分)
@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
//开始发消息 测试确认
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(/confirm/iConfig./confirm/i_EXCHANGE_NAME,
/confirm/iConfig./confirm/i_ROUTING_KEY,message,correlationData);
log.info("发送消息内容:{}",message);
}
}
5、消费者(即图中3部分)
@Slf4j
@Component
public class Consumer {
//接收消息
@RabbitListener(queues = /confirm/iConfig./confirm/i_QUEQUE_NAME)
public void receiveConfirmMessage(Message message){
String msg = new String(message.getBody());
log.info("接收到的队列/confirm/i.queue消息:{}",msg);
}
}