Maison > base de données > Redis > Quelle est la méthode d'implémentation de la file d'attente différée dans Redis ?

Quelle est la méthode d'implémentation de la file d'attente différée dans Redis ?

WBOY
Libérer: 2023-05-30 11:29:25
avant
2430 Les gens l'ont consulté

1. Avant-propos

1.1. Qu'est-ce qu'une file d'attente retardée

La plus grande différence entre une file d'attente retardée et une file d'attente ordinaire se reflète dans son attribut de retard. de la file d'attente ordinaire sont premiers entrés, premiers sortis et traités dans l'ordre dans lequel ils sont ajoutés à la file d'attente, tandis que les éléments de la file d'attente différée se voient attribuer un délai lorsqu'ils sont ajoutés à la file d'attente, indiquant qu'ils espèrent être traités après l'heure spécifiée. La structure d'une file d'attente à retard ressemble plus à une structure de tas ordonnée pondérée dans le temps qu'à une file d'attente traditionnelle.

1.2. Scénarios d'application

Dans certains scénarios métier, nous rencontrons souvent des fonctions qui doivent être exécutées à un nœud temporel spécifique ou après une période de temps. Par exemple, les scénarios suivants :

Créer une nouvelle commande Si le paiement n'est pas effectué dans le délai imparti, le plat à emporter ou le taxi doit être automatiquement annulé dix minutes avant l'heure d'arrivée prévue. le coureur ou le chauffeur sera rappelé que le délai sera dépassé. Si l'utilisateur ne confirme pas la réception dans le délai imparti, la livraison sera automatiquement confirmée. Pour les rendez-vous programmés, il vous sera rappelé de rejoindre la réunion le plus tôt possible dix minutes avant. le début de la réunion. Le rapport hebdomadaire quotidien vous rappellera de soumettre le plus tôt possible une demi-heure avant la date limite

1.3 , Pourquoi utiliser la file d'attente différée

Pour certains projets avec un petit volume de données et de faibles exigences en matière d'actualité des données, le moyen le plus simple et le plus efficace consiste à écrire une tâche planifiée pour analyser la base de données afin d'atteindre les objectifs commerciaux. Lorsque la quantité de données atteint des millions ou des dizaines de millions, il est facile d'être touché si la base de données est analysée régulièrement. Je pense que tout le monde sait que lorsque les données atteignent ce niveau, il sera très inefficace d'analyser régulièrement la table. Même dans les situations où l'intervalle de temps est relativement court, l'analyse suivante démarrera avant la fin de l'analyse. À l’heure actuelle, il peut être très efficace d’utiliser une file d’attente différée.

Plusieurs façons d'implémenter des files d'attente retardées

  • Tâches planifiées Quartz

  • DelayQueue file d'attente différée

  • Redis trié ensemble Redis

  • Rappel de surveillance des clés expirées

    #🎜🎜 #
  • RabbitMQ Dead Letter Queue

  • RabbitMQ implémente une file d'attente différée basée sur un plug-in

  • # 🎜🎜 #algorithme de roue de temps de roue
  • 2 Ensemble trié Redis

Dans Redis, zet est un ensemble ordonné, et son ensemble ordonné peut être utilisé, ajouter la tâche à zset, utiliser le délai d'expiration de la tâche comme score, utiliser la fonction de classement par défaut de zset, obtenir l'élément avec la plus petite valeur de score (c'est-à-dire la tâche expirée la plus récemment) et déterminer le système L'heure et l'arrivée de la tâche.La taille de la période.Si l'heure d'expiration est atteinte, l'entreprise sera exécutée, la tâche d'expiration sera supprimée et l'élément suivant continuera à être jugé. dormira pendant un certain temps (par exemple 1 seconde). Si la collection est vide, elle dormira également pendant un certain temps.

Quelle est la méthode dimplémentation de la file dattente différée dans Redis ?Ajoutez des éléments à la file d'attente delayqueue via la commande zadd et définissez la valeur du score pour indiquer le délai d'expiration de l'élément, ajoutez trois ordre1, ordre2 ; , et order3 à la delayqueue , expirant respectivement après 10 secondes, 20 secondes et 30 secondes.

zadd delayqueue 3 order3

Le consommateur interroge la file d'attente delayqueue, trie les éléments et compare le temps minimum avec l'heure actuelle. Si elle est inférieure à l'heure actuelle, elle. signifie que la clé a expiré et supprimée.

/**
 * 消费消息
 */
public void pollOrderQueue() {
    while (true) {
        Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
        String value = ((Tuple) set.toArray()[0]).getElement();
        int score = (int) ((Tuple) set.toArray()[0]).getScore();
        Calendar cal = Calendar.getInstance();
        int nowSecond = (int) (cal.getTimeInMillis() / 1000);
        if (nowSecond >= score) {
            jedis.zrem(DELAY_QUEUE, value);
            System.out.println(sdf.format(new Date()) + " removed key:" + value);
        }
        if (jedis.zcard(DELAY_QUEUE) <= 0) {
            System.out.println(sdf.format(new Date()) + " zset empty ");
            return;
        }
        Thread.sleep(1000);
    }
}
Copier après la connexion

Nous constatons que les résultats d'exécution sont comme prévu :

2020-05-07 13:24:09 ajout terminé.
2020- 05- 07 13:24:19 clé supprimée: commande 1

2020-05-07 13:24:29 clé supprimée: commande2
2020-05-07 13:24:39 clé supprimée: commande3#🎜 🎜# 2020-05-07 13:24:39 zset vide


3 Rappel de surveillance de la clé d'expiration Redis

L'événement de rappel d'expiration de la clé Redis peut également être réalisé. delay L'effet de la file d'attente, en termes simples, nous activons l'événement de surveillance si la clé expire. Une fois la clé expirée, un événement de rappel sera déclenché.

Pour activer notify-keyspace-events Ex, vous devez modifier le fichier redis.conf. notify-keyspace-events Ex

Redis configuration d'écoute, injecter Bean RedisMessageListenerContainer.

Deuxièmement, configurez l'écouteur redis. Enfin, écrivez la méthode de rappel de surveillance de l'expiration de la clé redis

@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
         return container;
    }
}
Copier après la connexion

Lors de l'écriture de la méthode de surveillance de rappel d'expiration Redis, vous devez hériter de KeyExpirationEventMessageListener. est quelque peu similaire à la surveillance des messages MQ .

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
    super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
      String expiredKey = message.toString();
      System.out.println("监听到key:" + expiredKey + "已过期");
    }
}
Copier après la connexion

Le code est maintenant écrit. C'est très simple. Ensuite, testez l'effet. Ajoutez une clé au client redis-cli et donnez-lui un délai d'expiration de 3 secondes.

set xiaofu 123 ex 3

La clé expirée a été surveillée avec succès sur la console.

La clé expirée détectée est : xiaofu

4, tâche planifiée Quartz

Quartz est un framework de planification de tâches très classique, en Redis, RabbitMQ Avant il était largement utilisé, la fonction d'annulation des commandes sans paiement après expiration du délai était mise en œuvre par des tâches planifiées.

Importer les dépendances Quartz

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
>在启动类中使用@EnableScheduling注解开启定时任务功能。
```java
@SpringBootApplication
@EnableScheduling
public class DelayQueueApplication {
    public static void main(String[] args) {
        SpringApplication.run(DelayQueueApplication.class, args);
    }
}
Copier après la connexion

Écrire des tâches planifiées

@Slf4j
@Component
public class QuartzDemo {
    /**
     * 每隔五秒开启一次任务
     */
    @Scheduled(cron = "0/5 * * * * ? ")
    public void process(){
        log.info("--------------定时任务测试--------------");
    }
}
Copier après la connexion

5. implémente l'API de delay queue, qui se trouve sous le package Java.util.concurrent DelayQueue.

DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆(不知道的自行了解哈)来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。 先简单实现一下看看效果,添加三个order入队DelayQueue,分别设置订单在当前时间的5秒、10秒、15秒后取消。

要实现DelayQueue延时队列,队中元素要implements Delayed 接口,这哥接口里只有一个getDelay方法,用于设置延期时间。在Order类中,compareTo方法的作用是对队列中的元素进行排列。

public class Order implements Delayed {
/**
 * 延迟时间
 */
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
public Order(String name, long time, TimeUnit unit) {
    this.name = name;
    this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
    return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
    Order Order = (Order) o;
    long diff = this.time - Order.time;
    if (diff <= 0) {
        return -1;
    } else {
        return 1;
    }
}
}
Copier après la connexion

DelayQueue的put方法是线程安全的,因为put方法内部使用了ReentrantLock锁进行线程同步。DelayQueue还提供了两种出队的方法poll()和take() , poll()为非阻塞获取,没有到期的元素直接返回null;take()阻塞方式获取,没有到期的元素线程将会等待。

public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
    Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
    Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
    Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
    DelayQueue<Order> delayQueue = new DelayQueue<>();
    delayQueue.put(Order1);
    delayQueue.put(Order2);
    delayQueue.put(Order3);
    System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    while (delayQueue.size() != 0) {
        /**
         * 取队列头部元素是否过期
         */
        Order task = delayQueue.poll();
        if (task != null) {
            System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name,  
            LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        }
        Thread.sleep(1000);
    }
}
}
Copier après la connexion

上边只是简单的实现入队与出队的操作,实际开发中会有专门的线程,负责消息的入队与消费。

执行后看到结果如下,Order1、Order2、Order3 分别在 5秒、10秒、15秒后被执行,至此就用DelayQueue实现了延时队列。

订单延迟队列开始时间:2020-05-06 14:59:09

订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}

订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}

订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}

6、RabbitMQ 延时队列

利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。

先来认识一下 TTL和 DXL两个概念:

Time To Live(TTL) :

TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身

设置队列过期时间,那么队列中所有消息都具有相同的过期时间。可以在队列中为每条消息单独设置过期时间,即使每个消息的TTL不同也可以实现。若队列和队列中消息的TTL同时被设置,则TTL的值以两者中较小的那个为准。如果队列中的消息存储时间超过了预设的TTL过期时间,那么它就会变成Dead Letter(死信)。

Dead Letter Exchanges(DLX)

DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的 Queue(队列)可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。

x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。

x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。

队列出现Dead Letter的情况有:

消息或者队列的TTL过期

队列达到最大长度

消息被消费端拒绝(basic.reject or basic.nack)

下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息A0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟,当到达30分钟后订单消息A0001成为了Dead Letter(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。

Quelle est la méthode dimplémentation de la file dattente différée dans Redis ?

发送消息时指定消息延迟的时间

public void send(String delayTimes) {
    amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {
      // 设置延迟毫秒值
      message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
      return message;
    });
  }
}
Copier après la connexion

设置延迟队列出现死信后的转发规则

/**
   * 延时队列
   */
  @Bean(name = "order.delay.queue")
  public Queue getMessageQueue() {
    return QueueBuilder
        .durable(RabbitConstant.DEAD_LETTER_QUEUE)
        // 配置到期后转发的交换
        .withArgument("x-dead-letter-exchange", "order.close.exchange")
        // 配置到期后转发的路由键
        .withArgument("x-dead-letter-routing-key", "order.close.queue")
        .build();
  }
Copier après la connexion

7、时间轮

前面几种实现延迟队列的方法相对简单,比较易于理解。相比之下,时间轮算法稍微有点抽象。kafka、netty都有基于时间轮算法实现延时队列,下边主要实践Netty的延时队列讲一下时间轮是什么原理。

先来看一张时间轮的原理图,解读一下时间轮的几个基本概念

Quelle est la méthode dimplémentation de la file dattente différée dans Redis ?

wheel :时间轮,图中的圆盘可以看作是钟表的刻度。举个例子,如果一圈round的长度为24秒,共分成8个刻度,那么每个刻度代表3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越大,精度越大。

当添加一个定时、延时任务A,假如会延迟25秒后才会执行,可时间轮一圈round 的长度才24秒,那么此时会根据时间轮长度和刻度得到一个圈数 round和对应的指针位置 index,也是就任务A会绕一圈指向0格子上,此时时间轮会记录该任务的round和 index信息。指针处于0格,当 round=0 且 index=0 时不会执行任务A,因为 round=0 不符合条件。

所以每一个格子代表的是一些时间,比如1秒和25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap的数据有些类似。

Netty构建延时队列主要用HashedWheelTimer,HashedWheelTimer底层数据结构依然是使用DelayedQueue,只是采用时间轮的算法来实现。

下面我们用Netty 简单实现延时队列,HashedWheelTimer构造函数比较多,解释一下各参数的含义。

ThreadFactory :表示用于生成工作线程,一般采用线程池;

tickDuration和unit:每格的时间间隔,默认100ms;

ticksPerWheel:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化hash值的计算。

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, true);
  }
Copier après la connexion

TimerTask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。

Timeout:一个定时任务提交到Timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。 Timer:是HashedWheelTimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。

public class NettyDelayQueue {
  public static void main(String[] args) {
    final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
    //定时任务
    TimerTask task1 = new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order1 5s 后执行 ");
        timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
      }
    };
    timer.newTimeout(task1, 5, TimeUnit.SECONDS);
    TimerTask task2 = new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order2 10s 后执行");
        timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
      }
    };
    timer.newTimeout(task2, 10, TimeUnit.SECONDS);
    //延迟任务
    timer.newTimeout(new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order3 15s 后执行一次");
      }
    }, 15, TimeUnit.SECONDS);
  }
}
Copier après la connexion

从执行的结果看,order3、order3延时任务只执行了一次,而order2、order1为定时任务,按照不同的周期重复执行。

order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal