文章目录
- Pre
- 概述
- 简单计数器
-
- 原理
- 实现
- 测试
- 优缺点
- 滑动窗口算法
-
- 原理
- 实现
- 测试
- 优缺点
- 漏桶算法
-
- 原理
- 实现
- 测试
- 优缺点
- 令牌桶算法
-
- 原理
- 实现
- 测试
- 优缺点
- 小结
Pre
深入理解分布式技术 - 限流
并发编程-25 高并发处理手段之消息队列思路 + 应用拆分思路 + 应用限流思路
SpringBoot - 优雅的实现【流控】
Spring Boot - 利用Resilience4j-RateLimiter进行流量控制和服务降级
MQ - 19 安全_限流方案的设计
每日一博 - 闲聊“突发流量”的应对之道
概述
限流算法是一种在分布式系统中广泛使用的技术,用于控制对系统资源的访问速率,以保护系统免受恶意攻击或突发流量导致的过载。
在实际的业务场景中,接口限流策略的应用非常广泛,以下是一些典型的场景:
- API 网关限流:在微服务架构中,API 网关通常是系统对外的唯一入口,需要限制单个用户或IP在一定时间内的请求次数,以保护后端服务不受恶意请求或突发流量的冲击。
- 分布式系统中的服务限流:在分布式系统中,各个服务之间可能会有调用关系,通过限流可以控制服务间的调用频率,避免服务间因为调用过于频繁而造成的服务过载。
- 微服务间的接口限流:微服务架构中,服务间通过接口进行通信,对接口进行限流可以保证服务之间的通信不会因为过量的请求而变得不可用。
- 营销活动限流:在开展营销活动时,为了防止活动页面被恶意刷票或访问量过大而崩溃,需要对接口进行限流,确保活动能够在可控的范围内进行。
- 用户高频操作限流:对于用户频繁操作的接口,如登录、发帖、评论等,需要限制用户在短时间内完成的操作次数,防止用户恶意操作或频繁操作导致系统资源耗尽。
- 秒杀活动限流:在秒杀活动中,为了防止用户在短时间内提交过多的秒杀请求,导致系统无法处理,需要对参与秒杀的接口进行限流。
- 后端服务保护限流:对于一些敏感操作或计算密集型的后端服务,通过限流可以避免因请求过多而使得服务响应变慢或崩溃。
- 防止分布式拒绝服务攻击:在遭受分布式拒绝服务(DDoS)攻击时,限流策略能够有效地减轻攻击带来的压力,确保关键业务的稳定性。
- 用户体验优化:对于一些需要高响应速度的服务,通过限流可以避免过多的请求积压,确保用户能够获得良好的服务体验。
简单计数器
原理
简单计数器是一种最基础的限流算法,它的实现原理相对直观。
简单计数器的工作原理如下:
- 时间窗口设定:首先设定一个固定的时间窗口,比如1分钟。
- 计数器初始化:在每个时间窗口开始时,将计数器重置为0。
- 请求到达:每当一个请求到达时,计数器加1。
- 判断与拒绝:如果在时间窗口内计数器的值达到了设定的阈值,比如1000,则后续的请求会被拒绝,直到当前时间窗口结束,计数器被重置。
实现
package com.artisan.counter; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface CounterRateLimit { /** * 请求数量 * * @return */ int maxRequest(); /** * 时间窗口, 单位秒 * * @return */ int timeWindow(); }
package com.artisan.counter; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Aspect @Component public class CounterRateLimitAspect { // 存储每个方法对应的请求次数 private Map<String, AtomicInteger> REQUEST_COUNT = new ConcurrentHashMap<>(); // 存储每个方法的时间戳 private Map<String, Long> REQUEST_TIMESTAMP = new ConcurrentHashMap<>(); /** * * @param joinPoint * @return * @throws Throwable */ @Around("@annotation(com.artisan.counter.CounterRateLimit)") public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable { // 获取注解信息 MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); CounterRateLimit annotation = method.getAnnotation(CounterRateLimit.class); // 获取注解的参数 int maxRequest = annotation.maxRequest(); long timeWindowInMillis = TimeUnit.SECONDS.toMillis(annotation.timeWindow()); // 获取方法名 String methodName = method.toString(); // 初始化计数器和时间戳 AtomicInteger count = REQUEST_COUNT.computeIfAbsent(methodName, x -> new AtomicInteger(0)); long startTime = REQUEST_TIMESTAMP.computeIfAbsent(methodName, x -> System.currentTimeMillis()); // 获取当前时间 long currentTimeMillis = System.currentTimeMillis(); // 判断: 如果当前时间超出时间窗口,则重置 if (currentTimeMillis - startTime > timeWindowInMillis) { count.set(0); REQUEST_TIMESTAMP.put(methodName, currentTimeMillis); } // 原子的增加计数器并检查其值 if (count.incrementAndGet() > maxRequest) { // 如果超出最大请求次数,递减计数器,并报错 count.decrementAndGet(); throw new RuntimeException("Too many requests, please try again later."); } // 方法原执行 return joinPoint.proceed(); } }
package com.artisan.counter; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @RestController public class CounterRateLimitController { /** * 一秒一次 * * @return */ @GetMapping("/counter") @CounterRateLimit(maxRequest = 2, timeWindow = 2) public ResponseEntity counter() { System.out.println("Request Coming Coming...."); return ResponseEntity.ok("Artisan OK"); } }
测试
启动项目, 访问接口 http://localhost:8080/counter
优缺点
简单计数器算法的优点是实现简单,但缺点也很明显:
-
边界问题:由于计数器是在时间窗口结束时重置的,如果系统的请求量非常大,可能会出现时间窗口临界点的问题,即在窗口即将结束时请求量激增,而在窗口开始时请求量较少,导致系统资源不能被有效利用。
-
突增流量处理能力不足:无法有效处理突增的流量,因为它的限制是固定的,不能根据系统的实时负载进行调整。
为了解决简单计数器的这些问题,可以采用更为复杂的限流算法,如滑动窗口计数器、漏桶算法、令牌桶算法等。这些算法能够更加平滑和有效地控制请求速率,提高系统的稳定性和可靠性。
在示例中,有一个使用了 @CounterRateLimit 注解的 counter 方法。根据注解的参数,这个方法在2秒钟的时间窗口内只能被调用2次。 如果在 2 秒内有更多的调用,那么这些额外的调用将被限流,并返回错误信息
假设1min一个时间段,每个时间段内最多100个请求。有一种极端情况,当10:00:58这个时刻100个请求一起过来,到达阈值;当10:01:02这个时刻100个请求又一起过来,到达阈值。这种情况就会导致在短短的4s内已经处理完了200个请求,而其他所有的时间都在限流中。
滑动窗口算法
原理
滑动窗口算法是实现限流的一种常用方法,它通过维护一个时间窗口来控制单位时间内请求的数量,从而保护系统免受突发流量或恶意攻击的影响。其核心原理是统计时间窗口内的请求次数,并根据预设的阈值来决定是否允许新的请求通过。
从图上可以看到时间创建是一种滑动的方式前进, 滑动窗口限流策略能够显著减少临界问题的影响,但并不能完全消除它。滑动窗口通过跟踪和限制在一个连续的时间窗口内的请求来工作。与简单的计数器方法不同,它不是在窗口结束时突然重置计数器,而是根据时间的推移逐渐地移除窗口中的旧请求,添加新的请求。
举个例子:假设时间窗口为10s,请求限制为3,第一次请求在10:00:00发起,第二次在10:00:05发起,第三次10:00:11发起,那么计数器策略的下一个窗口开始时间是10:00:11,而滑动窗口是10:00:05。所以这也是滑动窗口为什么可以减少临界问题的影响,但并不能完全消除它的原因。
实现
package com.artisan.slidingwindow; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface SlidingWindowRateLimit { /** * 请求数量 * * @return */ int maxRequest(); /** * 时间窗口, 单位秒 * * @return */ int timeWindow(); }
package com.artisan.slidingwindow; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Aspect @Component public class SlidingWindowRateLimitAspect { /** * 使用 ConcurrentHashMap 保存每个方法的请求时间戳队列 */ private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Long>> REQUEST_TIMES_MAP = new ConcurrentHashMap<>(); @Around("@annotation(com.artisan.slidingwindow.SlidingWindowRateLimit)") public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); SlidingWindowRateLimit rateLimit = method.getAnnotation(SlidingWindowRateLimit.class); // 允许的最大请求数 int requests = rateLimit.maxRequest(); // 滑动窗口的大小(秒) int timeWindow = rateLimit.timeWindow(); // 获取方法名称字符串 String methodName = method.toString(); // 如果不存在当前方法的请求时间戳队列,则初始化一个新的队列 ConcurrentLinkedQueue<Long> requestTimes = REQUEST_TIMES_MAP.computeIfAbsent(methodName, k -> new ConcurrentLinkedQueue<>()); // 当前时间 long currentTime = System.currentTimeMillis(); // 计算时间窗口的开始时间戳 long thresholdTime = currentTime - TimeUnit.SECONDS.toMillis(timeWindow); // 这一段代码是滑动窗口限流算法中的关键部分,其功能是移除当前滑动窗口之前的请求时间戳。这样做是为了确保窗口内只保留最近时间段内的请求记录。 // requestTimes.isEmpty() 是检查队列是否为空的条件。如果队列为空,则意味着没有任何请求记录,不需要进行移除操作。 // requestTimes.peek() < thresholdTime 是检查队列头部的时间戳是否早于滑动窗口的开始时间。如果是,说明这个时间戳已经不在当前的时间窗口内,应当被移除。 while (!requestTimes.isEmpty() && requestTimes.peek() < thresholdTime) { // 移除队列头部的过期时间戳 requestTimes.poll(); } // 检查当前时间窗口内的请求次数是否超过限制 if (requestTimes.size() < requests) { // 未超过限制,记录当前请求时间 requestTimes.add(currentTime); return joinPoint.proceed(); } else { // 超过限制,抛出限流异常 throw new RuntimeException("Too many requests, please try again later."); } } }
package com.artisan.slidingwindow; import com.artisan.leakybucket.LeakyBucketRateLimit; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @RestController public class SlidingWindowController { @GetMapping("/slidingWindow") @SlidingWindowRateLimit(maxRequest = 2, timeWindow = 2) public ResponseEntity slidingWindow() { return ResponseEntity.ok("artisan slidingWindow "); } }
测试
优缺点
滑动窗口算法的优点是它能够比较平滑地控制流量,允许一定程度的突发流量,同时又能够限制平均流量。相比于固定窗口算法,滑动窗口算法能够更精确地控制单位时间内的请求量,因为它考虑了时间窗口内请求的分布情况,而不仅仅是在窗口的开始和结束时刻的请求量。
滑动窗口算法的变种有很多,如基于令牌桶和漏桶的算法,这些算法在滑动窗口的基础上增加了更为复杂的令牌生成和消耗机制,以实现更精细的流量控制。
漏桶算法
原理
在Leaky Bucket算法中,容器有一个固定的容量,类似于漏桶的容量。数据以固定的速率进入容器,如果容器满了,则多余的数据会溢出。容器中的数据会以恒定的速率从底部流出,类似于漏桶中的水滴。如果容器中的数据不足以满足流出速率,则会等待直到有足够的数据可供流出。这样就实现了对数据流的平滑控制。
实现
package com.artisan.leakybucket; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface LeakyBucketRateLimit { /** * 桶的容量 * * @return */ int capacity(); /** * 漏斗的速率,单位通常是秒 * * @return */ int leakRate(); }
package com.artisan.leakybucket; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.lang.reflect.Method; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Aspect @Component public class LeakyBucketRateLimitAspect { @Around("@annotation(com.artisan.leakybucket.LeakyBucketRateLimit)") public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); LeakyBucketRateLimit leakyBucketRateLimit = method.getAnnotation(LeakyBucketRateLimit.class); int capacity = leakyBucketRateLimit.capacity(); int leakRate = leakyBucketRateLimit.leakRate(); // 方法签名作为唯一标识 String methodKey = method.toString(); LeakyBucketLimiter limiter = LeakyBucketLimiter.createLimiter(methodKey, capacity, leakRate); if (!limiter.tryAcquire()) { // 超过限制,抛出限流异常 throw new RuntimeException("Too many requests, please try again later."); } return joinPoint.proceed(); } }
package com.artisan.leakybucket; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class LeakyBucketLimiter { /** * 桶的容量 */ private final int capacity; /** * 漏桶的漏出速率,单位时间内漏出水的数量 */ private final int leakRate; /** * 当前桶中的水量 */ private volatile int water = 0; /** * 上次漏水的时间 */ private volatile long lastLeakTime = System.currentTimeMillis(); /** * 漏桶容器 */ private static final ConcurrentHashMap<String, LeakyBucketLimiter> LIMITER_MAP = new ConcurrentHashMap<>(); /** * 静态工厂方法,确保相同的方法使用相同的漏桶实例 * * @param methodKey 方法名 * @param capacity * @param leakRate * @return */ public static LeakyBucketLimiter createLimiter(String methodKey, int capacity, int leakRate) { return LIMITER_MAP.computeIfAbsent(methodKey, k -> new LeakyBucketLimiter(capacity, leakRate)); } private LeakyBucketLimiter(int capacity, int leakRate) { this.capacity = capacity; this.leakRate = leakRate; } /** * 尝试获取许可(try to acquire a permit),如果获取成功返回true,否则返回false * * @return */ public boolean tryAcquire() { long currentTime = System.currentTimeMillis(); synchronized (this) { // 计算上次漏水到当前时间的时间间隔 long leakDuration = currentTime - lastLeakTime; // 如果时间间隔大于等于1秒,表示漏桶已经漏出一定数量的水 if (leakDuration >= TimeUnit.SECONDS.toMillis(1)) { // 计算漏出的水量 long leakQuantity = leakDuration / TimeUnit.SECONDS.toMillis(1) * leakRate; // 漏桶漏出水后,更新桶中的水量,但不能低于0 water = (int) Math.max(0, water - leakQuantity); lastLeakTime = currentTime; } // 判断桶中的水量是否小于容量,如果是则可以继续添加水(相当于获取到令牌) if (water < capacity) { water++; return true; } } // 如果桶满,则获取令牌失败 return false; } }
package com.artisan.leakybucket; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @RestController public class LeakBucketController { /** * * * @return */ @GetMapping("/leakyBucket") @LeakyBucketRateLimit(capacity = 10, leakRate = 2) public ResponseEntity leakyBucket() { return ResponseEntity.ok("leakyBucket test ok!"); } }
测试
优缺点
漏桶算法和令牌桶算法最明显的区别是令牌桶算法允许流量一定程度的突发。因为默认的令牌桶算法,取走token是不需要耗费时间的,也就是说,假设桶内有100个token时,那么可以瞬间允许100个请求通过。
令牌桶算法由于实现简单,且允许某些流量的突发,对用户友好,所以被业界采用地较多。当然我们需要具体情况具体分析,只有最合适的算法,没有最优的算法。
令牌桶算法
使用Guava自带的RateLimiter实现
<!-- guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>32.1.1-jre</version> </dependency>
原理
令牌桶算法通过一个形象的比喻来描述:想象有一个桶,桶里装有一定数量的令牌。系统会以固定的速率向桶中添加令牌,而每个数据包在发送前都需要从桶中获取一个令牌。如果桶中有足够的令牌,数据包就可以立即发送;如果桶中没有令牌,那么数据包就需要等待,直到桶中有足够的令牌为止。
关键参数:
- 令牌生成速率:令牌被添加到桶中的速率,通常以每秒多少个令牌来表示。
- 桶容量:桶中可以存放的最大令牌数,如果桶已满,新添加的令牌会被丢弃或忽略。
- 初始令牌数:桶在开始时的令牌数量。
算法流程:
- 令牌添加:以固定的速率向桶中添加令牌,通常这个速率对应了网络接口的带宽限制。
- 请求处理:
- 当一个数据包到达时,系统会检查桶中是否有足够的令牌。
- 如果有足够的令牌,就从桶中移除相应数量的令牌,并且允许数据包通过。
- 如果桶中没有足够的令牌,数据包将被标记为等待或者被丢弃。
- 桶满处理:如果桶已满,新添加的令牌可能会被丢弃或计数超出桶容量的令牌数量,以允许临时突发流量的处理。
实现
package com.artisan.tokenbucket; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface TokenBucketRateLimit { /** * 产生令牌的速率(xx 个/秒) * * @return */ double permitsPerSecond(); }
package com.artisan.tokenbucket; import com.google.common.util.concurrent.RateLimiter; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @Aspect @Component public class TokenBucketRateLimitAspect { // 使用ConcurrentHashMap来存储每个方法的限流器 private final ConcurrentHashMap<String, RateLimiter> limiters = new ConcurrentHashMap<>(); // 环绕通知,用于在方法执行前后添加限流逻辑 @Around("@annotation(com.artisan.tokenbucket.TokenBucketRateLimit)") public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable { // 获取方法签名,用于获取方法信息 MethodSignature signature = (MethodSignature) joinPoint.getSignature(); // 根据方法签名获取方法对象 Method method = signature.getMethod(); // 从方法对象中获取限流注解 TokenBucketRateLimit rateLimit = method.getAnnotation(TokenBucketRateLimit.class); // 获取注解中定义的每秒令牌数 double permitsPerSecond = rateLimit.permitsPerSecond(); // 获取方法名,作为限流器的唯一标识 String methodName = method.toString(); // 如果限流器缓存中没有该方法的限流器,则创建一个新的 RateLimiter rateLimiter = limiters.computeIfAbsent(methodName, k -> RateLimiter.create(permitsPerSecond)); // 尝试获取令牌,如果可以获取,则继续执行方法 if (rateLimiter.tryAcquire()) { return joinPoint.proceed(); } else { // 如果无法获取令牌,则抛出异常,告知用户请求过于频繁 throw new RuntimeException("Too many requests, please try again later."); } } }
package com.artisan.tokenbucket; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ @RestController public class TokenBucketController { @GetMapping("/tokenBucket") @TokenBucketRateLimit(permitsPerSecond = 0.5) public ResponseEntity tokenBucket() { return ResponseEntity.ok("artisan token bucket"); } }
测试
优缺点
- 平滑流量:通过控制令牌生成的速率,可以平滑网络流量,避免突发流量导致的网络拥塞。
- 灵活性:可以应对一定的突发流量,因为桶可以暂时存储超过平均流量的令牌。
- 易于实现:算法相对简单,易于在硬件或软件中实现。
小结
在实施接口限流策略时,应根据具体的业务场景和系统需求,选择合适的限流算法和实现方式,同时注意限流策略对用户体验的影响,做到既能保护系统稳定运行,又不会对合法用户造成过多的困扰。