4 つの分散型電流制限アルゴリズムとコード実装

リリース: 2023-08-15 15:54:21
転載
1204 人が閲覧しました


# 質問をしながら電流制限に近づく

なぜ電流を制限する必要があるのか?

上で述べたように、トラフィックが多いことは確かに良いことですが、過負荷になってシステムがハングアップすると、誰もが苦しむことになります。

4 つの分散型電流制限アルゴリズムとコード実装不死了
したがって、さまざまな主要なプロモーション活動の前に、システムのストレス テストを実施し、システム全体のピーク QPS を評価する必要があります。一部の電流制限設定は、システムのハングを避けるために、特定のしきい値を超えると処理を拒否したり、処理を遅らせたりします。

電流制限とサーキットブレーカーの違いは何ですか?

トラフィック制限は、トラフィックが流入する前に発生し、過剰なトラフィックが制限されます。

サーキット ブレーカーは、障害に対処するメカニズムです。トラフィックが流入した後に発生します。システムに障害が発生したり、異常が発生した場合、サーキット ブレーカーは、障害がさらに拡大して重大な問題が発生するのを防ぐために、自動的に要求を遮断します。サービス雪崩。

電流制限とピーククリッピングの違いは何ですか?

ピーク クリッピングはトラフィックをスムーズに処理するもので、リクエストの処理速度をゆっくりと高めることでシステムの瞬間的な過負荷を回避します。

ピークカットはおそらく流れを蓄えてゆっくり流れるリザーバーであり、流量制限はおそらく過剰な流れを遮断するゲートです。

電流制限の一般的なプロセス

では、具体的な電流制限を実装するにはどうすればよいでしょうか?これは次の手順に要約できます:

4 つの分散型電流制限アルゴリズムとコード実装
電流制限の一般的なプロセス

  1. 統計的なリクエスト トラフィック: リクエストの数または割合を記録します。カウンタ、スラ​​イディング ウィンドウ、その他の統計手法を通じて行われます。
  2. 制限値を超えているかどうかの判定: 設定した制限条件に基づいて、現在のリクエストトラフィックが制限値を超えているかどうかを判定します。
  3. 電流制限ポリシーの実行: リクエストのトラフィックが制限を超えた場合、リクエストの拒否、処理の遅延、エラー情報の返し、等
  4. 統計情報の更新: カウンタの値の増加、スライディングウィンドウのデータの更新など、リクエストの処理結果に基づいて統計情報を更新します。 、など。
  5. 上記の手順を繰り返します: リクエスト トラフィックを継続的にカウントし、制限を超えているかどうかを判断し、現在の制限ポリシーを実装して、統計情報を更新します

特定の電流制限アルゴリズムの実装は、トークン バケット アルゴリズム、リーキー バケット アルゴリズムなどの使用など、さまざまなシナリオやニーズに応じて調整および最適化できることに注意してください。

単一マシンの電流制限と分散型電流制限

電流制限の一般的なプロセスでは、リクエスト量をカウントし、統計を更新する必要があることに気付きました。の場合、このリクエストの数量統計と更新はストレージに保持される必要があります。

これが単なるスタンドアロン環境の場合は、簡単に処理してローカルに直接保存できます。

4 つの分散型電流制限アルゴリズムとコード実装
単一マシンとクラスター

しかし、一般的に言えば、私たちのサービスはクラスターにデプロイされています。複数のマシン間で全体的な電流制限を達成するにはどうすればよいでしょうか?毛糸でしょうか?

現時点では、統計情報を Tair や Redis などの分散 K-V ストレージに入れることができます。

4 つの電流制限アルゴリズムと分散実装

次に、いくつかの一般的な電流制限アルゴリズムの実装を開始します。ここでは、分散ストレージとして Redis を使用します。言うまでもなく、Redis は最後の一般的な分散キャッシュです。 DB; Redission は Redis クライアントです。Redission は分散ロックにのみ使用されます。やや「不適格」ですが、実際には Redis クライアントとして非常に簡単に使用できます。

4 つの分散型電流制限アルゴリズムとコード実装
5 つの電流制限アルゴリズムの分散実装

始める前に、環境を簡単に準備しましょう。Redis のインストールとプロジェクトについては詳しく説明しません。創作です。

  • 依存関係の追加
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.2</version>
        </dependency>
