PHP 框架 Hyperf 實作處理逾時未支付訂單和延時佇列
延時佇列
#Delayproducer.Php
Amqpbuilder.Php
#AmqpBuilder.php
<?php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Builder\Builder; use Hyperf\Amqp\Builder\QueueBuilder; class AmqpBuilder extends QueueBuilder { /** * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments * * @return \Hyperf\Amqp\Builder\Builder */ public function setArguments($arguments) : Builder { $this->arguments = array_merge($this->arguments, $arguments); return $this; } /** * 设置延时队列相关参数 * * @param string $queueName * @param int $xMessageTtl * @param string $xDeadLetterExchange * @param string $xDeadLetterRoutingKey * * @return $this */ public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self { $this->setArguments([ 'x-message-ttl' => ['I', $xMessageTtl * 1000], // 毫秒 'x-dead-letter-exchange' => ['S', $xDeadLetterExchange], 'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey], ]); $this->setQueue($queueName); return $this; } }
DelayProducer.php
<?php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Annotation\Producer; use Hyperf\Amqp\Builder; use Hyperf\Amqp\Message\ProducerMessageInterface; use Hyperf\Di\Annotation\AnnotationCollector; use PhpAmqpLib\Message\AMQPMessage; use Throwable; class DelayProducer extends Builder { /** * @param ProducerMessageInterface $producerMessage * @param AmqpBuilder $queueBuilder * @param bool $confirm * @param int $timeout * * @return bool * @throws \Throwable */ public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool { return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout) { return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout); }); } /** * @param ProducerMessageInterface $producerMessage * @param AmqpBuilder $queueBuilder * @param bool $confirm * @param int $timeout * * @return bool * @throws \Throwable */ private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool { $result = false; $this->injectMessageProperty($producerMessage); $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties()); $pool = $this->getConnectionPool($producerMessage->getPoolName()); /** @var \Hyperf\Amqp\Connection $connection */ $connection = $pool->get(); if ($confirm) { $channel = $connection->getConfirmChannel(); } else { $channel = $connection->getChannel(); } $channel->set_ack_handler(function () use (&$result) { $result = true; }); try { // 处理延时队列 $exchangeBuilder = $producerMessage->getExchangeBuilder(); // 队列定义 $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket()); // 路由定义 $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket()); // 队列绑定 $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey()); // 消息发送 $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey()); $channel->wait_for_pending_acks_returns($timeout); } catch (Throwable $exception) { // Reconnect the connection before release. $connection->reconnect(); throw $exception; } finally { $connection->release(); } return $confirm ? $result : true; } /** * @param ProducerMessageInterface $producerMessage */ private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void { if (class_exists(AnnotationCollector::class)) { /** @var \Hyperf\Amqp\Annotation\Producer $annotation */ $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class); if ($annotation) { $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey); $annotation->exchange && $producerMessage->setExchange($annotation->exchange); } } } }
處理逾時訂單
Orderqueueconsumer.Php
##Orderqueueproducer.Php
<?php declare(strict_types = 1); namespace App\Amqp\Producer; use Hyperf\Amqp\Annotation\Producer; use Hyperf\Amqp\Builder\ExchangeBuilder; use Hyperf\Amqp\Message\ProducerMessage; /** * @Producer(exchange="order_exchange", routingKey="order_exchange") */ class OrderQueueProducer extends ProducerMessage { public function __construct($data) { $this->payload = $data; } public function getExchangeBuilder() : ExchangeBuilder { return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub } }
<?php declare(strict_types = 1); namespace App\Amqp\Consumer; use App\Service\CityTransport\OrderService; use Hyperf\Amqp\Result; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; /** * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1) */ class OrderQueueConsumer extends ConsumerMessage { public function consume($data) : string { ##业务处理 } public function isEnable() : bool { return true; } }
$builder = new AmqpBuilder(); $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route'); $que = ApplicationContext::getContainer()->get(DelayProducer::class); var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))
PHP教程》
以上是PHP 框架 Hyperf 實作處理逾時未支付訂單和延時佇列的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

PHP 8.4 帶來了多項新功能、安全性改進和效能改進,同時棄用和刪除了大量功能。 本指南介紹如何在 Ubuntu、Debian 或其衍生版本上安裝 PHP 8.4 或升級到 PHP 8.4

Visual Studio Code,也稱為 VS Code,是一個免費的原始碼編輯器 - 或整合開發環境 (IDE) - 可用於所有主要作業系統。 VS Code 擁有大量針對多種程式語言的擴展,可以輕鬆編寫

JWT是一種基於JSON的開放標準,用於在各方之間安全地傳輸信息,主要用於身份驗證和信息交換。 1.JWT由Header、Payload和Signature三部分組成。 2.JWT的工作原理包括生成JWT、驗證JWT和解析Payload三個步驟。 3.在PHP中使用JWT進行身份驗證時,可以生成和驗證JWT,並在高級用法中包含用戶角色和權限信息。 4.常見錯誤包括簽名驗證失敗、令牌過期和Payload過大,調試技巧包括使用調試工具和日誌記錄。 5.性能優化和最佳實踐包括使用合適的簽名算法、合理設置有效期、

字符串是由字符組成的序列,包括字母、數字和符號。本教程將學習如何使用不同的方法在PHP中計算給定字符串中元音的數量。英語中的元音是a、e、i、o、u,它們可以是大寫或小寫。 什麼是元音? 元音是代表特定語音的字母字符。英語中共有五個元音,包括大寫和小寫: a, e, i, o, u 示例 1 輸入:字符串 = "Tutorialspoint" 輸出:6 解釋 字符串 "Tutorialspoint" 中的元音是 u、o、i、a、o、i。總共有 6 個元

本教程演示瞭如何使用PHP有效地處理XML文檔。 XML(可擴展的標記語言)是一種用於人類可讀性和機器解析的多功能文本標記語言。它通常用於數據存儲

靜態綁定(static::)在PHP中實現晚期靜態綁定(LSB),允許在靜態上下文中引用調用類而非定義類。 1)解析過程在運行時進行,2)在繼承關係中向上查找調用類,3)可能帶來性能開銷。

PHP的魔法方法有哪些? PHP的魔法方法包括:1.\_\_construct,用於初始化對象;2.\_\_destruct,用於清理資源;3.\_\_call,處理不存在的方法調用;4.\_\_get,實現動態屬性訪問;5.\_\_set,實現動態屬性設置。這些方法在特定情況下自動調用,提升代碼的靈活性和效率。
