
背景介绍
Aliware

固定窗口
Aliware
在正式介绍之前,先简单介绍一下固定窗口的算法(也叫计数器算法)是实现流量控制比较简单的一种方式。其他常见的还有很多例如滑动时间窗口算法,漏桶算法,令牌桶算法等等。
var (counter int64 //计数intervalMs int64 = 1000 //窗口长度(1S)threshold int64 = 2 //限流阈值startTime = time.Now().UnixMilli() //窗口开始时间)func main() {for i := 0; i < 10; i++ {if tryAcquire() {fmt.Println("成功请求", time.Now().Unix())}}}func tryAcquire() bool {if time.Now().UnixMilli()-atomic.LoadInt64(&startTime) > intervalMs {atomic.StoreInt64(&startTime, time.Now().UnixMilli())atomic.StoreInt64(&counter, 0)}return atomic.AddInt64(&counter, 1) <= threshold}
滑动时间窗口
Aliware
在滑动时间窗口算法中可以解决固定窗口算法的边界问题,在滑动窗口算法中通常有两个比较重要的概念
-
统计周期:例如想限制 5S 的请求数不能超过 100 次,那么 5S 就是统计周期 -
窗口(格子)的大小:一个周期内会有多个窗口(格子)进行指标(例如请求数)的统计,长度相等的统计周期,格子的数量越多,统计的越精确
统计结构
Aliware
下面将详细介绍 Sentinel-Go 是如何使用滑动时间窗口高效的存储和统计指标数据的。
窗口结构
在滑动时间窗口中时间很重要。每个窗口(BocketWrap)的组成是由一个开始时间和一个抽象的统计结构:
type BucketWrap struct {// BucketStart represents start timestamp of this statistic bucket wrapper.BucketStart uint64// Value represents the actual data structure of the metrics (e.g. MetricBucket).Value atomic.Value}
-
pass: 表示到来的数量,即此刻通过 Sentinel-Go 规则的流量数量 -
block: 表示被拦截的流量数量 -
complete: 表示完成的流量数量,包含正常结束和异常结束的情况 -
error: 表示错误的流量数量(熔断场景使用) -
rt:单次请求的 request time -
total:暂时无用
原子时间轮
如上:整个统计周期内有多个时间窗口,在 Sentinel-Go 中统计周期是由 slice 实现的,每个元素对应一个窗口。
type AtomicBucketWrapArray struct {// The base address for real data arraybase unsafe.Pointer // 窗口数组首元素地址// The length of slice(array), it can not be modified.length int // 窗口数组的长度data []*BucketWrap //窗口数组}
初始化
// 计算开始时间func calculateStartTime(now uint64, bucketLengthInMs uint32) uint64 {return now - (now % uint64(bucketLengthInMs))}// 窗口下标位置idx := int((now / uint64(bucketLengthInMs)) % uint64(len))
for i := idx; i <= len-1; i++ {ww := &BucketWrap{BucketStart: startTime,Value: atomic.Value{},}ww.Value.Store(generator.NewEmptyBucket())ret.data[i] = wwstartTime += uint64(bucketLengthInMs)}for i := 0; i < idx; i++ {ww := &BucketWrap{BucketStart: startTime,Value: atomic.Value{},}ww.Value.Store(generator.NewEmptyBucket())ret.data[i] = wwstartTime += uint64(bucketLengthInMs)}
// calculate base address for real data arraysliHeader := (*util.SliceHeader)(unsafe.Pointer(&ret.data))ret.base = unsafe.Pointer((**BucketWrap)(unsafe.Pointer(sliHeader.Data)))
窗口获取&窗口替换
// 获取对应窗口func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap {// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]// then convert to (*unsafe.Pointer)if offset, ok := aa.elementOffset(idx); ok {return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(offset)))}return nil}// 替换对应窗口func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool {// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]// then convert to (*unsafe.Pointer)// update secondary pointerif offset, ok := aa.elementOffset(idx); ok {return atomic.CompareAndSwapPointer((*unsafe.Pointer)(offset), unsafe.Pointer(except), unsafe.Pointer(update))}return false}// 获取对应窗口的地址func (aa *AtomicBucketWrapArray) elementOffset(idx int) (unsafe.Pointer, bool) {if idx >= aa.length || idx < 0 {logging.Error(errors.New("array index out of bounds"),"array index out of bounds in AtomicBucketWrapArray.elementOffset()","idx", idx, "arrayLength", aa.length)return nil, false}basePtr := aa.basereturn unsafe.Pointer(uintptr(basePtr) + uintptr(idx)*unsafe.Sizeof(basePtr)), true}
-
在 get func 中接收根据当前时间计算出的窗口对应下标位置 -
根据下标位置在 elementOffset func 中,首先将底层的 slice 首元素地址转换成 uintptr,然后将窗口对应下标*对应的指针字节大小即可以得到对应窗口元素的地址 -
将对应窗口地址转换成时间窗口(*BucketWarp)即可
滑动窗口
滑动
// 根据当前时间获取周期内的所有窗口func (m *SlidingWindowMetric) getSatisfiedBuckets(now uint64) []*BucketWrap {start, end := m.getBucketStartRange(now)satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool {return ws >= start && ws <= end})return satisfiedBuckets}// 根据当前时间获取整个周期对应的窗口的开始时间和结束时间func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())end = curBucketStartTimestart = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())return}// 匹配符合条件的窗口func (la *LeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap {if now <= 0 {return make([]*BucketWrap, 0)}ret := make([]*BucketWrap, 0, la.array.length)for i := 0; i < la.array.length; i++ {ww := la.array.get(i)if ww == nil || la.isBucketDeprecated(now, ww) || !predicate(atomic.LoadUint64(&ww.BucketStart)) {continue}ret = append(ret, ww)}return ret}
satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool {return ws >= start && ws <= end})
更新
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {// 计算当前时间对应的窗口下标idx := la.calculateTimeIdx(now)// 计算当前时间对应的窗口的开始时间bucketStart := calculateStartTime(now, la.bucketLengthInMs)for {// 获取旧窗口old := la.array.get(idx)// 如果旧窗口==nil则初始化(正常不会执行这部分代码)if old == nil {newWrap := &BucketWrap{BucketStart: bucketStart,Value: atomic.Value{},}newWrap.Value.Store(bg.NewEmptyBucket())if la.array.compareAndSet(idx, nil, newWrap) {return newWrap, nil} else {runtime.Gosched()}// 如果本次计算的开始时间等于旧窗口的开始时间,则认为窗口没有过期,直接返回} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {return old, nil// 如果本次计算的开始时间大于旧窗口的开始时间,则认为窗口过期尝试重置} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {if la.updateLock.TryLock() {old = bg.ResetBucketTo(old, bucketStart)la.updateLock.Unlock()return old, nil} else {runtime.Gosched()}......}}
总结
Aliware
作者介绍:
文章参考:
《golang unsafe.Pointer 使用原则以及 uintptr 隐藏的坑》
https://louyuting.blog.csdn.net/article/details/103826830

