PHP framework Hyperf implements processing of timeout unpaid orders and delay queues

Guanhui
Release: 2023-04-08 16:44:01
forward
3559 people have browsed it

PHP framework Hyperf implements processing of timeout unpaid orders and delay queues

Delay Queue

  • ##Delayproducer.Php

  • Amqpbuilder.Php

AmqpBuilder.php

<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Builder\Builder;
use Hyperf\Amqp\Builder\QueueBuilder;
class AmqpBuilder extends QueueBuilder
{
    /**
     * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments
     *
     * @return \Hyperf\Amqp\Builder\Builder
     */
    public function setArguments($arguments) : Builder
    {
        $this->arguments = array_merge($this->arguments, $arguments);
        return $this;
    }
    /**
     * 设置延时队列相关参数
     *
     * @param string $queueName
     * @param int    $xMessageTtl
     * @param string $xDeadLetterExchange
     * @param string $xDeadLetterRoutingKey
     *
     * @return $this
     */
    public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
    {
        $this->setArguments([
            &#39;x-message-ttl&#39;             => [&#39;I&#39;, $xMessageTtl * 1000], // 毫秒
            &#39;x-dead-letter-exchange&#39;    => [&#39;S&#39;, $xDeadLetterExchange],
            &#39;x-dead-letter-routing-key&#39; => [&#39;S&#39;, $xDeadLetterRoutingKey],
        ]);
        $this->setQueue($queueName);
        return $this;
    }
}
Copy after login

DelayProducer.php

<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
class DelayProducer extends Builder
{
    /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws \Throwable
     */
    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
        return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
        {
            return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
        });
    }
    /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws \Throwable
     */
    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
        $result = false;
        $this->injectMessageProperty($producerMessage);
        $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
        $pool    = $this->getConnectionPool($producerMessage->getPoolName());
        /** @var \Hyperf\Amqp\Connection $connection */
        $connection = $pool->get();
        if ($confirm) {
            $channel = $connection->getConfirmChannel();
        } else {
            $channel = $connection->getChannel();
        }
        $channel->set_ack_handler(function () use (&$result)
        {
            $result = true;
        });
        try {
            // 处理延时队列
            $exchangeBuilder = $producerMessage->getExchangeBuilder();
            // 队列定义
            $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
            // 路由定义
            $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
            // 队列绑定
            $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            // 消息发送
            $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            $channel->wait_for_pending_acks_returns($timeout);
        } catch (Throwable $exception) {
            // Reconnect the connection before release.
            $connection->reconnect();
            throw $exception;
        }
        finally {
            $connection->release();
        }
        return $confirm ? $result : true;
    }
    /**
     * @param ProducerMessageInterface $producerMessage
     */
    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
    {
        if (class_exists(AnnotationCollector::class)) {
            /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
            $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
            if ($annotation) {
                $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
                $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
            }
        }
    }
}
Copy after login

Processing timeout orders

  • ##Orderqueueconsumer.Php

  • ##Orderqueueproducer.Php
  • Orderqueueproducer.php
  • <?php
    declare(strict_types = 1);
    namespace App\Amqp\Producer;
    use Hyperf\Amqp\Annotation\Producer;
    use Hyperf\Amqp\Builder\ExchangeBuilder;
    use Hyperf\Amqp\Message\ProducerMessage;
    /**
     * @Producer(exchange="order_exchange", routingKey="order_exchange")
     */
    class OrderQueueProducer extends ProducerMessage
    {
        public function __construct($data)
        {
            $this->payload = $data;
        }
        public function getExchangeBuilder() : ExchangeBuilder
        {
            return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
        }
    }
    Copy after login
Orderqueueconsumer.php

<?php
declare(strict_types = 1);
namespace App\Amqp\Consumer;
use App\Service\CityTransport\OrderService;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
/**
 * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
 */
class OrderQueueConsumer extends ConsumerMessage
{
    public function consume($data) : string
    {
       ##业务处理
    }
    public function isEnable() : bool
    {
        return true;
    }
}
Copy after login

Demo

$builder = new AmqpBuilder();
        $builder->setDelayedQueue(&#39;order_exchange&#39;, 1, &#39;delay_exchange&#39;, &#39;delay_route&#39;);
        $que = ApplicationContext::getContainer()->get(DelayProducer::class);
        var_dump($que->produce(new OrderQueueProducer([&#39;order_sn&#39; => (string)mt_rand(10000, 90000)]), $builder))
Copy after login

Recommended tutorial: "

PHP Tutorial

The above is the detailed content of PHP framework Hyperf implements processing of timeout unpaid orders and delay queues. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:learnku.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template