目次
まえがき
ストレージ キー:
コマンドを実行します
タグ
モードの違い
公開操作
公開プロセス
公開パラメーター
立即执行
延迟发布
执行过程
在其他项目中推任务
php版本
go版本
ホームページ PHPフレームワーク ThinkPHP think-queueの分析(redis周りの分析)

think-queueの分析(redis周りの分析)

Jul 26, 2021 pm 04:00 PM
php redis thinkphp5

まえがき

分析する前に、必ずメッセージ キューの実装を理解してください

tp5 メッセージ キューはデータベース Redis と Topthink の TP 公式実装に基づいています
この章は分析用に Redis に基づいています

ストレージ キー:

#key Type Description
queues:queueName list 実行するタスク
think:queue:restart string 再起動キューのタイムスタンプ
queues:queueName:layed zSet 遅延タスク
##queues :queueName:reserved zSet 実行に失敗しました。再実行を待機しています

コマンドを実行します

work listen との違いについては以下で説明します
#コマンド#説明#php think queue:workphp think queue:listenphp think queue:restart php think queue:subscribe ##動作タグ
Listening queue
#リッスンキュー
キューを再起動
まだありません。予約されている可能性があります。公式には他のアイデアもありますが、まだ実装されていません

タグ

##説明worker_daemon_startデーモン開始タスクの実行開始前タスク実行の遅延#queue_failedコマンドパラメータ使用可能なモード
worker_memory_exceeded メモリ超過
worker_queue_restart デーモンの再起動
#worker_before_process
worker_before_sleep
##タスク実行失敗
パラメータ デフォルト値
説明

キュー名前実行するタスクnullworkデーモンプロセスとしてタスクを実行します0仕事、聞く失敗後の再実行時間nullwork失敗後の再実行時間128M作業、リスニング最大メモリの制限3仕事、聞く## Tryswork,listenタスク失敗後の最大試行回数

モードの違い

1: 異なる実行原理
work: シングルプロセス処理モード;
デーモンパラメータなしワークプロセスは、次のメッセージを処理した後、現在のプロセスを直接終了します。新しいメッセージがない場合、一定期間スリープしてから終了します。
デーモン パラメータを使用すると、ワーク プロセスは、メモリがパラメータ設定を超えるまでループでキュー内のメッセージを処理してから、プロセスを終了します。 。新しいメッセージがない場合、各ループで一定期間スリープします。

listen: 親プロセスと子プロセスの処理モード。
は親プロセスに単一の実行モードを作成します。作業子プロセスは、作業子プロセスを通じてキュー内の次のメッセージを処理します。作業子プロセスが終了すると、
が配置されている親プロセスは、子プロセスの終了シグナルをリッスンして、新しいメッセージを作成します。 1. Work 子プロセスが一度に実行される;

2: 終了タイミングが異なります
work: 上記を参照
listen: 親プロセスは、次の 2 つの場合を除き、通常の状況で常に実行されます。
01: 作成された work 子プロセスの実行時間が listen コマンド ラインの --timeout パラメータ設定を超えます。この時点で、work 子プロセスは強制的に終了され、listen が実行されている親プロセスは見つかった場合も、ProcessTimeoutException 例外がスローされて終了します。;

開発者は、この例外をキャッチして親プロセスの実行を続行することを選択できます;
02: 親プロセスに何らかの理由でメモリ リークがある場合、親プロセス自体が占有するメモリがコマンド ラインを超えた場合 --memory パラメーターが設定されている場合、親プロセスと子プロセスの両方が終了します。通常の状況では、リッスン プロセス自体が占有するメモリは安定しています。

3: パフォーマンスの違い
work: スクリプト内でループし、コマンド実行の初期段階でフレームワーク スクリプトが読み込まれます;

listen: 処理後に新しいプロセスを開始しますタスク ワーク プロセスはこの時点でフレームワーク スクリプトをリロードします;

したがって、ワーク モードのパフォーマンスはリッスン モードよりも高くなります。
注: コードが更新された場合、作業モードで php think queue:restart コマンドを手動で実行して、変更を有効にするためにキューを再起動する必要があります。リッスン モードでは、他の操作を行わずに自動的に有効になります。オペレーション。

4: タイムアウト制御機能
work: 本質的には、プロセス自体の実行時間を制御することも、実行中のタスクの実行時間を制限することもできません。
listen: 作業の子を制限できますプロセスのタイムアウト時間;

タイムアウト パラメータを使用して、作業サブプロセスの実行が許可される最大時間を制限できます。この制限時間を超えて終了しなかったサブプロセスは、強制終了;
期限切れと時間の違い

期限切れは構成ファイルで設定され、タスクの有効期限を指します。この時間はグローバルであり、すべての作業プロセスに影響します。
タイムアウトはコマンド ライン パラメータに設定され、作業サブプロセスのタイムアウト時間を参照します。今回は、現在実行されている listen コマンドに対してのみ有効です。タイムアウトの対象は作業サブプロセスです。

5 : さまざまな使用シナリオ

作業に適用できるシナリオは次のとおりです:
01: 多数のタスク
02: 高いパフォーマンス要件
03: 短いタスク実行時間
04: タスクの実行時間が短い無限ループ、sleep()、exit()、die()、およびコンシューマ クラスでバグを引き起こしやすいその他のロジック

listen 該当するシナリオは次のとおりです:

01: タスクの数が小さい
02: タスクの実行時間が長い
03: タスクの実行時間を厳密に制限する必要がある

公開操作

解析を行っているためRedis に基づいているため、src/queue/connector/redis.php
01: 最初に src/Queue.php を呼び出します。__callStatic
のマジック メソッドを分析するだけです。 02: buildConnector は __callStatic メソッドで呼び出されます
03: 構成ファイルは最初に buildConnector で読み込まれます。ない場合は同期的に実行されます
04: 構成ファイルに基づいて接続を作成しますそして、構成を渡します

redis.php クラスのコンストラクターでの操作:
01: redis 拡張機能がインストールされているかどうかを確認します
02: 構成をマージします
03: 検出Redis 拡張機能か pRedis かどうか
04: 接続オブジェクトの作成

公開プロセス

公開パラメーター

null 作業、リッスン
##daemon
遅延
force
メモリ
睡眠 ##タスクがないときの待ち時間
0
##$dataemptyタスクデータpush,later$queuedefaultタスク名push,later #$遅延##

立即执行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');
ログイン後にコピー

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:

[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]
ログイン後にコピー

写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写

延迟发布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');
ログイン後にコピー

跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用

    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]
ログイン後にコピー

think-queueの分析(redis周りの分析)

public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }
ログイン後にコピー

控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出

守护进程开启
任务延迟执行
ログイン後にコピー

失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

<?php class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
ログイン後にコピー

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}
ログイン後にコピー

更多thinkphp技术知识,请访问thinkphp教程栏目!

パラメーター名 デフォルト値 説明 使用できるメソッド
$job None タスクを実行するクラス push,later
null 遅延時間 後で

以上がthink-queueの分析(redis周りの分析)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

PHPとPython:2つの一般的なプログラミング言語を比較します PHPとPython:2つの一般的なプログラミング言語を比較します Apr 14, 2025 am 12:13 AM

PHPとPythonにはそれぞれ独自の利点があり、プロジェクトの要件に従って選択します。 1.PHPは、特にWebサイトの迅速な開発とメンテナンスに適しています。 2。Pythonは、データサイエンス、機械学習、人工知能に適しており、簡潔な構文を備えており、初心者に適しています。

PHP:Web開発の重要な言語 PHP:Web開発の重要な言語 Apr 13, 2025 am 12:08 AM

PHPは、サーバー側で広く使用されているスクリプト言語で、特にWeb開発に適しています。 1.PHPは、HTMLを埋め込み、HTTP要求と応答を処理し、さまざまなデータベースをサポートできます。 2.PHPは、ダイナミックWebコンテンツ、プロセスフォームデータ、アクセスデータベースなどを生成するために使用され、強力なコミュニティサポートとオープンソースリソースを備えています。 3。PHPは解釈された言語であり、実行プロセスには語彙分析、文法分析、編集、実行が含まれます。 4.PHPは、ユーザー登録システムなどの高度なアプリケーションについてMySQLと組み合わせることができます。 5。PHPをデバッグするときは、error_reporting()やvar_dump()などの関数を使用できます。 6. PHPコードを最適化して、キャッシュメカニズムを使用し、データベースクエリを最適化し、組み込み関数を使用します。 7

