ホームページ > バックエンド開発 > PHPチュートリアル > PHP フレームワーク Hyperf は、タイムアウト未払い注文と遅延キューの処理を実装します。

PHP フレームワーク Hyperf は、タイムアウト未払い注文と遅延キューの処理を実装します。

Guanhui
リリース: 2023-04-08 16:44:01
転載
3613 人が閲覧しました

PHP フレームワーク Hyperf は、タイムアウト未払い注文と遅延キューの処理を実装します。

#遅延キュー

  • ##DelayProducer.Php

  • #Amqpbuilder.Php
  • AmqpBuilder.php
  • <?php
    declare(strict_types = 1);
    namespace App\Components\Amqp;
    use Hyperf\Amqp\Builder\Builder;
    use Hyperf\Amqp\Builder\QueueBuilder;
    class AmqpBuilder extends QueueBuilder
    {
        /**
         * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments
         *
         * @return \Hyperf\Amqp\Builder\Builder
         */
        public function setArguments($arguments) : Builder
        {
            $this->arguments = array_merge($this->arguments, $arguments);
            return $this;
        }
        /**
         * 设置延时队列相关参数
         *
         * @param string $queueName
         * @param int    $xMessageTtl
         * @param string $xDeadLetterExchange
         * @param string $xDeadLetterRoutingKey
         *
         * @return $this
         */
        public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
        {
            $this->setArguments([
                &#39;x-message-ttl&#39;             => [&#39;I&#39;, $xMessageTtl * 1000], // 毫秒
                &#39;x-dead-letter-exchange&#39;    => [&#39;S&#39;, $xDeadLetterExchange],
                &#39;x-dead-letter-routing-key&#39; => [&#39;S&#39;, $xDeadLetterRoutingKey],
            ]);
            $this->setQueue($queueName);
            return $this;
        }
    }
    ログイン後にコピー
DelayProducer.php

<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
class DelayProducer extends Builder
{
    /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws \Throwable
     */
    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
        return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
        {
            return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
        });
    }
    /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws \Throwable
     */
    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
        $result = false;
        $this->injectMessageProperty($producerMessage);
        $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
        $pool    = $this->getConnectionPool($producerMessage->getPoolName());
        /** @var \Hyperf\Amqp\Connection $connection */
        $connection = $pool->get();
        if ($confirm) {
            $channel = $connection->getConfirmChannel();
        } else {
            $channel = $connection->getChannel();
        }
        $channel->set_ack_handler(function () use (&$result)
        {
            $result = true;
        });
        try {
            // 处理延时队列
            $exchangeBuilder = $producerMessage->getExchangeBuilder();
            // 队列定义
            $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
            // 路由定义
            $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
            // 队列绑定
            $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            // 消息发送
            $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            $channel->wait_for_pending_acks_returns($timeout);
        } catch (Throwable $exception) {
            // Reconnect the connection before release.
            $connection->reconnect();
            throw $exception;
        }
        finally {
            $connection->release();
        }
        return $confirm ? $result : true;
    }
    /**
     * @param ProducerMessageInterface $producerMessage
     */
    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
    {
        if (class_exists(AnnotationCollector::class)) {
            /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
            $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
            if ($annotation) {
                $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
                $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
            }
        }
    }
}
ログイン後にコピー

タイムアウト注文の処理

##Orderqueueconsumer.Php
  • ##OrderqueueProducer.Php

  • OrderqueueProducer.php

    <?php
    declare(strict_types = 1);
    namespace App\Amqp\Producer;
    use Hyperf\Amqp\Annotation\Producer;
    use Hyperf\Amqp\Builder\ExchangeBuilder;
    use Hyperf\Amqp\Message\ProducerMessage;
    /**
     * @Producer(exchange="order_exchange", routingKey="order_exchange")
     */
    class OrderQueueProducer extends ProducerMessage
    {
        public function __construct($data)
        {
            $this->payload = $data;
        }
        public function getExchangeBuilder() : ExchangeBuilder
        {
            return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
        }
    }
    ログイン後にコピー

    Orderqueueconsumer.php
  • <?php
    declare(strict_types = 1);
    namespace App\Amqp\Consumer;
    use App\Service\CityTransport\OrderService;
    use Hyperf\Amqp\Result;
    use Hyperf\Amqp\Annotation\Consumer;
    use Hyperf\Amqp\Message\ConsumerMessage;
    /**
     * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
     */
    class OrderQueueConsumer extends ConsumerMessage
    {
        public function consume($data) : string
        {
           ##业务处理
        }
        public function isEnable() : bool
        {
            return true;
        }
    }
    ログイン後にコピー
デモ

$builder = new AmqpBuilder();
        $builder->setDelayedQueue(&#39;order_exchange&#39;, 1, &#39;delay_exchange&#39;, &#39;delay_route&#39;);
        $que = ApplicationContext::getContainer()->get(DelayProducer::class);
        var_dump($que->produce(new OrderQueueProducer([&#39;order_sn&#39; => (string)mt_rand(10000, 90000)]), $builder))
ログイン後にコピー
推奨チュートリアル: 「

PHP チュートリアル

」 》

以上がPHP フレームワーク Hyperf は、タイムアウト未払い注文と遅延キューの処理を実装します。の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:learnku.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート