ホームページ データベース Redis Redis は遅延キューをどのように実装しますか?手法の紹介

Redis は遅延キューをどのように実装しますか?手法の紹介

Jul 08, 2020 pm 04:01 PM
redis 遅延キュー

Redis は遅延キューをどのように実装しますか?手法の紹介

遅延キューは、その名のとおり、遅延機能を備えたメッセージキューです。では、どのような状況でそのようなキューが必要になるのでしょうか?

1. 背景

まず、次のビジネス シナリオを見てみましょう:

  • 注文が完了したとき未払い 注文が返金ステータスにあるときに注文をタイムリーにクローズする方法
  • 返金ステータスにある注文が正常に返金されたかどうかを定期的に確認する方法
  • 注文が返金されない場合下流システムからステータス通知を長期間にわたって受信する方法 注文ステータスの段階的同期を実現する戦略
  • システムが上流システムに支払い成功の最終ステータスを通知すると、上流システムは通知失敗を返します。非同期通知を実行し、分割した頻度で送信する方法: 15 秒 3 分 10 分 30 分 30 分 1 時間 2 時間 6 時間 15 時間

1.1 解決策

  • ##最も簡単な方法は、メーターを定期的にスキャンすることです。たとえば、注文の支払い有効期限要件が比較的高い場合、メーターは 2 秒ごとにスキャンされ、期限切れの注文を確認し、注文を積極的にクローズします。 利点はシンプルであることです欠点はテーブルを毎分グローバルにスキャンするため、リソースが無駄になることです有効期限が切れそうなテーブルデータの注文量が多い場合、注文完了に遅れが生じます。

  • RabbitMq またはその他の MQ 変更を使用して遅延キューを実装します。利点は、オープン ソースであり、既製の安定した実装ソリューションであることです。欠点は次のとおりです。MQ はメッセージ ミドルウェアです。チームのテクノロジー スタックが本質的に MQ を持っている場合は問題ありません。そうでない場合は、キューを遅延させるために MQ のセットをデプロイするのは少し費用がかかります。

  • Redis の zset を使用する機能をリストする場合は、Redis を使用して実装できます遅延キューRedisDelayQueue

2. 設計目標

  • リアルタイム パフォーマンス: 一定期間、第 2 レベルのエラーが許可されます。
  • 高可用性: スタンドアロンをサポート、クラスターをサポートします。
  • メッセージ削除をサポートします。 : 企業はいつでも指定されたメッセージを削除します
  • メッセージの信頼性: 少なくとも 1 回は消費されることが保証されます
  • メッセージの永続性: Redis 自体の永続特性に基づいて、Redis データが失われた場合、これは遅延メッセージが失われることを意味しますが、プライマリ バックアップとクラスタの保証は提供できます。これは、メッセージを MangoDB に永続化するための後続の最適化のために考慮できます。

3. 設計計画

設計には主に次の内容が含まれます。ポイント:
  • Redis 全体をメッセージ プールとして扱い、メッセージを KV 形式で保存します
  • ZSET を優先キューとして使用し、スコアに従って優先度を維持します
  • LIST 構造を使用して先出し消費を促進します。
  • ZSET および LIST はメッセージ アドレスを格納します (メッセージ プール内の各 KEY に対応します)
  • ルーティング オブジェクトをカスタマイズし、ZSET および LIST 名を格納し、 ZSET ルートから正しいリストにメッセージを送信します。
  • タイマーを使用してルーティングを維持します。
  • TTL ルールに従ってメッセージ遅延を実装します。

3.1 設計図


これは依然としてYouzanの遅延キュー設計、最適化、コード実装に基づいています。 Youzan DesignRedis は遅延キューをどのように実装しますか?手法の紹介

##3.2 データ構造

  • ZING:DELAY_QUEUE:JOB_POOL です。すべての遅延キュー情報を格納する Hash_Table 構造。 KV 構造: K=prefix projectName field = topic jobId V=CONENT;Vクライアントから渡されたデータは、消費時に返されます
  • ZING:DELAY_QUEUE:BUCKET 遅延キューがあります シーケンスset ZSET は、タイムスタンプに従ってソートされた K=ID と必要な実行タイムスタンプを格納します。
  • ZING:DELAY_QUEUE:QUEUE LIST 構造。各トピックには LIST があり、リストには JOB# が格納されます。

Redis は遅延キューをどのように実装しますか?手法の紹介## 現在使用する必要がある画像は参考用です。基本的にプロセス全体の実行を説明できます。画像は記事の最後にある参考ブログから引用しています。