PHPの現在のステータス:Web開発動向を見てください PHPの現在のステータス:Web開発動向を見てください Apr 13, 2025 am 12:20 AM

PHPは、現代のWeb開発、特にコンテンツ管理とeコマースプラットフォームで依然として重要です。 1)PHPには、LaravelやSymfonyなどの豊富なエコシステムと強力なフレームワークサポートがあります。 2)パフォーマンスの最適化は、Opcacheとnginxを通じて達成できます。 3)PHP8.0は、パフォーマンスを改善するためにJITコンパイラを導入します。 4)クラウドネイティブアプリケーションは、DockerおよびKubernetesを介して展開され、柔軟性とスケーラビリティを向上させます。

PHPの永続的な関連性:それはまだ生きていますか? PHPの永続的な関連性:それはまだ生きていますか? Apr 14, 2025 am 12:12 AM

PHPは依然として動的であり、現代のプログラミングの分野で重要な位置を占めています。 1)PHPのシンプルさと強力なコミュニティサポートにより、Web開発で広く使用されています。 2)その柔軟性と安定性により、Webフォーム、データベース操作、ファイル処理の処理において顕著になります。 3)PHPは、初心者や経験豊富な開発者に適した、常に進化し、最適化しています。

PHP対その他の言語:比較 PHP対その他の言語:比較 Apr 13, 2025 am 12:19 AM

PHPは、特に迅速な開発や動的なコンテンツの処理に適していますが、データサイエンスとエンタープライズレベルのアプリケーションには良くありません。 Pythonと比較して、PHPはWeb開発においてより多くの利点がありますが、データサイエンスの分野ではPythonほど良くありません。 Javaと比較して、PHPはエンタープライズレベルのアプリケーションでより悪化しますが、Web開発により柔軟性があります。 JavaScriptと比較して、PHPはバックエンド開発により簡潔ですが、フロントエンド開発のJavaScriptほど良くありません。

PHPの目的:動的なWebサイトの構築 PHPの目的:動的なWebサイトの構築 Apr 15, 2025 am 12:18 AM

PHPは動的なWebサイトを構築するために使用され、そのコア関数には次のものが含まれます。1。データベースに接続することにより、動的コンテンツを生成し、リアルタイムでWebページを生成します。 2。ユーザーのインタラクションを処理し、提出をフォームし、入力を確認し、操作に応答します。 3.セッションとユーザー認証を管理して、パーソナライズされたエクスペリエンスを提供します。 4.パフォーマンスを最適化し、ベストプラクティスに従って、ウェブサイトの効率とセキュリティを改善します。

PHP対Python:コア機能と機能 PHP対Python:コア機能と機能 Apr 13, 2025 am 12:16 AM

PHPとPythonにはそれぞれ独自の利点があり、さまざまなシナリオに適しています。 1.PHPはWeb開発に適しており、組み込みのWebサーバーとRich Functionライブラリを提供します。 2。Pythonは、簡潔な構文と強力な標準ライブラリを備えたデータサイエンスと機械学習に適しています。選択するときは、プロジェクトの要件に基づいて決定する必要があります。

アクション中のPHP:実際の例とアプリケーション アクション中のPHP:実際の例とアプリケーション Apr 14, 2025 am 12:19 AM

PHPは、電子商取引、コンテンツ管理システム、API開発で広く使用されています。 1)eコマース:ショッピングカート機能と支払い処理に使用。 2)コンテンツ管理システム:動的コンテンツの生成とユーザー管理に使用されます。 3)API開発:RESTFUL API開発とAPIセキュリティに使用されます。パフォーマンスの最適化とベストプラクティスを通じて、PHPアプリケーションの効率と保守性が向上します。

See all articles