Redis is a high-performance in-memory database that is often used in distributed systems. In addition to being a distributed cache or a simple memory The database also has some special application scenarios. This article combines Golang to write the corresponding middleware.
In a stand-alone system, we can use sync.Mutex
to protect critical resources. There is also such a need in a distributed system. When multiple hosts To seize the same resource, you need to add the corresponding "distributed lock".
In Redis we can achieve this through the setnx
command
If the key does not exist, the corresponding value can be set. If the setting is successful, the lock will be successful. , key does not exist and returns failure
Releasing the lock can be achieved through del
.
The main logic is as follows:
type RedisLock struct { client *redis.Client key string expiration time.Duration // 过期时间,防止宕机或者异常 } func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock { return &RedisLock{ client: client, key: key, expiration: expiration, } } // 加锁将成功会将调用者id保存到redis中 func (l *RedisLock) Lock(id string) (bool, error) { return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result() } const unLockScript = ` if (redis.call("get", KEYS[1]) == KEYS[2]) then redis.call("del", KEYS[1]) return true end return false ` // 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁 func (l *RedisLock) UnLock(id string) error { _, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result() if err != nil && err != redis.Nil { return err } return nil }
In order to prevent system downtime or deadlock caused by abnormal requests, an additional timeout period needs to be added, which should be set to Twice the maximum estimated run time.
Lua script is used to ensure atomicity when unlocking. The caller will only unlock the lock added by itself. Avoid confusion caused by timeout. For example: Process A acquired the lock at time t1, but due to slow execution, the lock timed out at time t2. Process B acquired the lock at t3. If process A finishes executing and unlocks the process, the process will be canceled. B's lock.
func main() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) lock := NewLock(client, "counter", 30*time.Second) counter := 0 worker := func(i int) { for { id := fmt.Sprintf("worker%d", i) ok, err := lock.Lock(id) log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err) if !ok { time.Sleep(100 * time.Millisecond) continue } defer lock.UnLock(id) counter++ log.Printf("worker %d, add counter %d", i, counter) break } } wg := sync.WaitGroup{} for i := 1; i <= 5; i++ { wg.Add(1) id := i go func() { defer wg.Done() worker(id) }() } wg.Wait() }
The running result shows that the effect is similar to sync.Mutex
2022/07/22 09: 58:09 worker 5 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58 :09 worker 4 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok : false, err:
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
2022/07/22 09:58 :10 worker 1 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false , err:
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err:
2022/07/22 09:58:10 worker 3, add counter 5
Special attention is that, in In a distributed Redis cluster, if an exception occurs (the master node is down), the availability of distributed locks may be reduced, which can be achieved through strong consistency components such as etcd and ZooKeeper.
Suppose we want to develop a crawler service to crawl millions of web pages. How to judge whether a certain web page has been crawled? In addition to using the database and HashMap, we can use Bloom filter is used to do it. Relative to other methods, Bloom filters take up very little space and have very fast insertion and query times.
The Bloom filter is used to determine whether an element is in the set, using BitSet
When inserting data, the value is hashed multiple times, and the BitSet corresponding position is 1
When querying, Hash is also performed multiple times to compare whether all bits are 1. If so, it exists.
The Bloom filter has a certain misjudgment rate and is not suitable for precise query scenarios. In addition, deleting elements is not supported. It is usually used in scenarios such as URL deduplication, spam filtering, and cache breakdown prevention.
In Redis, we can use the built-in BitSet implementation, and also use the atomicity of lua scripts to avoid multiple query data inconsistencies.
const ( // 插入数据,调用setbit设置对应位 setScript = ` for _, offset in ipairs(ARGV) do redis.call("setbit", KEYS[1], offset, 1) end ` // 查询数据,如果所有位都为1返回true getScript = ` for _, offset in ipairs(ARGV) do if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then return false end end return true ` ) type BloomFilter struct { client *redis.Client key string // 存在redis中的key bits uint // BitSet的大小 maps uint // Hash的次数 } func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter { client.Del(context.TODO(), key) if maps == 0 { maps = 14 } return &BloomFilter{ key: key, client: client, bits: bits, maps: maps, } } // 进行多次Hash, 得到位置列表 func (f *BloomFilter) getLocations(data []byte) []uint { locations := make([]uint, f.maps) for i := 0; i < int(f.maps); i++ { val := murmur3.Sum64(append(data, byte(i))) locations[i] = uint(val) % f.bits } return locations } func (f *BloomFilter) Add(data []byte) error { args := getArgs(f.getLocations(data)) _, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result() if err != nil && err != redis.Nil { return err } return nil } func (f *BloomFilter) Exists(data []byte) (bool, error) { args := getArgs(f.getLocations(data)) resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result() if err != nil { if err == redis.Nil { return false, nil } return false, err } exists, ok := resp.(int64) if !ok { return false, nil } return exists == 1, nil } func getArgs(locations []uint) []string { args := make([]string, 0) for _, l := range locations { args = append(args, strconv.FormatUint(uint64(l), 10)) } return args }
func main() { bf := NewBloomFilter(client,"bf-test", 2^16, 14) exists, err := bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) if err := bf.Add([]byte("test1")); err != nil { log.Printf("add err: %v", err) } exists, err = bf.Exists([]byte("test1")) log.Printf("exist %t, err %v", exists, err) exists, err = bf.Exists([]byte("test2")) log.Printf("exist %t, err %v", exists, err) // output // 2022/07/22 10:05:58 exist false, err <nil> // 2022/07/22 10:05:58 exist true, err <nil> // 2022/07/22 10:05:58 exist false, err <nil> }
Token-based is provided in the golang.org/x/time/rate
package Bucket current limiter, if you want to implement current limiting in a distributed environment, you can implement it based on the Redis Lua script.
The main principle of the token bucket is as follows:
Assume that the capacity of a token bucket is burst, and tokens are placed in it at the rate of qps per second
Initially, the tokens are filled. If the tokens overflow, they will be discarded directly. When requesting tokens, if there are enough tokens in the bucket, it will be allowed, otherwise it will be rejected.
When burst==qps, strictly follow the qps flow limit; when burst>qps, a certain burst of traffic can be allowed
这里主要参考了官方rate
包的实现,将核心逻辑改为Lua实现。
--- 相关Key --- limit rate key值,对应value为当前令牌数 local limit_key = KEYS[1] --- 输入参数 --[[ qps: 每秒请求数; burst: 令牌桶容量; now: 当前Timestamp; cost: 请求令牌数; max_wait: 最大等待时间 --]] local qps = tonumber(ARGV[1]) local burst = tonumber(ARGV[2]) local now = ARGV[3] local cost = tonumber(ARGV[4]) local max_wait = tonumber(ARGV[5]) --- 获取redis中的令牌数 local tokens = redis.call("hget", limit_key, "token") if not tokens then tokens = burst end --- 上次修改时间 local last_time = redis.call("hget", limit_key, "last_time") if not last_time then last_time = 0 end --- 最新等待时间 local last_event = redis.call("hget", limit_key, "last_event") if not last_event then last_event = 0 end --- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数 local delta = math.max(0, now-last_time) local new_tokens = math.min(burst, delta * qps + tokens) new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌 --- 如果最新令牌数小于0,计算需要等待的时间 local wait_period = 0 if new_tokens < 0 and qps > 0 then wait_period = wait_period - new_tokens / qps end wait_period = math.ceil(wait_period) local time_act = now + wait_period --- 满足等待间隔的时间戳 --- 允许请求有两种情况 --- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求 --- qps为0时,只要最新令牌数不小于0即可 local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0) --- 设置对应值 if ok then redis.call("set", limit_key, new_tokens) redis.call("set", last_time_key, now) redis.call("set", last_event_key, time_act) end --- 返回列表,{是否允许, 等待时间} return {ok, wait_period}
在Golang中的相关接口Allow、AllowN、Wait等都是通过调用reserveN实现
// 调用lua脚本 func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) { // ... res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result() if err != nil && err != redis.Nil { return nil, err } //... return &Reservation{ ok: allow == 1, lim: lim, tokens: n, timeToAct: now.Add(time.Duration(wait) * time.Second), }, nil }
func main() { rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, // use default DB }) r, err := NewRedisLimiter(rdb, 1, 2, "testrate") if err != nil { log.Fatal(err) } r.Reset() for i := 0; i < 5; i++ { err := r.Wait(context.TODO()) log.Printf("worker %d allowed: %v", i, err) } } // output // 2022/07/22 12:50:31 worker 0 allowed: <nil> // 2022/07/22 12:50:31 worker 1 allowed: <nil> // 2022/07/22 12:50:32 worker 2 allowed: <nil> // 2022/07/22 12:50:33 worker 3 allowed: <nil> // 2022/07/22 12:50:34 worker 4 allowed: <nil>
前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。
Redis还可用于全局计数、去重以及发布订阅等不同情境。参考Redis官方提供的模块,可以通过加载这些模块实现过滤、限流等特性。
The above is the detailed content of How to use Redis in Golang distributed applications. For more information, please follow other related articles on the PHP Chinese website!