栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 云计算 > Docker/k8s

Go 分布式令牌桶限流 + 兜底保障

Docker/k8s 更新时间:发布时间: 百科书网 趣学号

 

本文转载自微信公众号「微服务实践」,作者欧阳安。转载本文请联系微服务实践公众号。

上篇文章提到固定时间窗口限流无法处理突然请求洪峰情况,本文讲述的令牌桶线路算法则可以比较好的处理此场景。

工作原理

单位时间按照一定速率匀速的生产 token 放入桶内,直到达到桶容量上限。

处理请求,每次尝试获取一个或多个令牌,如果拿到则处理请求,失败则拒绝请求。



优缺点

优点

可以有效处理瞬间的突发流量,桶内存量 token 即可作为流量缓冲区平滑处理突发流量。

缺点

实现较为复杂。

代码实现
  1. core/limit/tokenlimit.go 

分布式环境下考虑使用 redis 作为桶和令牌的存储容器,采用 lua 脚本实现整个算法流程。

redis lua 脚本

  1. -- 每秒生成token数量即token生成速度 local rate = tonumber(ARGV[1]) 
  2. -- 桶容量 local capacity = tonumber(ARGV[2]) 
  3. -- 当前时间戳 local now = tonumber(ARGV[3]) 
  4. -- 当前请求token数量 local requested = tonumber(ARGV[4]) 
  5. -- 需要多少秒才能填满桶 local fill_time = capacity/rate 
  6. -- 向下取整,ttl为填满时间的2倍 local ttl = math.floor(fill_time*2) 
  7. -- 当前时间桶容量 local last_tokens = tonumber(redis.call("get", KEYS[1])) 
  8. -- 如果当前桶容量为0,说明是第一次进入,则默认容量为桶的最大容量 if last_tokens == nil then 
  9. last_tokens = capacity end 
  10. -- 上一次刷新的时间 local last_refreshed = tonumber(redis.call("get", KEYS[2])) 
  11. -- 第一次进入则设置刷新时间为0 if last_refreshed == nil then 
  12. last_refreshed = 0 end 
  13. -- 距离上次请求的时间跨度 local delta = math.max(0, now-last_refreshed) 
  14. -- 距离上次请求的时间跨度,总共能生产token的数量,如果超多最大容量则丢弃多余的token local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) 
  15. -- 本次请求token数量是否足够 local allowed = filled_tokens >= requested 
  16. -- 桶剩余数量 local new_tokens = filled_tokens 
  17. -- 允许本次token申请,计算剩余数量 if allowed then 
  18. new_tokens = filled_tokens - requested end 
  19. -- 设置剩余token数量 redis.call("setex", KEYS[1], ttl, new_tokens) 
  20. -- 设置刷新时间 redis.call("setex", KEYS[2], ttl, now) 
  21.  return allowed 

令牌桶限流器定义

  1. type TokenLimiter struct {     // 每秒生产速率 
  2.     rate int     // 桶容量 
  3.     burst int     // 存储容器 
  4.     store *redis.Redis     // redis key 
  5.     tokenKey       string     // 桶刷新时间key 
  6.     timestampKey   string     // lock 
  7.     rescueLock     sync.Mutex     // redis健康标识 
  8.     redisAlive     uint32     // redis故障时采用进程内 令牌桶限流器 
  9.     rescueLimiter  *xrate.Limiter     // redis监控探测任务标识 
  10.     monitorStarted bool } 
  11.  func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter { 
  12.     tokenKey := fmt.Sprintf(tokenFormat, key)     timestampKey := fmt.Sprintf(timestampFormat, key) 
  13.      return &TokenLimiter{ 
  14.         rate:          rate,         burst:         burst, 
  15.         store:         store,         tokenKey:      tokenKey, 
  16.         timestampKey:  timestampKey,         redisAlive:    1, 
  17.         rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),     } 

获取令牌



  1. func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {     // 判断redis是否健康 
  2.     // redis故障时采用进程内限流器     // 兜底保障 
  3.     if atomic.LoadUint32(&lim.redisAlive) == 0 {         return lim.rescueLimiter.AllowN(now, n) 
  4.     }     // 执行脚本获取令牌 
  5.     resp, err := lim.store.eval(         script, 
  6.         []string{             lim.tokenKey, 
  7.             lim.timestampKey,         }, 
  8.         []string{             strconv.Itoa(lim.rate), 
  9.             strconv.Itoa(lim.burst),             strconv.FormatInt(now.Unix(), 10), 
  10.             strconv.Itoa(n),         }) 
  11.     // redis allowed == false     // Lua boolean false -> r Nil bulk reply 
  12.     // 特殊处理key不存在的情况     if err == redis.Nil { 
  13.         return false     } else if err != nil { 
  14.         logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)         // 执行异常,开启redis健康探测任务 
  15.         // 同时采用进程内限流器作为兜底         lim.startMonitor() 
  16.         return lim.rescueLimiter.AllowN(now, n)     } 
  17.      code, ok := resp.(int64) 
  18.     if !ok {         logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp) 
  19.         lim.startMonitor()         return lim.rescueLimiter.AllowN(now, n) 
  20.     }  
  21.     // redis allowed == true     // Lua boolean true -> r integer reply with value of 1 
  22.     return code == 1 } 

redis 故障时兜底策略

兜底策略的设计考虑得非常细节,当 redis 不可用的时候,启动单机版的 ratelimit 做备用限流,确保基本的限流可用,服务不会被冲垮。

  1. // 开启redis健康探测 func (lim *TokenLimiter) startMonitor() { 
  2.     lim.rescueLock.Lock()     defer lim.rescueLock.Unlock() 
  3.     // 防止重复开启     if lim.monitorStarted { 
  4.         return     } 
  5.      // 设置任务和健康标识 
  6.     lim.monitorStarted = true     atomic.StoreUint32(&lim.redisAlive, 0) 
  7.     // 健康探测     go lim.waitForRedis() 
  8. }  
  9. // redis健康探测定时任务 func (lim *TokenLimiter) waitForRedis() { 
  10.     ticker := time.NewTicker(pingInterval)     // 健康探测成功时回调此函数 
  11.     defer func() {         ticker.Stop() 
  12.         lim.rescueLock.Lock()         lim.monitorStarted = false 
  13.         lim.rescueLock.Unlock()     }() 
  14.      for range ticker.C { 
  15.         // ping属于redis内置健康探测命令         if lim.store.Ping() { 
  16.             // 健康探测成功,设置健康标识             atomic.StoreUint32(&lim.redisAlive, 1) 
  17.             return         } 
  18.     } } 
项目地址

https://github.com/zeromicro/go-zero

欢迎使用 go-zero 并 star 支持我们!

 

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/796705.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号