3.3 タスクのライフ サイクル

    新しい JOB が追加されると、データの一部が
  1. ZING に挿入されます。 :DELAY_QUEUE:JOB_POOL とビジネス側と消費者側を記録しました。 ZING:DELAY_QUEUE:BUCKET は、実行タイムスタンプを記録するレコードも挿入します。
  2. 処理スレッドは
  3. ZING:DELAY_QUEUE:BUCKET に移動して、どの実行タイムスタンプが RunTimeMillis であるかを検索します。現在時刻より小さい場合は、これらのレコードをすべて削除します。同時に、各タスクのトピックが何であるかを解析し、これらのタスクを TOPICZING:DELAY_QUEUE:QUEUE に対応するリストにプッシュします。
  4. 各 TOPIC LIST には、LIST 内で消費されるデータをバッチ取得するためのリスニング スレッドがあり、取得されたすべてのデータは、この TOPIC の消費スレッド プールにスローされます。
  5. 消費スレッド プールの実行が始まります
  6. ZING:DELAY_QUEUE:JOB_POOLデータ構造を見つけて、それをコールバック構造に返し、コールバック メソッドを実行します。

3.4 設計ポイント

3.4.1 基本概念

  • JOB: 非同期処理を必要とするタスクは遅延キューの基本単位です
  • トピック: 同じタイプのジョブのコレクション (キュー)。コンシューマがサブスクライブするには

#3.4.2 メッセージ構造

#各 JOB には次の属性が含まれている必要があります

    jobId: ジョブの一意の識別子。指定したジョブ情報を取得および削除するために使用されます。
  • トピック: ジョブの種類。これは、特定のビジネス名として理解できます。
  • lay: ジョブを遅らせる必要がある時間。単位: 秒。 (サーバーが絶対時間に変換します)
  • body: コンシューマーが特定の業務処理を実行するためのジョブの内容。json 形式で保存されます。
  • retry: 失敗した再試行の数
  • url: 通知 URL

3.5 設計の詳細

3.5.1 方法ZING:DELAY_QUEUE:QUEUE

最も簡単な実装方法は、タイマーを使用して第 2 レベルのスキャンを実行することです。メッセージ実行の適時性を確保するには、1 秒ごとに Redis をリクエストして、キュー内に消費されるジョブがあるかどうかを判断するように設定できます。しかし、問題が発生します。キューに消費可能な JOB がない場合、頻繁なスキャンは無意味であり、リソースの無駄になります。幸いなことに、LIST には

BLPOP ブロッキング プリミティブ があります。リストの場合、データがあればすぐに返します データがなければデータが返されるまでそこでブロックされます ブロックタイムアウトを設定でき、タイムアウト後に NULL が返されます 具体的な実装方法と戦略は、

3.5.2 タイミングによるメッセージの繰り返し転送と消費を避ける

##Redis の分散ロックを使用するメッセージの転送を制御します。メッセージの繰り返し転送による問題を回避するには、
  • 分散ロックを使用してタイマーの実行頻度を確保します。

4. コア コードの実装

4.1 技術的な説明#技術スタック: SpringBoot、Redisson、Redis、分散ロック、 timer

Note

: このプロジェクトは、設計計画では複数のキューの消費を認識しておらず、1 つのキューのみを開きます。これは将来最適化される予定です。

4.2 コア エンティティ

4.2.1 ジョブへの新しいオブジェクトの追加##

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class Job implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;

    /**
     * Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
     */
    private Long delay;

    /**
     * Job的内容,供消费者做具体的业务处理,以json格式存储
     */
    @NotBlank
    private String body;

    /**
     * 失败重试次数
     */
    private int retry = 0;

    /**
     * 通知URL
     */
    @NotBlank
    private String url;
}
ログイン後にコピー
#4.2.2 ジョブからのオブジェクトの削除

/**
 * 消息结构
 *
 * @author 睁眼看世界
 * @date 2020年1月15日
 */
@Data
public class JobDie implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一标识。用来检索和删除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job类型。可以理解成具体的业务名称
     */
    @NotBlank
    private String topic;
}
ログイン後にコピー
4.3 トランスポート スレッド

/**
 * 搬运线程
 *
 * @author 睁眼看世界
 * @date 2020年1月17日
 */
@Slf4j
@Component
public class CarryJobScheduled {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 启动定时开启搬运JOB信息
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void carryJobToQueue() {
        System.out.println("carryJobToQueue --->");
        RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
            long now = System.currentTimeMillis();
            Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
            List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
            RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
            readyQueue.addAll(jobList);
            bucketSet.removeAllAsync(jobList);
        } catch (InterruptedException e) {
            log.error("carryJobToQueue error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</string></string></object></object>
ログイン後にコピー
4.4 コンシ​​ューマ スレッド

@Slf4j
@Component
public class ReadyQueueContext {

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private ConsumerService consumerService;

    /**
     * TOPIC消费线程
     */
    @PostConstruct
    public void startTopicConsumer() {
        TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
    }

    /**
     * 开启TOPIC消费线程
     * 将所有可能出现的异常全部catch住,确保While(true)能够不中断
     */
    @SuppressWarnings("InfiniteLoopStatement")
    private void runTopicThreads() {
        while (true) {
            RLock lock = null;
            try {
                lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
            } catch (Exception e) {
                log.error("runTopicThreads getLock error", e);
            }
            try {
                if (lock == null) {
                    continue;
                }
                // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
                boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
                if (!lockFlag) {
                    continue;
                }

                // 1. 获取ReadyQueue中待消费的数据
                RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
                String topicId = queue.poll(60, TimeUnit.SECONDS);
                if (StringUtils.isEmpty(topicId)) {
                    continue;
                }

                // 2. 获取job元信息内容
                RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
                Job job = jobPoolMap.get(topicId);

                // 3. 消费
                FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
                if (taskResult.get()) {
                    // 3.1 消费成功,删除JobPool和DelayBucket的job信息
                    jobPoolMap.remove(topicId);
                } else {
                    int retrySum = job.getRetry() + 1;
                    // 3.2 消费失败,则根据策略重新加入Bucket

                    // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
                    if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
                        jobPoolMap.remove(topicId);
                        continue;
                    }
                    job.setRetry(retrySum);
                    long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
                    log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
                    RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
                    delayBucket.add(nextTime, topicId);
                    // 3.3 更新元信息失败次数
                    jobPoolMap.put(topicId, job);
                }
            } catch (Exception e) {
                log.error("runTopicThreads error", e);
            } finally {
                if (lock != null) {
                    try {
                        lock.unlock();
                    } catch (Exception e) {
                        log.error("runTopicThreads unlock error", e);
                    }
                }
            }
        }
    }
}</object></boolean></string></string>
ログイン後にコピー
4.5 JOBの追加と削除

/**
 * 提供给外部服务的操作接口
 *
 * @author why
 * @date 2020年1月15日
 */
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 添加job元信息
     *
     * @param job 元信息
     */
    @Override
    public void addJob(Job job) {

        RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());

            // 1. 将job添加到 JobPool中
            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            if (jobPool.get(topicId) != null) {
                throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
            }

            jobPool.put(topicId, job);

            // 2. 将job添加到 DelayBucket中
            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.add(job.getDelay(), topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }


    /**
     * 删除job信息
     *
     * @param job 元信息
     */
    @Override
    public void deleteJob(JobDie jobDie) {

        RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());

            RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            jobPool.remove(topicId);

            RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.remove(topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}</object></string></object></string>
ログイン後にコピー
5. コンテンツの追加最適化する必要があります

現在、メッセージを保存する Queue キューは 1 つだけです。消費する必要があるメッセージが大量に蓄積すると、メッセージ通知の適時性が影響を受けます。改善方法は、複数のキューを開き、メッセージ ルーティングを実行し、スループットを提供するために複数のコンシューマ スレッドを開いて消費することです。

メッセージは永続化されないため、危険です。メッセージは MangoDB に永続化されます。
  1. 6. ソースコード

詳細なソースコードは以下のアドレスから入手してください

RedisDelayQueue の実装

zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)
  • RedissonStarter redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)
  • プロジェクト申請 zing-pay(https://gitee.com/whyCodeData/zing-pay)
  • ##7. 参考文献

##https://tech.youzan.com/queuing_delay/https://blog.csdn.net/u010634066/article/details/98864764

    詳細redis に関する知識については、
  • redis 入門チュートリアル
  • 列を参照してください。

以上がRedis は遅延キューをどのように実装しますか?手法の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Windows 11 10.0.22000.100 のインストール時の 0x80242008 エラーの解決策 Windows 11 10.0.22000.100 のインストール時の 0x80242008 エラーの解決策 May 08, 2024 pm 03:50 PM

1. [スタート]メニューを起動し、[cmd]と入力し、[コマンドプロンプト]を右クリックし、[管理者として実行]を選択します。 2. 次のコマンドを順番に入力します (注意してコピーして貼り付けてください): SCconfigwuauservstart=auto、Enter キーを押す SCconfigbitsstart=auto、Enter キーを押す SCconfigcryptsvcstart=auto、Enter キーを押す SCconfigtrustedinstallerstart=auto、Enter キーを押す SCconfigwuauservtype=share、Enter キーを押す netstopwuauserv 、enter netstopcryptS を押す

PHP機能のボトルネックを分析し、実行効率を向上 PHP機能のボトルネックを分析し、実行効率を向上 Apr 23, 2024 pm 03:42 PM

PHP 関数のボトルネックはパフォーマンスの低下につながります。これは、ボトルネック関数を特定し、パフォーマンス分析ツールを使用するという手順で解決できます。結果をキャッシュして再計算を減らします。タスクを並列処理して実行効率を向上させます。文字列の連結を最適化し、代わりに組み込み関数を使用します。カスタム関数の代わりに組み込み関数を使用します。

Golang API のキャッシュ戦略と最適化 Golang API のキャッシュ戦略と最適化 May 07, 2024 pm 02:12 PM

GolangAPI のキャッシュ戦略により、パフォーマンスが向上し、サーバーの負荷が軽減されます。一般的に使用される戦略は、LRU、LFU、FIFO、TTL です。最適化手法には、適切なキャッシュ ストレージの選択、階層型キャッシュ、無効化管理、監視とチューニングが含まれます。実際には、データベースからユーザー情報を取得する API を最適化するために LRU キャッシュが使用されます。それ以外の場合は、データベースからデータを取得した後にキャッシュを更新できます。

erlang と golang ではどちらのパフォーマンスが優れていますか? erlang と golang ではどちらのパフォーマンスが優れていますか? Apr 21, 2024 am 03:24 AM

Erlang と Go にはパフォーマンスの違いがあります。 Erlang は同時実行性に優れていますが、Go はより高いスループットとより高速なネットワーク パフォーマンスを備えています。 Erlang は高い同時実行性を必要とするシステムに適しており、Go は高スループットと低遅延を必要とするシステムに適しています。

PHP 開発におけるキャッシュ メカニズムとアプリケーションの実践 PHP 開発におけるキャッシュ メカニズムとアプリケーションの実践 May 09, 2024 pm 01:30 PM

PHP 開発では、キャッシュ メカニズムにより、頻繁にアクセスされるデータがメモリまたはディスクに一時的に保存され、データベース アクセスの数が削減され、パフォーマンスが向上します。キャッシュの種類には主にメモリ、ファイル、データベース キャッシュが含まれます。キャッシュは、組み込み関数またはサードパーティのライブラリ (cache_get() や Memcache など) を使用して PHP に実装できます。一般的な実用的なアプリケーションには、データベース クエリ結果をキャッシュしてクエリ パフォーマンスを最適化したり、ページ出力をキャッシュしてレンダリングを高速化したりすることが含まれます。キャッシュ メカニズムにより、Web サイトの応答速度が効果的に向上し、ユーザー エクスペリエンスが向上し、サーバーの負荷が軽減されます。

PHP 配列のページネーションで Redis キャッシュを使用するにはどうすればよいですか? PHP 配列のページネーションで Redis キャッシュを使用するにはどうすればよいですか? May 01, 2024 am 10:48 AM

Redis キャッシュを使用すると、PHP 配列ページングのパフォーマンスを大幅に最適化できます。これは、次の手順で実現できます。 Redis クライアントをインストールします。 Redisサーバーに接続します。キャッシュ データを作成し、データの各ページをキー「page:{page_number}」を持つ Redis ハッシュに保存します。キャッシュからデータを取得し、大規模な配列での高コストの操作を回避します。

Win11 英語 21996 を簡体字中国語 22000 にアップグレードする方法_Win11 英語 21996 を簡体字中国語 22000 にアップグレードする方法 Win11 英語 21996 を簡体字中国語 22000 にアップグレードする方法_Win11 英語 21996 を簡体字中国語 22000 にアップグレードする方法 May 08, 2024 pm 05:10 PM

まず、システム言語を簡体字中国語表示に設定して再起動する必要があります。もちろん、以前に表示言語を簡体字中国語に変更したことがある場合は、この手順をスキップできます。次に、レジストリ regedit.exe の操作を開始し、左側のナビゲーション バーまたは上部のアドレス バーで HKEY_LOCAL_MACHINESYSTEMCurrentControlSetControlNlsLanguage に直接移動し、InstallLanguage キーの値と Default キーの値を 0804 に変更します (英語に変更する場合)。まずシステムの表示言語を en-us に設定し、システムを再起動してから、すべてを 0409 に変更します) この時点でシステムを再起動する必要があります。

navicat は redis に接続できますか? navicat は redis に接続できますか? Apr 23, 2024 pm 05:12 PM

はい、Navicat は Redis に接続できます。これにより、ユーザーはキーの管理、値の表示、コマンドの実行、アクティビティの監視、問題の診断が可能になります。 Redis に接続するには、Navicat で「Redis」接続タイプを選択し、サーバーの詳細を入力します。

See all articles