Maison > base de données > Redis > Un article pour parler de la stratégie de limitation actuelle dans Redis

Un article pour parler de la stratégie de limitation actuelle dans Redis

青灯夜游
Libérer: 2021-12-30 10:16:43
avant
1963 Les gens l'ont consulté

Cet article vous amènera à comprendre la limitation actuelle dans Redis et à présenter la stratégie simple de limitation de courant et la limitation de courant en entonnoir. J'espère que cela sera utile à tout le monde !

Un article pour parler de la stratégie de limitation actuelle dans Redis

1. Limitation de courant simple

Principes de base

Lorsque la capacité de traitement du système est limitée, comment organiser des demandes imprévues pour faire pression sur le système. Tout d’abord, examinons quelques stratégies simples de limitation du courant pour empêcher les attaques par force brute. Par exemple, si vous souhaitez accéder à une IP, vous ne pouvez y accéder que 10 fois toutes les 5 secondes, et si elle dépasse la limite, elle sera bloquée. [Recommandations associées : Tutoriel vidéo Redis]

Un article pour parler de la stratégie de limitation actuelle dans Redis

Comme indiqué ci-dessus, une fenêtre glissante est généralement utilisée pour compter le nombre de visites dans un intervalle. Utilisez zset pour enregistrer le nombre de visites IP. Chaque IP est enregistré via la clé et le score<.> Enregistrez l'horodatage actuel, <code>value n'est implémenté qu'avec l'horodatage ou l'UUIDzset 记录 IP 访问次数,每个 IP 通过 key 保存下来,score 保存当前时间戳,value 唯一用时间戳或者UUID来实现

代码实现

public class RedisLimiterTest {
    private Jedis jedis;

    public RedisLimiterTest(Jedis jedis) {
        this.jedis = jedis;
    }

    /**
     * @param ipAddress Ip地址
     * @param period    特定的时间内,单位秒
     * @param maxCount  最大允许的次数
     * @return
     */
    public boolean isIpLimit(String ipAddress, int period, int maxCount) {
        String key = String.format("ip:%s", ipAddress);
        // 毫秒时间戳
        long currentTimeMillis = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        // redis事务,保证原子性
        pipe.multi();
        // 存放数据,value 和 score 都使用毫秒时间戳
        pipe.zadd(key, currentTimeMillis, "" + UUID.randomUUID());
        // 移除窗口区间所有的元素
        pipe.zremrangeByScore(key, 0, currentTimeMillis - period * 1000);
        // 获取时间窗口内的行为数量
        Response<Long> count = pipe.zcard(key);
        // 设置 zset 过期时间,避免冷用户持续占用内存,这里宽限1s
        pipe.expire(key, period + 1);
        // 提交事务
        pipe.exec();
        pipe.close();
        // 比较数量是否超标
        return count.get() > maxCount;
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        RedisLimiterTest limiter = new RedisLimiterTest(jedis);
        for (int i = 1; i <= 20; i++) {
            // 验证IP  10秒钟之内只能访问5次
            boolean isLimit = limiter.isIpLimit("222.73.55.22", 10, 5);
            System.out.println("访问第" + i + "次, 结果:" + (isLimit ? "限制访问" : "允许访问"));
        }
    }
}
Copier après la connexion

执行结果

访问第1次, 结果:允许访问
访问第2次, 结果:允许访问
访问第3次, 结果:允许访问
访问第4次, 结果:允许访问
访问第5次, 结果:允许访问
访问第6次, 结果:限制访问
访问第7次, 结果:限制访问
... ...
Copier après la connexion

缺点:要记录时间窗口所有的行为记录,量很大,比如,限定60s内不能超过100万次这种场景,不太适合这样限流,因为会消耗大量的储存空间。

二、漏斗限流

基本原理

  • 漏斗的容量是限定的,如果满了,就装不进去了。
  • 如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。
  • 如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。
  • 如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。

示例代码

public class FunnelLimiterTest {

    static class Funnel {
        int capacity; // 漏斗容量
        float leakingRate; // 漏嘴流水速率
        int leftQuota; // 漏斗剩余空间
        long leakingTs; // 上一次漏水时间

        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            this.leftQuota = capacity;
            this.leakingTs = System.currentTimeMillis();
        }

        void makeSpace() {
            long nowTs = System.currentTimeMillis();
            long deltaTs = nowTs - leakingTs; // 距离上一次漏水过去了多久
            int deltaQuota = (int) (deltaTs * leakingRate); // 腾出的空间 = 时间*漏水速率
            if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出
                this.leftQuota = capacity;
                this.leakingTs = nowTs;
                return;
            }
            if (deltaQuota < 1) { // 腾出空间太小 就等下次,最小单位是1
                return;
            }
            this.leftQuota += deltaQuota; // 漏斗剩余空间 = 漏斗剩余空间 + 腾出的空间
            this.leakingTs = nowTs;
            if (this.leftQuota > this.capacity) { // 剩余空间不得高于容量
                this.leftQuota = this.capacity;
            }
        }

        boolean watering(int quota) {
            makeSpace();
            if (this.leftQuota >= quota) { // 判断剩余空间是否足够
                this.leftQuota -= quota;
                return true;
            }
            return false;
        }
    }

    // 所有的漏斗
    private Map<String, Funnel> funnels = new HashMap<>();

    /**
     * @param capacity    漏斗容量
     * @param leakingRate 漏嘴流水速率 quota/s
     */
    public boolean isIpLimit(String ipAddress, int capacity, float leakingRate) {
        String key = String.format("ip:%s", ipAddress);
        Funnel funnel = funnels.get(key);
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnels.put(key, funnel);
        }
        return !funnel.watering(1); // 需要1个quota
    }

    public static void main(String[] args) throws Exception{
        FunnelLimiterTest limiter = new FunnelLimiterTest();
        for (int i = 1; i <= 50; i++) {
            // 每1s执行一次
            Thread.sleep(1000);
            // 漏斗容量是2 ,漏嘴流水速率是0.5每秒,
            boolean isLimit = limiter.isIpLimit("222.73.55.22", 2, (float)0.5/1000);
            System.out.println("访问第" + i + "次, 结果:" + (isLimit ? "限制访问" : "允许访问"));
        }
    }
}
Copier après la connexion

执行结果

访问第1次, 结果:允许访问    # 第1次,容量剩余2,执行后1
访问第2次, 结果:允许访问    # 第2次,容量剩余1,执行后0
访问第3次, 结果:允许访问    # 第3次,由于过了2s, 漏斗流水剩余1个空间,所以容量剩余1,执行后0
访问第4次, 结果:限制访问    # 第4次,过了1s, 剩余空间小于1, 容量剩余0
访问第5次, 结果:允许访问    # 第5次,由于过了2s, 漏斗流水剩余1个空间,所以容量剩余1,执行后0
访问第6次, 结果:限制访问    # 以此类推...
访问第7次, 结果:允许访问
访问第8次, 结果:限制访问
访问第9次, 结果:允许访问
访问第10次, 结果:限制访问
Copier après la connexion
  • 我们观察 Funnel 对象的几个字段,我们发现可以将 Funnel 对象的内容按字段存储到一个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。
  • 但是有个问题,我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。
  • 如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择,我们该如何解决这个问题呢?Redis-Cell 救星来了!

Redis-Cell

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。 该模块只有1条指令cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这个指令具体该如何使用。

> cl.throttle key:xxx 15 30 60 1
Copier après la connexion
  • 15 : 15 capacity 这是漏斗容量
  • 30 60 : 30 operations / 60 seconds 这是漏水速率
  • 1 : need 1 quota (可选参数,默认值也是1)
> cl.throttle laoqian:reply 15 30 60
1) (integer) 0   # 0 表示允许,1表示拒绝
2) (integer) 15  # 漏斗容量capacity
3) (integer) 14  # 漏斗剩余空间left_quota
4) (integer) -1  # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2   # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)
Copier après la connexion

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep

Implémentation du code

rrreee

Résultats d'exécution🎜🎜rrreee🎜Inconvénients : Il est nécessaire d'enregistrer tous les enregistrements de comportement dans la fenêtre temporelle, qui est très grande. Par exemple, le scénario consistant à limiter le nombre de fois à pas plus d'un million de fois en 60 secondes n'est pas adapté à un tel cas. limitation de courant, car cela consommera beaucoup d’espace de stockage. 🎜

2. Limite de courant de l'entonnoir 🎜

🎜Principe de base🎜🎜
  • La capacité du L'entonnoir est limité, s'il est plein, il ne rentrera pas.
  • Si vous relâchez le bec, l'eau coulera vers le bas. Une fois qu'une partie s'écoule, vous pourrez continuer à le remplir d'eau.
  • Si le débit d'eau du bec est supérieur au taux de remplissage de l'eau, alors l'entonnoir ne sera jamais plein.
  • Si le débit de l'entonnoir est inférieur au taux de remplissage, alors une fois l'entonnoir plein, le remplissage doit être mis en pause et attendre que l'entonnoir se vide.

🎜Exemple de code🎜🎜rrreee🎜🎜Résultats d'exécution🎜🎜rrreee
  • Nous observons l'Entonnoir object Plusieurs champs, nous avons constaté que le contenu de l'objet Funnel peut être stocké dans une structure hash par champ. Lors du remplissage, la structure hash. Une fois les champs supprimés pour les opérations logiques, les nouvelles valeurs sont renseignées dans la structure hash pour compléter une détection de fréquence de comportement.
  • Mais il y a un problème, nous ne pouvons pas garantir l'atomicité de l'ensemble du processus. Obtenez la valeur de la structure hash, puis exploitez-la dans la mémoire, puis remplissez-la dans la structure hash. Ces trois processus ne peuvent pas être atomiques, ce qui signifie un verrouillage approprié. un contrôle est nécessaire. Une fois le verrou verrouillé, cela signifie qu'il y aura un échec de verrouillage. Si le verrou échoue, vous devez choisir de réessayer ou d'abandonner.
  • Si vous réessayez, les performances diminueront. Si vous abandonnez, cela affectera l’expérience utilisateur. Dans le même temps, la complexité du code a également beaucoup augmenté. C'est un choix vraiment difficile, comment résoudre ce problème ? Redis-Cell Le sauveur est là !

🎜Redis-Cell🎜🎜🎜Redis 4.0 fournit un module Redis actuellement limité appelé redis-cell. Ce module utilise également l'algorithme de l'entonnoir et fournit des instructions de limitation du courant atomique. Ce module n'a qu'une seule instruction cl.throttle, et ses paramètres et valeurs de retour sont légèrement compliqués. Voyons ensuite comment utiliser cette instruction. 🎜rrreee
  • 15 : 15 capacité C'est la capacité de l'entonnoir
  • 30 60 : 30 opérations / 60 secondes C'est l'eau taux de fuite
  • 1 : besoin de 1 quota (paramètre facultatif, la valeur par défaut est également 1)
rrreee🎜S'il est rejeté lors de l'exécution de la limite actuelle commande, vous devez l'abandonner ou réessayer. L'instruction cl.throttle est très réfléchie et calcule même le temps de nouvelle tentative pour vous. Prenez simplement la quatrième valeur du tableau de résultats renvoyé et effectuez sleep. Si vous ne souhaitez pas bloquer le thread, vous pouvez également réessayer avec une tâche planifiée asynchrone. 🎜🎜Pour plus de connaissances sur la programmation, veuillez visiter : 🎜Vidéos de programmation🎜 ! ! 🎜

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:juejin.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal