ホームページ データベース Redis Redis は遅延キューをどのように実装しますか?手法の紹介

Redis は遅延キューをどのように実装しますか?手法の紹介

Jul 08, 2020 pm 04:01 PM
redis 遅延キュー

Redis は遅延キューをどのように実装しますか?手法の紹介

遅延キューは、その名のとおり、遅延機能を備えたメッセージキューです。では、どのような状況でそのようなキューが必要になるのでしょうか?

1. 背景

まず、次のビジネス シナリオを見てみましょう:

  • 注文が完了したとき未払い 注文が返金ステータスにあるときに注文をタイムリーにクローズする方法
  • 返金ステータスにある注文が正常に返金されたかどうかを定期的に確認する方法
  • 注文が返金されない場合下流システムからステータス通知を長期間にわたって受信する方法 注文ステータスの段階的同期を実現する戦略
  • システムが上流システムに支払い成功の最終ステータスを通知すると、上流システムは通知失敗を返します。非同期通知を実行し、分割した頻度で送信する方法: 15 秒 3 分 10 分 30 分 30 分 1 時間 2 時間 6 時間 15 時間

1.1 解決策

  • ##最も簡単な方法は、メーターを定期的にスキャンすることです。たとえば、注文の支払い有効期限要件が比較的高い場合、メーターは 2 秒ごとにスキャンされ、期限切れの注文を確認し、注文を積極的にクローズします。 利点はシンプルであることです欠点はテーブルを毎分グローバルにスキャンするため、リソースが無駄になることです有効期限が切れそうなテーブルデータの注文量が多い場合、注文完了に遅れが生じます。

  • RabbitMq またはその他の MQ 変更を使用して遅延キューを実装します。利点は、オープン ソースであり、既製の安定した実装ソリューションであることです。欠点は次のとおりです。MQ はメッセージ ミドルウェアです。チームのテクノロジー スタックが本質的に MQ を持っている場合は問題ありません。そうでない場合は、キューを遅延させるために MQ のセットをデプロイするのは少し費用がかかります。

  • Redis の zset を使用する機能をリストする場合は、Redis を使用して実装できます遅延キューRedisDelayQueue

2. 設計目標

  • リアルタイム パフォーマンス: 一定期間、第 2 レベルのエラーが許可されます。
  • 高可用性: スタンドアロンをサポート、クラスターをサポートします。
  • メッセージ削除をサポートします。 : 企業はいつでも指定されたメッセージを削除します
  • メッセージの信頼性: 少なくとも 1 回は消費されることが保証されます
  • メッセージの永続性: Redis 自体の永続特性に基づいて、Redis データが失われた場合、これは遅延メッセージが失われることを意味しますが、プライマリ バックアップとクラスタの保証は提供できます。これは、メッセージを MangoDB に永続化するための後続の最適化のために考慮できます。

3. 設計計画

設計には主に次の内容が含まれます。ポイント:
  • Redis 全体をメッセージ プールとして扱い、メッセージを KV 形式で保存します
  • ZSET を優先キューとして使用し、スコアに従って優先度を維持します
  • LIST 構造を使用して先出し消費を促進します。
  • ZSET および LIST はメッセージ アドレスを格納します (メッセージ プール内の各 KEY に対応します)
  • ルーティング オブジェクトをカスタマイズし、ZSET および LIST 名を格納し、 ZSET ルートから正しいリストにメッセージを送信します。
  • タイマーを使用してルーティングを維持します。
  • TTL ルールに従ってメッセージ遅延を実装します。

3.1 設計図


これは依然としてYouzanの遅延キュー設計、最適化、コード実装に基づいています。 Youzan DesignRedis は遅延キューをどのように実装しますか?手法の紹介

##3.2 データ構造

  • ZING:DELAY_QUEUE:JOB_POOL です。すべての遅延キュー情報を格納する Hash_Table 構造。 KV 構造: K=prefix projectName field = topic jobId V=CONENT;Vクライアントから渡されたデータは、消費時に返されます
  • ZING:DELAY_QUEUE:BUCKET 遅延キューがあります シーケンスset ZSET は、タイムスタンプに従ってソートされた K=ID と必要な実行タイムスタンプを格納します。
  • ZING:DELAY_QUEUE:QUEUE LIST 構造。各トピックには LIST があり、リストには JOB# が格納されます。

Redis は遅延キューをどのように実装しますか?手法の紹介## 現在使用する必要がある画像は参考用です。基本的にプロセス全体の実行を説明できます。画像は記事の最後にある参考ブログから引用しています。

