Méthode d'interrogation programmée par base de données, l'idée de mise en œuvre est relativement simple. Démarrez une tâche planifiée, analysez la table de commande à une certaine heure et annulez la commande de délai d'attente après l'avoir interrogée.
Avantages : Simple à mettre en œuvre.
Inconvénients : L'intervalle d'interrogation est difficile à déterminer, occupant les ressources du serveur et affectant les performances de la base de données.
Lorsque vous interrogez des informations sur la commande, déterminez d'abord si la commande a expiré, et si elle expire, annulez-la d'abord.
Avantages : Simple à mettre en œuvre.
Inconvénients : cela affecte les activités autres que les requêtes (telles que les statistiques, l'inventaire) et affecte l'efficacité des requêtes.
JDK Delay Queue DelayQueue est une file d'attente de blocage illimitée, qui ne peut en obtenir des éléments qu'à l'expiration du délai.
La démonstration simple du code d'implémentation est la suivante. Dans le processus de production réel, il y aura un fil dédié responsable de la mise en file d'attente et de la consommation des messages.
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author 向振华 * @date 2022/08/16 15:55 */ public class OrderDelayed implements Delayed { /** * 延迟时间 */ private final Long time; /** * 订单编号 */ public String orderNo; public OrderDelayed(String orderNo, long time, TimeUnit unit) { this.orderNo = orderNo; this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0); } @Override public long getDelay(TimeUnit unit) { return time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { OrderDelayed orderDelayed = (OrderDelayed) o; long diff = this.time - orderDelayed.time; if (diff <= 0) { return -1; } else { return 1; } } }
import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; /** * @author 向振华 * @date 2022/08/16 16:02 */ public class Test { public static void main(String[] args) { DelayQueue<OrderDelayed> delayQueue = new DelayQueue<>(); delayQueue.put(new OrderDelayed("220101001", 8, TimeUnit.SECONDS)); delayQueue.put(new OrderDelayed("220101002", 4, TimeUnit.SECONDS)); System.out.println("订单延迟队列开始执行"); while (true) { // 取队列头部元素是否过期 OrderDelayed task = delayQueue.poll(); if (task != null) { // 取消订单业务逻辑 System.out.println("订单 ---> " + task.orderNo + " 已过期准备取消"); } } } }
Avantages : haute efficacité, faible délai de déclenchement des tâches.
Inconvénients : difficulté de récupération des exceptions, expansion du cluster gênante, utilisation de la mémoire.
L'algorithme de la roue du temps est similaire à une horloge, il tournera dans une certaine direction avec une fréquence fixe. Vous pouvez utiliser HashedWheelTimer de Netty pour implémenter la méthode de la roue temporelle.
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.78.Final</version> </dependency>
import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.util.concurrent.TimeUnit; /** * @author 向振华 * @date 2022/08/16 16:02 */ public class Test { public static void main(String[] args) { // 初始化时间轮 Timer timer = new HashedWheelTimer(); // 定时任务 TimerTask task1 = new TimerTask() { public void run(Timeout timeout) throws Exception { // 取消订单业务逻辑 System.out.println("订单1已过期准备取消"); } }; // 注册此定时任务(延迟时间为5秒,也就是说5秒后订单过期) timer.newTimeout(task1, 5, TimeUnit.SECONDS); // 定时任务 TimerTask task2 = new TimerTask() { public void run(Timeout timeout) throws Exception { // 取消订单业务逻辑 System.out.println("订单2已过期准备取消"); } }; // 注册此定时任务(延迟时间为3秒,也就是说3秒后订单过期) timer.newTimeout(task2, 3, TimeUnit.SECONDS); } }
Avantages : haute efficacité, délai de déclenchement des tâches réduit.
Inconvénients : difficulté de récupération des exceptions, expansion du cluster gênante, utilisation de la mémoire.
L'événement de rappel d'expiration de clé de Redis peut également avoir pour effet de retarder la file d'attente.
Ajoutez une configuration à redis.conf :
notify-keyspace-events Ex
Configuration d'écoute
@Configuration public class RedisListenerConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
Méthode d'écoute de rappel d'expiration Redis
@Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { // 过期key,可以设置成订单号 String expiredKey = message.toString(); // 取消订单业务逻辑 System.out.println("订单 ---> " + expiredKey + " 已过期准备取消"); } }
Avantages : Les données ne sont pas facilement perdues et l'expansion du cluster est facile.
Inconvénients : une maintenance supplémentaire de Redis est requise.
La structure de données Zset de Redis peut également obtenir l'effet de file d'attente retardée, principalement en utilisant son attribut score Redis utilise score pour trier les membres de l'ensemble de petit à grand. Ajoutez des éléments à la file d'attente delayqueue via la commande zadd et définissez la valeur du score pour indiquer le délai d'expiration de l'élément.
Le consommateur interroge la file d'attente, trie les éléments et compare le temps minimum avec l'heure actuelle. Si elle est inférieure à l'heure actuelle, cela signifie que la clé a expiré et a été supprimée.
public void pollOrderQueue() { while (true) { Set<Tuple> set = jedis.zrangeWithScores(delayqueue, 0, 0); String value = ((Tuple) set.toArray()[0]).getElement(); int score = (int) ((Tuple) set.toArray()[0]).getScore(); int nowSecond = System.currentTimeMillis() / 1000); if (nowSecond >= score) { jedis.zrem(delayqueue, value); System.out.println(sdf.format(new Date()) + " removed key:" + value); } if (jedis.zcard(delayqueue) <= 0) { System.out.println(sdf.format(new Date()) + " zset empty "); return; } Thread.sleep(1000); } }
Avantages : les données ne se perdent pas facilement et l'expansion du cluster est facile.
Inconvénient : La même clé peut être utilisée à plusieurs reprises.
Utilisez le middleware de planification des tâches xxl-job, ScheduleX, Elastic-Job, etc. pour l'implémenter. Définissez une heure de planification cron Lorsque l'heure de planification de l'expiration de la commande est atteinte, l'exécution de la tâche est déclenchée pour être annulée. la logique métier de la commande.
Par exemple, en utilisant l'implémentation xxl-job, lorsqu'une commande est créée, une tâche expirée est soumise au serveur xxl-job. Voici la méthode de l'exécuteur :
import com.xxl.job.core.handler.annotation.XxlJob; import org.springframework.stereotype.Component; /** * @author 向振华 * @date 2022/08/16 15:55 */ @Component public class JobHandler { @XxlJob("payExpireJobHandler") public void payExpireJobHandler(String executorParam) { // 取消订单业务逻辑 System.out.println("订单 ---> " + executorParam + " 已过期准备取消"); } }
Avantages : grande rapidité et prise en charge de la distribution.
Inconvénients : Mise en œuvre complexe et coûts de maintenance élevés.
Utilise les messages retardés de RocketMQ, RabbitMQ et Kafka. Une fois le message envoyé au serveur de file d'attente de messages, il ne sera pas remis immédiatement, mais sera remis au consommateur après un délai fixe. sur les attributs du message.
L'exemple de code permettant à RocketMQ d'envoyer des messages retardés est le suivant :
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import java.util.Properties; public class Test { public static void main(String[] args) { Properties properties = new Properties(); // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.AccessKey, "XXX"); // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.SecretKey, "XXX"); // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。 producer.start(); Message msg = new Message( // 您在消息队列RocketMQ版控制台创建的Topic。 "Topic", // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版服务器过滤。 "tag", // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发。 msg.setKey("ORDERID_100"); try { // 延时消息,在指定延迟时间(当前时间之后)进行投递。最大可设置延迟40天投递,单位毫秒(ms)。 // 以下示例表示消息在3秒后投递。 long delayTime = System.currentTimeMillis() + 3000; // 设置消息需要被投递的时间。 msg.setStartDeliverTime(delayTime); SendResult sendResult = producer.send(msg); // 同步发送消息,只要不抛异常就是成功。 if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } // 在应用退出前,销毁Producer对象。 // 注意:如果不销毁也没有问题。 producer.shutdown(); } }
L'abonnement aux messages différés de RocketMQ est le même que l'abonnement aux messages ordinaires.
Avantages : efficace, facile à étendre, prend en charge la distribution.
Inconvénients : Mise en œuvre complexe et coûts de maintenance élevés.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!