ログイン後にコピー
  • シングルトン モードを使用して RedissonClient を取得します。ここでは Bean として登録されません。単一テストの実行
public class RedissonConfig {

    private static final String REDIS_ADDRESS = "redis://127.0.0.1:6379";

    private static volatile  RedissonClient redissonClient;

   public static RedissonClient getInstance(){
        if (redissonClient==null){
            synchronized (RedissonConfig.class){
                if (redissonClient==null){
                    Config config = new Config();
                    config.useSingleServer().setAddress(REDIS_ADDRESS);
                    redissonClient = Redisson.create(config);
                    return redissonClient;
                }
            }
        }
        return redissonClient;
    }
}
ログイン後にコピー

固定ウィンドウ電流制限アルゴリズム

アルゴリズム原理

固定ウィンドウ アルゴリズム、多くの参考資料は次のとおりです。カウンタ アルゴリズムとも呼ばれます。もちろん、私はカウンタ アルゴリズムが固定ウィンドウ アルゴリズムの特殊なケースであることを個人的に理解しています。もちろん、それについてはあまり心配しません。

固定ウィンドウ アルゴリズムは、比較的単純な電流制限アルゴリズムであり、時間を固定時間ウィンドウに分割し、各ウィンドウで許可されるリクエストの数に制限を設定します。リクエストの数が時間枠内で上限を超えると、電流制限がトリガーされます。

4 つの分散型電流制限アルゴリズムとコード実装
ここに画像の説明を挿入

アルゴリズムの実装

Redisson に基づく固定ウィンドウの実装は非常に簡単です。各ウィンドウ期間内で、incrementAndGet オペレーションを通じてリクエストの数をカウントできます。ウィンドウ期間が終了すると、Redis のキー有効期限機能を利用してカウントを自動的にリセットできます。

  • コードの実装を見てみましょう:
public class FixedWindowRateLimiter {
    public static final String KEY = "fixedWindowRateLimiter:";
    /**
     * 请求限制数量
     */
    private Long limit;
    /**
     * 窗口大小(单位:S)
     */
    private Long windowSize;

    public FixedWindowRateLimiter(Long limit, Long windowSize) {
        this.limit = limit;
        this.windowSize = windowSize;
    }

    /**
     * 固定窗口限流
     */
    public boolean triggerLimit(String path) {
        RedissonClient redissonClient = RedissonConfig.getInstance();
        //加分布式锁,防止并发情况下窗口初始化时间不一致问题
        RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
        try {
            rLock.lock(100, TimeUnit.MILLISECONDS);
            String redisKey = KEY + path;
            RAtomicLong counter = redissonClient.getAtomicLong(redisKey);
            //计数
            long count = counter.incrementAndGet();
            //如果为1的话,就说明窗口刚初始化
            if (count == 1) {
                //直接设置过期时间,作为窗口
                counter.expire(windowSize, TimeUnit.SECONDS);
            }
            //触发限流
            if (count > limit) {
                //触发限流的不记在请求数量中
                counter.decrementAndGet();
                return true;
            }
            return false;
        } finally {
            rLock.unlock();
        }
    }

}
ログイン後にコピー

ここでは、ウィンドウの初期化を並行して解決するために追加の分散ロックが使用されています。状況や質問。

  • もう一度テストしてみましょう
class FixedWindowRateLimiterTest {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("1min限制10次请求固定窗口测试")
    void triggerLimit() throws InterruptedException {
        FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(10L,60L);
        //模拟不同窗口内的调用
        for (int i = 0; i < 3; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            //20个线程并发调用
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = fixedWindowRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            //休眠1min
            TimeUnit.MINUTES.sleep(1);
        }
    }
}
ログイン後にコピー

もちろん、インターフェイスを作成し、Jmeter などのストレス テスト ツールを使用してテストすることもできます。

固定ウィンドウ アルゴリズムの利点は、実装が簡単でスペースをあまりとらないことですが、ウィンドウの切り替えが瞬時に完了するため、リクエストの処理がスムーズではなく、エラーが発生する可能性があるという重大な問題があります。ウィンドウ切り替えの瞬間 交通量の激しい変動。

たとえば、この例では、00:02 に大量のリクエストが突然入ってきたが、この時点でカウントをリセットした場合、突然のトラフィックを制限することはできません。

4 つの分散型電流制限アルゴリズムとコード実装
臨界値問題

スライディング ウィンドウ アルゴリズム

固定ウィンドウのバースト トラフィック問題を軽減するために、スライディング ウィンドウアルゴリズムを使用できます。コンピュータ ネットワークにおける TCP フロー制御は、スライディング ウィンドウ アルゴリズムを使用します。

アルゴリズム原理

スライディング ウィンドウ電流制限アルゴリズムの原理は、大きな時間ウィンドウを複数の小さな時間ウィンドウに分割し、各小さなウィンドウには独立したカウントがあります。

リクエストが到着したら、リクエストの数がウィンドウ全体の制限を超えているかどうかを判断します。ウィンドウの移動は、毎回 小さな単位ウィンドウを前方にスライドさせます。

たとえば、次のスライディング ウィンドウは、1 分の大きな時間ウィンドウを 5 つの小さなウィンドウに分割し、各小さなウィンドウの時間は 12 秒です。

各セルには独自の独立したカウンターがあり、12 秒ごとに 1 セルずつ進みます。

リクエストが 00:01 に来た場合、この時点のウィンドウ数は 3 12 9 15 = 39 であり、これも電流制限の役割を果たす可能性があります。

4 つの分散型電流制限アルゴリズムとコード実装スライディング ウィンドウ アルゴリズムの概略図
これが、スライディング ウィンドウが重大な問題を解決できる理由です。スライディング グリッドが増えるほど、全体のスライディング # が大きくなります。 # #Smooth

になるほど、電流制限効果がより正確になります。 アルゴリズムの実装

それでは、ここでスライディング ウィンドウ電流制限アルゴリズムをどのように実装するのでしょうか?これは非常に簡単で、Redis の順序セット (zset) 構造を直接使用できます。

タイムスタンプをスコアとメンバーとして使用します。リクエストが来ると、現在のタイムスタンプが順序付きセットに追加されます。次に、ウィンドウ外のリクエストについては、ウィンドウ サイズに基づいて開始タイムスタンプを計算し、ウィンドウ外のリクエストを削除できます。このように、オーダーされたセットのサイズは、ウィンドウ内のリクエストの数になります。

4 つの分散型電流制限アルゴリズムとコード実装
zset实现滑动窗口
  • 代码实现
public class SlidingWindowRateLimiter {
    public static final String KEY = "slidingWindowRateLimiter:";

    /**
     * 请求次数限制
     */
    private Long limit;
    /**
     * 窗口大小(单位:S)
     */
    private Long windowSize;

    public SlidingWindowRateLimiter(Long limit, Long windowSize) {
        this.limit = limit;
        this.windowSize = windowSize;
    }


    public boolean triggerLimit(String path) {
        RedissonClient redissonClient = RedissonConfig.getInstance();
        //窗口计数
        RScoredSortedSet<Long> counter = redissonClient.getScoredSortedSet(KEY + path);
        //使用分布式锁,避免并发设置初始值的时候,导致窗口计数被覆盖
        RLock rLock = redissonClient.getLock(KEY + "LOCK:" + path);
        try {
            rLock.lock(200, TimeUnit.MILLISECONDS);
            // 当前时间戳
            long currentTimestamp = System.currentTimeMillis();
            // 窗口起始时间戳
            long windowStartTimestamp = currentTimestamp - windowSize * 1000;
            // 移除窗口外的时间戳,左闭右开
            counter.removeRangeByScore(0, true, windowStartTimestamp, false);
            // 将当前时间戳作为score,也作为member,
            // TODO:高并发情况下可能没法保证唯一,可以加一个唯一标识
            counter.add(currentTimestamp, currentTimestamp);
            //使用zset的元素个数,作为请求计数
            long count = counter.size();
            // 判断时间戳数量是否超过限流阈值
            if (count > limit) {
                System.out.println("[triggerLimit] path:" + path + " count:" + count + " over limit:" + limit);
                return true;
            }
            return false;
        } finally {
            rLock.unlock();
        }
    }

}
ログイン後にコピー