3.3 タスクのライフ サイクル

    新しい JOB が追加されると、データの一部が
  1. ZING に挿入されます。 :DELAY_QUEUE:JOB_POOL とビジネス側と消費者側を記録しました。 ZING:DELAY_QUEUE:BUCKET は、実行タイムスタンプを記録するレコードも挿入します。
  2. 処理スレッドは
  3. ZING:DELAY_QUEUE:BUCKET に移動して、どの実行タイムスタンプが RunTimeMillis であるかを検索します。現在時刻より小さい場合は、これらのレコードをすべて削除します。同時に、各タスクのトピックが何であるかを解析し、これらのタスクを TOPICZING:DELAY_QUEUE:QUEUE に対応するリストにプッシュします。
  4. 各 TOPIC LIST には、LIST 内で消費されるデータをバッチ取得するためのリスニング スレッドがあり、取得されたすべてのデータは、この TOPIC の消費スレッド プールにスローされます。
  5. 消費スレッド プールの実行が始まります
  6. ZING:DELAY_QUEUE:JOB_POOLデータ構造を見つけて、それをコールバック構造に返し、コールバック メソッドを実行します。

3.4 設計ポイント

3.4.1 基本概念

  • JOB: 非同期処理を必要とするタスクは遅延キューの基本単位です
  • トピック: 同じタイプのジョブのコレクション (キュー)。コンシューマがサブスクライブするには

#3.4.2 メッセージ構造

#各 JOB には次の属性が含まれている必要があります

    jobId: ジョブの一意の識別子。指定したジョブ情報を取得および削除するために使用されます。
  • トピック: ジョブの種類。これは、特定のビジネス名として理解できます。
  • lay: ジョブを遅らせる必要がある時間。単位: 秒。 (サーバーが絶対時間に変換します)
  • body: コンシューマーが特定の業務処理を実行するためのジョブの内容。json 形式で保存されます。
  • retry: 失敗した再試行の数
  • url: 通知 URL

3.5 設計の詳細

3.5.1 方法ZING:DELAY_QUEUE:QUEUE

最も簡単な実装方法は、タイマーを使用して第 2 レベルのスキャンを実行することです。メッセージ実行の適時性を確保するには、1 秒ごとに Redis をリクエストして、キュー内に消費されるジョブがあるかどうかを判断するように設定できます。しかし、問題が発生します。キューに消費可能な JOB がない場合、頻繁なスキャンは無意味であり、リソースの無駄になります。幸いなことに、LIST には

BLPOP ブロッキング プリミティブ があります。リストの場合、データがあればすぐに返します データがなければデータが返されるまでそこでブロックされます ブロックタイムアウトを設定でき、タイムアウト後に NULL が返されます 具体的な実装方法と戦略は、

3.5.2 タイミングによるメッセージの繰り返し転送と消費を避ける

##Redis の分散ロックを使用するメッセージの転送を制御します。メッセージの繰り返し転送による問題を回避するには、
  • 分散ロックを使用してタイマーの実行頻度を確保します。

4. コア コードの実装

4.1 技術的な説明#技術スタック: SpringBoot、Redisson、Redis、分散ロック、 timer

Note

: このプロジェクトは、設計計画では複数のキューの消費を認識しておらず、1 つのキューのみを開きます。これは将来最適化される予定です。

4.2 コア エンティティ

4.2.1 ジョブへの新しいオブジェクトの追加##

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class Job implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;

    /**
     * Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
     */
    private Long delay;

    /**
     * Job的内容,供消费者做具体的业务处理,以json格式存储
     */
    @NotBlank
    private String body;

    /**
     * 失败重试次数
     */
    private int retry = 0;

    /**
     * 通知URL
     */
    @NotBlank
    private String url;
}
ログイン後にコピー
#4.2.2 ジョブからのオブジェクトの削除

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class JobDie implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;
}
ログイン後にコピー
4.3 トランスポート スレッド

/**
 * 搬运线程
 *
 * @author 睁眼看世界
 * @date 2020年1月17日
 */
