Redis は遅延キューをどのように実装しますか?手法の紹介
遅延キューは、その名のとおり、遅延機能を備えたメッセージキューです。では、どのような状況でそのようなキューが必要になるのでしょうか?
1. 背景
まず、次のビジネス シナリオを見てみましょう:
- 注文が完了したとき未払い 注文が返金ステータスにあるときに注文をタイムリーにクローズする方法
- 返金ステータスにある注文が正常に返金されたかどうかを定期的に確認する方法
- 注文が返金されない場合下流システムからステータス通知を長期間にわたって受信する方法 注文ステータスの段階的同期を実現する戦略
- システムが上流システムに支払い成功の最終ステータスを通知すると、上流システムは通知失敗を返します。非同期通知を実行し、分割した頻度で送信する方法: 15 秒 3 分 10 分 30 分 30 分 1 時間 2 時間 6 時間 15 時間
1.1 解決策
##最も簡単な方法は、メーターを定期的にスキャンすることです。たとえば、注文の支払い有効期限要件が比較的高い場合、メーターは 2 秒ごとにスキャンされ、期限切れの注文を確認し、注文を積極的にクローズします。 利点はシンプルであることです、欠点はテーブルを毎分グローバルにスキャンするため、リソースが無駄になることです有効期限が切れそうなテーブルデータの注文量が多い場合、注文完了に遅れが生じます。
RabbitMq またはその他の MQ 変更を使用して遅延キューを実装します。利点は、オープン ソースであり、既製の安定した実装ソリューションであることです。欠点は次のとおりです。MQ はメッセージ ミドルウェアです。チームのテクノロジー スタックが本質的に MQ を持っている場合は問題ありません。そうでない場合は、キューを遅延させるために MQ のセットをデプロイするのは少し費用がかかります。
Redis の zset を使用する機能をリストする場合は、Redis を使用して実装できます遅延キューRedisDelayQueue
2. 設計目標
- リアルタイム パフォーマンス: 一定期間、第 2 レベルのエラーが許可されます。
- 高可用性: スタンドアロンをサポート、クラスターをサポートします。
- メッセージ削除をサポートします。 : 企業はいつでも指定されたメッセージを削除します
- メッセージの信頼性: 少なくとも 1 回は消費されることが保証されます
- メッセージの永続性: Redis 自体の永続特性に基づいて、Redis データが失われた場合、これは遅延メッセージが失われることを意味しますが、プライマリ バックアップとクラスタの保証は提供できます。これは、メッセージを MangoDB に永続化するための後続の最適化のために考慮できます。
3. 設計計画
設計には主に次の内容が含まれます。ポイント:- Redis 全体をメッセージ プールとして扱い、メッセージを KV 形式で保存します
- ZSET を優先キューとして使用し、スコアに従って優先度を維持します
- LIST 構造を使用して先出し消費を促進します。
- ZSET および LIST はメッセージ アドレスを格納します (メッセージ プール内の各 KEY に対応します)
- ルーティング オブジェクトをカスタマイズし、ZSET および LIST 名を格納し、 ZSET ルートから正しいリストにメッセージを送信します。
- タイマーを使用してルーティングを維持します。
- TTL ルールに従ってメッセージ遅延を実装します。
3.1 設計図
これは依然としてYouzanの遅延キュー設計、最適化、コード実装に基づいています。 Youzan Design
##3.2 データ構造
-
ZING:DELAY_QUEUE:JOB_POOL
です。すべての遅延キュー情報を格納する Hash_Table 構造。 KV 構造: K=prefix projectName field = topic jobId V=CONENT;Vクライアントから渡されたデータは、消費時に返されます -
ZING:DELAY_QUEUE:BUCKET
遅延キューがあります シーケンスset ZSET は、タイムスタンプに従ってソートされた K=ID と必要な実行タイムスタンプを格納します。 -
ZING:DELAY_QUEUE:QUEUE
LIST 構造。各トピックには LIST があり、リストには JOB# が格納されます。
## 現在使用する必要がある画像は参考用です。基本的にプロセス全体の実行を説明できます。画像は記事の最後にある参考ブログから引用しています。
3.3 タスクのライフ サイクル
- 新しい JOB が追加されると、データの一部が
- ZING に挿入されます。 :DELAY_QUEUE:JOB_POOL
とビジネス側と消費者側を記録しました。
ZING:DELAY_QUEUE:BUCKETは、実行タイムスタンプを記録するレコードも挿入します。
処理スレッドは - ZING:DELAY_QUEUE:BUCKET
に移動して、どの実行タイムスタンプが RunTimeMillis であるかを検索します。現在時刻より小さい場合は、これらのレコードをすべて削除します。同時に、各タスクのトピックが何であるかを解析し、これらのタスクを TOPIC
ZING:DELAY_QUEUE:QUEUEに対応するリストにプッシュします。
各 TOPIC LIST には、LIST 内で消費されるデータをバッチ取得するためのリスニング スレッドがあり、取得されたすべてのデータは、この TOPIC の消費スレッド プールにスローされます。 - 消費スレッド プールの実行が始まります
- ZING:DELAY_QUEUE:JOB_POOL
データ構造を見つけて、それをコールバック構造に返し、コールバック メソッドを実行します。
3.4 設計ポイント
3.4.1 基本概念
- JOB: 非同期処理を必要とするタスクは遅延キューの基本単位です
- トピック: 同じタイプのジョブのコレクション (キュー)。コンシューマがサブスクライブするには
#3.4.2 メッセージ構造
#各 JOB には次の属性が含まれている必要があります- jobId: ジョブの一意の識別子。指定したジョブ情報を取得および削除するために使用されます。
- トピック: ジョブの種類。これは、特定のビジネス名として理解できます。
- lay: ジョブを遅らせる必要がある時間。単位: 秒。 (サーバーが絶対時間に変換します)
- body: コンシューマーが特定の業務処理を実行するためのジョブの内容。json 形式で保存されます。
- retry: 失敗した再試行の数
- url: 通知 URL
3.5 設計の詳細
3.5.1 方法ZING:DELAY_QUEUE:QUEUE
BLPOP ブロッキング プリミティブ があります。リストの場合、データがあればすぐに返します データがなければデータが返されるまでそこでブロックされます ブロックタイムアウトを設定でき、タイムアウト後に NULL が返されます 具体的な実装方法と戦略は、
3.5.2 タイミングによるメッセージの繰り返し転送と消費を避ける
##Redis の分散ロックを使用するメッセージの転送を制御します。メッセージの繰り返し転送による問題を回避するには、- 分散ロックを使用してタイマーの実行頻度を確保します。
4. コア コードの実装
4.1 技術的な説明#技術スタック: SpringBoot、Redisson、Redis、分散ロック、 timer
Note
: このプロジェクトは、設計計画では複数のキューの消費を認識しておらず、1 つのキューのみを開きます。これは将来最適化される予定です。
4.2 コア エンティティ
4.2.1 ジョブへの新しいオブジェクトの追加##/**
* 消息结构
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class Job implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job的唯一标识。用来检索和删除指定的Job信息
*/
@NotBlank
private String jobId;
/**
* Job类型。可以理解成具体的业务名称
*/
@NotBlank
private String topic;
/**
* Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
*/
private Long delay;
/**
* Job的内容,供消费者做具体的业务处理,以json格式存储
*/
@NotBlank
private String body;
/**
* 失败重试次数
*/
private int retry = 0;
/**
* 通知URL
*/
@NotBlank
private String url;
}
/**
* 消息结构
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class JobDie implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job的唯一标识。用来检索和删除指定的Job信息
*/
@NotBlank
private String jobId;
/**
* Job类型。可以理解成具体的业务名称
*/
@NotBlank
private String topic;
}
/**
* 搬运线程
*
* @author 睁眼看世界
* @date 2020年1月17日
*/
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* 启动定时开启搬运JOB信息
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
RScoredSortedSet<object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<object> jobCollection = bucketSet.valueRange(0, false, now, true);
List<string> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<string> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
readyQueue.addAll(jobList);
bucketSet.removeAllAsync(jobList);
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}</string></string></object></object>
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC消费线程
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
}
/**
* 开启TOPIC消费线程
* 将所有可能出现的异常全部catch住,确保While(true)能够不中断
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. 获取ReadyQueue中待消费的数据
RBlockingQueue<string> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. 获取job元信息内容
RMap<string> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. 消费
FutureTask<boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 消费成功,删除JobPool和DelayBucket的job信息
jobPoolMap.remove(topicId);
} else {
int retrySum = job.getRetry() + 1;
// 3.2 消费失败,则根据策略重新加入Bucket
// 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 更新元信息失败次数
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}
}</object></boolean></string></string>
/**
* 提供给外部服务的操作接口
*
* @author why
* @date 2020年1月15日
*/
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* 添加job元信息
*
* @param job 元信息
*/
@Override
public void addJob(Job job) {
RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. 将job添加到 JobPool中
RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
}
jobPool.put(topicId, job);
// 2. 将job添加到 DelayBucket中
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 删除job信息
*
* @param job 元信息
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
RMap<string> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
RScoredSortedSet<object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}</object></string></object></string>
現在、メッセージを保存する Queue キューは 1 つだけです。消費する必要があるメッセージが大量に蓄積すると、メッセージ通知の適時性が影響を受けます。改善方法は、複数のキューを開き、メッセージ ルーティングを実行し、スループットを提供するために複数のコンシューマ スレッドを開いて消費することです。
メッセージは永続化されないため、危険です。メッセージは MangoDB に永続化されます。- 6. ソースコード
詳細なソースコードは以下のアドレスから入手してください
RedisDelayQueue の実装
zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)-
RedissonStarter
redisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter) -
プロジェクト申請
zing-pay(https://gitee.com/whyCodeData/zing-pay) -
##7. 参考文献
##https://tech.youzan.com/queuing_delay/https://blog.csdn.net/u010634066/article/details/98864764
- 詳細redis に関する知識については、
- redis 入門チュートリアル 列を参照してください。
以上がRedis は遅延キューをどのように実装しますか?手法の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック











1. [スタート]メニューを起動し、[cmd]と入力し、[コマンドプロンプト]を右クリックし、[管理者として実行]を選択します。 2. 次のコマンドを順番に入力します (注意してコピーして貼り付けてください): SCconfigwuauservstart=auto、Enter キーを押す SCconfigbitsstart=auto、Enter キーを押す SCconfigcryptsvcstart=auto、Enter キーを押す SCconfigtrustedinstallerstart=auto、Enter キーを押す SCconfigwuauservtype=share、Enter キーを押す netstopwuauserv 、enter netstopcryptS を押す

PHP 関数のボトルネックはパフォーマンスの低下につながります。これは、ボトルネック関数を特定し、パフォーマンス分析ツールを使用するという手順で解決できます。結果をキャッシュして再計算を減らします。タスクを並列処理して実行効率を向上させます。文字列の連結を最適化し、代わりに組み込み関数を使用します。カスタム関数の代わりに組み込み関数を使用します。

GolangAPI のキャッシュ戦略により、パフォーマンスが向上し、サーバーの負荷が軽減されます。一般的に使用される戦略は、LRU、LFU、FIFO、TTL です。最適化手法には、適切なキャッシュ ストレージの選択、階層型キャッシュ、無効化管理、監視とチューニングが含まれます。実際には、データベースからユーザー情報を取得する API を最適化するために LRU キャッシュが使用されます。それ以外の場合は、データベースからデータを取得した後にキャッシュを更新できます。

Erlang と Go にはパフォーマンスの違いがあります。 Erlang は同時実行性に優れていますが、Go はより高いスループットとより高速なネットワーク パフォーマンスを備えています。 Erlang は高い同時実行性を必要とするシステムに適しており、Go は高スループットと低遅延を必要とするシステムに適しています。

PHP 開発では、キャッシュ メカニズムにより、頻繁にアクセスされるデータがメモリまたはディスクに一時的に保存され、データベース アクセスの数が削減され、パフォーマンスが向上します。キャッシュの種類には主にメモリ、ファイル、データベース キャッシュが含まれます。キャッシュは、組み込み関数またはサードパーティのライブラリ (cache_get() や Memcache など) を使用して PHP に実装できます。一般的な実用的なアプリケーションには、データベース クエリ結果をキャッシュしてクエリ パフォーマンスを最適化したり、ページ出力をキャッシュしてレンダリングを高速化したりすることが含まれます。キャッシュ メカニズムにより、Web サイトの応答速度が効果的に向上し、ユーザー エクスペリエンスが向上し、サーバーの負荷が軽減されます。

Redis キャッシュを使用すると、PHP 配列ページングのパフォーマンスを大幅に最適化できます。これは、次の手順で実現できます。 Redis クライアントをインストールします。 Redisサーバーに接続します。キャッシュ データを作成し、データの各ページをキー「page:{page_number}」を持つ Redis ハッシュに保存します。キャッシュからデータを取得し、大規模な配列での高コストの操作を回避します。

まず、システム言語を簡体字中国語表示に設定して再起動する必要があります。もちろん、以前に表示言語を簡体字中国語に変更したことがある場合は、この手順をスキップできます。次に、レジストリ regedit.exe の操作を開始し、左側のナビゲーション バーまたは上部のアドレス バーで HKEY_LOCAL_MACHINESYSTEMCurrentControlSetControlNlsLanguage に直接移動し、InstallLanguage キーの値と Default キーの値を 0804 に変更します (英語に変更する場合)。まずシステムの表示言語を en-us に設定し、システムを再起動してから、すべてを 0409 に変更します) この時点でシステムを再起動する必要があります。

はい、Navicat は Redis に接続できます。これにより、ユーザーはキーの管理、値の表示、コマンドの実行、アクティビティの監視、問題の診断が可能になります。 Redis に接続するには、Navicat で「Redis」接続タイプを選択し、サーバーの詳細を入力します。
