目次
延时队列和优先级队列
应用场景
ホームページ データベース Redis Redis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法

Redis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法

May 23, 2022 pm 12:15 PM
redis

この記事では、Redis に関する関連知識を提供します。主に、キューのブロック、遅延、パブリッシュ、サブスクリプションの実装方法に関する関連問題を紹介します。一緒に見てみましょう。皆さんのご協力を願っています。 。

Redis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法

推奨学習: Redis ビデオ チュートリアル

Redis はキャッシュ サーバーとしてだけでなく、メッセージ キューとしても使用できます。そのリスト タイプは本質的にメッセージ キューとしての使用をサポートしています。以下の図に示すように、
Redis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法

#Redis リストは二重リンクリストを使用して実装されているため、先頭ノードと末尾ノードが保存されるため、その両側の要素を挿入または取得します。リストの先頭と末尾は非常に高速で、時間計算量は O(1) です。

通常のキュー

Redis リスト データ型を直接使用して、lpush と rpop、または rpush と lpop という 2 つの簡単な命令だけでメッセージ キューを実装できます。

    lpush rpop: left-in および right-out キュー
  • rpush lpop: left-out および right-in キュー
以下では、通常のキューをシミュレートする redis コマンド。

lpush コマンドを使用してメッセージを生成します:

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"
ログイン後にコピー
rpop コマンドを使用してメッセージを消費します:

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"
ログイン後にコピー
以下では、Java コードを使用して共通キューを実装します。

Producer SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i Consumer SingleConsumer: <p></p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }}
ログイン後にコピー
上記のコードは基本的に通常のキューの生成と消費を実現していますが、上記の例ではメッセージのコンシューマが存在します。

    消費者は、redis リストに処理対象のデータ (メッセージ) があるかどうかを確認するために、常に rpop メソッドを呼び出す必要があります。接続は呼び出されるたびに開始されます。リストにデータがない可能性があるため、多数の空のポーリングが発生し、不要な無駄が発生します。 Thread.sleep() やその他のメソッドを使用して、一定期間後にコンシューマ スレッドが再び消費できるようにすることもできます。スリープ時間が長すぎると、一部の時間に敏感なメッセージが処理できなくなります。スリープ時間が短すぎる場合は、場合、接続上での比較も発生し、大きなオーバーヘッドが発生します。
  1. プロデューサーの速度がコンシューマーの消費速度よりも速い場合、メッセージ キューの長さは増加し続け、時間の経過とともに多くのメモリ領域を占有することになります。
ブロッキングキュー

コンシューマーは brpop 命令を使用して、redis リストからデータを取得できます。この命令は、要素がある場合にのみ返されます。要素がない場合は、タイムアウトになるまでブロックされ、 null を返す. したがって、コンシューマーはデータを取得するためにスリープする必要はありません。これは、ブロッキング キューを実装するのと同等です。

redis の brpop コマンドを使用して、ブロッキング キューをシミュレートします。

>brpop queue:single 30
ログイン後にコピー
コマンド ラインが brpop でブロックされており、データがなければ 30 秒後に戻ることがわかります。

Java コードは次のように実装されます。

プロデューサーは通常のキューのプロデューサーと同じです。

Consumer BlockConsumer:

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/**
 * 消费者
 */public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }}</string>
ログイン後にコピー
短所: 1 つの生産と複数の消費は実現できません。

パブリッシュおよびサブスクライブ モード

メッセージ キューのサポートを提供することに加えて、Redis はパブリッシュ/サブスクライブ モードをサポートする一連のコマンドも提供します。 Redis のパブリッシュ/サブスクライブ モードを使用すると、1 回生成して複数回消費するキューを実装できます。

Publish: PUBLISH 命令を使用してメッセージを公開できます。形式:

PUBLISH channel message
ログイン後にコピー
戻り値は、メッセージの購読者数を示します。

サブスクリプション: SUBSCRIBE 命令はメッセージを受信するために使用されます。形式:

SUBSCRIBE channel
ログイン後にコピー
SUBSCRIBE 命令を使用した後、サブスクリプション モードに入りますが、その前にパブリッシュによって送信されたメッセージは受信されません。これは、サブスクリプションのみが送信されるまでメッセージを受信しないためです。このモードの他のコマンドについては、応答のみが表示されます。

返信は 3 つのタイプに分類されます。

    subscribe の場合、2 番目の値は購読したチャネルを示し、3 番目の値は購読したチャネルの数を示します
  1. メッセージ (メッセージ) の場合、2 番目の値はメッセージを生成したチャネル、3 番目の値はメッセージです。
  2. unsubscribe の場合、2 番目の値は購読を中止するチャネルを表します。 3 番目の値はメッセージを生成したチャネルで、現在のクライアントのサブスクリプション番号を表します。
以下では、redis コマンドを使用してパブリッシュ/サブスクライブ モードをシミュレートします。

プロデューサー:

127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1
ログイン後にコピー
コンシューマー:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"
ログイン後にコピー
Java コードは次のように実装されます:

プロデューサー PubsubProducer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/**
 * 生产者
 */public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i コンシューマー PubsubConsumer : <p></p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/**
 * 消费者
 */public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: " + message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: " + channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel " + channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }}
ログイン後にコピー
コンシューマは複数のコンシューマを起動でき、各コンシューマはすべてのメッセージを受信できます。

コマンド UNSUBSCRIBE を使用して購読を解除できます。パラメーターを追加しない場合、SUBSCRIBE コマンドによって購読されたすべてのチャネルが購読解除されます。

Redis は、コマンド PSUBSCRIBE (パターン サブスクライブ) を使用した、ワイルドカードに基づくメッセージ サブスクリプションもサポートしています。例:

psubscribe channel.*
ログイン後にコピー
PSUBSCRIBE コマンドでサブスクライブされたチャネルは、サブスクライブを解除するためにコマンド PUNSUBSCRIBE コマンドも使用する必要があります。このコマンドは、SUBSCRIBE によって購読されたチャネルから購読を解除することはできません。同様に、UNSUBSCRIBE は、PSUBSCRIBE コマンドによって購読されたチャネルから購読を解除することはできません。

同時に、PUNSUBSCRIBE 命令のワイルドカードは展開されません。例:

PUNSUBSCRIBE \*channel.\* と一致しないため、channel.\* の購読を解除するには、PUBSUBSCRIBE チャネルを記述する必要があります。 \*

Redis の pub/sub には欠点もあります。つまり、コンシューマーがオフラインになると、プロデューサーのメッセージが失われます。

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0
ログイン後にコピー

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1
ログイン後にコピー

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/**
 * 生产者
 */public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/**
 * 消费者
 */public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time <h2 id="应用场景">应用场景</h2>
<ul>
<li>延时队列可用于订单超时失效的场景</li>
<li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li>
</ul>
<p>推荐学习:<a href="https://www.php.cn/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>
ログイン後にコピー

以上がRedis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Windows 11 10.0.22000.100 のインストール時の 0x80242008 エラーの解決策 Windows 11 10.0.22000.100 のインストール時の 0x80242008 エラーの解決策 May 08, 2024 pm 03:50 PM

1. [スタート]メニューを起動し、[cmd]と入力し、[コマンドプロンプト]を右クリックし、[管理者として実行]を選択します。 2. 次のコマンドを順番に入力します (注意してコピーして貼り付けてください): SCconfigwuauservstart=auto、Enter キーを押す SCconfigbitsstart=auto、Enter キーを押す SCconfigcryptsvcstart=auto、Enter キーを押す SCconfigtrustedinstallerstart=auto、Enter キーを押す SCconfigwuauservtype=share、Enter キーを押す netstopwuauserv 、enter netstopcryptS を押す

Golang API のキャッシュ戦略と最適化 Golang API のキャッシュ戦略と最適化 May 07, 2024 pm 02:12 PM

GolangAPI のキャッシュ戦略により、パフォーマンスが向上し、サーバーの負荷が軽減されます。一般的に使用される戦略は、LRU、LFU、FIFO、TTL です。最適化手法には、適切なキャッシュ ストレージの選択、階層型キャッシュ、無効化管理、監視とチューニングが含まれます。実際には、データベースからユーザー情報を取得する API を最適化するために LRU キャッシュが使用されます。それ以外の場合は、データベースからデータを取得した後にキャッシュを更新できます。

PHP 開発におけるキャッシュ メカニズムとアプリケーションの実践 PHP 開発におけるキャッシュ メカニズムとアプリケーションの実践 May 09, 2024 pm 01:30 PM

PHP 開発では、キャッシュ メカニズムにより、頻繁にアクセスされるデータがメモリまたはディスクに一時的に保存され、データベース アクセスの数が削減され、パフォーマンスが向上します。キャッシュの種類には主にメモリ、ファイル、データベース キャッシュが含まれます。キャッシュは、組み込み関数またはサードパーティのライブラリ (cache_get() や Memcache など) を使用して PHP に実装できます。一般的な実用的なアプリケーションには、データベース クエリ結果をキャッシュしてクエリ パフォーマンスを最適化したり、ページ出力をキャッシュしてレンダリングを高速化したりすることが含まれます。キャッシュ メカニズムにより、Web サイトの応答速度が効果的に向上し、ユーザー エクスペリエンスが向上し、サーバーの負荷が軽減されます。

Win11 英語 21996 を簡体字中国語 22000 にアップグレードする方法_Win11 英語 21996 を簡体字中国語 22000 にアップグレードする方法 Win11 英語 21996 を簡体字中国語 22000 にアップグレードする方法_Win11 英語 21996 を簡体字中国語 22000 にアップグレードする方法 May 08, 2024 pm 05:10 PM

まず、システム言語を簡体字中国語表示に設定して再起動する必要があります。もちろん、以前に表示言語を簡体字中国語に変更したことがある場合は、この手順をスキップできます。次に、レジストリ regedit.exe の操作を開始し、左側のナビゲーション バーまたは上部のアドレス バーで HKEY_LOCAL_MACHINESYSTEMCurrentControlSetControlNlsLanguage に直接移動し、InstallLanguage キーの値と Default キーの値を 0804 に変更します (英語に変更する場合)。まずシステムの表示言語を en-us に設定し、システムを再起動してから、すべてを 0409 に変更します) この時点でシステムを再起動する必要があります。

PHP 配列のページネーションで Redis キャッシュを使用するにはどうすればよいですか? PHP 配列のページネーションで Redis キャッシュを使用するにはどうすればよいですか? May 01, 2024 am 10:48 AM

Redis キャッシュを使用すると、PHP 配列ページングのパフォーマンスを大幅に最適化できます。これは、次の手順で実現できます。 Redis クライアントをインストールします。 Redisサーバーに接続します。キャッシュ データを作成し、データの各ページをキー「page:{page_number}」を持つ Redis ハッシュに保存します。キャッシュからデータを取得し、大規模な配列での高コストの操作を回避します。

Win11でダウンロードしたアップデートファイルの探し方_Win11でダウンロードしたアップデートファイルの場所を共有する Win11でダウンロードしたアップデートファイルの探し方_Win11でダウンロードしたアップデートファイルの場所を共有する May 08, 2024 am 10:34 AM

1. まず、デスクトップ上の[このPC]アイコンをダブルクリックして開きます。 2. 次に、マウスの左ボタンをダブルクリックして [C ドライブ] に入ります。システム ファイルは通常、自動的に C ドライブに保存されます。 3. 次に、C ドライブで [windows] フォルダーを見つけ、ダブルクリックしてに入ります。 4. [windows]フォルダーに入ったら、[SoftwareDistribution]フォルダーを見つけます。 5. 入力後、win11 のダウンロード ファイルとアップデート ファイルがすべて含まれている [ダウンロード] フォルダーを見つけます。 6. これらのファイルを削除したい場合は、このフォルダー内で直接削除してください。

PHP Redis キャッシュ アプリケーションとベスト プラクティス PHP Redis キャッシュ アプリケーションとベスト プラクティス May 04, 2024 am 08:33 AM

Redis は、高性能のキー/値キャッシュです。 PHPRedis 拡張機能は、Redis サーバーと対話するための API を提供します。 Redis に接続し、データを保存および取得するには、次の手順を使用します。 接続: Redis クラスを使用してサーバーに接続します。ストレージ: set メソッドを使用してキーと値のペアを設定します。取得: get メソッドを使用してキーの値を取得します。

Docker環境にPECLを使用して拡張機能をインストールするときにエラーが発生するのはなぜですか?それを解決する方法は? Docker環境にPECLを使用して拡張機能をインストールするときにエラーが発生するのはなぜですか?それを解決する方法は? Apr 01, 2025 pm 03:06 PM

エラーの原因とソリューションPECLを使用してDocker環境に拡張機能をインストールする場合、Docker環境を使用するときに、いくつかの頭痛に遭遇します...

See all articles