这里还有一个小的可以完善的点,zset在member相同的情况下,是会覆盖的,也就是说高并发情况下,时间戳可能会重复,那么就有可能统计的请求偏少,这里可以用时间戳+随机数来缓解,也可以生成唯一序列来解决,比如UUID、雪花算法等等。

  • 还是来测试一下
class SlidingWindowRateLimiterTest {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("滑动窗口限流")
    void triggerLimit() throws InterruptedException {
        SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(10L, 1L);
        //模拟在不同时间片内的请求
        for (int i = 0; i < 8; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = slidingWindowRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            //休眠10s
            TimeUnit.SECONDS.sleep(10L);
        }
    }
}
ログイン後にコピー

用Redis实现了滑动窗口限流,解决了固定窗口限流的边界问题,当然这里也带来了新的问题,因为我们存储了窗口期的所有请求,所以高并发的情况下,可能会比较占内存。

漏桶算法

我们可以看到,计数器类的限流,体现的是一个“戛然而止”,超过限制,立马决绝,但是有时候,我们可能只是希望请求平滑一些,追求的是“波澜不惊”,这时候就可以考虑使用其它的限流算法。

算法原理

漏桶算法(Leaky Bucket),名副其实,就是请求就像水一样以任意速度注入漏桶,而桶会按照固定的速率将水漏掉。

4 つの分散型電流制限アルゴリズムとコード実装
漏桶算法

当进水速率大于出水速率的时候,漏桶会变满,此时新进入的请求将会被丢弃。

漏桶算法的两大作用是网络流量整形(Traffic Shaping)和速度限制(Rate Limiting)。

算法实现

我们接着看看具体应该怎么实现。

在滑动窗口限流算法里我们用到了RScoredSortedSet,非常好用对不对,这里也可以用这个结构,直接使用ZREMRANGEBYSCORE命令来删除旧的请求。

进水就不用多说了,请求进来,判断桶有没有满,满了就拒绝,没满就往桶里丢请求。

那么出水怎么办呢?得保证稳定速率出水,可以用一个定时任务,来定时去删除旧的请求。

  • 代码实现
public class LeakyBucketRateLimiter {
    private RedissonClient redissonClient = RedissonConfig.getInstance();
    private static final String KEY_PREFIX = "LeakyBucket:";

    /**
     * 桶的大小
     */
    private Long bucketSize;
    /**
     * 漏水速率,单位:个/秒
     */
    private Long leakRate;


    public LeakyBucketRateLimiter(Long bucketSize, Long leakRate) {
        this.bucketSize = bucketSize;
        this.leakRate = leakRate;
        //这里启动一个定时任务,每s执行一次
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS);
    }

    /**
     * 漏水
     */
    public void leakWater() {
        RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
        //遍历所有path,删除旧请求
        for(String path:pathSet){
            String redisKey = KEY_PREFIX + path;
            RScoredSortedSet<Long> bucket = redissonClient.getScoredSortedSet(KEY_PREFIX + path);
            // 获取当前时间
            long now = System.currentTimeMillis();
            // 删除旧的请求
            bucket.removeRangeByScore(0, true,now - 1000 * leakRate,true);
        }
    }

    /**
     * 限流
     */
    public boolean triggerLimit(String path) {
        //加锁,防止并发初始化问题
        RLock rLock = redissonClient.getLock(KEY_PREFIX + "LOCK:" + path);
        try {
            rLock.lock(100,TimeUnit.MILLISECONDS);
            String redisKey = KEY_PREFIX + path;
            RScoredSortedSet<Long> bucket = redissonClient.getScoredSortedSet(redisKey);
            //这里用一个set,来存储所有path
            RSet<String> pathSet=redissonClient.getSet(KEY_PREFIX+":pathSet");
            pathSet.add(path);
            // 获取当前时间
            long now = System.currentTimeMillis();
            // 检查桶是否已满
            if (bucket.size() < bucketSize) {
                // 桶未满,添加一个元素到桶中
                bucket.add(now,now);
                return false;
            }
            // 桶已满,触发限流
            System.out.println("[triggerLimit] path:"+path+" bucket size:"+bucket.size());
            return true;
        }finally {
            rLock.unlock();
        }
    }
    
}
ログイン後にコピー

在代码实现里,我们用了RSet来存储path,这样一来,一个定时任务,就可以搞定所有path对应的桶的出水,而不用每个桶都创建一个一个定时任务。

这里我直接用ScheduledExecutorService启动了一个定时任务,1s跑一次,当然集群环境下,每台机器都跑一个定时任务,对性能是极大的浪费,而且不好管理,我们可以用分布式定时任务,比如xxl-job去执行leakWater

  • 最后还是大家熟悉的测试
class LeakyBucketRateLimiterTest {

    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 50, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));

    @Test
    @DisplayName("漏桶算法")
    void triggerLimit() throws InterruptedException {
        LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(10L, 1L);
        for (int i = 0; i < 8; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(20);
            for (int j = 0; j < 20; j++) {
                threadPoolExecutor.execute(() -> {
                    boolean isLimit = leakyBucketRateLimiter.triggerLimit("/test");
                    System.out.println(isLimit);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            //休眠10s
            TimeUnit.SECONDS.sleep(10L);
        }
    }
}
ログイン後にコピー

漏桶算法能够有效防止网络拥塞,实现也比较简单。

但是,因为漏桶的出水速率是固定的,假如突然来了大量的请求,那么只能丢弃超量的请求,即使下游能处理更大的流量,没法充分利用系统资源

令牌桶算法

令牌桶算法来了!

算法原理

令牌桶算法是对漏桶算法的一种改进。

它的主要思想是:系统以一种固定的速率向桶中添加令牌,每个请求在发送前都需要从桶中取出一个令牌,只有取到令牌的请求才被通过。因此,令牌桶算法允许请求以任意速率发送,只要桶中有足够的令牌。

4 つの分散型電流制限アルゴリズムとコード実装
令牌桶算法

算法实现

我们继续看怎么实现,首先是要发放令牌,要固定速率,那我们又得开个线程,定时往桶里投令牌,然后……

——然后Redission提供了令牌桶算法的实现,舒不舒服?

4 つの分散型電流制限アルゴリズムとコード実装
拿来吧你

拿来就用!

  • 代码实现
public class TokenBucketRateLimiter {

    public static final String KEY = "TokenBucketRateLimiter:";

    /**
     * 阈值
     */
    private Long limit;
    /**
     * 添加令牌的速率,单位:个/秒
     */
    private Long tokenRate;

    public TokenBucketRateLimiter(Long limit, Long tokenRate) {
        this.limit = limit;
        this.tokenRate = tokenRate;
    }

    /**
     * 限流算法
     */
    public boolean triggerLimit(String path){
        RedissonClient redissonClient=RedissonConfig.getInstance();
        RRateLimiter rateLimiter = redissonClient.getRateLimiter(KEY+path);
        // 初始化,设置速率模式,速率,间隔,间隔单位
        rateLimiter.trySetRate(RateType.OVERALL, limit, tokenRate, RateIntervalUnit.SECONDS);
        // 获取令牌
        return rateLimiter.tryAcquire();
    }
}
ログイン後にコピー

Redisson实现的,还是比较稳的,这里就不测试了。

关于Redission是怎么实现这个限速器的,大家可以看一下参考[3],还是Redisson家的老传统——Lua脚本,设计相当巧妙。

总结

在这篇文章里,我们对(三)种限流算法进行了分布式实现,采用了非常好用的Redission客户端,当然我们也有不完善的地方:

  • 并发处理采用了分布式锁,高并发情况下,对性能有一定损耗,逻辑最好还是直接采用Lua脚本实现,来提高性能
  • 可以提供更加优雅的调用方式,比如利用aop实现注解式调用,代码设计也可以更加优雅,继承体系可以完善一下
  • 没有实现限流的拒绝策略,比如抛异常、缓存、丢进MQ打散……限流是一种方法,最终的目的还是尽可能保证系统平稳

如果后面有机会,希望可以继续完善这个简单的Demo,达到工程级的应用。

除此之外,市面上也有很多好用的开源限流工具:

  • Guava RateLimiter ,基于令牌桶算法限流,当然是单机的;
  • Sentinel ,基于滑动窗口限流,支持单机,也支持集群
  • 网关限流,很多网关自带限流方法,比如Spring Cloud GatewayNginx

以上が4 つの分散型電流制限アルゴリズムとコード実装の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:Java后端技术全栈
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート