ホームページ > バックエンド開発 > PHPチュートリアル > PHPとMySQLにおけるキューメッセージ確認機構とメッセージリトライ処理方法

PHPとMySQLにおけるキューメッセージ確認機構とメッセージリトライ処理方法

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
リリース: 2023-10-15 12:54:02
オリジナル
1305 人が閲覧しました

PHPとMySQLにおけるキューメッセージ確認機構とメッセージリトライ処理方法

PHP と MySQL におけるキューのメッセージ確認機構とメッセージリトライ処理方法

はじめに:
インターネットアプリケーションの発展に伴い、多くのオンラインサービスが登場のリクエストを処理する必要があり、これらのリクエストには非同期処理メソッドが必要になることがよくあります。キューは、リクエストを処理から効果的に分離し、システムのパフォーマンスと信頼性を向上させる一般的なソリューションです。この記事では、PHPとMySQLにおけるキューのメッセージ確認の仕組みとメッセージのリトライ処理方法を、具体的なコード例とともに紹介します。

1. メッセージ キューの概念と機能
メッセージ キューは、メッセージをキューに格納し、非同期に処理する一般的なアプリケーション モードです。メッセージ キューの利点は主に次の側面に反映されます。

  1. 分離: リクエストと処理を分離すると、システムのスケーラビリティと保守性が向上します。
  2. 非同期処理: 時間のかかる操作を非同期処理のキューに入れて、システムの応答速度を向上させます。
  3. 失敗した処理に対するフォールトトレラントなメカニズム: メッセージ確認およびメッセージ再試行メカニズムを通じて、メッセージ処理の信頼性を確保します。

2. メッセージ確認の仕組み
キュー システムにおいて、メッセージ確認はメッセージの処理が完了したことを確認するための仕組みです。メッセージ確認メカニズムは、メッセージの損失や処理の重複の問題を回避するのに役立ちます。

PHP でのメッセージ確認は、ACK メカニズムを使用して実現できます。具体的な実装手順は次のとおりです。

  1. プロデューサーはメッセージをキューに送信します。
  2. コンシューマはキューからメッセージを取り出して処理します。
  3. メッセージ処理が成功した場合、コンシューマはメッセージ処理が完了したことを確認するために ACK を送信し、そうでない場合は、コンシューマはメッセージ処理を拒否するために NACK を送信します。
  4. キューはメッセージの ACK または NACK の受信を確認し、キューから ACK メッセージを削除します。

以下は、RabbitMQ をメッセージキューとして使用したサンプルコードです。
Producer:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

#!/usr/bin/env php

<?php

require_once __DIR__ . '/../vendor/autoload.php';

 

use PhpAmqpLibConnectionAMQPStreamConnection;

use PhpAmqpLibMessageAMQPMessage;

 

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('queue_name', false, false, false, false);

 

$msg = new AMQPMessage('Hello World!');

$channel->basic_publish($msg, '', 'queue_name');

 

$channel->close();

$connection->close();

ログイン後にコピー

Consumer:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

#!/usr/bin/env php

<?php

require_once __DIR__ . '/../vendor/autoload.php';

 

use PhpAmqpLibConnectionAMQPStreamConnection;

use PhpAmqpLibMessageAMQPMessage;

 

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('queue_name', false, false, false, false);

 

$callback = function (AMQPMessage $msg) {

    echo 'Received message: ' . $msg->body . PHP_EOL;

    if (processMessage($msg)) {

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); // 消息处理成功,发送ACK确认

    } else {

        $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true); // 消息处理失败,发送NACK拒绝

    }

};

 

$channel->basic_consume('queue_name', '', false, false, false, false, $callback);

while (count($channel->callbacks)) {

    $channel->wait();

}

 

function processMessage(AMQPMessage $msg) {

    // 消息处理逻辑

    if ($msg->body == 'Hello World!') {

        return true;

    } else {

        return false;

    }

}

 

$channel->close();

$connection->close();

ログイン後にコピー

3. メッセージリトライ処理方法
実際のアプリケーションでは、ネットワーク障害やサーバーエラーなどによりメッセージ処理が失敗する場合があります。メッセージの信頼性を確保するために、処理に失敗したメッセージは再試行できます。

