ホームページ バックエンド開発 PHPチュートリアル php pthreads マルチスレッドのインストールと使用

php pthreads マルチスレッドのインストールと使用

Dec 21, 2016 am 11:51 AM

Pthreads をインストールするには、基本的に PHP を再コンパイルして --enable-maintainer-zts パラメーターを追加する必要がありますが、これを使用するためのドキュメントはほとんどなく、多くのバグや予期せぬ問題が発生するため、実稼働環境では「ははは」しかできません。実際のマルチスレッドの場合は、Python、C などを使用してください。

1. インストール

ここで使用するものは php-7.0.2 です

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql
ログイン後にコピー

make && make install

pthreads をインストールします

pecl install pthreads

2. スレッド

<?php
#1
$thread = new class extends Thread {
public function run() {
echo "Hello World {$this->getThreadId()}\n";
}
};
$thread->start() && $thread->join();
#2
class workerThread extends Thread {
public function __construct($i){
$this->i=$i;
}
public function run(){
while(true){
echo $this->i."\n";
sleep(1);
}
}
}
for($i=0;$i<50;$i++){
$workers[$i]=new workerThread($i);
$workers[$i]->start();
}
?>
ログイン後にコピー

3. ワーカーとスタッカブル


スタッカブル オブジェクトの前後で同期、読み取り、書き込みを行うことができます。

<?php
class SQLQuery extends Stackable {
public function __construct($sql) {
$this->sql = $sql;
}
public function run() {
$dbh = $this->worker->getConnection();
$row = $dbh->query($this->sql);
while($member = $row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
class ExampleWorker extends Worker {
public static $dbh;
public function __construct($name) {
}
public function run(){
self::$dbh = new PDO(&#39;mysql:host=10.0.0.30;dbname=testdb&#39;,&#39;root&#39;,&#39;123456&#39;);
}
public function getConnection(){
return self::$dbh;
}
}
$worker = new ExampleWorker("My Worker Thread");
$sql1 = new SQLQuery(&#39;select * from test order by id desc limit 1,5&#39;);
$worker->stack($sql1);
$sql2 = new SQLQuery(&#39;select * from test order by id desc limit 5,5&#39;);
$worker->stack($sql2);
$worker->start();
$worker->shutdown();
?>
ログイン後にコピー

4. ミューテックス ロック


どのような状況でミューテックス ロックが使用されますか?複数のスレッドを制御する必要があり、同時に動作できるスレッドが 1 つだけである場合に使用できます。ミューテックスの有無の違いを示す簡単なカウンター プログラム

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);
class CounterThread extends Thread {
public function __construct($mutex = null){
$this->mutex = $mutex;
$this->handle = fopen("/tmp/counter.txt", "w+");
}
public function __destruct(){
fclose($this->handle);
}
public function run() {
if($this->mutex)
$locked=Mutex::lock($this->mutex);
$counter = intval(fgets($this->handle));
$counter++;
rewind($this->handle);
fputs($this->handle, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
if($this->mutex)
Mutex::unlock($this->mutex);
}
}
//没有互斥锁
for ($i=0;$i<50;$i++){
$threads[$i] = new CounterThread();
$threads[$i]->start();
}
//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
$threads[$i] = new CounterThread($mutex);
$threads[$i]->start();
}
Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
$threads[$i]->join();
}
Mutex::destroy($mutex);
?>
ログイン後にコピー

マルチスレッドと共有メモリ


共有メモリの例では、ロックは使用されていません。それでも動作する可能性があります。動作する可能性があります。 メモリ操作それ自体は lock

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);
$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );
class CounterThread extends Thread {
public function __construct($shmid){
$this->shmid = $shmid;
}
public function run() {
$counter = shm_get_var( $this->shmid, 1 );
$counter++;
shm_put_var( $this->shmid, 1, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
}
}
for ($i=0;$i<100;$i++){
$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
$threads[$i]->start();
}
for ($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>
ログイン後にコピー

の機能を持っています

5. スレッドの同期


いくつかのシナリオでは、thread->start() でプログラムの実行を開始したくないが、スレッドがコマンドを待機するようにしたい場合があります。 。 thread->wait(); のテスト関数は、スレッドが thread->start() の直後に実行されるのではなく、 thread->notify(); からのシグナルを受信した後にのみ実行されます。

6. スレッドプール

プールクラス

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);
$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );
class CounterThread extends Thread {
public function __construct($shmid){
$this->shmid = $shmid;
}
public function run() {
$this->synchronized(function($thread){
$thread->wait();
}, $this);
$counter = shm_get_var( $this->shmid, 1 );
$counter++;
shm_put_var( $this->shmid, 1, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
}
}
for ($i=0;$i<100;$i++){
$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
$threads[$i]->start();
}
for ($i=0;$i<100;$i++){
$threads[$i]->synchronized(function($thread){
$thread->notify();
}, $threads[$i]);
}
for ($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>
ログイン後にコピー

動的キュースレッドプール

上記の例は、スレッドプールがいっぱいになったらすぐに開始する例です。スレッド プールは空いています。新しいスレッドを作成します。

<?php
class Update extends Thread {
public $running = false;
public $row = array();
public function __construct($row) {
$this->row = $row;
$this->sql = null;
}
public function run() {
if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
}else{
$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
file_put_contents("bankno_error.log", $error, FILE_APPEND);
}
if( strlen($bankno) > 7 ){
$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
$this->sql = $sql;
}
printf("%s\n",$this->sql);
}
}
class Pool {
public $pool = array();
public function __construct($count) {
$this->count = $count;
}
public function push($row){
if(count($this->pool) < $this->count){
$this->pool[] = new Update($row);
return true;
}else{
return false;
}
}
public function start(){
foreach ( $this->pool as $id => $worker){
$this->pool[$id]->start();
}
}
public function join(){
foreach ( $this->pool as $id => $worker){
$this->pool[$id]->join();
}
}
public function clean(){
foreach ( $this->pool as $id => $worker){
if(! $worker->isRunning()){
unset($this->pool[$id]);
}
}
}
}
try {
$dbh = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select id,bankno from members order by id desc";
$row = $dbh->query($sql);
$pool = new Pool(5);
while($member = $row->fetch(PDO::FETCH_ASSOC))
{
while(true){
if($pool->push($member)){ //压入任务到池中
break;
}else{ //如果池已经满,就开始启动线程
$pool->start();
$pool->join();
$pool->clean();
}
}
}
$pool->start();
$pool->join();
$dbh = null;
} catch (Exception $e) {
echo &#39;[&#39; , date(&#39;H:i:s&#39;) , &#39;]&#39;, &#39;系统错误&#39;, $e->getMessage(), "\n";
}
?>
ログイン後にコピー

pthreads プールクラス

<?php
class Update extends Thread {
public $running = false;
public $row = array();
public function __construct($row) {
$this->row = $row;
$this->sql = null;
//print_r($this->row);
}
public function run() {
if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
}else{
$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
file_put_contents("bankno_error.log", $error, FILE_APPEND);
}
if( strlen($bankno) > 7 ){
$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
$this->sql = $sql;
}
printf("%s\n",$this->sql);
}
}
try {
$dbh = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select id,bankno from members order by id desc limit 50";
$row = $dbh->query($sql);
$pool = array();
while($member = $row->fetch(PDO::FETCH_ASSOC))
{
$id = $member[&#39;id&#39;];
while (true){
if(count($pool) < 5){
$pool[$id] = new Update($member);
$pool[$id]->start();
break;
}else{
foreach ( $pool as $name => $worker){
if(! $worker->isRunning()){
unset($pool[$name]);
}
}
}
}
}
$dbh = null;
} catch (Exception $e) {
echo &#39;【&#39; , date(&#39;H:i:s&#39;) , &#39;】&#39;, &#39;【系统错误】&#39;, $e->getMessage(), "\n";
}
?>
ログイン後にコピー

7. マルチスレッドファイルの安全な読み書き

LOCK_SH 共有ロックの取得(プログラムの読み込み)

LOCK_EX 排他ロックの取得(プログラムの書き込み) )


LOCK_UN はロックを解放します (共有か排他かに関係なく)

LOCK_NB ロック時に flock() をブロックしたくない場合

<?php
class WebWorker extends Worker {
public function __construct(SafeLog $logger) {
$this->logger = $logger;
}
protected $loger;
}
class WebWork extends Stackable {
public function isComplete() {
return $this->complete;
}
public function run() {
$this->worker
->logger
->log("%s executing in Thread #%lu",
__CLASS__, $this->worker->getThreadId());
$this->complete = true;
}
protected $complete;
}
class SafeLog extends Stackable {
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf(
"{$message}\n", $args);
}
}
}
$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();
$pool->collect(function($work){
return $work->isComplete();
});
var_dump($pool);
ログイン後にコピー

