Home > Backend Development > PHP Tutorial > PHP simply implements delay operation

PHP simply implements delay operation

coldplay.xixi
Release: 2023-04-09 06:40:01
forward
5709 people have browsed it

PHP simply implements delay operation


## Scenario

Sometimes in business You will encounter delayed operations, such as canceling the order if payment is not made half an hour after placing the order, sending a text message reminder if payment is not made fifteen minutes after placing the order, etc. So how to realize such a demand?

Related learning recommendations:

PHP programming from entry to proficiency

Implementation method

    The first simple way is to use a background process to check the order in an endless loop, and perform different operations according to the order time
  • The second is to use the scheduled message of the message queue, and send the scheduled message after the order is placed. , different timing queues handle different logic
  • The third method can be done using some existing functions provided by the framework

Implementation code

We use the scenario of sending an email to the user after the order was not paid for 15 minutes after the order was created.

Preparation work:

    Simple order form: order
  1. Various needed composer packages
  2. rabbitMq local service
  3. Open Alibaba Cloud RocketMq service

The first one

    The code logic is very simple, just go into an infinite loop
  • To start this script process, you can configure it with supervisor
  • Part of the code
  • //创建订单的逻辑/**
     * 随机创建订单
     */$order = [
        'order_number' => mt_rand(100,10000).date("YmdHis"),
        'user_id' => mt_rand(1, 100),
        'order_amount' => mt_rand(100, 1000),];
        /**@var $manager Illuminate\Database\Capsule\Manager **/
        $conn = $manager;$insertResult = $conn::table("order")
        ->insert($order);print_r($insertResult);
    Copy after login
Delay processing logic

