2024-08-14 15:20:58
参考链接:
- gin-cache
- beego-cache
- bigcache
- 分布式多级缓存SDK设计的思考
- 实战干货–分布式多级缓存设计方案
- 常见缓存更新策略
- jetcache-go多级缓存
- 统一缓存帝国 - 实战 Spring Cache
- Golang 高性能无 GC 的缓存库 bigcache 是怎么实现的?
- 请收藏!golang本地缓存选型对比及原理总结
- Go 语言分布式缓存 Groupcache – 用法,源码深度解读
- spring boot cache 多级缓存
- 鹅厂微创新Golang缓存组件TCache介绍
- 使用go设置多缓存驱动
- kratos中使用rockscache介绍
- 分布式多级缓存系统设计与实战
- jetcache
- 使用Go设计多缓存驱动
- 多缓存架构设计
- gocache
开源方案调研
gin-cache
gin cache 中使用统一的 cache store 接口来进行本地和redis的缓存设置。关键核心代码如下:
https://github.com/gin-contrib/cache/master/blob/persistence/cache_store.go
type CacheStore interface {
// Get retrieves an item from the cache. Returns the item or nil, and a bool indicating
// whether the key was found.
Get(key string, value interface{}) error
// Set sets an item to the cache, replacing any existing item.
Set(key string, value interface{}, expire time.Duration) error
// Add adds an item to the cache only if an item doesn't already exist for the given
// key, or if the existing item has expired. Returns an error otherwise.
Add(key string, value interface{}, expire time.Duration) error
// Replace sets a new value for the cache key only if it already exists. Returns an
// error if it does not.
Replace(key string, data interface{}, expire time.Duration) error
// Delete removes an item from the cache. Does nothing if the key is not in the cache.
Delete(key string) error
// Increment increments a real number, and returns error if the value is not real
Increment(key string, data uint64) (uint64, error)
// Decrement decrements a real number, and returns error if the value is not real
Decrement(key string, data uint64) (uint64, error)
// Flush deletes all items from the cache.
Flush() error
}
gin-cache 通过统一定义缓存接口的方式实现缓存抽象,但是并没有进行统一的缓存管理,更没有做多级缓存管理,没有解决缓存穿透、雪崩、击穿等问题。
jetcache-go
jetcache 在多级缓存上进行了进一步的缓存封装,并允许通过自定义函数的方式进行,调用下级函数进行实际的值填充。
cache 核心接口代码
// https://github.com/mgtv-tech/jetcache-go/blob/7570bd5d950b4e016cf241b45e3536c50e49cd56/cache.go
type (
// Cache interface is used to define the cache implementation.
Cache interface {
// Set sets the cache with ItemOption
Set(ctx context.Context, key string, opts ...ItemOption) error
// Once gets the opts.value for the given key from the cache or
// executes, caches, and returns the results of the given opts.do,
// making sure that only one execution is in-flight for a given key
// at a time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
Once(ctx context.Context, key string, opts ...ItemOption) error
// Delete deletes cached val with key.
Delete(ctx context.Context, key string) error
// DeleteFromLocalCache deletes local cached val with key.
DeleteFromLocalCache(key string)
// Exists reports whether val for the given key exists.
Exists(ctx context.Context, key string) bool
// Get gets the val for the given key and fills into val.
Get(ctx context.Context, key string, val any) error
// GetSkippingLocal gets the val for the given key skipping local cache.
GetSkippingLocal(ctx context.Context, key string, val any) error
// TaskSize returns Refresh task size.
TaskSize() int
// CacheType returns cache type
CacheType() string
// Close closes the cache. This should be called when cache refreshing is
// enabled and no longer needed, or when it may lead to resource leaks.
Close()
}
jetCache struct {
sync.Mutex // 分布式锁
Options // 选择函数
group singleflight.Group // 避免同时执行重复的函数
safeRand *util.SafeRand // 线程安全的随机数生成器
refreshTaskMap sync.Map // 更新任务map
eventCh chan *Event // 事件管道
stopChan chan struct{} // 停止管道
}
)
其中 Options
使用了操作链模式,进行了依赖反转,可以通过函数链,进行快速的参数设置,值得学习, 定义如下:
type (
// Options are used to store cache options.
Options struct {
name string // Cache name, used for log identification and metric reporting
remote remote.Remote // Remote is distributed cache, such as Redis.
local local.Local // Local is memory cache, such as FreeCache.
codec string // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
errNotFound error // Error to return for cache miss. Used to prevent cache penetration.
remoteExpiry time.Duration // Remote cache ttl, Default is 1 hour.
notFoundExpiry time.Duration // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
offset time.Duration // Expiration time jitter factor for cache misses.
refreshDuration time.Duration // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
stopRefreshAfterLastAccess time.Duration // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
refreshConcurrency int // Maximum number of concurrent cache refreshes. Default is 4.
statsDisabled bool // Flag to disable cache statistics.
statsHandler stats.Handler // Metrics statsHandler collector.
sourceID string // Unique identifier for cache instance.
syncLocal bool // Enable events for syncing local cache (only for "Both" cache type).
eventChBufSize int // Buffer size for event channel (default: 100).
eventHandler func(event *Event) // Function to handle local cache invalidation events.
}
// Option defines the method to customize an Options.
Option func(o *Options) // Option 修改函数
)
func WithName(name string) Option {
return func(o *Options) {
o.name = name
}
}
func WithRemote(remote remote.Remote) Option {
return func(o *Options) {
o.remote = remote
}
}
func WithLocal(local local.Local) Option {
return func(o *Options) {
o.local = local
}
}
func newOptions(opts ...Option) Options {
var o Options
for _, opt := range opts {
opt(&o)
}
}
// 使用代码示例如下:
// o := newOptions(
// WithOffset(30 * time.Second),
// WithRefreshConcurrency(16),
// WithCodec(json.Name),
// )
下面让我们以值插入和查询为例进行核心的代码分析:
- 写入示例, 对应的写入示例如下:
func Example_advancedUsage() {
// 创建redis ring
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"localhost": ":6379",
},
})
// 创建cache
mycache := cache.New(
cache.WithName("any"), // 指定名称
cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)), // 添加远程redis缓存
cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)), // 添加本地缓存
cache.WithErrNotFound(errRecordNotFound), // 指定无数据异常
cache.WithRefreshDuration(time.Minute), // 指定刷新时间
)
ctx := context.TODO()
key := "mykey:1"
obj := new(object)
// 查询对应值
if err := mycache.Once(
ctx,
key,
cache.Value(obj), // 设置值类型
cache.TTL(time.Hour), // 设置TTL时间
cache.Refresh(true), // 是否进行刷新
cache.Do(
func(ctx context.Context) (any, error) {
return mockDBGetObject(1)
}), // 值处理函数--当出现缓存not found 时调用此函数
); err != nil {
panic(err)
}
// 输出最终对象
fmt.Println(obj)
// Output: &{mystring 42}
// 关闭缓存块
mycache.Close()
}
- once 函数实现:使用once 函数进行值设置和读取。核心关键代码如下:
// https://github.com/mgtv-tech/jetcache-go/blob/7570bd5d950b4e016cf241b45e3536c50e49cd56/cache.go
func (c *jetCache) Once(ctx context.Context, key string, opts ...ItemOption) error {
// 创建新的item
item := newItemOptions(ctx, key, opts...)
// 更新刷线任务
c.addOrUpdateRefreshTask(item)
// 设置item
b, cached, err := c.getSetItemBytesOnce(item)
if err != nil {
return err
}
if bytes.Compare(b, notFoundPlaceholder) == 0 {
return c.errNotFound
}
if item.value == nil || len(b) == 0 {
return nil
}
if err := c.Unmarshal(b, item.value); err != nil {
// 解析异常的,需要先进行删除
// 再次尝试获取
if cached {
_ = c.Delete(ctx, item.key)
return c.Once(ctx, key, opts...)
}
return err
}
return nil
}
这里是整体的获取逻辑,关键在于获取失败的会自动进行重试。值查询的核心在于getSetItemBytesOnce
进行核心的值查询
-
getSetItemBytesOnce
兼顾了值查询与写入,核心的处理逻辑如下:
func (c *jetCache) getSetItemBytesOnce(item *item) (b []byte, cached bool, err error) {
if !item.skipLocal && c.local != nil {
// 先统local 进行值查询
b, ok := c.local.Get(item.key)
// 更新对应命中率
if ok {
c.statsHandler.IncrHit()
c.statsHandler.IncrLocalHit()
if bytes.Compare(b, notFoundPlaceholder) == 0 {
return nil, true, c.errNotFound
}
return b, true, nil
}
}
// 使用group.Do 保证一次只有一个再执行
v, err, _ := c.group.Do(
item.key, // key 用于锁住函数
// 核心的执行函数
func() (any, error) {
// 查询对应值--
b, err := c.getBytes(item.Context(), item.key, item.skipLocal)
if err == nil {
cached = true
return b, nil
} else if errors.Is(err, c.errNotFound) {
cached = true
return nil, c.errNotFound
}
// 缓存中不存在该值--缓存穿透了--执行对应是饥饿中加密手机
b, ok, err := c.set(item)
if ok {
c.send(EventTypeSetByOnce, item.key)
return b, nil
}
return nil, err
})
if err != nil {
return nil, false, err
}
return v.([]byte), cached, nil
}
这里核心还是去调用了set 函数,进行值的更新和设置,getValue 会主动调用Do
钩子,通过用户注入函数,进行值的获取
func (c *jetCache) set(item *item) ([]byte, bool, error) {
// 先查询对象是否存在
val, err := item.getValue()
// 设置函数非空,新增参数
if item.do != nil {
c.statsHandler.IncrQuery()
}
if c.IsNotFound(err) {
if e := c.setNotFound(item.Context(), item.key, item.skipLocal); e != nil {
logger.Error("setNotFound(%s) error(%v)", item.key, err)
}
return notFoundPlaceholder, true, nil
} else if err != nil {
c.statsHandler.IncrQueryFail(err)
return nil, false, err
}
b, err := c.Marshal(val)
if err != nil {
return nil, false, err
}
// 设置本地缓存
if c.local != nil && !item.skipLocal {
c.local.Set(item.key, b)
}
if c.remote == nil {
if c.local == nil {
return b, true, ErrRemoteLocalBothNil
}
return b, true, nil
}
// 查询剩余存在时间
ttl := item.getTtl(c.remoteExpiry)
if ttl == 0 {
return b, true, nil
}
// 设置远端缓存
if item.setXX {
_, err := c.remote.SetXX(item.Context(), item.key, b, ttl)
return b, true, err
}
if item.setNX {
_, err := c.remote.SetNX(item.Context(), item.key, b, ttl)
return b, true, err
}
return b, true, c.remote.SetEX(item.Context(), item.key, b, ttl)
}
func (item *item) getValue() (any, error) {
if item.do != nil {
// 执行数据处理--使用用户注入钩子进行处理
return item.do(item.Context())
}
if item.value != nil {
return item.value, nil
}
return nil, nil
}
spring cloud cache
gocache
本地缓存方案
本地缓存直接使用内存存储即可,添加使用redis订阅发布,增加分布式同步功能。
核心业务处理
缓存查询
- 分级查询
- 查询到后更新前几级的值
- 最终查询不到,直接异常
缓存更新
- 更新最后一层缓存
- 删除二级缓存
- 删除一级缓存
缓存添加
- 添加二级缓存
缓存删除
存在问题
缓存击穿
热点数据过期,导致无法从缓存中访问数据,直接从数据库中访问。导致数据库被请求冲垮。
解决方案
- 互斥锁:保证同一时间只有一个业务线程更新缓存,未能获取互斥锁的请求。等待锁释放后重新读取缓存。要么返回空值或者默认值
- 不设置过期时间:由后台异步更新缓存,在热点数据要过期前,提前通知后台线程更新缓存以及重新设置过期时间。
缓存雪崩
缓存数据同一时间大量过期,导致数据大量击中mysql导致异常。
解决方案
-
均匀设置过期时间 给缓存数据设置过期时间,可以在对数据设置过期时间时,添加随机数,保证数据不会在同一时间过期
-
互斥锁
发现访问数据不在Redis里,就加个互斥锁,保证同一时间内只有一个请求来构建缓存(从数据库读取数据,再将数据更新到redis中)。构建完成后再进行锁释放。未能获取互斥锁的请求,等锁释放后重新读取缓存。
- 后台更新缓存
- 设置缓存“永久有效”,并将更新缓存的工作交给后台线程定时更新
- 通过消息队列发送缓存失效数据,保证缓存失效后,重新进行更新。即:缓存预热
缓存穿透
用户访问的数据,既不在缓存中,也不再数据库中。导致请求在访问缓存时,发现缓存缺失。访问数据库中也没有构建缓存数据。大量请求时导致缓存数据库压力增加。
一级缓存(本地缓存)一致性问题
使用一级缓存后,在分布式场景下,存在redis数据已经更新,但是另外一个实例的一级缓存并未更新的情况。导致本地缓存与redis数据不一致。
解决方法
使用redis发布订阅模式,实现多级缓存key的删除。
一级缓存(本地缓存)数据污染问题
缓存污染问题指的是留存在缓存中的数据,实际不会再被访问了,但是又占据了缓存空间。
热key问题
部分数据访问量较高,缓存过期之后,会存在瞬间的穿透。
解决方案
- 设置查询延期,针对不同层的参数,可以指定key的默认存在时间。每次查询后,对重新增强当前key的过期时间
- 查询频率延期:在查询延期的基础上,设置查询频率延期的策略,越是高热点数据,默认缓存延期时间越长
解决方法
设置本地缓存超时时间,对超时的缓存进行清理。
使用定时清理的方式进行缓存清除
综合解决方案
- 缓存雪崩:涉及到key过期问题,应由使用方自行解决。
- key过期时间梯度设置:将key的过期时间进行梯度设置,保证前一层的过期时间小于后一层。进行层层拦截
- 随机过期时间:使用参数,决定是否加入随机过期时间,防止缓存雪崩。
- 缓存穿透:
- 多层缓存中,key 设置下层对应的miss 次数和limit_qps, 当miss 次数超过limit下一层的limit 时不再进行,下层访问:而是直接返回empty
- 层访问qos limit: 每层设置下一层的访问与当前访问qps, qps limit。超过此限制时,直接返回异常
- 最大QPS: mysql: 1800, redis(ba): 100,000(3-5万)。单机(nginx):120w~150w,
-
缓存击穿:互斥锁使用,保证同一时间只有一个会进行更新,加层限制后避免
- 本地缓存一致性:
- 使用redis发布订阅,删除本地缓存。
- 给本地缓存设置一个较短的生存时间,尽量降低污染影响
- 本地缓存选型:
综合考虑,本地缓存选型为freecache 与ristretto 缓存
- 层级锁:
- 更新(删除/更新)下级缓存时,需要先拿到层级锁后再进行操作。避免本地线程争抢问题