Eureka在实现中有一个限流器来保证Eureka Server的稳定。
常见的限流算法有漏桶算法以及令牌桶算法。这个可以参考http://www.cnblogs.com/LBSer/p/4083131.html。
Google Guava
中提供了限流工具类RateLimiter
。一开始我以为Eureka中的限流是由Guava
中的RateLimiter
实现的,后来发现并不是,Eureka自己实现了限流类com.netflix.discovery.util.RateLimiter
。
RateLimiter
构造方法
RateLimiter目前支持分钟级和秒级两种速率限制。构造方法如下:
1 | public class RateLimiter { |
averageRateUnit
参数,速率单位。构造方法里将averageRateUnit
转换成rateToMsConversion
。
acquire
acquire
方法用于获取令牌,并返回是否获取成功
1 | public boolean acquire(int burstSize, long averageRate) { |
burstSize
:令牌桶上限,即最大能被消耗掉burstSize
数量的令牌averageRate
:令牌填充平均速率
refillToken
refillToken
方法填充已消耗的令牌。填充令牌的操作并不是一个后台任务每毫秒执行填充。为什么不适合这样呢?一方面,实际项目里每个接口都会有相应的RateLimiter,导致太多执行频率极高的后台任务;另一方面,获取令牌时才计算,多次令牌填充可以合并成一次,减少冗余和无效的计算。
代码如下:
1 | /** |
- 根据距离最后一次填充令牌的时间来计算可填充的最大令牌数量
newTokens
- 计算新的填充令牌的时间
newRefillTime
。为什么不能用currentTimeMillis
呢?例如,averageRate = 500 && averageRateUnit = SECONDS
时,每2毫秒才填充一个令牌,如果设置currentTimeMillis
,会导致不足以填充一个令牌的时长被吞了。 - 通过CAS设置最后填充令牌的时间。并保证只有一个线程进入填充令牌的逻辑
循环直到令牌填充完成
- 通过
consumedTokens.get()
获取消耗令牌的数量 - 通过与
burstSize
的比较来调整消耗令牌的数量。因为burstSize
可能被调小,例如,系统接入分布式配置中心,可以远程调整该数值。如果此时burstSize
更小,以它作为已消耗的令牌数量 - 通过
adjustedLevel - newTokens
来计算新的被消耗掉的令牌的数量。即此时补充进来newTokens
数量的令牌,因此消耗令牌的数量减少。 - 通过
Math.max(0, adjustedLevel - newTokens)
的计算保证新的被消耗掉的令牌newLevel
的数量不小于0。即此时令牌桶里的令牌是满的。 - 通过CAS设置消耗令牌数量。避免覆盖设置正在消费令牌的线程。
- 通过
consumeToken
consumeToken
方法用于获取(消费)令牌。代码如下:
1 | private boolean consumeToken(int burstSize) { |
- 循环导致获取令牌成功或者失败
- 获取当前消耗掉的令牌的数量
currentLevel
,如果currentLevel >= burstSize
说明当前所有的令牌都被消耗掉了,不能再获取令牌,所以返回false
- 否则,通过CAS将当前消耗掉的令牌的数量增1,获取令牌成功。
总结
RateLimiter
的设计和我们的直觉不太一样:
- 首先它并不是有一个单独的线程来填充令牌,而是将填充令牌的操作放在获取令牌的方法中。
- 其次它并不是以令牌数为中心来控制令牌是否获取成功,而是以消耗掉的令牌数为中心。因此填充令牌是减少消耗令牌数,获取令牌是增加消耗令牌数。
RateLimiter在Eureka中的应用
RateLimitingFilter
com.netflix.eureka.RateLimitingFilter
是Eureka Server的限流过滤器。其中使用RateLimiting
来保证Eureka Server的稳定性。
doFilter
方法如下:
1 | public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { |
getTarget
首先调用getTarget
方法,获取Target。使用正则表达式^.*/apps(/[^/]*)?$
来匹配请求的url,根据不同的url返回不同的Target类型。Target类型有以下几种:FullFetch
, DeltaFetch
, Application
, Other
如果Target的类型为Other
则不做限流
isRateLimited
然后调用isRateLimited
方法,判断是否被限流。代码如下:
1 | private boolean isRateLimited(HttpServletRequest request, Target target) { |
首先调用isPrivileged
方法,判断是否为特权应用,对特权应用不开启限流逻辑。代码如下:
1 | private boolean isPrivileged(HttpServletRequest request) { |
然后调用isOverloaded
方法,判断是否过载。代码如下:
1 | /** |
调用registryFetchRateLimiter.acquire
获取令牌,如果target的类型是FullFetch
则调用registryFullFetchRateLimiter.acquire
来获取令牌,效果就是如果是FullFetch请求则限制在每秒钟100次,普通请求则限制在每秒钟500次。
InstanceInfoReplicator
InstanceInfoReplicator
是Eureka Client的服务实例复制器。在Eureka(三)——client注册过程中有详解。
服务实例状态发生变化时,会调用onDemandUpdate
方法向Eureka Server发起注册,同步服务实例信息。onDemandUpdate
方法中使用RateLimiter避免状态频繁发生变化而向Eureka Server频繁同步。代码如下:
1 | private final RateLimiter rateLimiter; |
onDemandUpdate
方法调用RateLimiter.acquire
方法获取令牌:
- 若获取成功,向Eureka Server发起注册,同步服务实例信息
- 若获取失败,不向Eureka Server发起注册,同步服务实例信息。这样会不会有问题?答案是不会。因为
InstanceInfoReplicator
会固定周期检查本地服务实例,如果发生了改变就会向Eureka Server同步实例信息。
http://www.iocoder.cn/Eureka/rate-limiter/
http://www.itmuch.com/spring-cloud-sum/spring-cloud-ratelimit/