이 기사는 PHP에 대한 관련 지식을 제공합니다. 주로 메시지 큐 RabbitMQ에 대한 소개와 몇 가지 실제적인 세부 사항을 소개합니다. 메시지 큐는 애플리케이션 간의 통신 방법에 대해 함께 살펴보겠습니다. 모두에게 도움이 되었습니다.
추천 학습: "PHP 비디오 자습서"
설명
MQ(Message Queue)는 메시지 대기열입니다. 어느 애플리케이션 간 통신 방식으로, 메시지를 보낸 후 즉시 반환할 수 있으며, 메시지 시스템은 메시지의 안정적인 전달을 보장합니다. "메시지 큐"는 전송 중에 메시지를 저장하는 컨테이너입니다. 전형적인 것은 생산자, 소비자 모델입니다. 생산자는 계속해서 메시지 대기열에 메시지를 생성하고 소비자는 계속해서 대기열에서 메시지를 얻습니다. 메시지의 생성과 소비는 비동기식이며 메시지 전송 및 수신에만 관심이 있기 때문에 비즈니스 로직이 침해되지 않아 생산자와 소비자의 분리가 달성됩니다.
메시지 미들웨어를 사용하는 이유는 무엇인가요?
메시지 큐는 애플리케이션 분리, 비동기 메시지, 트래픽 피크 감소 등과 같은 문제를 해결하고 높은 동시성, 고가용성, 확장성 및 최종 일관성 아키텍처를 달성하는 분산 시스템의 중요한 구성 요소입니다.
비동기 처리
사용자 등록 정보를 입력한 후 이메일 전송 및 문자 메시지 등록을 해야 합니다
1. 사용자 등록 정보가 데이터베이스에 기록된 후에는 등록에 성공하더라도 정보가 반환됩니다
2. 이메일 전송 및 문자 메시지 등록이 실행됩니다. 메시지 대기열을 통해 비동기식으로 이루어지며 사용자는 이 두 작업을 기다릴 필요가 없습니다
애플리케이션 분리
사용자가 주문한 후 주문 시스템은 재고 시스템에 이를 알려야 합니다. 전통적인 접근 방식은 주문 시스템이 재고 시스템의 인터페이스를 호출하여 재고를 늘리거나 줄이는 것입니다.
1. 사용자가 생산을 주문하고 성공 프롬프트를 반환합니다.
2. 대기열 소비 재고 시스템이 재고를 늘리거나 줄입니다.
트래픽 피크 감소
트래픽 피크 감소는 메시지 대기열에서도 일반적으로 사용되는 시나리오이며 일반적으로 플래시 판매 또는 그룹 잡기 활동에서 널리 사용됩니다.
1. 사용자 그룹이 대기열 입력을 요청하면 대기열 수를 제어합니다. .일정 개수를 초과하면 플래시 세일이 종료됩니다
2. 선입선출 방식에 따라 큐가 하나씩 소모됩니다
Rabbitmq 기능
Reliability(신뢰성) RabbitMQ 지속성, 전송 확인, 릴리스 확인과 같은 일부 메커니즘을 사용하여 안정성을 보장합니다.
유연한 라우팅 메시지는 대기열에 들어가기 전에 Exchange를 통해 라우팅됩니다. 일반적인 라우팅 기능을 위해 RabbitMQ는 이미 일부 기본 제공 Exchange 구현을 제공합니다. 보다 복잡한 라우팅 기능을 위해 여러 Exchange를 함께 바인딩하거나 플러그인 메커니즘을 통해 자체 Exchange를 구현할 수 있습니다.
메시지 클러스터링 여러 RabbitMQ 서버가 클러스터를 형성하여 논리적 브로커를 형성할 수 있습니다.
고가용성 대기열 대기열은 클러스터의 컴퓨터에 미러링될 수 있으므로 일부 노드에 오류가 발생하더라도 대기열을 계속 사용할 수 있습니다.
다중 프로토콜 RabbitMQ는 STOMP, MQTT 등과 같은 여러 메시지 대기열 프로토콜을 지원합니다.
많은 클라이언트 RabbitMQ는 PHP Java, .NET, Ruby 등과 같이 일반적으로 사용되는 거의 모든 언어를 지원합니다.
관리 UI RabbitMQ는 사용자가 메시지 브로커의 다양한 측면을 모니터링하고 관리할 수 있는 사용하기 쉬운 사용자 인터페이스를 제공합니다.
Tracing 메시지가 비정상적인 경우 RabbitMQ는 사용자가 무슨 일이 일어났는지 알 수 있도록 메시지 추적 메커니즘을 제공합니다.
플러그인 시스템 RabbitMQ는 다양한 측면에서 확장할 수 있는 다양한 플러그인을 제공하며, 자신만의 플러그인을 작성할 수도 있습니다.
RabbitMQ 작동 방식
Broker: 메시지를 수신하고 배포하는 애플리케이션인 RabbitMQ 서버는 메시지 브로커입니다.
가상 호스트: mysql 데이터베이스와 마찬가지로 여러 사용자가 동일한 RabbitMQ 서버에서 제공하는 서비스를 사용할 때 여러 가상 호스트를 나눌 수 있으며 각 사용자는 자신의 가상 호스트에 교환/큐 등을 생성합니다.
연결: 게시자/소비자와 브로커 간의 TCP 연결입니다.
Channel: RabbitMQ에 접속할 때마다 Connection이 설정되면 TCP Connection 설정에 따른 오버헤드가 커지고 메시지 양이 많아지면 효율성이 낮아집니다. 채널은 연결 내부에 설정된 논리적 연결입니다. 경량 연결인 채널은 운영 체제에서 TCP 연결을 설정하는 데 드는 비용을 크게 줄여줍니다.
Exchange: 메시지는 배포 규칙에 따라 쿼리 테이블의 라우팅 키와 일치하여 메시지를 대기열에 배포합니다. 일반적으로 사용되는 유형은 직접(지점 간), 주제(게시-구독) 및 팬아웃(멀티캐스트)입니다.
Queue: 메시지는 소비자가 선택할 수 있도록 최종적으로 여기로 전송됩니다. 메시지는 동시에 여러 대기열에 복사될 수 있습니다.
RabbitMQ 공식 주소: http://www.rabbitmq.com
Rabbitmq를 설치하려면 먼저 erlang을 설치해야 합니다
1단계: erlang 설치
Rabbitmq를 설치하려면 erlang을 설치해야 합니다. 첫째, centos7은 erlang 24 버전 설치를 지원하지 않습니다.
다운로드:
# 系统 centos 7# 下载erlang包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白 # 安装 yum install erlang-23.3.4.4-1.el7.x86_64.rpm # 验证安装是否成功 erl
2단계: Rabbitmq 설치
# 系统 centos 7# 下载rabbitmq包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白 # 安装 yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm # 启动 systemctl start rabbitmq-server # 关闭 systemctl stop rabbitmq-server # 查看默认端口服务是否启动 netstat -tunlp
4369: epmd(Erlang 포트 매퍼 데몬), erlang 서비스 포트
5672: 클라이언트 통신 포트
15672: HTTP API 클라이언트, 관리 UI(관리 플러그인이 활성화된 경우에만)가 반드시 시작되지는 않습니다.
25672: 노드 간 통신에 사용됩니다(Erlang 배포 서버 포트)
rabbitmq 관리 명령
시작 15672: HTTP API 클라이언트 , 관리 UI (관리 플러그인이 활성화된 경우에만)
# 启动rabbitmq_management插件 rabbitmq-plugins enable rabbitmq_management # 查看所有插件 rabbitmq-plugins list
UI 인터페이스에 대한 테스트 접근: (이때, localhost가 아닌 주소는 로그인할 수 없습니다)
http://192.168.10.105:15672/
rabbitmq 관리 인터페이스 구성
# 新增一个用户 rabbitmqctl add_user 【用户名Username】 【密码Password】 rabbitmqctl add_user root root # 删除一个用户 rabbitmqctl delete_user Username # 修改用户的密码 rabbitmqctl change_password Username Newpassword # 查看当前用户列表 rabbitmqctl list_users # 设置用户角色的命令为: rabbitmqctl set_user_tags User Tag rabbitmqctl set_user_tags root administrator # User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。
가상호스트 명령줄 생성 및 PHP 확장 설치
mysql 데이터베이스와 마찬가지로, 동일한 RabbitMQ 서버에서 제공하는 서비스를 여러 사용자가 사용하는 경우 여러 가상호스트가 분할될 수 있으며 각 사용자는 교환/대기열 등을 생성합니다.
1) 다양한 사용자의 가상 호스트 보기
가상 호스트 생성 및 권한 할당
# 新增vhost rabbitmqctl add_vhost vhostname rabbitmqctl add_vhost order # 查看vhost列表 rabbitmqctl list_vhosts #为vhost添加用户 rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"rabbitmqctl set_permissions -p order root ".*" ".*" ".*" ".*" ".*" ".*"后边三个.*分别代表:配置权限、写权限、读权限
2) php용 Rabbitmq 확장 설치 설치
https://github.com/php-amqplib/php- amqplib 확장 설치
Alibaba Cloud 이미지 수정
composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/
다운로드 시작 – 가끔 2.8 이하 버전으로 다운로드될 경우 버전을 지정해야 합니다
. 다운로드에 실패하면 작곡가와 php.ini를 업그레이드하세요. 국내 이미지를 확장하고 전환하는 소켓
# 升级composer composer self-update #php.ini 打开 sockets 扩展 #下载指定版本 composer require php-amqplib/php-amqplib=^3.0
단순 모드 생산자 메시지 메시지 대기열로 푸시
문서:
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
단순 생산자 그리고 messager
생산자 코드
http://localhost/rabbitmq/simple/pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //生产者 //Connection: publisher/consumer和broker之间的TCP连接 //Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:goods $queue_name = 'goods'; $channel->queue_declare($queue_name, false, true, false, false); //生产数据 $data = 'this is messge'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exchange = '', $queue_name); //关闭连接 $channel->close(); $connection->close();
运行生产者脚本:
http://localhost/rabbitmq/simple/pro.php
点击goods队列可以进入到消息详情
http://localhost/rabbitmq/simple/con.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:goods $queue_name = 'goods'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; }; //开启消费 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
rabbitmq Work Queues
一个生产者对应多个消费者,消费特别慢时增加几个消费分发
生产者,和上文生产者不变
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //生产者 //Connection: publisher/consumer和broker之间的TCP连接 //Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); for ($i = 0; $i AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exchange = '', $queue_name); } //关闭连接 $channel->close(); $connection->close();
消费者worker1
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; }; //开启消费 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
消费者worker2,代码和worker1一样,同时运行开启后会一起消费
D:\phpstudy_pro\WWW\rabbitmq\worker\worker2.php
消费者消费消息ack确认
用以确认不会丢失消息
消费消息
basic_consume($queue = ‘’, $consumer_tag = ‘’, $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array())
no_ack=false,设置为手动应答
开启后需要进行消息的消费确认后才会进行移除,否者该消息会一直存在消息队列中
消费端代码
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
发布/订阅模式
是要是公用一个交换机的消费端都能收到同样的消息,类似广播的功能
文档:rabbitmq Publish/Subscribe
https://www.rabbitmq.com/tutorials/tutorial-three-php.html
rabbitmq Exchange类型
交换器、路由键、绑定 Exchange:交换器。发送消息的AMQP实体。交换器拿到一个消息之后将它路由给一个或几个队列。它使用哪种路由算法是由交换机类型和被称作绑定(Binding)的规则所决定的。RabbitMQ有四种类型。 RoutingKey:路由键。生产者将消息发送给交换器。一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终失效。 Binding:绑定。绑定(Binding)是交换机(Exchange)将消息(Message)路由给队列(Queue)所需遵循的规则。 # 四种模式 Direct 定向 消息与一个特定的路由键完全匹配 Topic 通配符 路由键和某模式进行匹配 Fanout 广播 发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列 Headers 不处理路由键,而是根据发送的消息内容中的headers属性进行匹配
exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。试探性申请一个交换器,若该交换器不存在,则创建;若存在,则跳过。
生产者代码
D:\phpstudy_pro\WWW\rabbitmq\ps\pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'exch'; $channel->exchange_declare($exc_name, 'fanout', false, false, false); //声明数据 $data = 'this is fanout message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exc_name); //关闭连接 $channel->close(); $connection->close();
fanout模式消费者消费消息
是要是公用一个交换机的消费端都能收到同样的消息,类似广播的功能
当消费端运行时才会显示该队列
消费端:
D:\phpstudy_pro\WWW\rabbitmq\ps\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'exch'; $channel->exchange_declare($exc_name, 'fanout', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定 $channel->queue_bind($queue_name,$exc_name); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
文档:
https://www.rabbitmq.com/tutorials/tutorial-four-php.html
用来指定不同的交换机和指定routing_key,在消费端进行消费
生产者代码:
D:\phpstudy_pro\WWW\rabbitmq\routing\pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'info'; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'direct', false, false, false); //声明数据 $data = 'this is ' . $routing_key . ' message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //关闭连接 $channel->close(); $connection->close();
消费者代码
D:\phpstudy_pro\WWW\rabbitmq\routing\info.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'info'; $channel->exchange_declare($exc_name, 'direct', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定,并指定routing_key $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
通配符的匹配模式
如消费端中routing_key = ‘user.*’;
生产者:
指定routing_key= ‘user.top’
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'topic_log'; //指定routing_key $routing_key = 'user.top'; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'topic', false, false, false); //声明数据 $data = 'this is ' . $routing_key . ' message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //关闭连接 $channel->close(); $connection->close();
消费者
消费端中routing_key = ‘user.*’;
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'user.*'; $channel->exchange_declare($exc_name, 'topic', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定,并指定routing_key $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
推荐学习:《PHP视频教程》
위 내용은 메시지 큐 RabbitMQ 시작하기 및 PHP 예제에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!