首頁 > 資料庫 > Redis > 詳解Redis和隊列

詳解Redis和隊列

藏色散人
發布: 2020-08-26 11:51:30
轉載
2191 人瀏覽過

下面由Redis教學專欄給大家詳解Redis和佇列,希望對需要的朋友有幫助!

詳解Redis和隊列

摘要

Redis不僅可作為快取伺服器,也可用作訊息佇列。它的列表類型天生支援用作訊息隊列。如下圖所示:

由於Redis的列表是使用雙向鍊錶實現的,保存了頭尾節點,所以在列表頭尾兩邊插取元素都是非常快的。

普通佇列實作

所以可以直接使用Redis的List實作訊息佇列,只需簡單的兩個指令lpush和rpop或rpush和lpop。簡單範例如下:

存放訊息端(訊息生產者):

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息生产者
 * @author yamikaze */public class Producer extends Thread { 
    public static final String MESSAGE_KEY = "message:queue";    private Jedis jedis;    private String producerName;    private volatile int count; 
    public Producer(String name) {        this.producerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 当前未被处理消息条数为:" + size);
        count++;
    } 
    public int getCount() {        return count;
    }
 
    @Override    public void run() {        try {            while (true) {
                putMessage(StringUtils.generate32Str());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    } 
    public static void main(String[] args) throws InterruptedException{
        Producer producer = new Producer("myProducer");
        producer.start(); 
        for(; ;) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}
登入後複製

訊息處理端(訊息消費者):

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息消费者
 * @author yamikaze */public class Customer extends Thread{ 
    private String customerName;    private volatile int count;    private Jedis jedis; 
    public Customer(String name) {        this.customerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void processMessage() {
        String message = jedis.rpop(Producer.MESSAGE_KEY);        if(message != null) {
            count++;
            handle(message);
        }
    } 
    public void handle(String message) {
        System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
    }
 
    @Override    public void run() {        while (true) {
            processMessage();
        }
    } 
    public static void main(String[] args) {
        Customer customer = new Customer("yamikaze");
        customer.start();
    }
}
登入後複製

看似還不錯,但上述範例中訊息消費者有一個問題存在,就是需要不停的呼叫rpop方法來查看List中是否有待處理訊息。每呼叫一次都會發起一次連接,這會造成不必要的浪費。也許你會使用Thread.sleep()等方法讓消費者執行緒隔一段時間再消費,但這樣做有兩個問題:

1)、如果生產者速度大於消費者消費速度,訊息佇列長度會一直增大,時間久了會佔用大量記憶體空間。

2)、如果睡眠時間過長,這樣不能處理一些時效性的消息,睡眠時間過短,也會在連線上造成比較大的開銷。

所以可以使用brpop指令,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,於是消費端可以將processMessage可以改為這樣:

public void processMessage() {    /**
     * brpop支持多个列表(队列)
     * brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。
     * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY
     * 0表示不限制等待,会一直阻塞在这儿     */
    List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");    if(messages.size() != 0) {        //由于该指令可以监听多个Key,所以返回的是一个列表        //列表由2项组成,1) 列表名,2)数据
        String keyName = messages.get(0);        //如果返回的是MESSAGE_KEY的消息
        if(Producer.MESSAGE_KEY.equals(keyName)) {
            String message = messages.get(1);
            handle(message);
        }
 
    }
    System.out.println("=======================");
}
登入後複製

然後可以運行Customer,清空控制台,可以看到程式沒有任何輸出,阻塞在了brpop這兒。然後在開啟Redis的客戶端,輸入指令client list,可以查看目前有兩個連線。

一次生產多次消費的隊列

Redis除了對訊息隊列提供支援外,還提供了一組命令用於支援發布/訂閱模式。利用Redis的pub/sub模式可以實現一次生產多次消費的隊列。

1)發布
    PUBLISH指令可用來發佈訊息,格式 PUBLISH channel message

    回傳值表示訂閱了該訊息的數量。
    2)訂閱
    SUBSCRIBE指令用於接收訊息,格式SUBSCRIBE channel

    可以看到使用SUBSCRIBE指令後進入了訂閱模式,但沒有接收到publish發送的訊息,這是因為只有在訊息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回應。回覆分為三種:
    1、如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是第幾個訂閱的頻道?(理解成序號?) 
    2、如果為message(訊息),第二個值為產生該訊息的頻道,第三個值為訊息
    3、如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示目前客戶端的訂閱數量。

    可以使用指令UNSUBSCRIBE退訂,如果不加參數,則會退訂所有由SUBSCRIBE指令訂閱的頻道。
   
    Redis也支援基於通配符的訊息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

   再試試推播訊息會得到下列結果:

   可看到publish指令回傳的是2,而訂閱端這邊接收了兩次訊息。這是因為PSUBSCRIBE指令可以重複訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也無法退訂PSUBSCRIBE指令訂閱的頻道。同時PUNSUBSCRIBE指令通配符不會展開。
