Artikel ini membawakan anda pengetahuan yang berkaitan tentang PHP terutamanya memperkenalkan baris gilir mesej RabbitMQ dan beberapa butiran praktikal lihat, harap ia membantu semua orang.
Pembelajaran yang disyorkan: "Tutorial Video PHP"
Penjelasan
MQ (Message Queue) ialah baris gilir mesej, yang merupakan kaedah komunikasi antara aplikasi boleh dikembalikan serta-merta selepas dihantar , sistem mesej memastikan penghantaran mesej yang boleh dipercayai. "Baris gilir mesej" ialah bekas yang menyimpan mesej semasa penghantarannya. Ia adalah tipikal: pengeluar, model pengguna. Pengeluar terus menghasilkan mesej ke dalam baris gilir mesej, dan pengguna terus mendapatkan mesej daripada baris gilir. Oleh kerana pengeluaran dan penggunaan mesej adalah tidak segerak, dan hanya mengambil berat tentang penghantaran dan penerimaan mesej, tiada pencerobohan logik perniagaan, sekali gus mencapai penyahgandingan pengeluar dan pengguna.
Mengapa menggunakan perisian tengah mesej?
Baris gilir mesej ialah komponen penting dalam sistem teragih, menyelesaikan masalah seperti penyahgandingan aplikasi, pemesejan tak segerak, pencukuran puncak trafik, dll., dan mencapai keselarasan tinggi, ketersediaan tinggi, kebolehskalaan dan seni bina ketekalan akhirnya
Pemprosesan tak segerak
Selepas pengguna mendaftar maklumat, mereka perlu menghantar e-mel dan mesej SMS pendaftaran
1. Selepas maklumat pendaftaran pengguna ditulis ke dalam pangkalan data, walaupun pendaftaran berjaya, maklumat tentang pendaftaran yang berjaya dikembalikan
2. Hantar e-mel dan pendaftaran mesej SMS Pelaksanaan asynchronous melalui baris gilir mesej, pengguna tidak perlu menunggu dua operasi ini
Penyahgandingan aplikasi
Selepas pengguna membuat pesanan, sistem pesanan perlu memberitahu sistem inventori. Pendekatan tradisional ialah sistem pesanan memanggil antara muka sistem inventori untuk menambah atau mengurangkan inventori
1. Pengguna membuat pesanan untuk beratur untuk pengeluaran, dan gesaan kejayaan dikembalikan
2. Baris gilir sistem inventori penggunaan digunakan untuk menambah atau mengurangkan inventori
Pencukur puncak trafik
Pencukuran puncak trafik juga merupakan senario biasa dalam baris gilir mesej secara amnya digunakan secara meluas dalam jualan kilat atau aktiviti merebut kumpulan
1. Apabila sekumpulan pengguna Permintaan datang dan memasuki baris gilir, dan bilangan baris gilir dikawal Jika bilangannya melebihi nombor tertentu, jualan kilat akan tamat
2. Kemudian barisan akan dimakan satu persatu mengikut first-in, first-out
Ciri-ciri Rabbitmq
Kebolehpercayaan RabbitMQ menggunakan beberapa mekanisme untuk memastikan kebolehpercayaan, seperti kegigihan, pengesahan penghantaran dan pengesahan pelepasan.
Penghalaan Fleksibel Mesej dihalakan melalui Exchange sebelum mereka memasuki baris gilir. Untuk fungsi penghalaan biasa, RabbitMQ sudah menyediakan beberapa pelaksanaan Exchange terbina dalam. Untuk fungsi penghalaan yang lebih kompleks, berbilang Exchange boleh disatukan dan Exchange anda sendiri juga boleh dilaksanakan melalui mekanisme pemalam.
Pengkelompokan mesej (Pengkelompokan) Berbilang pelayan RabbitMQ boleh membentuk kelompok untuk membentuk Broker yang logik.
Baris Gilir Sangat Tersedia Baris gilir boleh dicerminkan pada mesin dalam kelompok, supaya baris gilir masih tersedia jika beberapa nod gagal.
Berbilang protokol RabbitMQ menyokong berbilang protokol baris gilir mesej, seperti STOMP, MQTT, dsb.
Pelanggan berbilang bahasa (Ramai Pelanggan) RabbitMQ menyokong hampir semua bahasa yang biasa digunakan, seperti PHP Java, .NET, Ruby, dsb.
UI Pengurusan (UI Pengurusan) RabbitMQ menyediakan antara muka pengguna yang mudah digunakan yang membolehkan pengguna memantau dan mengurus banyak aspek broker mesej.
Menjejak Jika mesej tidak normal, RabbitMQ menyediakan mekanisme penjejakan mesej supaya pengguna dapat mengetahui apa yang berlaku.
Sistem Plugin RabbitMQ menyediakan banyak pemalam untuk dikembangkan dalam banyak aspek, dan anda juga boleh menulis pemalam anda sendiri.
Cara RabbitMQ berfungsi
Broker: Aplikasi yang menerima dan mengedarkan mesej RabbitMQ Server ialah Message Broker.
Hos maya: Sama seperti pangkalan data mysql, apabila berbilang pengguna berbeza menggunakan perkhidmatan yang disediakan oleh pelayan RabbitMQ yang sama, berbilang vhost boleh dibahagikan dan setiap pengguna boleh menggunakan perkhidmatan yang disediakan oleh pelayan RabbitMQ yang sama membuat pertukaran/baris gilir, dsb.
Sambungan: Sambungan TCP antara penerbit/pengguna dan broker.
Saluran: Jika Sambungan diwujudkan setiap kali RabbitMQ diakses, overhed untuk mewujudkan Sambungan TCP akan menjadi besar dan kecekapan akan menjadi rendah apabila volum mesej besar. Saluran ialah sambungan logik yang diwujudkan di dalam sambungan Sebagai Sambungan yang ringan, Saluran sangat mengurangkan kos untuk mewujudkan sambungan TCP oleh sistem pengendalian.
Pertukaran: Mesej sampai ke perhentian pertama broker Menurut peraturan pengedaran, ia sepadan dengan kunci penghalaan dalam jadual pertanyaan dan mengedarkan mesej ke baris gilir. Jenis yang biasa digunakan ialah: langsung (point-to-point), topik (publish-subscribe) dan fanout (multicast).
Barisan: Mesej akhirnya dihantar ke sini untuk diambil oleh pengguna. Mesej boleh disalin ke berbilang baris gilir pada masa yang sama.
Alamat rasmi RabbitMQ: http://www.rabbitmq.com
Untuk memasang rabbitmq, anda perlu memasang erlang dahulu
Langkah pertama: Pemasangan Erlang
Untuk memasang rabbitmq, anda perlu memasang erlang dahulu Centos7 tidak menyokong pemasangan versi erlang 24
Muat turun:
<.>
# 系统 centos 7# 下载erlang包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白 # 安装 yum install erlang-23.3.4.4-1.el7.x86_64.rpm # 验证安装是否成功 erl
# 系统 centos 7# 下载rabbitmq包,手动下载后上传至服务器,我在使用wget下载后无法安装,这里没明白 # 安装 yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm # 启动 systemctl start rabbitmq-server # 关闭 systemctl stop rabbitmq-server # 查看默认端口服务是否启动 netstat -tunlp
mesej php Gunakan pelbagai mod baris gilir rabbitmq
5672: port komunikasi port klien
15672: Pelanggan API HTTP, UI pengurusan (hanya jika pemalam pengurusan didayakan) tidak semestinya bermula
25672: digunakan untuk komunikasi antara nod (Erlang port pelayan pengedaran)
arahan pengurusan rabbitmq Mula 15672: Pelanggan API HTTP, UI pengurusan (hanya jika pemalam pengurusan didayakan)
# 启动rabbitmq_management插件 rabbitmq-plugins enable rabbitmq_management # 查看所有插件 rabbitmq-plugins list
# 新增一个用户 rabbitmqctl add_user 【用户名Username】 【密码Password】 rabbitmqctl add_user root root # 删除一个用户 rabbitmqctl delete_user Username # 修改用户的密码 rabbitmqctl change_password Username Newpassword # 查看当前用户列表 rabbitmqctl list_users # 设置用户角色的命令为: rabbitmqctl set_user_tags User Tag rabbitmqctl set_user_tags root administrator # User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。
Sama seperti pangkalan data mysql, apabila berbilang pengguna berbeza menggunakan perkhidmatan yang disediakan oleh pelayan RabbitMQ yang sama, berbilang vhost boleh dibahagikan, dan setiap satu pengguna boleh Buat pertukaran/baris vhost anda sendiri, dsb.
1) Lihat vhosts pengguna berbeza
Buat vhosts dan tetapkan kebenaran
# 新增vhost rabbitmqctl add_vhost vhostname rabbitmqctl add_vhost order # 查看vhost列表 rabbitmqctl list_vhosts #为vhost添加用户 rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"rabbitmqctl set_permissions -p order root ".*" ".*" ".*" ".*" ".*" ".*"后边三个.*分别代表:配置权限、写权限、读权限
2) Pasang pemasangan sambungan rabbitmq untuk php
https://github.com/php-amqplib/php-amqplib pemasangan sambungan
Ubah suai imej Alibaba Cloud
composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/
.
# 升级composer composer self-update #php.ini 打开 sockets 扩展 #下载指定版本 composer require php-amqplib/php-amqplib=^3.0
Dokumentasi: https://www.rabbitmq.com/tutorials /tutorial-one-php.html
Pengeluar dan mesej ringkas
Kod pengeluar http://localhost/rabbitmq/simple/pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //生产者 //Connection: publisher/consumer和broker之间的TCP连接 //Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:goods $queue_name = 'goods'; $channel->queue_declare($queue_name, false, true, false, false); //生产数据 $data = 'this is messge'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exchange = '', $queue_name); //关闭连接 $channel->close(); $connection->close();
运行生产者脚本:
http://localhost/rabbitmq/simple/pro.php
点击goods队列可以进入到消息详情
http://localhost/rabbitmq/simple/con.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:goods $queue_name = 'goods'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; }; //开启消费 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
rabbitmq Work Queues
一个生产者对应多个消费者,消费特别慢时增加几个消费分发
生产者,和上文生产者不变
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //生产者 //Connection: publisher/consumer和broker之间的TCP连接 //Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); for ($i = 0; $i AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exchange = '', $queue_name); } //关闭连接 $channel->close(); $connection->close();
消费者worker1
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; }; //开启消费 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
消费者worker2,代码和worker1一样,同时运行开启后会一起消费
D:\phpstudy_pro\WWW\rabbitmq\worker\worker2.php
消费者消费消息ack确认
用以确认不会丢失消息
消费消息
basic_consume($queue = ‘’, $consumer_tag = ‘’, $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array())
no_ack=false,设置为手动应答
开启后需要进行消息的消费确认后才会进行移除,否者该消息会一直存在消息队列中
消费端代码
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明队列名为:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
发布/订阅模式
是要是公用一个交换机的消费端都能收到同样的消息,类似广播的功能
文档:rabbitmq Publish/Subscribe
https://www.rabbitmq.com/tutorials/tutorial-three-php.html
rabbitmq Exchange类型
交换器、路由键、绑定 Exchange:交换器。发送消息的AMQP实体。交换器拿到一个消息之后将它路由给一个或几个队列。它使用哪种路由算法是由交换机类型和被称作绑定(Binding)的规则所决定的。RabbitMQ有四种类型。 RoutingKey:路由键。生产者将消息发送给交换器。一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终失效。 Binding:绑定。绑定(Binding)是交换机(Exchange)将消息(Message)路由给队列(Queue)所需遵循的规则。 # 四种模式 Direct 定向 消息与一个特定的路由键完全匹配 Topic 通配符 路由键和某模式进行匹配 Fanout 广播 发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列 Headers 不处理路由键,而是根据发送的消息内容中的headers属性进行匹配
exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。试探性申请一个交换器,若该交换器不存在,则创建;若存在,则跳过。
生产者代码
D:\phpstudy_pro\WWW\rabbitmq\ps\pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'exch'; $channel->exchange_declare($exc_name, 'fanout', false, false, false); //声明数据 $data = 'this is fanout message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 $channel->basic_publish($msg, $exc_name); //关闭连接 $channel->close(); $connection->close();
fanout模式消费者消费消息
是要是公用一个交换机的消费端都能收到同样的消息,类似广播的功能
当消费端运行时才会显示该队列
消费端:
D:\phpstudy_pro\WWW\rabbitmq\ps\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'exch'; $channel->exchange_declare($exc_name, 'fanout', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定 $channel->queue_bind($queue_name,$exc_name); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
文档:
https://www.rabbitmq.com/tutorials/tutorial-four-php.html
用来指定不同的交换机和指定routing_key,在消费端进行消费
生产者代码:
D:\phpstudy_pro\WWW\rabbitmq\routing\pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'info'; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'direct', false, false, false); //声明数据 $data = 'this is ' . $routing_key . ' message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //关闭连接 $channel->close(); $connection->close();
消费者代码
D:\phpstudy_pro\WWW\rabbitmq\routing\info.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'info'; $channel->exchange_declare($exc_name, 'direct', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定,并指定routing_key $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
通配符的匹配模式
如消费端中routing_key = ‘user.*’;
生产者:
指定routing_key= ‘user.top’
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'topic_log'; //指定routing_key $routing_key = 'user.top'; //指定交换机类型为direct $channel->exchange_declare($exc_name, 'topic', false, false, false); //声明数据 $data = 'this is ' . $routing_key . ' message'; //创建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //发布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //关闭连接 $channel->close(); $connection->close();
消费者
消费端中routing_key = ‘user.*’;
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //声明交换器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'user.*'; $channel->exchange_declare($exc_name, 'topic', false, false, false); //获取系统生成的消息队列名称 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //将队列名与交换器名进行绑定,并指定routing_key $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //确认消息已被消费,从生产队列中移除 $msg->ack(); }; //设置消费成功后才能继续进行下一个消费 $channel->basic_qos(null, 1, null); //开启消费no_ack=false,设置为手动应答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不断的循环进行消费 while ($channel->is_open()) { $channel->wait(); } //关闭连接 $channel->close(); $connection->close();
推荐学习:《PHP视频教程》
Atas ialah kandungan terperinci Bermula dengan baris gilir mesej RabbitMQ dan penjelasan terperinci tentang contoh PHP. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!