Heim > PHP-Framework > Swoole > Hauptteil

Integrationspraxis von Swoole und RabbitMQ: Aufbau eines hochverfügbaren Nachrichtenwarteschlangensystems

WBOY
Freigeben: 2023-06-14 12:56:09
Original
1377 Leute haben es durchsucht

Mit dem Aufkommen des Internetzeitalters sind Nachrichtenwarteschlangensysteme immer wichtiger geworden. Es ermöglicht asynchrone Vorgänge zwischen verschiedenen Anwendungen, reduziert die Kopplung und verbessert die Skalierbarkeit, wodurch die Leistung und das Benutzererlebnis des gesamten Systems verbessert werden. Im Nachrichtenwarteschlangensystem ist RabbitMQ eine leistungsstarke Open-Source-Nachrichtenwarteschlangensoftware, die eine Vielzahl von Nachrichtenprotokollen unterstützt und häufig in Finanztransaktionen, E-Commerce, Online-Spielen und anderen Bereichen eingesetzt wird.

In praktischen Anwendungen ist es oft notwendig, RabbitMQ mit anderen Systemen zu integrieren. In diesem Artikel wird erläutert, wie Sie mithilfe der Swoole-Erweiterung einen hochverfügbaren RabbitMQ-Cluster implementieren und einen vollständigen Beispielcode bereitstellen.

1. RabbitMQ-Integration

  1. Einführung in RabbitMQ

RabbitMQ ist eine plattformübergreifende Open-Source-Nachrichtenwarteschlangensoftware, die vollständig dem AMQP-Protokoll (Advanced Message Queuing Protocol) folgt und mehrere Nachrichtenprotokolle unterstützt. Die Kernidee von RabbitMQ besteht darin, Nachrichten in die Warteschlange zu stellen und bei Bedarf herauszunehmen, wodurch ein effizienter asynchroner Datenaustausch und eine effiziente asynchrone Datenkommunikation realisiert werden.

  1. RabbitMQ-Integration

Um RabbitMQ mit PHP-Anwendungen zu integrieren, können wir die von der PHP AMQP-Bibliothek bereitgestellte API verwenden. Die Bibliothek unterstützt das Hauptprotokoll AMQP 0-9-1 von RabbitMQ und Erweiterungen, einschließlich Veröffentlichung, Abonnieren, Warteschlange, Austausch und andere Funktionen. Hier ist ein einfacher Beispielcode:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('hello', false, false, false, false);

// 创建消息
$msg = new AMQPMessage('Hello World!');

// 发送消息
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'
";

// 关闭连接
$channel->close();
$connection->close();
?>
Nach dem Login kopieren

Dieser Beispielcode stellt eine Verbindung zum lokalen RabbitMQ-Server („localhost“) her, deklariert eine Warteschlange mit dem Namen „hello“ und sendet Nachrichten an diese Warteschlange.

2. Swoole-Integration

  1. Einführung in Swoole

Swoole ist ein leistungsstarkes PHP-Framework für die asynchrone Netzwerkkommunikation, das asynchrones TCP, UDP, HTTP, WebSocket und andere Kommunikationsprotokolle basierend auf EventLoop implementiert. Es zeichnet sich durch hohe Parallelität, hohe Leistung, geringen Verbrauch und einfache Entwicklung aus und wird häufig in Szenarien wie Webdiensten und Spieleservern eingesetzt.

  1. Swoole integriert RabbitMQ

Swooles asynchrone Funktionen eignen sich sehr gut für die asynchrone Kommunikation von RabbitMQ und können ein effizientes, stabiles Nachrichtenwarteschlangensystem mit geringer Latenz erreichen. Das Folgende ist ein Beispielcode für Swoole, der RabbitMQ integriert:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C
";

// 接收消息
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "
";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done
";
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

// 监听消息
while (count($channel->callbacks)) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();
?>
Nach dem Login kopieren

Dieser Beispielcode stellt eine Verbindung zum lokalen RabbitMQ-Server („localhost“) her, deklariert eine persistente Warteschlange „task_queue“ und beginnt mit dem Abhören der Nachrichten der Warteschlange. Wenn eine Nachricht eintrifft, ruft Swoole die Rückruffunktion asynchron auf und kann nach der Verarbeitung der Geschäftslogik in der Rückruffunktion eine Antwort senden, um eine effiziente asynchrone Kommunikation mit geringer Latenz zu erreichen.

3. Hochverfügbare Architektur

Um ein hochverfügbares Nachrichtenwarteschlangensystem zu erreichen, müssen wir mehrere RabbitMQ-Knoten in einen Cluster integrieren, um die Skalierbarkeit und Fehlertoleranz des Systems zu verbessern.