8. マルチスレッドとデータ接続

pthreads と pdo が使用されます同時に注意する必要があるのは、 public static $dbh; を静的に宣言し、シングルトン モード


PDO

<?php
$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) { // 进行排它型锁定
ftruncate($fp, 0); // truncate file
fwrite($fp, "Write something here\n");
fflush($fp); // flush output before releasing the lock
flock($fp, LOCK_UN); // 释放锁定
} else {
echo "Couldn&#39;t get the lock!";
}
fclose($fp);
$fp = fopen(&#39;/tmp/lock.txt&#39;, &#39;r+&#39;);
if(!flock($fp, LOCK_EX | LOCK_NB)) {
echo &#39;Unable to obtain lock&#39;;
exit(-1);
}
fclose($fp);
?>
ログイン後にコピー

Pool と PDO

を通じてデータベース接続にアクセスする必要があることです。スレッド プール内のデータベースをリンクします

<?php
class Work extends Stackable {
public function __construct() {
}
public function run() {
$dbh = $this->worker->getConnection();
$sql = "select id,name from members order by id desc limit ";
$row = $dbh->query($sql);
while($member = $row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
class ExampleWorker extends Worker {
public static $dbh;
public function __construct($name) {
}
/*
* The run method should just prepare the environment for the work that is coming ...
*/
public function run(){
self::$dbh = new PDO(&#39;mysql:host=...;dbname=example&#39;,&#39;www&#39;,&#39;&#39;);
}
public function getConnection(){
return self::$dbh;
}
}
$worker = new ExampleWorker("My Worker Thread");
$work=new Work();
$worker->stack($work);
$worker->start();
$worker->shutdown();
?>
ログイン後にコピー

上記のプログラムをさらに改善するには、シングルトン モード $this->worker->getInstance(); を使用して、グローバルに 1 つのデータベース接続のみを作成し、スレッドを作成します。共有データベース接続を使用します

# cat pool.php
<?php
class ExampleWorker extends Worker {
public function __construct(Logging $logger) {
$this->logger = $logger;
}
protected $logger;
}
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
public function __construct($number) {
$this->number = $number;
}
public function run() {
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example_real&#39;;
$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
$sql = "select OPEN_TIME, `COMMENT` from MT_TRADES where LOGIN=&#39;".$this->number[&#39;name&#39;]."&#39; and CMD=&#39;&#39; and `COMMENT` = &#39;".$this->number[&#39;order&#39;].":DEPOSIT&#39;";
#echo $sql;
$row = $dbh->query($sql);
$mt_trades = $row->fetch(PDO::FETCH_ASSOC);
if($mt_trades){
$row = null;
$sql = "UPDATE db_example.accounts SET paystatus=&#39;成功&#39;, deposit_time=&#39;".$mt_trades[&#39;OPEN_TIME&#39;]."&#39; where `order` = &#39;".$this->number[&#39;order&#39;]."&#39;;";
$dbh->query($sql);
#printf("%s\n",$sql);
}
$dbh = null;
printf("runtime: %s, %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->worker->getThreadId() ,$this->number[&#39;order&#39;]);
}
}
class Logging extends Stackable {
protected static $dbh;
public function __construct() {
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example_real&#39;; // 数据库名
self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
}
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf("{$message}\n", $args);
}
}
protected function getConnection(){
return self::$dbh;
}
}
$pool = new Pool(, \ExampleWorker::class, [new Logging()]);
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;db_example&#39;;
$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";
$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
{
$pool->submit(new Work($account));
}
$pool->shutdown();
?> 
ログイン後にコピー

マルチスレッドでのデータベース操作の概要

一般に、pthreads はまだ開発中であり、まだいくつかの欠点があります。また、pthreads の git が常にこれを改善していることもわかります。プロジェクト


データベースの永続リンクは非常に重要です。そうしないと、各スレッドが一度開かれます。データベース接続とその後の終了により、多くのリンクタイムアウトが発生します。


php pthreads マルチスレッドのインストールと使用に関する関連知識が必要です。まずはここで紹介しますが、今後はさらに更新される予定です。php pthreads マルチスレッドのインストールと使用に関連する記事については、PHP 中国語 Web サイト

に注目してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

PHPのカール:REST APIでPHPカール拡張機能を使用する方法 PHPのカール:REST APIでPHPカール拡張機能を使用する方法 Mar 14, 2025 am 11:42 AM

PHPクライアントURL(CURL)拡張機能は、開発者にとって強力なツールであり、リモートサーバーやREST APIとのシームレスな対話を可能にします。尊敬されるマルチプロトコルファイル転送ライブラリであるLibcurlを活用することにより、PHP Curlは効率的なexecuを促進します

JSON Web Tokens(JWT)とPHP APIでのユースケースを説明してください。 JSON Web Tokens(JWT)とPHP APIでのユースケースを説明してください。 Apr 05, 2025 am 12:04 AM

JWTは、JSONに基づくオープン標準であり、主にアイデンティティ認証と情報交換のために、当事者間で情報を安全に送信するために使用されます。 1。JWTは、ヘッダー、ペイロード、署名の3つの部分で構成されています。 2。JWTの実用的な原則には、JWTの生成、JWTの検証、ペイロードの解析という3つのステップが含まれます。 3. PHPでの認証にJWTを使用する場合、JWTを生成および検証でき、ユーザーの役割と許可情報を高度な使用に含めることができます。 4.一般的なエラーには、署名検証障害、トークンの有効期限、およびペイロードが大きくなります。デバッグスキルには、デバッグツールの使用とロギングが含まれます。 5.パフォーマンスの最適化とベストプラクティスには、適切な署名アルゴリズムの使用、有効期間を合理的に設定することが含まれます。

PHPにおける後期静的結合の概念を説明します。 PHPにおける後期静的結合の概念を説明します。 Mar 21, 2025 pm 01:33 PM

記事では、PHP 5.3で導入されたPHPの後期静的結合(LSB)について説明し、より柔軟な継承を求める静的メソッドコールのランタイム解像度を可能にします。 LSBの実用的なアプリケーションと潜在的なパフォーマ

フレームワークセキュリティ機能:脆弱性から保護します。 フレームワークセキュリティ機能:脆弱性から保護します。 Mar 28, 2025 pm 05:11 PM

記事では、入力検証、認証、定期的な更新など、脆弱性から保護するためのフレームワークの重要なセキュリティ機能について説明します。

PHPのCurlライブラリを使用してJSONデータを含むPOSTリクエストを送信する方法は? PHPのCurlライブラリを使用してJSONデータを含むPOSTリクエストを送信する方法は? Apr 01, 2025 pm 03:12 PM

PHP開発でPHPのCurlライブラリを使用してJSONデータを送信すると、外部APIと対話する必要があることがよくあります。一般的な方法の1つは、Curlライブラリを使用して投稿を送信することです。

フレームワークのカスタマイズ/拡張:カスタム機能を追加する方法。 フレームワークのカスタマイズ/拡張:カスタム機能を追加する方法。 Mar 28, 2025 pm 05:12 PM

この記事では、フレームワークにカスタム機能を追加し、アーキテクチャの理解、拡張ポイントの識別、統合とデバッグのベストプラクティスに焦点を当てています。

ReactPhpの非ブロッキング機能は何ですか?ブロッキングI/O操作を処理する方法は? ReactPhpの非ブロッキング機能は何ですか?ブロッキングI/O操作を処理する方法は? Apr 01, 2025 pm 03:09 PM

ReactPhpの詳細な解釈の非ブロッキング機能の公式紹介は、多くの開発者の質問を呼び起こしました。

See all articles