The database scheduled polling method has a relatively simple implementation idea. Start a scheduled task, scan the order table at a certain time, and cancel the timeout order after querying it.
Advantages: Simple to implement.
Disadvantages: The polling interval is difficult to determine, occupies server resources, and affects database performance.
When querying order information, first determine whether the order has timed out, and if it times out, cancel it first.
Advantages: Simple to implement.
Disadvantages: Affects businesses other than query (such as statistics, inventory) and affects query efficiency.
JDK Delay Queue DelayQueue is an unbounded blocking queue, which can only obtain elements from it when the delay expires.
The simple implementation code demo is as follows. In the actual production process, there will be a dedicated thread responsible for the enqueuing and consumption of messages.
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author 向振华 * @date 2022/08/16 15:55 */ public class OrderDelayed implements Delayed { /** * 延迟时间 */ private final Long time; /** * 订单编号 */ public String orderNo; public OrderDelayed(String orderNo, long time, TimeUnit unit) { this.orderNo = orderNo; this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0); } @Override public long getDelay(TimeUnit unit) { return time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { OrderDelayed orderDelayed = (OrderDelayed) o; long diff = this.time - orderDelayed.time; if (diff <= 0) { return -1; } else { return 1; } } }
import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; /** * @author 向振华 * @date 2022/08/16 16:02 */ public class Test { public static void main(String[] args) { DelayQueue<OrderDelayed> delayQueue = new DelayQueue<>(); delayQueue.put(new OrderDelayed("220101001", 8, TimeUnit.SECONDS)); delayQueue.put(new OrderDelayed("220101002", 4, TimeUnit.SECONDS)); System.out.println("订单延迟队列开始执行"); while (true) { // 取队列头部元素是否过期 OrderDelayed task = delayQueue.poll(); if (task != null) { // 取消订单业务逻辑 System.out.println("订单 ---> " + task.orderNo + " 已过期准备取消"); } } } }
Advantages: high efficiency, low task trigger time delay.
Disadvantages: Difficulty in exception recovery, troublesome cluster expansion, memory usage.
The time wheel algorithm is similar to a clock and will rotate in a certain direction at a fixed frequency. You can use Netty's HashedWheelTimer to implement the time wheel method.
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.78.Final</version> </dependency>
import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.util.concurrent.TimeUnit; /** * @author 向振华 * @date 2022/08/16 16:02 */ public class Test { public static void main(String[] args) { // 初始化时间轮 Timer timer = new HashedWheelTimer(); // 定时任务 TimerTask task1 = new TimerTask() { public void run(Timeout timeout) throws Exception { // 取消订单业务逻辑 System.out.println("订单1已过期准备取消"); } }; // 注册此定时任务(延迟时间为5秒,也就是说5秒后订单过期) timer.newTimeout(task1, 5, TimeUnit.SECONDS); // 定时任务 TimerTask task2 = new TimerTask() { public void run(Timeout timeout) throws Exception { // 取消订单业务逻辑 System.out.println("订单2已过期准备取消"); } }; // 注册此定时任务(延迟时间为3秒,也就是说3秒后订单过期) timer.newTimeout(task2, 3, TimeUnit.SECONDS); } }
Advantages: high efficiency, lower task trigger time delay.
Disadvantages: Difficulty in exception recovery, troublesome cluster expansion, memory usage.
Redis’ key expiration callback event can also achieve the effect of delaying the queue.
Add a configuration in redis.conf:
notify-keyspace-events Ex
Listening configuration
@Configuration public class RedisListenerConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
Redis expiration callback Monitoring method
@Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { // 过期key,可以设置成订单号 String expiredKey = message.toString(); // 取消订单业务逻辑 System.out.println("订单 ---> " + expiredKey + " 已过期准备取消"); } }
Advantages: Data is not easily lost and cluster expansion is easy.
Disadvantages: Additional maintenance of redis is required.
Redis's data structure Zset can also achieve the effect of delayed queue, mainly using its score attribute. Redis uses score to rank members in the set from small to large. sorting. Add elements to the queue delayqueue through the zadd command, and set the score value to indicate the expiration time of the element.
The consumer polls the queue delayqueue, sorts the elements and compares the minimum time with the current time. If it is less than the current time, it means the key has expired and been removed.
public void pollOrderQueue() { while (true) { Set<Tuple> set = jedis.zrangeWithScores(delayqueue, 0, 0); String value = ((Tuple) set.toArray()[0]).getElement(); int score = (int) ((Tuple) set.toArray()[0]).getScore(); int nowSecond = System.currentTimeMillis() / 1000); if (nowSecond >= score) { jedis.zrem(delayqueue, value); System.out.println(sdf.format(new Date()) + " removed key:" + value); } if (jedis.zcard(delayqueue) <= 0) { System.out.println(sdf.format(new Date()) + " zset empty "); return; } Thread.sleep(1000); } }
Advantages: Data is not easily lost and cluster expansion is easy.
Disadvantages: The same key may be consumed repeatedly.
Use task scheduling middleware xxl-job, ScheduleX, Elastic-Job, etc. to implement it. Set a scheduling time cron. When the scheduling time of the order expiration is reached, task execution cancellation is triggered. Order business logic.
For example, using xxl-job implementation, when an order is created, an expired task is submitted to the xxl-job server. The following is the executor method:
import com.xxl.job.core.handler.annotation.XxlJob; import org.springframework.stereotype.Component; /** * @author 向振华 * @date 2022/08/16 15:55 */ @Component public class JobHandler { @XxlJob("payExpireJobHandler") public void payExpireJobHandler(String executorParam) { // 取消订单业务逻辑 System.out.println("订单 ---> " + executorParam + " 已过期准备取消"); } }
Advantages: strong timeliness and support for distribution.
Disadvantages: Complex implementation and high maintenance costs.
Using delayed messages of RocketMQ, RabbitMQ, and Kafka, the message will not be delivered immediately after being sent to the message queue server, but will be delayed for a fixed time based on the attributes in the message. and then delivered to consumers.
The sample code for RocketMQ to send delayed messages is as follows:
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import java.util.Properties; public class Test { public static void main(String[] args) { Properties properties = new Properties(); // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.AccessKey, "XXX"); // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。 properties.put(PropertyKeyConst.SecretKey, "XXX"); // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。 producer.start(); Message msg = new Message( // 您在消息队列RocketMQ版控制台创建的Topic。 "Topic", // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版服务器过滤。 "tag", // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发。 msg.setKey("ORDERID_100"); try { // 延时消息,在指定延迟时间(当前时间之后)进行投递。最大可设置延迟40天投递,单位毫秒(ms)。 // 以下示例表示消息在3秒后投递。 long delayTime = System.currentTimeMillis() + 3000; // 设置消息需要被投递的时间。 msg.setStartDeliverTime(delayTime); SendResult sendResult = producer.send(msg); // 同步发送消息,只要不抛异常就是成功。 if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } // 在应用退出前,销毁Producer对象。 // 注意:如果不销毁也没有问题。 producer.shutdown(); } }
RocketMQ delayed message subscription is the same as ordinary message subscription.
Advantages: efficient, easy to expand, supports distribution.
Disadvantages: Complex implementation and high maintenance costs.
The above is the detailed content of How to use Java to automatically cancel unpaid orders?. For more information, please follow other related articles on the PHP Chinese website!