栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

rocketmq 常用api(一)

大数据系统 更新时间:发布时间: 百科书网 趣学号
消息常用API
DefaultMQProducer producer = new DefaultMQProducer("xxoogp");
//设置nameserver地址
producer.setNamesrvAddr("192.168.150.113:9876");
producer.start();

//topic 消息将要发送到的地址 body消息中的具体数据
Message message = new Message("topic","xxoo第一条".getBytes());
//同步消息发送 发了过后要等待收到消息的信号SendResult 阻塞状态 提供比较强的到达率 不会丢消息
SendResult sendResult = producer.send(message);

DefaultMqProducer()最基本信息的就发送了。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xxoocs");
consumer.setNamesrvAddr("192.168.150.113:9876");
//每个consumer关注一个topic
//topic 关注的消息的地址
//过滤器 * 表示不过滤
consumer.subscribe("topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt messageExt : list) {
            String msg = new String(messageExt.getBody());
        }
        //默认情况下,这条消息只会被 一个consumer消费到 点对点 消费完后
        // message 状态修改
        // ack 回应 比如三次握手也有ack
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

生产者和消费者就这样注册到了nameserver 然后进行消息发送和消费

发送方式 批量消息发送

因为之前发送的消息是同步消息 也就意味的阻塞 比较慢 所以有了批量发送。

可以多条消息打包一起发送,减少网络传输次数提高效率。

send(Collection c) 可以接收一个集合 实现批量发送

  • 批量消息要求必要具有同一topic、相同消息配置
  • 不支持延时消息
  • 官方建议一个批量消息最好不超过1MB大小
  • 如果不确定是否超过限制,可以手动计算大小分批发送

异步消息
//异步可靠消息 不会阻塞等待broker确认,采用事件监听接收broker返回的确认
producer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        //发送成功
    }

    @Override
    public void onException(Throwable e) {
        //如果发生异常 case异常,尝试重投
        //或者调整业务逻辑
    }
});
单向消息

还有一种单向消息 不需要回应,效率最高。此方式发送消息的过程耗时非常短,一般在微秒级别。但是是很有可能丢消息的。

producer.sendoneway(message);
消费模式

默认的集群模式 就只会消费一次,广播模式就会消费多次

consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
consumer.setMessageModel(MessageModel.CLUSTERING);//默认集群模式 一组consumer
集群消息

集群消息是指集群化部署消费者

当使用集群消费模式时,MQ认为任意一条消息只需要被集群内的任意一个消费者处理即可。

特点

  • 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
  • 在消息重投时,不能保证路由到同一台机器上
  • 消费状态由broker维护

广播消息

当使用广播消费模式时,MQ会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。

特点

  • 消费进度由consumer维护
  • 保证每个消费者消费一次消息
  • 消费失败的消息不会重投

消息过滤

tag selector在一个group中的消费者,都不能随便变,要保持统一

TAG

TAG 是用来过滤消息,消息分组

在producer中使用tag

Message message = new Message("topic","TAG-A","KEY-xxoo","xxoo第一条".getBytes());

在Consumer中订阅Tag:

consumer.subscribe("topic","TAG-A||TAG-B"); 来匹配过滤的消息

SQL表达式

消费者将收到包含TAGA或TAGB的消息,但限制是一条消息只能由一个标签,而者对于复杂的情况可能无效,在这种情况下,可以使用SQL表达式筛选出消息

MessageSelector messageSelector = MessageSelector.bySql("age >= 18 and age <= 60");
consumer.subscribe("topic",messageSelector);

配置

在broker.conf中添加配置

enablePropertyFilter=true

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/601808.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号