@Slf4j
@Component
public class CarryJobScheduled {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 启动定时开启搬运JOB信息
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void carryJobToQueue() {
        System.out.println("carryJobToQueue --->");
        RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
            long now = System.currentTimeMillis();
            Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
            List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
            RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
            readyQueue.addAll(jobList);
            bucketSet.removeAllAsync(jobList);
        } catch (InterruptedException e) {
            log.error("carryJobToQueue error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</string></string></object></object>
ログイン後にコピー
4.4 コンシ​​ューマ スレッド

@Slf4j
@Component
public class ReadyQueueContext {

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private ConsumerService consumerService;

    /**
     * TOPIC消费线程
     */
    @PostConstruct
    public void startTopicConsumer() {
        TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
    }

    /**
     * 开启TOPIC消费线程
     * 将所有可能出现的异常全部catch住,确保While(true)能够不中断
     */
    @SuppressWarnings("InfiniteLoopStatement")
    private void runTopicThreads() {
        while (true) {
            RLock lock = null;
            try {
                lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
            } catch (Exception e) {
                log.error("runTopicThreads getLock error", e);
            }
            try {
                if (lock == null) {
                    continue;
                }
                // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
                boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
                if (!lockFlag) {
                    continue;
                }

                // 1. 获取ReadyQueue中待消费的数据
                RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
                String topicId = queue.poll(60, TimeUnit.SECONDS);
                if (StringUtils.isEmpty(topicId)) {
                    continue;
                }

                // 2. 获取job元信息内容
                RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
                Job job = jobPoolMap.get(topicId);

                // 3. 消费
                FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
                if (taskResult.get()) {
                    // 3.1 消费成功,删除JobPool和DelayBucket的job信息
                    jobPoolMap.remove(topicId);
                } else {
                    int retrySum = job.getRetry() + 1;
                    // 3.2 消费失败,则根据策略重新加入Bucket

                    // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
                    if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
                        jobPoolMap.remove(topicId);
                        continue;
                    }
                    job.setRetry(retrySum);
                    long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
                    log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
                    RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
                    delayBucket.add(nextTime, topicId);
                    // 3.3 更新元信息失败次数
                    jobPoolMap.put(topicId, job);
                }
            } catch (Exception e) {
                log.error("runTopicThreads error", e);
            } finally {
                if (lock != null) {
                    try {
                        lock.unlock();
                    } catch (Exception e) {
                        log.error("runTopicThreads unlock error", e);
                    }
                }
            }
        }
    }
}</object></boolean></string></string>
ログイン後にコピー
4.5 JOBの追加と削除

/**
 * 提供给外部服务的操作接口
 *
 * @author why
 * @date 2020年1月15日
 */
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 添加job元信息
     *
     * @param job 元信息
     */
    @Override
    public void addJob(Job job) {

        RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());

            // 1. 将job添加到 JobPool中
            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            if (jobPool.get(topicId) != null) {
                throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
            }

            jobPool.put(topicId, job);

            // 2. 将job添加到 DelayBucket中
            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.add(job.getDelay(), topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }


    /**
     * 删除job信息
     *
     * @param job 元信息
     */
    @Override
    public void deleteJob(JobDie jobDie) {

        RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());

            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            jobPool.remove(topicId);

            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.remove(topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</object></string></object></string>
ログイン後にコピー
5. コンテンツの追加最適化する必要があります

現在、メッセージを保存する Queue キューは 1 つだけです。消費する必要があるメッセージが大量に蓄積すると、メッセージ通知の適時性が影響を受けます。改善方法は、複数のキューを開き、メッセージ ルーティングを実行し、スループットを提供するために複数のコンシューマ スレッドを開いて消費することです。

メッセージは永続化されないため、危険です。メッセージは MangoDB に永続化されます。
  1. 6. ソースコード

詳細なソースコードは以下のアドレスから入手してください

RedisDelayQueue の実装

zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)
  • RedissonStarter redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)
  • プロジェクト申請 zing-pay(https://gitee.com/whyCodeData/zing-pay)
  • ##7. 参考文献

##https://tech.youzan.com/queuing_delay/https://blog.csdn.net/u010634066/article/details/98864764

    詳細redis に関する知識については、
  • redis 入門チュートリアル
  • 列を参照してください。

以上がRedis は遅延キューをどのように実装しますか?手法の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Redisクラスターモードの構築方法 Redisクラスターモードの構築方法 Apr 10, 2025 pm 10:15 PM

Redisクラスターモードは、シャードを介してRedisインスタンスを複数のサーバーに展開し、スケーラビリティと可用性を向上させます。構造の手順は次のとおりです。異なるポートで奇妙なRedisインスタンスを作成します。 3つのセンチネルインスタンスを作成し、Redisインスタンスを監視し、フェールオーバーを監視します。 Sentinel構成ファイルを構成し、Redisインスタンス情報とフェールオーバー設定の監視を追加します。 Redisインスタンス構成ファイルを構成し、クラスターモードを有効にし、クラスター情報ファイルパスを指定します。各Redisインスタンスの情報を含むnodes.confファイルを作成します。クラスターを起動し、CREATEコマンドを実行してクラスターを作成し、レプリカの数を指定します。クラスターにログインしてクラスター情報コマンドを実行して、クラスターステータスを確認します。作る

Redisデータをクリアする方法 Redisデータをクリアする方法 Apr 10, 2025 pm 10:06 PM

Redisデータをクリアする方法:Flushallコマンドを使用して、すべての重要な値をクリアします。 FlushDBコマンドを使用して、現在選択されているデータベースのキー値をクリアします。 [選択]を使用してデータベースを切り替え、FlushDBを使用して複数のデータベースをクリアします。 DELコマンドを使用して、特定のキーを削除します。 Redis-CLIツールを使用してデータをクリアします。

Redisキューの読み方 Redisキューの読み方 Apr 10, 2025 pm 10:12 PM

Redisのキューを読むには、キュー名を取得し、LPOPコマンドを使用して要素を読み、空のキューを処理する必要があります。特定の手順は次のとおりです。キュー名を取得します:「キュー:キュー」などの「キュー:」のプレフィックスで名前を付けます。 LPOPコマンドを使用します。キューのヘッドから要素を排出し、LPOP Queue:My-Queueなどの値を返します。空のキューの処理:キューが空の場合、LPOPはnilを返し、要素を読む前にキューが存在するかどうかを確認できます。

Redisコマンドの使用方法 Redisコマンドの使用方法 Apr 10, 2025 pm 08:45 PM

Redis指令を使用するには、次の手順が必要です。Redisクライアントを開きます。コマンド(動詞キー値)を入力します。必要なパラメーターを提供します(指示ごとに異なります)。 Enterを押してコマンドを実行します。 Redisは、操作の結果を示す応答を返します(通常はOKまたは-ERR)。

Redisロックの使用方法 Redisロックの使用方法 Apr 10, 2025 pm 08:39 PM

Redisを使用して操作をロックするには、setnxコマンドを介してロックを取得し、有効期限を設定するために有効期限コマンドを使用する必要があります。特定の手順は次のとおりです。(1)SETNXコマンドを使用して、キー価値ペアを設定しようとします。 (2)expireコマンドを使用して、ロックの有効期限を設定します。 (3)Delコマンドを使用して、ロックが不要になったときにロックを削除します。

Redisのソースコードを読み取る方法 Redisのソースコードを読み取る方法 Apr 10, 2025 pm 08:27 PM

Redisソースコードを理解する最良の方法は、段階的に進むことです。Redisの基本に精通してください。開始点として特定のモジュールまたは機能を選択します。モジュールまたは機能のエントリポイントから始めて、行ごとにコードを表示します。関数コールチェーンを介してコードを表示します。 Redisが使用する基礎となるデータ構造に精通してください。 Redisが使用するアルゴリズムを特定します。

Centos RedisでLUAスクリプト実行時間を構成する方法 Centos RedisでLUAスクリプト実行時間を構成する方法 Apr 14, 2025 pm 02:12 PM

Centosシステムでは、Redis構成ファイルを変更するか、Redisコマンドを使用して悪意のあるスクリプトがあまりにも多くのリソースを消費しないようにすることにより、LUAスクリプトの実行時間を制限できます。方法1:Redis構成ファイルを変更し、Redis構成ファイルを見つけます:Redis構成ファイルは通常/etc/redis/redis.confにあります。構成ファイルの編集:テキストエディター(VIやNANOなど)を使用して構成ファイルを開きます:sudovi/etc/redis/redis.conf luaスクリプト実行時間制限を設定します。

Redisコマンドラインの使用方法 Redisコマンドラインの使用方法 Apr 10, 2025 pm 10:18 PM

Redisコマンドラインツール(Redis-Cli)を使用して、次の手順を使用してRedisを管理および操作します。サーバーに接続し、アドレスとポートを指定します。コマンド名とパラメーターを使用して、コマンドをサーバーに送信します。ヘルプコマンドを使用して、特定のコマンドのヘルプ情報を表示します。 QUITコマンドを使用して、コマンドラインツールを終了します。

See all articles