栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 云计算 > 云平台

阿里或AWS Redis Key 失效事件实现延时任务

云平台 更新时间:发布时间: 百科书网 趣学号

  

一、业务场景
  1. 某个设备需要保持激活状态,当失效的时候需要预警。

  1. 订单付款后需要在24小时之后触发短信提醒等等。

二、常见方案
  • 手动无线循环;

  • ScheduledExecutorService;

  • DelayQueue;

  • Redis zset 数据判断的方式;

  • Redis 键空间通知的方式;

  • Netty 提供的 HashedWheelTimer 工具类;

  • RabbitMQ 死信队列;

  • RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange;

  • Spring Scheduled;

  • Quartz。

实现方式这里也例举一些,感兴趣可以自己扩展。本文主要讲通过redis实现。因为项目本身有使用redis的场景。

三、具体实现 Redis zset 数据判断的方式
  1. 借助 zset 数据类型,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费,实现代码如下(需要借助 Jedis 框架):

  2. import redis.clients.jedis.Jedis;
    import utils.JedisUtils;
    import java.time.Instant;
    import java.util.Set;
    public class RedisDelayQueueZset {
        // zset key
        private static final String KEY = "ghDelayQueue";
        
        public static void main(String[] args) throws InterruptedException {
            Jedis jedis = JedisUtils.getJedis();
            // 延迟 30s 执行(30s 后的时间)
            long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
            jedis.zadd(KEY, delayTime, "order_1");
            // 继续添加测试数据
            jedis.zadd(KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
            jedis.zadd(KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
            jedis.zadd(KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
            jedis.zadd(KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
            // 开启延迟队列
            doDelayQueue(jedis);
        }
        
        public static void doDelayQueue(Jedis jedis) throws InterruptedException {
            while (true) {
                // 当前时间
                Instant nowInstant = Instant.now();
                long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间
                long nowSecond = nowInstant.getEpochSecond();
                // 查询当前时间的所有任务
                Set data = jedis.zrangeByScore(KEY, lastSecond, nowSecond);
                for (String item : data) {
                    // 消费任务
                    System.out.println("消费:" + item);
                }
                // 删除已经执行的任务
                jedis.zremrangeByScore(KEY, lastSecond, nowSecond);
                Thread.sleep(1000); // 每秒轮询一次
            }
        }
    }

  3. 缺点:当数据量非常多的时候,比如超出redis读写瓶颈。又或者数据很少的时候。依旧要有一个循环在每秒操作。都显得不是很优雅于是有了下面的方式

Redis 键空间通知的方式
  1. 通过 notify-keyspace-events Ex 的命令手动开启,开启键空间通知后,我们就可以拿到每个键值过期的事件,利用这个机制实现延时任务的功能。

  2. 拿业务场景1来分析:

当设备上行或者心跳触发。那么我们更新key的失效时间。如果长时间掉线。那么监听到事件触发预警

     3.设置参数 notify-keyspace-events Ex

阿里云redis 设置完立马生效不需要重启

 

aws redis

默认的参数组只有common可以修改,所以如果用的不是common需要新建一个参数组覆盖原来的。这里我选择新建test 修改参数组也是立马生效不需要重启

      4.代码实现:

redis失效监听器注册

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
    RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
    redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
    return redisMessageListenerContainer;
}



@Bean
public RedisEventMessageListener redisEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer) {
    return new RedisEventMessageListener(redisMessageListenerContainer);
}

继承事件监听

package cn.fuzhi.cloud.v2.manager.redis;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import java.time.LocalDateTime;

public class RedisEventMessageListener extends KeyExpirationEventMessageListener {
 
    
    public RedisEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
 
    @Override
    protected void doHandleMessage(Message message) {
        // 这个就是过期的key ,过期后,也就是事件触发后对应的value是拿不到的。
        // 这里实现业务逻辑,如果是服务器集群的话需要使用分布式锁进行抢占执行。
        String key = message.toString();
        System.out.println("key = " + key);
        System.out.println("end = " + LocalDateTime.now());
    }
}
四、redis-cli测试notify-keyspace-events是否生效

先开一个客户端连接

redis-cli -h xxxx -p 6379 -a password

连接成功输入

PSUBSCRIBE __keyevent@*__:expired

 

出现这个就可以了先不要关闭窗口

另外开一个窗口

 

setex ghDelayQueue 3 3

 

出现上述输出就表示配置成功

注意一点就是这种方式:事件触发后对应的value是拿不到的,只能获取到key的内容。所以要对key的命名给出规范

最后有啥提出一起交流

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/1072723.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号