
DefaultMQProducer producer = new DefaultMQProducer("xxoogp");
//设置nameserver地址
producer.setNamesrvAddr("192.168.150.113:9876");
producer.start();
//topic 消息将要发送到的地址 body消息中的具体数据
Message message = new Message("topic","xxoo第一条".getBytes());
//同步消息发送 发了过后要等待收到消息的信号SendResult 阻塞状态 提供比较强的到达率 不会丢消息
SendResult sendResult = producer.send(message);
DefaultMqProducer()最基本信息的就发送了。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xxoocs");
consumer.setNamesrvAddr("192.168.150.113:9876");
//每个consumer关注一个topic
//topic 关注的消息的地址
//过滤器 * 表示不过滤
consumer.subscribe("topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
String msg = new String(messageExt.getBody());
}
//默认情况下,这条消息只会被 一个consumer消费到 点对点 消费完后
// message 状态修改
// ack 回应 比如三次握手也有ack
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
生产者和消费者就这样注册到了nameserver 然后进行消息发送和消费
发送方式 批量消息发送因为之前发送的消息是同步消息 也就意味的阻塞 比较慢 所以有了批量发送。
可以多条消息打包一起发送,减少网络传输次数提高效率。
send(Collection c) 可以接收一个集合 实现批量发送
//异步可靠消息 不会阻塞等待broker确认,采用事件监听接收broker返回的确认
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//发送成功
}
@Override
public void onException(Throwable e) {
//如果发生异常 case异常,尝试重投
//或者调整业务逻辑
}
});
单向消息
还有一种单向消息 不需要回应,效率最高。此方式发送消息的过程耗时非常短,一般在微秒级别。但是是很有可能丢消息的。
producer.sendoneway(message);消费模式
默认的集群模式 就只会消费一次,广播模式就会消费多次
consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式 consumer.setMessageModel(MessageModel.CLUSTERING);//默认集群模式 一组consumer集群消息
集群消息是指集群化部署消费者
当使用集群消费模式时,MQ认为任意一条消息只需要被集群内的任意一个消费者处理即可。
特点
当使用广播消费模式时,MQ会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
特点
tag selector在一个group中的消费者,都不能随便变,要保持统一
TAGTAG 是用来过滤消息,消息分组
在producer中使用tag
Message message = new Message("topic","TAG-A","KEY-xxoo","xxoo第一条".getBytes());
在Consumer中订阅Tag:
consumer.subscribe("topic","TAG-A||TAG-B"); 来匹配过滤的消息
SQL表达式
消费者将收到包含TAGA或TAGB的消息,但限制是一条消息只能由一个标签,而者对于复杂的情况可能无效,在这种情况下,可以使用SQL表达式筛选出消息
MessageSelector messageSelector = MessageSelector.bySql("age >= 18 and age <= 60");
consumer.subscribe("topic",messageSelector);
配置
在broker.conf中添加配置
enablePropertyFilter=true