while(true) {
    // 未支付订单列表
    $orderList = $conn::table("order")
        ->where("created_time",  &#39;<=&#39;, date("Y-m-d H:i:s", strtotime("-15 minutes")))
        ->where(&#39;sended_need_pay_notify&#39;, &#39;=&#39;, 2)
        ->where(&#39;status&#39;, &#39;=&#39;, 1)
        ->select([&#39;user_id&#39;, &#39;id&#39;])
        ->orderBy("id", &#39;asc&#39;)
        ->get();
    $orderList = json_decode(json_encode($orderList), true);
    foreach ($orderList as $orderInfo) {
        sendEmail($orderInfo[&#39;user_id&#39;]);
        $conn::table(&#39;order&#39;)
            ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
            ->update([&#39;sended_need_pay_notify&#39; => 1]);
        logs("update-success-orderId-". $orderInfo[&#39;id&#39;]."-userId-".$orderInfo[&#39;user_id&#39;]);
    }

    sleep(10);}
Copy after login

Execution processing script

gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php
send email to 73 success ...
2020-06-24 11:37:36:update-success-orderId-3-userId-73
Copy after login

This method is simple to implement, but not elegant, and it can order large quantities at the same time There will also be problems encountered.

The second type

    For example, using Alibaba Cloud's MQ service, the current versions of rocketMq and rabbitMq support delayed messages, but rabbit's Delayed message charges are too high
  • Here we first use rocketMq’s delayed message to implement
  • Need to activate Alibaba Cloud services
  • // 创建订单的逻辑try
            {
    
                /**
                 * 随机创建订单
                 */
                $order = [
                    &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
                    &#39;user_id&#39; => mt_rand(1, 100),
                    &#39;order_amount&#39; => mt_rand(100, 1000),
                ];
    
                /**@var $manager Illuminate\Database\Capsule\Manager **/
                $conn = $manager;
    
                $insertId = $conn::table("order")
                    ->insertGetId($order);
    
                $body = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]);
                $publishMessage = new TopicMessage(
                    $body            );
                // 设置消息KEY
                $publishMessage->setMessageKey("MessageKey");
    
                // 定时消息, 定时时间为3分钟后
                $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000);
    
                $result = $this->producer->publishMessage($publishMessage);
    
                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result
                -
                >getMessageBodyMD5() . "\n";
            } catch (\Exception $e) {
                print_r($e->getMessage() . "\n");
            }
    Copy after login
The consumption logic is also consuming Processing

foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();

                $messageBody = $message->getMessageBody();

                $orderInfo = json_decode($messageBody, true);
                if (!empty($orderInfo[&#39;order_id&#39;])) {
                    $orderId = $orderInfo[&#39;order_id&#39;];

                    /**@var $manager Illuminate\Database\Capsule\Manager * */
                    $conn = $manager;
                    $orderInfo = $conn::table("order")
                        ->select([&#39;id&#39;, &#39;user_id&#39;])
                        ->where(&#39;id&#39;, &#39;=&#39;, $orderId)
                        ->where(&#39;status&#39;, &#39;=&#39;, 1)
                        ->first();
                    if (!empty($orderInfo)) {
                        $orderInfo = json_decode(json_encode($orderInfo), true);
                        sendEmail($orderInfo[&#39;user_id&#39;]);
                        $conn::table(&#39;order&#39;)
                            ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
                            ->update([&#39;sended_need_pay_notify&#39; => 1]);
                        logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . 
                        "-userId-" . $orderInfo[&#39;user_id&#39;]);
                    }
                }
            }
Copy after login

Start producing a message

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php 
Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 
is:63448B50AA7B8AF47B07AA7CE807E3D3
gaoz@nobodyMBP delay_mq_demo %
Copy after login

Start the consumer and wait slowly

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php 
No message, contine long polling!RequestId:5EF752583441411C74869BA9
No message, contine long polling!RequestId:5EF7525B3441411C74869FE2
No message, contine long polling!RequestId:5EF7525E3441411C7486A42C
No message, contine long polling!RequestId:5EF752613441411C7486A7D9
consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95
 Array(
    [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw)
    ack
Copy after login

This method can be used by existing services and reduce development time

The third type Use rabbitMq to implement

    After consulting the documentation, I did not find rabbitMq’s native function of supporting delay queue, but it can be achieved through the ttl of the message Dead letter queue implementation
  • The private message queue is a queue used to store messages that have not been consumed or failed to consume.
  • When the message is not consumed within the validity period of the set message, it will be forwarded to the dead letter queue.
  • Achieve delay function by setting the validity period of the message
  • // 生产者$exchange = &#39;order15min_notify_exchange&#39;;
    $queue = &#39;order15minx_notify_queue&#39;;$dlxExchange = "dlx_order15min_exchange";
    $dlxQueue = "dlx_order15min_queue";
    $connection = new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
    $channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    $channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new \PhpAmqpLib\Wire\AMQPTable();// 消息有效期$table->set(&#39;x-message-ttl&#39;, 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/**
     * 随机创建订单
     */$order = [
        &#39;order_number&#39; => mt_rand(100,10000).date("YmdHis"),
        &#39;user_id&#39; => mt_rand(1, 100),
        &#39;order_amount&#39; => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;$insertId = $conn::table("order")
        ->insertGetId($order);$messageBody = json_encode([&#39;order_id&#39; => $insertId, &#39;created_time&#39; => date("Y-m-d H:i:s")]);
        $message = new AMQPMessage($messageBody, array(&#39;content_type&#39; => &#39;text/plain&#39;, &#39;delivery_mode&#39; => AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->basic_publish($message, $exchange);
    Copy after login
Consumer

$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue";
$connection = new AMQPStreamConnection(getenv(&#39;RABBIT_HOST&#39;), getenv(&#39;RABBIT_PORT&#39;), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
$channel = $connection->channel();
$channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($dlxQueue, $dlxExchange);/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */function process_message($message){
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $orderInfo = json_decode($message->body, true);
    if (!empty($orderInfo[&#39;order_id&#39;])) {
        $orderId = $orderInfo[&#39;order_id&#39;];

        /**@var $conn Illuminate\Database\Capsule\Manager * */
        $conn = getdb();
        $orderInfo = $conn::table("order")
            ->select([&#39;id&#39;, &#39;user_id&#39;])
            ->where(&#39;id&#39;, &#39;=&#39;, $orderId)
            ->where(&#39;status&#39;, &#39;=&#39;, 1)
            ->first();
        if (!empty($orderInfo)) {
            $orderInfo = json_decode(json_encode($orderInfo), true);
            sendEmail($orderInfo[&#39;user_id&#39;]);
            $conn::table(&#39;order&#39;)
                ->where(&#39;id&#39;, &#39;=&#39;, $orderInfo[&#39;id&#39;])
                ->update([&#39;sended_need_pay_notify&#39; => 1]);
            logs("update-success-orderId-" . $orderInfo[&#39;id&#39;] . "-userId-" . $orderInfo[&#39;user_id&#39;]);
        }

    }
    $message->delivery_info[&#39;channel&#39;]->basic_ack(
        $message->delivery_info[&#39;delivery_tag&#39;]);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, &#39;process_message&#39;);
Copy after login

Start consumer

gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php
--------
{"order_id":7,"created_time":"2020-06-27 11:50:08"}
--------
send email to 2 success ...
2020-06-27 11:56:55:update-success-orderId-7-userId-2
Copy after login
Start consumer and production respectively That's it. You can see the flow of messages here

The message first enters the normal queue, and then enters the dead letter after expiration The queue is consumed

The fourth method

    Use laravel’s own Queue to implement
  • The detailed code is not organized here, and will be updated later Come out
  • You can view the official document queue "Laravel 5.7 Chinese Documentation"
Code example: github.com/nobody05/delay_mq_demo

The above is the detailed content of PHP simply implements delay operation. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
php
source:learnku.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Issues
php data acquisition?
From 1970-01-01 08:00:00
0
0
0
PHP extension intl
From 1970-01-01 08:00:00
0
0
0
How to learn php well
From 1970-01-01 08:00:00
0
0
0
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template