때로는 주문 후 30분 이내에 결제가 이루어지지 않으면 주문을 취소하고, 결제가 완료되지 않으면 문자 메시지를 보내는 등 비즈니스 운영이 지연되는 경우가 있습니다. 주문 후 15분이 지나도 결제가 되지 않습니다. 기다리세요. 그렇다면 그러한 요구를 어떻게 실현할 수 있을까요?
관련 학습 권장 사항: 초보부터 숙련까지 PHP 프로그래밍
주문이 생성된 후 15분 후에 사용자에게 결제 없이 이메일을 보내는 시나리오를 사용하여 학습했습니다
//创建订单的逻辑/** * 随机创建订单 */$order = [ 'order_number' => mt_rand(100,10000).date("YmdHis"), 'user_id' => mt_rand(1, 100), 'order_amount' => mt_rand(100, 1000),]; /**@var $manager Illuminate\Database\Capsule\Manager **/ $conn = $manager;$insertResult = $conn::table("order") ->insert($order);print_r($insertResult);
지연된 처리 논리
while(true) { // 未支付订单列表 $orderList = $conn::table("order") ->where("created_time", '<=', date("Y-m-d H:i:s", strtotime("-15 minutes"))) ->where('sended_need_pay_notify', '=', 2) ->where('status', '=', 1) ->select(['user_id', 'id']) ->orderBy("id", 'asc') ->get(); $orderList = json_decode(json_encode($orderList), true); foreach ($orderList as $orderInfo) { sendEmail($orderInfo['user_id']); $conn::table('order') ->where('id', '=', $orderInfo['id']) ->update(['sended_need_pay_notify' => 1]); logs("update-success-orderId-". $orderInfo['id']."-userId-".$orderInfo['user_id']); } sleep(10);}
실행 처리 스크립트
gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php send email to 73 success ... 2020-06-24 11:37:36:update-success-orderId-3-userId-73
이 방법은 구현이 간단하지만 동시에 대규모 일괄 주문이 우아하지 않습니다. 문제도 겪게 됩니다.
// 创建订单的逻辑try { /** * 随机创建订单 */ $order = [ 'order_number' => mt_rand(100,10000).date("YmdHis"), 'user_id' => mt_rand(1, 100), 'order_amount' => mt_rand(100, 1000), ]; /**@var $manager Illuminate\Database\Capsule\Manager **/ $conn = $manager; $insertId = $conn::table("order") ->insertGetId($order); $body = json_encode(['order_id' => $insertId, 'created_time' => date("Y-m-d H:i:s")]); $publishMessage = new TopicMessage( $body ); // 设置消息KEY $publishMessage->setMessageKey("MessageKey"); // 定时消息, 定时时间为3分钟后 $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000); $result = $this->producer->publishMessage($publishMessage); print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result - >getMessageBodyMD5() . "\n"; } catch (\Exception $e) { print_r($e->getMessage() . "\n"); }
소비 로직도 소비자에서 처리됩니다
foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); $messageBody = $message->getMessageBody(); $orderInfo = json_decode($messageBody, true); if (!empty($orderInfo['order_id'])) { $orderId = $orderInfo['order_id']; /**@var $manager Illuminate\Database\Capsule\Manager * */ $conn = $manager; $orderInfo = $conn::table("order") ->select(['id', 'user_id']) ->where('id', '=', $orderId) ->where('status', '=', 1) ->first(); if (!empty($orderInfo)) { $orderInfo = json_decode(json_encode($orderInfo), true); sendEmail($orderInfo['user_id']); $conn::table('order') ->where('id', '=', $orderInfo['id']) ->update(['sended_need_pay_notify' => 1]); logs("update-success-orderId-" . $orderInfo['id'] . "-userId-" . $orderInfo['user_id']); } } }
메시지 생성 시작
gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 is:63448B50AA7B8AF47B07AA7CE807E3D3 gaoz@nobodyMBP delay_mq_demo %
소비자를 시작하고 천천히 기다리세요
gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php No message, contine long polling!RequestId:5EF752583441411C74869BA9 No message, contine long polling!RequestId:5EF7525B3441411C74869FE2 No message, contine long polling!RequestId:5EF7525E3441411C7486A42C No message, contine long polling!RequestId:5EF752613441411C7486A7D9 consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95 Array( [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw) ack
이 방법은 기존 서비스에서 사용할 수 있습니다. 개발을 줄입니다. time
// 生产者$exchange = 'order15min_notify_exchange'; $queue = 'order15minx_notify_queue';$dlxExchange = "dlx_order15min_exchange"; $dlxQueue = "dlx_order15min_queue"; $connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST")); $channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new \PhpAmqpLib\Wire\AMQPTable();// 消息有效期$table->set('x-message-ttl', 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/** * 随机创建订单 */$order = [ 'order_number' => mt_rand(100,10000).date("YmdHis"), 'user_id' => mt_rand(1, 100), 'order_amount' => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;$insertId = $conn::table("order") ->insertGetId($order);$messageBody = json_encode(['order_id' => $insertId, 'created_time' => date("Y-m-d H:i:s")]); $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message, $exchange);
Consumer
$dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue"; $connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST")); $channel = $connection->channel(); $channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($dlxQueue, $dlxExchange);/** * @param \PhpAmqpLib\Message\AMQPMessage $message */function process_message($message){ echo "\n--------\n"; echo $message->body; echo "\n--------\n"; $orderInfo = json_decode($message->body, true); if (!empty($orderInfo['order_id'])) { $orderId = $orderInfo['order_id']; /**@var $conn Illuminate\Database\Capsule\Manager * */ $conn = getdb(); $orderInfo = $conn::table("order") ->select(['id', 'user_id']) ->where('id', '=', $orderId) ->where('status', '=', 1) ->first(); if (!empty($orderInfo)) { $orderInfo = json_decode(json_encode($orderInfo), true); sendEmail($orderInfo['user_id']); $conn::table('order') ->where('id', '=', $orderInfo['id']) ->update(['sended_need_pay_notify' => 1]); logs("update-success-orderId-" . $orderInfo['id'] . "-userId-" . $orderInfo['user_id']); } } $message->delivery_info['channel']->basic_ack( $message->delivery_info['delivery_tag']);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, 'process_message');
소비자를 시작하고
gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php -------- {"order_id":7,"created_time":"2020-06-27 11:50:08"} -------- send email to 2 success ... 2020-06-27 11:56:55:update-success-orderId-7-userId-2
소비자와 생산자를 각각 시작하면 여기에서 메시지의 흐름을 볼 수 있습니다
메시지는 먼저 일반 큐에 들어가고 만료된 후에는 배달 못한 편지 큐에 들어가 소비됩니다.
코드 샘플을 볼 수 있습니다: github.com/nobody05/delay_mq_demo
위 내용은 PHP는 단순히 지연 작업을 구현합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!