この記事では、Redis の高度な使用法 (メッセージ キュー) を説明し、Redis の遅延キューについて紹介します。
メッセージ キュー ミドルウェアというと、アプリケーションに非同期メッセージング機能を実装するための RabbitMQ、RocketMQ、Kafka を誰もが思い浮かべます。これらは、私たちが理解できる以上の機能を備えた特殊なメッセージ キュー ミドルウェアです。
これらのメッセージ ミドルウェアの使用は、RabbitMQ など複雑です。メッセージを送信する前に、Exchange と Queue を作成し、いくつかのルールに従って Exchange と Queue をバインドする必要があります。ルーティング キーと制御ヘッダー メッセージを定式化する必要もあります。これはプロデューサーのみが対象ですが、コンシューマーもメッセージを消費する前に上記の一連の面倒な手順を再度実行する必要があります。
したがって、100% の信頼性を必要とせず、単純なメッセージ キュー要件を実装したい人は、Redis を使用してメッセージ キュー ミドルウェアの面倒な手順から解放できます。
Redis のメッセージ キューは専門的なメッセージ キューではないため、メッセージ キューに多くの高度な機能はなく、ACK 保証もありません。メッセージの信頼性を究極的に追求する場合は、プロフェッショナルな MQ ミドルウェアをご利用ください。 [関連する推奨事項: Redis ビデオ チュートリアル ]
最も単純な非同期メッセージ キューから始まり、Redis リストのデータ構造は一般的に次のようになります。 used 非同期メッセージ キューとして、エンキューには lrpush/lpush が使用され、デキューには rpop/lpop が使用されます。
// 生产\ public void delay(T msg) {\ TaskItem task = new TaskItem();\ task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\ task.msg = msg;\ String s = JSON.toJSONString(task); // fastjson 序列化\ jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试\ }\ // 消费\ public void loop() {\ while (!Thread.interrupted()) {\ // zrangeByScore参数中0, System.currentTimeMills()代表从redis中去score范围在0到系统当前时间的数据, 0,1表示从0开始取1个 拓展传入的score为-inf, +inf 分别表示zset中的最大值和最小值,当你不知道zset中的score最值时就可以使用inf作为参数变量\ Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\ if (values.isEmpty()) {\ try {\ Thread.sleep(500); // 歇会继续\ }\ catch (InterruptedException e) {\ break;\ }\ continue;\ }\ String s = values.iterator().next(); //消费队列\ if (jedis.zrem(queueKey, s) > 0) { // 抢到了,要考虑到多线程下锁争抢的情况,只有rem成功代表成功的消费了一条消息。\ TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\ this.handleMsg(task.msg);\ }\ }\ }
プログラミング入門をご覧ください。 !
以上がRedis でメッセージ キューを使用する方法の簡単な分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。