この記事では、Redis に関する関連知識を提供します。主に、キューのブロック、遅延、パブリッシュ、サブスクリプションの実装方法に関する関連問題を紹介します。一緒に見てみましょう。皆さんのご協力を願っています。 。
推奨学習: Redis ビデオ チュートリアル
Redis はキャッシュ サーバーとしてだけでなく、メッセージ キューとしても使用できます。そのリスト タイプは本質的にメッセージ キューとしての使用をサポートしています。以下の図に示すように、
lpush コマンドを使用してメッセージを生成します:
>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"
>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"
package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;/** * 生产者 */public class SingleProducer { public static final String SINGLE_QUEUE_NAME = "queue:single"; public static void main(String[] args) { Jedis jedis = new Jedis(); for (int i = 0; i Consumer SingleConsumer: <p></p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;/** * 消费者 */public class SingleConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); while (true) { String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME); if(Objects.nonNull(message)) { System.out.println(message); } else { TimeUnit.MILLISECONDS.sleep(500); } } }}
>brpop queue:single 30
package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;/** * 消费者 */public class BlockConsumer { public static void main(String[] args) { Jedis jedis = new Jedis(); while (true) { // 超时时间为1s List<string> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME); if (null != messageList && !messageList.isEmpty()) { System.out.println(messageList); } } }}</string>
PUBLISH channel message
SUBSCRIBE channel
127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1
127.0.0.1:6379> subscribe queue Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"
package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;/** * 生产者 */public class PubsubProducer { public static final String PUBSUB_QUEUE_NAME = "queue:pubsub"; public static void main(String[] args) { Jedis jedis = new Jedis(); for (int i = 0; i コンシューマー PubsubConsumer : <p></p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;/** * 消费者 */public class PubsubConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); JedisPubSub jedisPubSub = new JedisPubSub() { @Override public void onMessage(String channel, String message) { System.out.println("receive message: " + message); if(message.indexOf("99") > -1) { this.unsubscribe(); } } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("subscribe channel: " + channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("unsubscribe channel " + channel); } }; jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME); }}
psubscribe channel.*
PUNSUBSCRIBE \* は
channel.\* と一致しないため、
channel.\* の購読を解除するには、
PUBSUBSCRIBE チャネルを記述する必要があります。 \*。
Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。
如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。
如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。
下面使用redis的zset来模拟延时队列。
生产者:
127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0
消费者:
127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1
Java代码如下:
生产者DelayProducer:
package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;/** * 生产者 */public class DelayProducer { public static final String DELAY_QUEUE_NAME = "queue:delay"; public static void main(String[] args) { Jedis jedis = new Jedis(); long now = new Date().getTime(); Random random = new Random(); for (int i = 0; i <p>消费者:</p><pre class="brush:php;toolbar:false">package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;/** * 消费者 */public class DelayConsumer { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis(); while (true) { long now = new Date().getTime(); Set<tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0); if(tupleSet.isEmpty()) { TimeUnit.MILLISECONDS.sleep(500); } else { for (Tuple tuple : tupleSet) { Double score = tuple.getScore(); long time = score.longValue(); if(time <h2>应用场景</h2> <ul> <li>延时队列可用于订单超时失效的场景</li> <li>二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。</li> </ul> <p>推荐学习:<a href="https://www.php.cn/course/list/54.html" target="_blank">Redis视频教程</a></p></tuple>
以上がRedis がキューのブロック、遅延、パブリッシュ、サブスクリプションを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。