1. 介绍
接口限流是一种保护系统稳定性的技术,它限制了某一时间窗口内的请求数量,以防止因流量暴增而导致的系统运行缓慢或宕机。通过接口限流,可以应对热点业务带来的突发请求、调用方bug导致的突发请求以及恶意攻击请求。接口限流的主要目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统。一旦达到限制速率,则可以拒绝服务、排队或等待、降级等处理。
在面对高并发的请求时,如果不对接口进行限流,可能会对后台系统造成极大的压力。过多的请求打到数据库会对系统的稳定性造成影响。在开发高并发系统时,有三把利器用来保护系统:缓存、降级和限流。缓存的目的是提升系统访问速度和增大系统处理容量;降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行;限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统。一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。因此,对于公开的接口最好采取限流措施。
本文将介绍如何使用注解和Caffeine库来实现简单、高效的接口限流。
2. Caffeine介绍
Caffeine是一个基于Java8开发的提供了近乎最佳命中率的高性能的缓存库。
缓存和ConcurrentMap有点相似,但还是有所区别。最根本的区别是ConcurrentMap将会持有所有加入到缓存当中的元素,直到它们被从缓存当中手动移除。但是,Caffeine的缓存Cache 通常会被配置成自动驱逐缓存中元素,以限制其内存占用。在某些场景下,LoadingCache和AsyncLoadingCache 因为其自动加载缓存的能力将会变得非常实用。
Caffeine提供了灵活的构造器去创建一个拥有下列特性的缓存:
自动加载元素到缓存当中,异步加载的方式也可供选择
当达到最大容量的时候可以使用基于就近度和频率的算法进行基于容量的驱逐
将根据缓存中的元素上一次访问或者被修改的时间进行基于过期时间的驱逐
当向缓存中一个已经过时的元素进行访问的时候将会进行异步刷新
key将自动被弱引用所封装
value将自动被弱引用或者软引用所封装
驱逐(或移除)缓存中的元素时将会进行通知
写入传播到一个外部数据源当中
持续计算缓存的访问统计指标
为了提高集成度,扩展模块提供了JSR-107 JCache和Guava适配器。JSR-107规范了基于Java 6的API,在牺牲了功能和性能的代价下使代码更加规范。Guava的Cache是Caffeine的原型库并且Caffeine提供了适配器以供简单的迁移策略。
简单实用及说明
Cache<String, Integer> cache = Caffeine.newBuilder().maximumSize(1)// 只要在1s内都有被访问,则一直不过期;只有在最后一次访问之后的1s后自动过期// 一个元素在上一次读写操作后一段时间之后,在指定的时间后没有被再次访问将会被认定为过期项。在当被缓存的元素时被绑定在一个session上时,当session因为不活跃而使元素过期的情况下,这是理想的选择。.expireAfterAccess(Duration.ofSeconds(1))// 元素创建以后,不管在指定的1s内是否被访问,1s后都会过期// 一个元素将会在其创建或者最近一次被更新之后的一段时间后被认定为过期项。在对被缓存的元素的时效性存在要求的场景下,这是理想的选择。// .expireAfterWrite(Duration.ofSeconds(111))// 为了使过期更有效率,可以通过在你的Cache构造器中通过Scheduler接口和Caffeine.scheduler(Scheduler) 方法去指定一个调度线程代替在缓存活动中去对过期事件进行调度。使用Java 9以上版本的用户可以选择Scheduler.systemScheduler()利用系统范围内的调度线程。.scheduler(Scheduler.systemScheduler()).build() ;cache.put("a", 666) ;System.out.println(cache.getIfPresent("a")) ;
删除缓存
// 失效keycache.invalidate(key)// 批量失效keycache.invalidateAll(keys)// 失效所有的keycache.invalidateAll()
刷新缓存
Cache<String, AccessCount> cache= Caffeine.newBuilder().maximumSize(10_000).refreshAfterWrite(1, TimeUnit.MINUTES).build();
刷新和驱逐并不相同。可以通过LoadingCache.refresh(K)方法,异步为key对应的缓存元素刷新一个新的值。与驱逐不同的是,在刷新的时候如果查询缓存元素,其旧值将仍被返回,直到该元素的刷新完毕后结束后才会返回刷新后的新值。
与 expireAfterWrite相反,refreshAfterWrite 将会使在写操作之后的一段时间后允许key对应的缓存元素进行刷新,但是只有在这个key被真正查询到的时候才会正式进行刷新操作。所以打个比方,你可以在同一个缓存中同时用到 refreshAfterWrite和expireAfterWrite ,这样缓存元素的在被允许刷新的时候不会直接刷新使得过期时间被盲目重置。当一个元素在其被允许刷新但是没有被主动查询的时候,这个元素也会被视为过期。
缓存统计
Cache<String, AccessCount> cache = Caffeine.newBuilder().maximumSize(10_000).recordStats().build() ;CacheStats stats = cache.stats() ;stats.hitRate() ; // 查询缓存的命中率stats.evictionCount() ; // 被驱逐的缓存数量stats.averageLoadPenalty() ; // 新值被载入的平均耗时
3. 限流实现
自定义注解
注解定义,通过该注解配置基本信息
@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Inherited@Documentedpublic @interface PackLimiter {/**每s限制访问次数*/int count() default 5 ;/**缓存Key*/String key() default "" ;/**fallback 方法名*/String fallbackMethod() default "" ;}
自定义切面
该切面用来拦截标有@PackLimiter注解的接口
public class PackLimiterAspect {private static final Logger logger = LoggerFactory.getLogger(PackLimiterAspect.class) ;public static final Cache<String, AccessCount> LIMITRATE = Caffeine.newBuilder().maximumSize(10_000).expireAfterAccess(Duration.ofSeconds(1)).scheduler(Scheduler.systemScheduler()).build() ;private static final Map<String, Object> FALLBACK_METHOD = new ConcurrentHashMap<>() ;private static final String DEFAULT_RET_DATA = "429 - Too Many Requests" ;private static final Function<? super String, ? extends AccessCount> INIT_COUNT = AccessCount::new ;private HttpServletRequest request ;("@annotation(limiter)")private void access(PackLimiter limiter) {}("access(limiter)")public Object limiter(ProceedingJoinPoint pjp, PackLimiter limiter) throws Throwable {int count = limiter.count() ;String key = limiter.key() ;String fallbackMethod = limiter.fallbackMethod() ;MethodSignature joinPointObject = (MethodSignature) pjp.getSignature() ;Class<?> targetType = joinPointObject.getDeclaringType() ;Method method = joinPointObject.getMethod() ;if (!StringUtils.hasLength(key)) {key = getKey(targetType, method) ;}logger.info("缓存key: {}", key) ;AccessCount accessCount = LIMITRATE.get("a", INIT_COUNT) ;if (accessCount.isValid(count)) {return pjp.proceed() ;} else {if (!FALLBACK_METHOD.containsKey(key)) {if (StringUtils.hasLength(fallbackMethod)) {try {Method fallback = targetType.getDeclaredMethod(fallbackMethod) ;FALLBACK_METHOD.put(key, fallback.invoke(pjp.getTarget())) ;} catch (Exception e) {throw new RuntimeException(e) ;}} else {FALLBACK_METHOD.put(key, DEFAULT_RET_DATA) ;}}return FALLBACK_METHOD.get(key) ;}}private String getKey(Class<?> targetType, Method method) {StringBuilder builder = new StringBuilder();builder.append(targetType.getSimpleName());builder.append('#').append(method.getName()).append('(');Class<?>[] types = method.getParameterTypes() ;for (Class<?> clazz : types) {builder.append(clazz.getSimpleName()).append(",") ;}if (method.getParameterTypes().length > 0) {builder.deleteCharAt(builder.length() - 1);}return (InetUtils.getIp() + builder.append(')').toString()).replaceAll("[^a-zA-Z0-9]", "") ;}public void intt(List<String> list) {}}
接口测试
public Object index( Integer id) {return "success" ;}public Object fallbackIndex() {return "访问太快了" ;}
感谢阅读本文,希望通过本文的介绍,您能够了解如何使用注解和Caffeine实现接口限流,并认识到接口限流在保护系统稳定性方面的重要性
完毕!!!



