Maison > base de données > Redis > Comment utiliser Redis dans les applications distribuées Golang

Comment utiliser Redis dans les applications distribuées Golang

王林
Libérer: 2023-05-26 22:07:36
avant
885 Les gens l'ont consulté

    Text

    Redis est une base de données en mémoire hautes performances qui est souvent utilisée dans les systèmes distribués. En plus d'être un cache distribué ou une simple base de données en mémoire, il existe des scénarios d'application spéciaux. est écrit en collaboration avec Golang Le middleware correspondant.

    Verrouillage distribué

    Dans un système à machine unique, nous pouvons utiliser sync.Mutex pour protéger les ressources critiques. Il existe également une telle demande dans les systèmes distribués lorsque plusieurs hôtes s'emparent de la même ressource, des ressources supplémentaires. Le « serrure distribué » correspondant. sync.Mutex来保护临界资源,在分布式系统中同样有这样的需求,当多个主机抢占同一个资源,需要加对应的“分布式锁”。

    在Redis中我们可以通过setnx命令来实现

    • 如果key不存在可以设置对应的值,设置成功则加锁成功,key不存在返回失败

    • 释放锁可以通过del实现。

    主要逻辑如下:

    type RedisLock struct {
    	client     *redis.Client
    	key        string
    	expiration time.Duration // 过期时间,防止宕机或者异常
    }
    func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock {
    	return &RedisLock{
    		client:     client,
    		key:        key,
    		expiration: expiration,
    	}
    }
    // 加锁将成功会将调用者id保存到redis中
    func (l *RedisLock) Lock(id string) (bool, error) {
    	return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result()
    }
    const unLockScript = `
    if (redis.call("get", KEYS[1]) == KEYS[2]) then
    	redis.call("del", KEYS[1])
    	return true
    end
    return false
    `
    // 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁
    func (l *RedisLock) UnLock(id string) error {
    	_, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result()
    	if err != nil && err != redis.Nil {
    		return err
    	}
    	return nil
    }
    Copier après la connexion

    为了防止系统宕机或异常请求导致的死锁,需要添加一个额外的超时时间,该超时时间应设为最大估计运行时间的两倍。

    解锁时通过lua脚本来保证原子性,调用者只会解自己加的锁。避免由于超时造成的混乱,例如:进程A在时间t1获取了锁,但由于执行缓慢,在时间t2锁超时失效,进程B在t3获取了锁,这是如果进程A执行完去解锁会取消进程B的锁。

    运行测试

    func main() {
        client := redis.NewClient(&redis.Options{
    		Addr:     "localhost:6379",
    		Password: "123456",
    		DB:       0, // use default DB
    	})
    	lock := NewLock(client, "counter", 30*time.Second)
        counter := 0
    	worker := func(i int) {
    		for {
    			id := fmt.Sprintf("worker%d", i)
    			ok, err := lock.Lock(id)
    			log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err)
    			if !ok {
    				time.Sleep(100 * time.Millisecond)
    				continue
    			}
    			defer lock.UnLock(id)
    			counter++
    			log.Printf("worker %d, add counter %d", i, counter)
    			break
    		}
    	}
    	wg := sync.WaitGroup{}
    	for i := 1; i <= 5; i++ {
    		wg.Add(1)
    		id := i
    		go func() {
    			defer wg.Done()
    			worker(id)
    		}()
    	}
    	wg.Wait()
    }
    Copier après la connexion

    运行结果,可以看到与sync.Mutex使用效果类似

    2022/07/22 09:58:09 worker 5 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:09 worker 5, add counter 1
    2022/07/22 09:58:09 worker 4 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:10 worker 4, add counter 2
    2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:10 worker 1, add counter 3
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:10 worker 2, add counter 4
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:10 worker 3, add counter 5

    特别注意的是,在分布式Redis集群中,如果发生异常时(主节点宕机),可能会降低分布式锁的可用性,可以通过强一致性的组件etcd、ZooKeeper等实现。

    分布式过滤器

    假设要开发一个爬虫服务,爬取百万级的网页,怎么判断某一个网页是否爬取过,除了借助数据库和HashMap,我们可以借助布隆过滤器来做。相对于其他方法,布隆过滤器占用空间非常少,且插入和查询时间非常快。

    布隆过滤器用来判断某个元素是否在集合中,利用BitSet

    • 插入数据时将值进行多次Hash,将BitSet对应位置1

    • 查询时同样进行多次Hash对比所有位上是否为1,如是则存在。

    布隆过滤器有一定的误判率,不适合精确查询的场景。另外也不支持删除元素。通常适用于URL去重、垃圾邮件过滤、防止缓存击穿等场景中。

    在Redis中,我们可以使用自带的BitSet实现,同样也借助lua脚本的原子性来避免多次查询数据不一致。

    const (
    	// 插入数据,调用setbit设置对应位
    	setScript = `
    for _, offset in ipairs(ARGV) do
    	redis.call("setbit", KEYS[1], offset, 1)
    end
    `
    	// 查询数据,如果所有位都为1返回true
    	getScript = `
    for _, offset in ipairs(ARGV) do
    	if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
    		return false
    	end
    end
    return true
    `
    )
    type BloomFilter struct {
    	client *redis.Client
    	key    string // 存在redis中的key
    	bits   uint // BitSet的大小
    	maps   uint // Hash的次数
    }
    func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter {
    	client.Del(context.TODO(), key)
    	if maps == 0 {
    		maps = 14
    	}
    	return &BloomFilter{
    		key:    key,
    		client: client,
    		bits:   bits,
    		maps:   maps,
    	}
    }
    // 进行多次Hash, 得到位置列表
    func (f *BloomFilter) getLocations(data []byte) []uint {
    	locations := make([]uint, f.maps)
    	for i := 0; i < int(f.maps); i++ {
    		val := murmur3.Sum64(append(data, byte(i)))
    		locations[i] = uint(val) % f.bits
    	}
    	return locations
    }
    func (f *BloomFilter) Add(data []byte) error {
    	args := getArgs(f.getLocations(data))
    	_, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result()
    	if err != nil && err != redis.Nil {
    		return err
    	}
    	return nil
    }
    func (f *BloomFilter) Exists(data []byte) (bool, error) {
    	args := getArgs(f.getLocations(data))
    	resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result()
    	if err != nil {
    		if err == redis.Nil {
    			return false, nil
    		}
    		return false, err
    	}
    	exists, ok := resp.(int64)
    	if !ok {
    		return false, nil
    	}
    	return exists == 1, nil
    }
    func getArgs(locations []uint) []string {
    	args := make([]string, 0)
    	for _, l := range locations {
    		args = append(args, strconv.FormatUint(uint64(l), 10))
    	}
    	return args
    }
    Copier après la connexion

    运行测试

    func main() {
    	bf := NewBloomFilter(client,"bf-test", 2^16, 14)
    	exists, err := bf.Exists([]byte("test1"))
    	log.Printf("exist %t, err %v", exists, err)
    	if err := bf.Add([]byte("test1")); err != nil {
    		log.Printf("add err: %v", err)
    	}
    	exists, err = bf.Exists([]byte("test1"))
    	log.Printf("exist %t, err %v", exists, err)
    	exists, err = bf.Exists([]byte("test2"))
    	log.Printf("exist %t, err %v", exists, err)
    // output
    // 2022/07/22 10:05:58 exist false, err <nil>
    // 2022/07/22 10:05:58 exist true, err <nil>
    // 2022/07/22 10:05:58 exist false, err <nil>
    }
    Copier après la connexion

    分布式限流器

    golang.org/x/time/rate

    Dans Redis, nous pouvons y parvenir grâce à la commande setnx

    • Si la clé n'existe pas, vous pouvez définir la valeur correspondante. Si le réglage est réussi, le verrouillage sera réussi. la clé n'existe pas, elle renverra un échec

    • Les verrous de libération peuvent être implémentés via del.

    • La logique principale est la suivante :

      --- 相关Key
      --- limit rate key值,对应value为当前令牌数
      local limit_key = KEYS[1]
      --- 输入参数
      --[[
      qps: 每秒请求数;
      burst: 令牌桶容量;
      now: 当前Timestamp;
      cost: 请求令牌数;
      max_wait: 最大等待时间
      --]]
      local qps = tonumber(ARGV[1])
      local burst = tonumber(ARGV[2])
      local now = ARGV[3]
      local cost = tonumber(ARGV[4])
      local max_wait = tonumber(ARGV[5])
      --- 获取redis中的令牌数
      local tokens = redis.call("hget", limit_key, "token")
      if not tokens then
      	tokens = burst
      end
      --- 上次修改时间
      local last_time = redis.call("hget", limit_key, "last_time")
      if not last_time then
      	last_time = 0
      end
      --- 最新等待时间
      local last_event = redis.call("hget", limit_key, "last_event")
      if not last_event then
      	last_event = 0
      end
      --- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数
      local delta = math.max(0, now-last_time)
      local new_tokens = math.min(burst, delta * qps + tokens)
      new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌
      --- 如果最新令牌数小于0,计算需要等待的时间
      local wait_period = 0
      if new_tokens < 0 and qps > 0 then
      	wait_period = wait_period - new_tokens / qps
      end
      wait_period = math.ceil(wait_period)
      local time_act = now + wait_period --- 满足等待间隔的时间戳
      --- 允许请求有两种情况
      --- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求
      --- qps为0时,只要最新令牌数不小于0即可
      local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)
      --- 设置对应值
      if ok then
      	redis.call("set", limit_key, new_tokens)
      	redis.call("set", last_time_key, now)
      	redis.call("set", last_event_key, time_act)
      end
      --- 返回列表,{是否允许, 等待时间}
      return {ok, wait_period}
      Copier après la connexion
      Copier après la connexion

      Afin d'éviter un blocage causé par un temps d'arrêt du système ou des requêtes anormales, un délai d'attente supplémentaire doit être ajouté, qui doit être défini sur deux fois la durée d'exécution maximale estimée. 🎜🎜Utilisez le script Lua pour garantir l'atomicité lors du déverrouillage. L'appelant ne déverrouillera que le verrou ajouté par lui-même. Évitez toute confusion causée par le délai d'attente. Par exemple : Le processus A a acquis le verrou au moment t1, mais en raison de la lenteur de l'exécution, le processus a expiré au moment t2. Si le processus A termine son exécution et déverrouille le processus, le processus sera annulé. 🎜

      Exécutez le test

      // 调用lua脚本
      func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) {
      	// ...
      	res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result()
      	if err != nil && err != redis.Nil {
      		return nil, err
      	}
      	//...
      	return &Reservation{
      		ok:        allow == 1,
      		lim:       lim,
      		tokens:    n,
      		timeToAct: now.Add(time.Duration(wait) * time.Second),
      	}, nil
      }
      Copier après la connexion
      Copier après la connexion
      🎜Le résultat en cours d'exécution, vous pouvez voir que l'effet est similaire à l'utilisation de sync.Mutex🎜
      🎜2022/07/22 09 :58:09 le travailleur 5 tente d'obtenir le verrou, ok : vrai, erreur :
      2022/07/22 09:58:09 le travailleur 5, ajoute le compteur 1
      2022/07/22 09:58:09 le travailleur 4 tente d'obtenir le verrou, ok : faux, erreur :
      2022/07/22 09:58:09 le travailleur 1 tente d'obtenir le verrou, ok : faux, erreur : < ;nil>
      2022/07/22 09:58:09 travailleur 2 tentative d'obtention du verrou, ok : faux, erreur :
      2022/07/22 09:58:09 travailleur 3 tentative d'obtention du verrou, ok : faux, erreur :
      2022/07/22 09:58:10 travailleur 3 tentative d'obtention du verrou, ok : faux, erreur :
      2022/07/22 09 : 58:10 le travailleur 1 tente d'obtenir le verrou, ok : faux, erreur :
      2022/07/22 09:58:10 le travailleur 2 tente d'obtenir le verrou, ok : faux, erreur : 2022/07/22 09:58:10 travailleur 4 tentative d'obtention du verrou, ok : vrai, erreur :
      2022/07/22 09 : 58:10 travailleur 4, ajouter le compteur 2
      2022/07/22 09:58:10 travailleur 1 tentative d'obtention du verrou, ok : vrai, erreur :
      2022/07/22 09 :58:10 travailleur 1, ajouter le compteur 3
      2022/07/22 09:58:10 travailleur 3 tentative d'obtention du verrou, ok : faux, erreur :
      2022/07/22 09:58:10 le travailleur 2 tente d'obtenir le verrou, ok : faux, erreur :
      2022/07/22 09:58:10 le travailleur 2 tente d'obtenir le verrou, ok : vrai, erreur : < ;nil>
      2022/07/22 09:58:10 travailleur 2, ajouter le compteur 4
      2022/07/22 09:58:10 travailleur 3 tentative d'obtention du verrou, ok : faux, erreur :
      2022/07/22 09:58:10 travailleur 3 tentative d'obtention du verrou, ok : vrai, erreur :
      2022/07/22 09:58:10 travailleur 3, ajoutez le compteur 5🎜< /blockquote>🎜Une attention particulière est que dans un cluster Redis distribué, si une exception se produit (le nœud maître est en panne), la disponibilité des verrous distribués peut être réduite, ce qui peut être obtenu grâce à des composants de cohérence forts comme etcd et ZooKeeper. 🎜🎜Filtre distribué🎜🎜Supposons que nous souhaitions développer un service d'exploration pour explorer des millions de pages Web. Comment déterminer si une certaine page Web a été explorée En plus d'utiliser des bases de données et HashMap, nous pouvons utiliser des filtres Bloom pour le faire ? Par rapport aux autres méthodes, les filtres Bloom prennent très peu de place et ont des temps d'insertion et de requête très rapides. 🎜🎜Le filtre Bloom est utilisé pour déterminer si un élément est dans l'ensemble. Lors de l'insertion de données à l'aide de BitSet🎜🎜🎜🎜, la valeur est hachée plusieurs fois et la position correspondante de BitSet est 1🎜🎜🎜🎜Lors de l'interrogation, le la valeur est hachée plusieurs fois pour comparer tout. Si le bit est 1, si c'est le cas, il existe. 🎜🎜🎜🎜 Le filtre Bloom a un certain taux d'erreur d'évaluation et ne convient pas aux scénarios de requêtes précis. De plus, la suppression d'éléments n'est pas prise en charge. Il est généralement utilisé dans des scénarios tels que la déduplication d'URL, le filtrage du spam et la prévention des pannes de cache. 🎜🎜Dans Redis, nous pouvons utiliser l'implémentation BitSet intégrée, ainsi que l'atomicité des scripts Lua pour éviter plusieurs incohérences de données de requête. 🎜
      func main() {
      	rdb := redis.NewClient(&redis.Options{
      		Addr:     "localhost:6379",
      		Password: "123456",
      		DB:       0, // use default DB
      	})
      	r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
      	if err != nil {
      		log.Fatal(err)
      	}
      	r.Reset()
      	for i := 0; i < 5; i++ {
      		err := r.Wait(context.TODO())
      		log.Printf("worker %d allowed: %v", i, err)
      	}
      }
      // output
      // 2022/07/22 12:50:31 worker 0 allowed: <nil>
      // 2022/07/22 12:50:31 worker 1 allowed: <nil>
      // 2022/07/22 12:50:32 worker 2 allowed: <nil>
      // 2022/07/22 12:50:33 worker 3 allowed: <nil>
      // 2022/07/22 12:50:34 worker 4 allowed: <nil>
      Copier après la connexion
      Copier après la connexion

      Exécuter le test

      rrreee🎜Le limiteur de débit distribué🎜🎜 fournit un limiteur de débit basé sur un compartiment de jetons dans le package golang.org/x/time/rate. Si vous souhaitez implémentez la limitation de courant dans un environnement distribué, vous pouvez l'implémenter sur la base de scripts Redis Lua. 🎜🎜Le principe principal du seau à jetons est le suivant : 🎜🎜🎜🎜 Supposons que la capacité d'un seau à jetons explose et que des jetons y sont placés à raison de qps chaque seconde🎜🎜🎜🎜Les jetons sont initialement remplis avec des jetons, et si le jeton déborde, il sera rempli directement. Rejeté, lors de la demande d'un jeton, s'il y a suffisamment de jetons dans le seau, c'est autorisé, sinon il est rejeté🎜🎜🎜🎜Quand burst==qps, le flux la limite est strictement conforme à qps ; lorsque burst>qps, une certaine rafale de trafic peut être autorisée🎜

    这里主要参考了官方rate包的实现,将核心逻辑改为Lua实现。

    --- 相关Key
    --- limit rate key值,对应value为当前令牌数
    local limit_key = KEYS[1]
    --- 输入参数
    --[[
    qps: 每秒请求数;
    burst: 令牌桶容量;
    now: 当前Timestamp;
    cost: 请求令牌数;
    max_wait: 最大等待时间
    --]]
    local qps = tonumber(ARGV[1])
    local burst = tonumber(ARGV[2])
    local now = ARGV[3]
    local cost = tonumber(ARGV[4])
    local max_wait = tonumber(ARGV[5])
    --- 获取redis中的令牌数
    local tokens = redis.call("hget", limit_key, "token")
    if not tokens then
    	tokens = burst
    end
    --- 上次修改时间
    local last_time = redis.call("hget", limit_key, "last_time")
    if not last_time then
    	last_time = 0
    end
    --- 最新等待时间
    local last_event = redis.call("hget", limit_key, "last_event")
    if not last_event then
    	last_event = 0
    end
    --- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数
    local delta = math.max(0, now-last_time)
    local new_tokens = math.min(burst, delta * qps + tokens)
    new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌
    --- 如果最新令牌数小于0,计算需要等待的时间
    local wait_period = 0
    if new_tokens < 0 and qps > 0 then
    	wait_period = wait_period - new_tokens / qps
    end
    wait_period = math.ceil(wait_period)
    local time_act = now + wait_period --- 满足等待间隔的时间戳
    --- 允许请求有两种情况
    --- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求
    --- qps为0时,只要最新令牌数不小于0即可
    local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)
    --- 设置对应值
    if ok then
    	redis.call("set", limit_key, new_tokens)
    	redis.call("set", last_time_key, now)
    	redis.call("set", last_event_key, time_act)
    end
    --- 返回列表,{是否允许, 等待时间}
    return {ok, wait_period}
    Copier après la connexion
    Copier après la connexion

    在Golang中的相关接口Allow、AllowN、Wait等都是通过调用reserveN实现

    // 调用lua脚本
    func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) {
    	// ...
    	res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result()
    	if err != nil && err != redis.Nil {
    		return nil, err
    	}
    	//...
    	return &Reservation{
    		ok:        allow == 1,
    		lim:       lim,
    		tokens:    n,
    		timeToAct: now.Add(time.Duration(wait) * time.Second),
    	}, nil
    }
    Copier après la connexion
    Copier après la connexion

    运行测试

    func main() {
    	rdb := redis.NewClient(&redis.Options{
    		Addr:     "localhost:6379",
    		Password: "123456",
    		DB:       0, // use default DB
    	})
    	r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
    	if err != nil {
    		log.Fatal(err)
    	}
    	r.Reset()
    	for i := 0; i < 5; i++ {
    		err := r.Wait(context.TODO())
    		log.Printf("worker %d allowed: %v", i, err)
    	}
    }
    // output
    // 2022/07/22 12:50:31 worker 0 allowed: <nil>
    // 2022/07/22 12:50:31 worker 1 allowed: <nil>
    // 2022/07/22 12:50:32 worker 2 allowed: <nil>
    // 2022/07/22 12:50:33 worker 3 allowed: <nil>
    // 2022/07/22 12:50:34 worker 4 allowed: <nil>
    Copier après la connexion
    Copier après la connexion

    前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。

    其他

    Redis还可用于全局计数、去重以及发布订阅等不同情境。参考Redis官方提供的模块,可以通过加载这些模块实现过滤、限流等特性。

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Étiquettes associées:
    source:yisu.com
    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
    Tutoriels populaires
    Plus>
    Derniers téléchargements
    Plus>
    effets Web
    Code source du site Web
    Matériel du site Web
    Modèle frontal