sync.singleflight 包提供了一种抑制重复函数调用的机制。

缓存击穿

缓存 在各种场景中被大量使用,在 Cache Miss(缓存未命中)的情况下,就会出现下图的情况:

所有的请求被同时打到下游存储上,将会影响下游存储的服务质量,因此需要严格限制访问下游存储的并发量。

singleflight

// Do():  相同的 key,fn 同时只会执行一次,返回执行的结果给fn执行期间,所有使用该 key 的调用
// v: fn 返回的数据
// err: fn 返回的err
// shared: 表示返回数据是调用 fn 得到的还是其他相同 key 调用返回的
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// DoChan(): 类似Do方(),以 chan 返回结果
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
// Forget(): 失效 key,后续对此 key 的调用将执行 fn,而不是等待前面的调用完成
func (g *Group) Forget(key string)

通常使用方法如下:

package main

import (
	"context"
	"fmt"
	"golang.org/x/sync/singleflight"
	"sync/atomic"
	"time"
)

type Result string

func find(ctx context.Context, query string) (Result, error) {
	return Result(fmt.Sprintf("result for %q", query)), nil
}

func main() {
	var g singleflight.Group
	const n = 5
	waited := int32(n)
	done := make(chan struct{})
	key := "https://weibo.com/1227368500/H3GIgngon"
	for i := 0; i < n; i++ {
		go func(j int) {
			v, _, shared := g.Do(key, func() (interface{}, error) {
				ret, err := find(context.Background(), key)
				return ret, err
			})
			if atomic.AddInt32(&waited, -1) == 0 {
				close(done)
			}
			fmt.Printf("index: %d, val: %v, shared: %v\n", j, v, shared)
		}(i)
	}

	select {
	case <-done:
	case <-time.After(time.Second):
		fmt.Println("Do hangs")
	}
}

如果函数执行一切正常,则所有请求都能顺利获得正确的数据。相反,如果函数执行遇到问题呢?由于 singleflight 是以阻塞读的方式来控制向下游请求的并发量,在第一个下游请求没有返回之前,所有请求都将被阻塞。

问题

  • 阻塞读:缺少超时控制,难以快速失败
  • 单并发:控制了并发量,但牺牲了成功率

阻塞读

作为 Do() 的替代函数,singleflight 提供了 DoChan()。两者实现上完全一样,不同的是,DoChan() 通过 channel 返回结果。因此可以使用 select 语句实现超时控制

ch := g.DoChan(key, func() (interface{}, error) {
    ret, err := find(context.Background(), key)
    return ret, err
})
// Create our timeout
timeout := time.After(500 * time.Millisecond)

var ret singleflight.Result
select {
case <-timeout: // Timeout elapsed
        fmt.Println("Timeout")
    return
case ret = <-ch: // Received result from channel
    fmt.Printf("index: %d, val: %v, shared: %v\n", j, ret.Val, ret.Shared)
}

单并发

在一些对可用性要求极高的场景下,往往需要一定的请求饱和度来保证业务的最终成功率。一次请求还是多次请求,对于下游服务而言并没有太大区别,此时使用 singleflight 只是为了降低请求的数量级,那么使用 Forget() 提高下游请求的并发:

v, _, shared := g.Do(key, func() (interface{}, error) {
    go func() {
        time.Sleep(10 * time.Millisecond)
        fmt.Printf("Deleting key: %v\n", key)
        g.Forget(key)
    }()
    ret, err := find(context.Background(), key)
    return ret, err
})

总结

当然,如果单次的失败无法容忍,在高并发的场景下更好的处理方案是:

  • 放弃使用同步请求,牺牲数据更新的实时性
  • “缓存” 存储准实时的数据 + “异步更新” 数据到缓存