Zu den häufig verwendeten RabbitMQ-Clusterkonfigurationen gehören der Aktiv-Standby-Modus und der Spiegelungsmodus. Im Aktiv-Standby-Modus dient ein Knoten als aktiver Knoten und die anderen Knoten als Backup-Knoten. Wenn der Primärknoten ausfällt, übernimmt der Backup-Knoten automatisch seine Aufgaben. Im Spiegelmodus wird eine Warteschlange auf mehreren Knoten auf der Festplatte repliziert und synchron gehalten. Jeder dieser Knoten kann Nachrichten verarbeiten, die von Produzenten- und Konsumentenanfragen gesendet werden.

Unter Berücksichtigung von Stabilität, Skalierbarkeit, Wartbarkeit und anderen Faktoren haben wir den Spiegelmodus als unsere Hochverfügbarkeitsarchitektur gewählt. Das Folgende ist ein Beispielcode zum Hinzufügen einer Spiegelwarteschlange in der Konfigurationsdatei:

$channel->queue_declare('task_queue', false, true, false, false, false, array(
    'x-ha-policy' => array('S', 'all'),
    'x-dead-letter-exchange' => array('S', 'dead_exchange'),
));
Nach dem Login kopieren

Dieser Beispielcode erstellt eine persistente Warteschlange mit dem Namen „task_queue“ und setzt den Parameter „x-ha-policy“ auf „all“, was angibt, dass diese Warteschlange Alle Spiegelwarteschlangen sind „hochverfügbar“. Gleichzeitig wird auch der Parameter „x-dead-letter-exchange“ auf „dead_exchange“ gesetzt, was bedeutet, dass die Nachricht nach der Ablehnung an diesen Switch gesendet wird. Dieser Schalter kann eine oder mehrere Warteschlangen für die Wiederverwendung von Nachrichten oder für Statistiken haben.

4. Vollständiger Beispielcode

Das Folgende ist ein vollständiger Beispielcode für ein Nachrichtenwarteschlangensystem, der das asynchrone Kommunikationsframework von Swoole verwendet, um den RabbitMQ-Spiegelwarteschlangenmodus zu integrieren und ein hochverfügbares Nachrichtenwarteschlangensystem zu implementieren. Sie können die Konfiguration oder den Code ändern, um Ihr eigenes Nachrichtenwarteschlangensystem entsprechend den tatsächlichen Anforderungen zu implementieren.

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$exchangeName = 'test.exchange';
$queueName = 'test.queue';
$deadExchangeName = 'dead.exchange';

// 建立连接
$connection = new AMQPStreamConnection(
    'localhost', 5672, 'guest', 'guest', '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0, null, true
);
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare($exchangeName, 'direct', false, true, false);

// 声明死信交换机
$channel->exchange_declare($deadExchangeName, 'fanout', false, true, false);

// 声明队列
$channel->queue_declare($queueName, false, true, false, false, false, array(
    'x-ha-policy' => array('S', 'all'),
    'x-dead-letter-exchange' => array('S', $deadExchangeName),
));

// 绑定队列到交换机中
$channel->queue_bind($queueName, $exchangeName);

echo " [*] Waiting for messages. To exit press CTRL+C
";

// 接收消息
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "
";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done
";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

// 监听消息
while (count($channel->callbacks)) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();
?>
Nach dem Login kopieren

Im obigen Code wird die Verbindung zu RabbitMQ zunächst über die AMQPStreamConnection-Klasse hergestellt. Anschließend wurde ein Schalter mit dem Namen „test.exchange“ und eine Warteschlange mit dem Namen „test.queue“ erstellt und „x-ha-policy“ auf „all“ gesetzt, was angibt, dass es sich bei dieser Warteschlange um eine Spiegelwarteschlange handelt und alle Knoten darauf zugreifen können. Gleichzeitig wird „x-dead-letter-exchange“ auch auf „dead.exchange“ gesetzt, was bedeutet, dass die Nachricht nach der Ablehnung an den Schalter „dead.exchange“ gesendet wird.

Verwenden Sie abschließend in der Rückruffunktion die Methode basic_ack (), um den Erfolg des Verbrauchs zu ermitteln und die von der Nachricht belegten Ressourcen freizugeben.

Das Obige ist der relevante Inhalt zur Integrationspraxis von Swoole und RabbitMQ. Durch die Verwendung der Swoole-Erweiterung können wir problemlos asynchrone Kommunikation implementieren und mehrere RabbitMQ-Knoten in ein hochverfügbares Nachrichtenwarteschlangensystem integrieren, um die Leistung und Stabilität des Systems zu verbessern.

Das obige ist der detaillierte Inhalt vonIntegrationspraxis von Swoole und RabbitMQ: Aufbau eines hochverfügbaren Nachrichtenwarteschlangensystems. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage