Cet article vous amènera à comprendre la limitation actuelle dans Redis et à présenter la stratégie simple de limitation de courant et la limitation de courant en entonnoir. J'espère que cela sera utile à tout le monde !
Lorsque la capacité de traitement du système est limitée, comment organiser des demandes imprévues pour faire pression sur le système. Tout d’abord, examinons quelques stratégies simples de limitation du courant pour empêcher les attaques par force brute. Par exemple, si vous souhaitez accéder à une IP, vous ne pouvez y accéder que 10 fois toutes les 5 secondes, et si elle dépasse la limite, elle sera bloquée. [Recommandations associées : Tutoriel vidéo Redis]
Comme indiqué ci-dessus, une fenêtre glissante est généralement utilisée pour compter le nombre de visites dans un intervalle.
Utilisez zset
pour enregistrer le nombre de visites IP
. Chaque IP
est enregistré via la clé
et le score<.> Enregistrez l'horodatage actuel, <code>value
n'est implémenté qu'avec l'horodatage ou l'UUIDzset
记录 IP
访问次数,每个 IP
通过 key
保存下来,score
保存当前时间戳,value
唯一用时间戳或者UUID来实现
public class RedisLimiterTest { private Jedis jedis; public RedisLimiterTest(Jedis jedis) { this.jedis = jedis; } /** * @param ipAddress Ip地址 * @param period 特定的时间内,单位秒 * @param maxCount 最大允许的次数 * @return */ public boolean isIpLimit(String ipAddress, int period, int maxCount) { String key = String.format("ip:%s", ipAddress); // 毫秒时间戳 long currentTimeMillis = System.currentTimeMillis(); Pipeline pipe = jedis.pipelined(); // redis事务,保证原子性 pipe.multi(); // 存放数据,value 和 score 都使用毫秒时间戳 pipe.zadd(key, currentTimeMillis, "" + UUID.randomUUID()); // 移除窗口区间所有的元素 pipe.zremrangeByScore(key, 0, currentTimeMillis - period * 1000); // 获取时间窗口内的行为数量 Response<Long> count = pipe.zcard(key); // 设置 zset 过期时间,避免冷用户持续占用内存,这里宽限1s pipe.expire(key, period + 1); // 提交事务 pipe.exec(); pipe.close(); // 比较数量是否超标 return count.get() > maxCount; } public static void main(String[] args) { Jedis jedis = new Jedis("localhost", 6379); RedisLimiterTest limiter = new RedisLimiterTest(jedis); for (int i = 1; i <= 20; i++) { // 验证IP 10秒钟之内只能访问5次 boolean isLimit = limiter.isIpLimit("222.73.55.22", 10, 5); System.out.println("访问第" + i + "次, 结果:" + (isLimit ? "限制访问" : "允许访问")); } } }
执行结果
访问第1次, 结果:允许访问 访问第2次, 结果:允许访问 访问第3次, 结果:允许访问 访问第4次, 结果:允许访问 访问第5次, 结果:允许访问 访问第6次, 结果:限制访问 访问第7次, 结果:限制访问 ... ...
缺点:要记录时间窗口所有的行为记录,量很大,比如,限定60s内不能超过100万次这种场景,不太适合这样限流,因为会消耗大量的储存空间。
public class FunnelLimiterTest { static class Funnel { int capacity; // 漏斗容量 float leakingRate; // 漏嘴流水速率 int leftQuota; // 漏斗剩余空间 long leakingTs; // 上一次漏水时间 public Funnel(int capacity, float leakingRate) { this.capacity = capacity; this.leakingRate = leakingRate; this.leftQuota = capacity; this.leakingTs = System.currentTimeMillis(); } void makeSpace() { long nowTs = System.currentTimeMillis(); long deltaTs = nowTs - leakingTs; // 距离上一次漏水过去了多久 int deltaQuota = (int) (deltaTs * leakingRate); // 腾出的空间 = 时间*漏水速率 if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出 this.leftQuota = capacity; this.leakingTs = nowTs; return; } if (deltaQuota < 1) { // 腾出空间太小 就等下次,最小单位是1 return; } this.leftQuota += deltaQuota; // 漏斗剩余空间 = 漏斗剩余空间 + 腾出的空间 this.leakingTs = nowTs; if (this.leftQuota > this.capacity) { // 剩余空间不得高于容量 this.leftQuota = this.capacity; } } boolean watering(int quota) { makeSpace(); if (this.leftQuota >= quota) { // 判断剩余空间是否足够 this.leftQuota -= quota; return true; } return false; } } // 所有的漏斗 private Map<String, Funnel> funnels = new HashMap<>(); /** * @param capacity 漏斗容量 * @param leakingRate 漏嘴流水速率 quota/s */ public boolean isIpLimit(String ipAddress, int capacity, float leakingRate) { String key = String.format("ip:%s", ipAddress); Funnel funnel = funnels.get(key); if (funnel == null) { funnel = new Funnel(capacity, leakingRate); funnels.put(key, funnel); } return !funnel.watering(1); // 需要1个quota } public static void main(String[] args) throws Exception{ FunnelLimiterTest limiter = new FunnelLimiterTest(); for (int i = 1; i <= 50; i++) { // 每1s执行一次 Thread.sleep(1000); // 漏斗容量是2 ,漏嘴流水速率是0.5每秒, boolean isLimit = limiter.isIpLimit("222.73.55.22", 2, (float)0.5/1000); System.out.println("访问第" + i + "次, 结果:" + (isLimit ? "限制访问" : "允许访问")); } } }
执行结果
访问第1次, 结果:允许访问 # 第1次,容量剩余2,执行后1 访问第2次, 结果:允许访问 # 第2次,容量剩余1,执行后0 访问第3次, 结果:允许访问 # 第3次,由于过了2s, 漏斗流水剩余1个空间,所以容量剩余1,执行后0 访问第4次, 结果:限制访问 # 第4次,过了1s, 剩余空间小于1, 容量剩余0 访问第5次, 结果:允许访问 # 第5次,由于过了2s, 漏斗流水剩余1个空间,所以容量剩余1,执行后0 访问第6次, 结果:限制访问 # 以此类推... 访问第7次, 结果:允许访问 访问第8次, 结果:限制访问 访问第9次, 结果:允许访问 访问第10次, 结果:限制访问
Funnel
对象的几个字段,我们发现可以将 Funnel
对象的内容按字段存储到一个 hash
结构中,灌水的时候将 hash
结构的字段取出来进行逻辑运算后,再将新值回填到 hash
结构中就完成了一次行为频度的检测。hash
结构中取值,然后在内存里运算,再回填到 hash
结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。Redis-Cell
救星来了!Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell
。该模块也使用了漏斗算法,并提供了原子的限流指令。
该模块只有1条指令cl.throttle
,它的参数和返回值都略显复杂,接下来让我们来看看这个指令具体该如何使用。
> cl.throttle key:xxx 15 30 60 1
15
: 15 capacity 这是漏斗容量30 60
: 30 operations / 60 seconds 这是漏水速率1
: need 1 quota (可选参数,默认值也是1)> cl.throttle laoqian:reply 15 30 60 1) (integer) 0 # 0 表示允许,1表示拒绝 2) (integer) 15 # 漏斗容量capacity 3) (integer) 14 # 漏斗剩余空间left_quota 4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒) 5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)
在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle
指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep
Entonnoir
object Plusieurs champs, nous avons constaté que le contenu de l'objet Funnel
peut être stocké dans une structure hash
par champ. Lors du remplissage, la structure hash
. Une fois les champs supprimés pour les opérations logiques, les nouvelles valeurs sont renseignées dans la structure hash
pour compléter une détection de fréquence de comportement. hash
, puis exploitez-la dans la mémoire, puis remplissez-la dans la structure hash
. Ces trois processus ne peuvent pas être atomiques, ce qui signifie un verrouillage approprié. un contrôle est nécessaire. Une fois le verrou verrouillé, cela signifie qu'il y aura un échec de verrouillage. Si le verrou échoue, vous devez choisir de réessayer ou d'abandonner. Redis-Cell
Le sauveur est là ! redis-cell
. Ce module utilise également l'algorithme de l'entonnoir et fournit des instructions de limitation du courant atomique.
Ce module n'a qu'une seule instruction cl.throttle
, et ses paramètres et valeurs de retour sont légèrement compliqués. Voyons ensuite comment utiliser cette instruction. 🎜rrreee15
: 15 capacité C'est la capacité de l'entonnoir30 60
: 30 opérations / 60 secondes C'est l'eau taux de fuite1
: besoin de 1 quota (paramètre facultatif, la valeur par défaut est également 1)cl.throttle
est très réfléchie et calcule même le temps de nouvelle tentative pour vous. Prenez simplement la quatrième valeur du tableau de résultats renvoyé et effectuez sleep
. Si vous ne souhaitez pas bloquer le thread, vous pouvez également réessayer avec une tâche planifiée asynchrone. 🎜🎜Pour plus de connaissances sur la programmation, veuillez visiter : 🎜Vidéos de programmation🎜 ! ! 🎜Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!