例如:PUNSUBSCRIBE * 不會配對到 channel.*, 所以要取消訂閱channel.*就要這樣寫PUBSUBSCRIBE channel.*。

程式碼示範如下:

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息发布方
 * @author yamikaze */public class Publisher { 
    public static final String CHANNEL_KEY = "channel:message";    private Jedis jedis; 
    public Publisher() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void publishMessage(String message) {        if(StringUtils.isBlank(message)) {            return;
        }
        jedis.publish(CHANNEL_KEY, message);
    } 
    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.publishMessage("Hello Redis!");
    }
}
登入後複製

簡單的傳送一個訊息。

訊息訂閱方:

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息订阅方客户端
 * @author yamikaze */public class SubscribeClient { 
    private Jedis jedis;    private static final String EXIT_COMMAND = "exit"; 
    public SubscribeClient() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void subscribe(String ...channel) {        if(channel == null || channel.length <= 0) {            return;
        }        //消息处理,接收到消息时如何处理
        JedisPubSub jps = new JedisPubSub() {            /**
             * JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
             * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
             * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
             * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]             */
            @Override            public void onMessage(String channel, String message) {                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("接收到消息: channel : " + message);                    //接收到exit消息后退出
                    if(EXIT_COMMAND.equals(message)) {
                        System.exit(0);
                    }
 
                }
            } 
            /**
             * 订阅时             */
            @Override            public void onSubscribe(String channel, int subscribedChannels) {                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("订阅了频道:" + channel);
                }
            }
        };        //可以订阅多个频道 当前线程会阻塞在这儿        jedis.subscribe(jps, channel);
    } 
    public static void main(String[] args) {
        SubscribeClient client = new SubscribeClient();
        client.subscribe(Publisher.CHANNEL_KEY);        //并没有 unsubscribe方法        //相应的也没有punsubscribe方法    }
}
登入後複製

先執行client,再執行Publisher進行訊息傳送,輸出結果:

##Redis的pub/sub也有其缺點,就是如果消費者下線,生產者的消息會遺失。

延時佇列

背景

在業務發展過程中,會出現一些需要延時處理的場景,例如:

a.订单下单之后超过30分钟用户未支付,需要取消订单
b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。

几种延时队列

延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:

1.Java中java.util.concurrent.DelayQueue

优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化

2.Rocketmq延时队列

优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息

3.Rabbitmq延时队列(TTL+DLX实现)

优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列

Redis实现的延时消息队列适合的项目特点:

  • Spring框架管理对象
  • 有消息需求,但不想维护mq中间件
  • 有使用redis
  • 对消息持久化并没有很苛刻的要求

Redis实现的延时消息队列思路

Redis由于其自身的Zset数据结构,本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?

试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。

Zset的排列效果如下图:

java代码实现如下:

package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/**
 * @program: test
 * @description: redis实现延时队列
 * @author: xingcheng
 * @create: 2018-08-19
 **/public class AppTest {    private static final String ADDR = "127.0.0.1";    private static final int PORT = 6379;    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);    private static CountDownLatch cdl = new CountDownLatch(10);    public static Jedis getJedis() {        return jedisPool.getResource();
    }    /**
     * 生产者,生成5个订单     */
    public void productionDelayMessage() {        for (int i = 0; i < 5; i++) {
            Calendar instance = Calendar.getInstance();            // 3秒后执行
            instance.add(Calendar.SECOND, 3 + i);
            AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
            System.out.println("生产订单: " + StringUtils.join("000000000", i + 1) + " 当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println((3 + i) + "秒后执行");
        }
    }    //消费者,取订单
    public static void consumerDelayMessage() {
        Jedis jedis = AppTest.getJedis();        while (true) {
            Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);            if (order == null || order.isEmpty()) {
                System.out.println("当前没有等待的任务");                try {
                    TimeUnit.MICROSECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }                continue;
            }
            Tuple tuple = (Tuple) order.toArray()[0];            double score = tuple.getScore();
            Calendar instance = Calendar.getInstance();            long nowTime = instance.getTimeInMillis() / 1000;            if (nowTime >= score) {
                String element = tuple.getElement();
                Long orderId = jedis.zrem("orderId", element);                if (orderId > 0) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element);
                }
            }
        }
    }    static class DelayMessage implements Runnable{
        @Override        public void run() {            try {
                cdl.await();
                consumerDelayMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
    public static void main(String[] args) {
        AppTest appTest = new AppTest();
        appTest.productionDelayMessage();        for (int i = 0; i < 10; i++) {            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}
登入後複製

实现效果如下:

以上是詳解Redis和隊列的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:cnblogs.com
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板