MySQL は、メッセージの再試行処理に適用できるトランザクションおよびロールバック メカニズムを提供します。具体的な実装手順は次のとおりです。

  1. プロデューサは、データベース内のメッセージ テーブルにメッセージを送信します。
  2. コンシューマは、データベース内のメッセージ テーブルからメッセージを取得して処理します。
  3. メッセージが正常に処理された場合はメッセージ テーブルからメッセージを削除し、そうでない場合はメッセージ テーブルの処理回数に 1 を加えて処理時間を更新します。
  4. スケジュールされたタスクを設定して、最大再試行回数以下で処理されたメッセージ テーブル内のメッセージを定期的にチェックし、コンシューマに再配信します。

次は、MySQL をメッセージ ストアとして使用するサンプル コードです。
プロデューサー:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

<?php

$dsn = 'mysql:dbname=testdb;host=127.0.0.1';

$user = 'root';

$password = '';

 

try {

    $db = new PDO($dsn, $user, $password);

    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

 

    $sql = 'INSERT INTO message_queue (message) VALUES (?)';

    $stmt = $db->prepare($sql);

    $message = 'Hello World!';

    $stmt->bindParam(1, $message);

    $stmt->execute();

 

} catch (PDOException $e) {

    echo 'Connection failed: ' . $e->getMessage();

}

ログイン後にコピー

コンシューマー:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

<?php

$dsn = 'mysql:dbname=testdb;host=127.0.0.1';

$user = 'root';

$password = '';

 

try {

    $db = new PDO($dsn, $user, $password);

    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

 

    $sql = 'SELECT * FROM message_queue';

    $stmt = $db->prepare($sql);

    $stmt->execute();

    $messages = $stmt->fetchAll();

 

    foreach ($messages as $message) {

        if (processMessage($message)) {

            $deleteSql = 'DELETE FROM message_queue WHERE id = ?';

            $deleteStmt = $db->prepare($deleteSql);

            $deleteStmt->bindParam(1, $message['id']);

            $deleteStmt->execute();

        } else {

            $retrySql = 'UPDATE message_queue SET retries = retries + 1, last_retry_time = ? WHERE id = ?';

            $retryStmt = $db->prepare($retrySql);

            $now = date('Y-m-d H:i:s');

            $retryStmt->bindParam(1, $now);

            $retryStmt->bindParam(2, $message['id']);

            $retryStmt->execute();

        }

    }

 

} catch (PDOException $e) {

    echo 'Connection failed: ' . $e->getMessage();

}

 

function processMessage($message) {

    // 消息处理逻辑

    if ($message['message'] == 'Hello World!') {

        return true;

    } else {

        return false;

    }

}

ログイン後にコピー

スケジュールされたタスク:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<?php

$dsn = 'mysql:dbname=testdb;host=127.0.0.1';

$user = 'root';

$password = '';

 

try {

    $db = new PDO($dsn, $user, $password);

    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

 

    $sql = 'SELECT * FROM message_queue WHERE retries <= ?';

    $stmt = $db->prepare($sql);

    $maxRetries = 3;

    $stmt->bindParam(1, $maxRetries);

    $stmt->execute();

    $messages = $stmt->fetchAll();

 

    foreach ($messages as $message) {

        // 重新投递消息给消费者

    }

 

} catch (PDOException $e) {

    echo 'Connection failed: ' . $e->getMessage();

}

ログイン後にコピー

結論:
メッセージ確認メカニズムとメッセージ再試行処理方法を通じて、システムの信頼性と安定性を向上させることができます。一般的な分離および非同期処理ツールとして、キューは PHP および MySQL でメッセージの確認と再試行を効果的に実装でき、アプリケーションのパフォーマンスとユーザー エクスペリエンスを向上させます。

参考資料:

  1. PHP RabbitMQ 公式ドキュメント: https://github.com/php-amqplib/php-amqplib
  2. PHP MySQL 公式ドキュメント: https : //www.php.net/manual/en/book.pdo.php

以上がPHPとMySQLにおけるキューメッセージ確認機構とメッセージリトライ処理方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート