> 백엔드 개발 > PHP 튜토리얼 > PHP pthreads 멀티스레딩 설치 및 사용

PHP pthreads 멀티스레딩 설치 및 사용

高洛峰
풀어 주다: 2023-03-03 19:42:01
원래의
1431명이 탐색했습니다.

Pthreads를 설치하려면 기본적으로 PHP를 다시 컴파일하고 --enable-maintainer-zts 매개변수를 추가해야 하지만 이를 사용하기 위한 문서가 거의 없으며 버그가 많고 예상치 못한 문제가 있으므로 프로덕션 환경은 무시할 수 밖에 없습니다. 실제 멀티스레딩을 위해서는 여전히 Python, C 등을 사용해야 합니다.

1. 설치

여기서 사용되는 것은 다음과 같습니다. php-7.0.2

./configure \
--prefix=/usr/local/php7 \
--with-config-file-path=/etc \
--with-config-file-scan-dir=/etc/php.d \
--enable-debug \
--enable-maintainer-zts \
--enable-pcntl \
--enable-fpm \
--enable-opcache \
--enable-embed=shared \
--enable-json=shared \
--enable-phpdbg \
--with-curl=shared \
--with-mysql=/usr/local/mysql \
--with-mysqli=/usr/local/mysql/bin/mysql_config \
--with-pdo-mysql
로그인 후 복사

make && make install

pthread 설치

pecl install pthreads

2. Thread

<?php
#1
$thread = new class extends Thread {
public function run() {
echo "Hello World {$this->getThreadId()}\n";
}
};
$thread->start() && $thread->join();
#2
class workerThread extends Thread {
public function __construct($i){
$this->i=$i;
}
public function run(){
while(true){
echo $this->i."\n";
sleep(1);
}
}
}
for($i=0;$i<50;$i++){
$workers[$i]=new workerThread($i);
$workers[$i]->start();
}
?>
로그인 후 복사

3. Worker 및 Stackable

Stackable은 Worker 스레드에 의해 실행되는 작업입니다. . 실행 전, 실행 후, 실행 중에 Stackable 객체를 동기화하고 읽고 쓸 수 있습니다. 뮤텍스 잠금이 사용됩니까? 여러 스레드를 제어해야 하고 동시에 하나의 스레드만 작동할 수 있는 경우에 사용할 수 있습니다. 뮤텍스 유무에 따른 차이를 설명하는 간단한 카운터 프로그램

<?php
class SQLQuery extends Stackable {
public function __construct($sql) {
$this->sql = $sql;
}
public function run() {
$dbh = $this->worker->getConnection();
$row = $dbh->query($this->sql);
while($member = $row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
class ExampleWorker extends Worker {
public static $dbh;
public function __construct($name) {
}
public function run(){
self::$dbh = new PDO(&#39;mysql:host=10.0.0.30;dbname=testdb&#39;,&#39;root&#39;,&#39;123456&#39;);
}
public function getConnection(){
return self::$dbh;
}
}
$worker = new ExampleWorker("My Worker Thread");
$sql1 = new SQLQuery(&#39;select * from test order by id desc limit 1,5&#39;);
$worker->stack($sql1);
$sql2 = new SQLQuery(&#39;select * from test order by id desc limit 5,5&#39;);
$worker->stack($sql2);
$worker->start();
$worker->shutdown();
?>
로그인 후 복사

멀티 스레딩 및 공유 메모리

In 공유 메모리의 예에서는 잠금을 사용하지 않고 여전히 정상적으로 작동할 수 있습니다. 어쩌면 작업 메모리 작업 자체에 잠금 기능이 있을 수도 있습니다

<?php
$counter = 0;
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);
class CounterThread extends Thread {
public function __construct($mutex = null){
$this->mutex = $mutex;
$this->handle = fopen("/tmp/counter.txt", "w+");
}
public function __destruct(){
fclose($this->handle);
}
public function run() {
if($this->mutex)
$locked=Mutex::lock($this->mutex);
$counter = intval(fgets($this->handle));
$counter++;
rewind($this->handle);
fputs($this->handle, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
if($this->mutex)
Mutex::unlock($this->mutex);
}
}
//没有互斥锁
for ($i=0;$i<50;$i++){
$threads[$i] = new CounterThread();
$threads[$i]->start();
}
//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
$threads[$i] = new CounterThread($mutex);
$threads[$i]->start();
}
Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
$threads[$i]->join();
}
Mutex::destroy($mutex);
?>
로그인 후 복사

5 . 스레드 동기화

일부 시나리오에서는 thread->start()가 프로그램 실행을 시작하는 것을 원하지 않지만 스레드가 명령을 기다리기를 원합니다. thread->wait();의 테스트 기능은 thread->start() 직후에 스레드가 실행되지 않는다는 것입니다.

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);
$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );
class CounterThread extends Thread {
public function __construct($shmid){
$this->shmid = $shmid;
}
public function run() {
$counter = shm_get_var( $this->shmid, 1 );
$counter++;
shm_put_var( $this->shmid, 1, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
}
}
for ($i=0;$i<100;$i++){
$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
$threads[$i]->start();
}
for ($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>
로그인 후 복사

6. Thread Pool

Pool 클래스

<?php
$tmp = tempnam(__FILE__, &#39;PHP&#39;);
$key = ftok($tmp, &#39;a&#39;);
$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );
class CounterThread extends Thread {
public function __construct($shmid){
$this->shmid = $shmid;
}
public function run() {
$this->synchronized(function($thread){
$thread->wait();
}, $this);
$counter = shm_get_var( $this->shmid, 1 );
$counter++;
shm_put_var( $this->shmid, 1, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
}
}
for ($i=0;$i<100;$i++){
$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
$threads[$i]->start();
}
for ($i=0;$i<100;$i++){
$threads[$i]->synchronized(function($thread){
$thread->notify();
}, $threads[$i]);
}
for ($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>
로그인 후 복사

Dynamic Queue Thread Pool

위의 예는 스레드 풀이 가득 찼을 때 start를 실행하는 것입니다. 다음은 스레드에 여유 공간이 생기는 즉시 새로운 스레드를 생성하는 것입니다. 수영장.

<?php
class Update extends Thread {
public $running = false;
public $row = array();
public function __construct($row) {
$this->row = $row;
$this->sql = null;
}
public function run() {
if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
}else{
$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
file_put_contents("bankno_error.log", $error, FILE_APPEND);
}
if( strlen($bankno) > 7 ){
$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
$this->sql = $sql;
}
printf("%s\n",$this->sql);
}
}
class Pool {
public $pool = array();
public function __construct($count) {
$this->count = $count;
}
public function push($row){
if(count($this->pool) < $this->count){
$this->pool[] = new Update($row);
return true;
}else{
return false;
}
}
public function start(){
foreach ( $this->pool as $id => $worker){
$this->pool[$id]->start();
}
}
public function join(){
foreach ( $this->pool as $id => $worker){
$this->pool[$id]->join();
}
}
public function clean(){
foreach ( $this->pool as $id => $worker){
if(! $worker->isRunning()){
unset($this->pool[$id]);
}
}
}
}
try {
$dbh = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select id,bankno from members order by id desc";
$row = $dbh->query($sql);
$pool = new Pool(5);
while($member = $row->fetch(PDO::FETCH_ASSOC))
{
while(true){
if($pool->push($member)){ //压入任务到池中
break;
}else{ //如果池已经满,就开始启动线程
$pool->start();
$pool->join();
$pool->clean();
}
}
}
$pool->start();
$pool->join();
$dbh = null;
} catch (Exception $e) {
echo &#39;[&#39; , date(&#39;H:i:s&#39;) , &#39;]&#39;, &#39;系统错误&#39;, $e->getMessage(), "\n";
}
?>
로그인 후 복사

pthreads 풀 클래스

<?php
class Update extends Thread {
public $running = false;
public $row = array();
public function __construct($row) {
$this->row = $row;
$this->sql = null;
//print_r($this->row);
}
public function run() {
if(strlen($this->row[&#39;bankno&#39;]) > 100 ){
$bankno = safenet_decrypt($this->row[&#39;bankno&#39;]);
}else{
$error = sprintf("%s, %s\r\n",$this->row[&#39;id&#39;], $this->row[&#39;bankno&#39;]);
file_put_contents("bankno_error.log", $error, FILE_APPEND);
}
if( strlen($bankno) > 7 ){
$sql = sprintf("update members set bankno = &#39;%s&#39; where id = &#39;%s&#39;;", $bankno, $this->row[&#39;id&#39;]);
$this->sql = $sql;
}
printf("%s\n",$this->sql);
}
}
try {
$dbh = new PDO("mysql:host=" . str_replace(&#39;:&#39;, &#39;;port=&#39;, $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF8\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select id,bankno from members order by id desc limit 50";
$row = $dbh->query($sql);
$pool = array();
while($member = $row->fetch(PDO::FETCH_ASSOC))
{
$id = $member[&#39;id&#39;];
while (true){
if(count($pool) < 5){
$pool[$id] = new Update($member);
$pool[$id]->start();
break;
}else{
foreach ( $pool as $name => $worker){
if(! $worker->isRunning()){
unset($pool[$name]);
}
}
}
}
}
$dbh = null;
} catch (Exception $e) {
echo &#39;【&#39; , date(&#39;H:i:s&#39;) , &#39;】&#39;, &#39;【系统错误】&#39;, $e->getMessage(), "\n";
}
?>
로그인 후 복사

7. 멀티스레드 files Safe Read and Write

LOCK_SH 공유 잠금 획득(읽기 프로그램)

<?php
class WebWorker extends Worker {
public function __construct(SafeLog $logger) {
$this->logger = $logger;
}
protected $loger;
}
class WebWork extends Stackable {
public function isComplete() {
return $this->complete;
}
public function run() {
$this->worker
->logger
->log("%s executing in Thread #%lu",
__CLASS__, $this->worker->getThreadId());
$this->complete = true;
}
protected $complete;
}
class SafeLog extends Stackable {
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf(
"{$message}\n", $args);
}
}
}
$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();
$pool->collect(function($work){
return $work->isComplete();
});
var_dump($pool);
로그인 후 복사
LOCK_EX 배타적 잠금 획득(쓰기 프로그램)

LOCK_UN 잠금 해제(공유 또는 배타적 여부) )

LOCK_NB 잠금 시 Flock()이 차단되는 것을 원하지 않는 경우


8. 멀티스레딩 및 데이터 연결

pthread와 pdo를 동시에 사용하는 경우 public static $dbh;를 정적으로 선언하고 싱글톤 모드를 통해 데이터베이스 연결에 액세스해야 합니다.

<?php
$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) { // 进行排它型锁定
ftruncate($fp, 0); // truncate file
fwrite($fp, "Write something here\n");
fflush($fp); // flush output before releasing the lock
flock($fp, LOCK_UN); // 释放锁定
} else {
echo "Couldn&#39;t get the lock!";
}
fclose($fp);
$fp = fopen(&#39;/tmp/lock.txt&#39;, &#39;r+&#39;);
if(!flock($fp, LOCK_EX | LOCK_NB)) {
echo &#39;Unable to obtain lock&#39;;
exit(-1);
}
fclose($fp);
?>
로그인 후 복사

Worker 및 PDO<. 🎜>


Pool 및 PDO


스레드 풀에서 데이터베이스 연결

<?php
class Work extends Stackable {
public function __construct() {
}
public function run() {
$dbh = $this->worker->getConnection();
$sql = "select id,name from members order by id desc limit ";
$row = $dbh->query($sql);
while($member = $row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
class ExampleWorker extends Worker {
public static $dbh;
public function __construct($name) {
}
/*
* The run method should just prepare the environment for the work that is coming ...
*/
public function run(){
self::$dbh = new PDO(&#39;mysql:host=...;dbname=example&#39;,&#39;www&#39;,&#39;&#39;);
}
public function getConnection(){
return self::$dbh;
}
}
$worker = new ExampleWorker("My Worker Thread");
$work=new Work();
$worker->stack($work);
$worker->start();
$worker->shutdown();
?>
로그인 후 복사

위 프로그램을 더욱 개선하기 위해 싱글톤 모드 $this->worker->getInstance()를 사용하여 전역적으로 데이터베이스 연결만 설정합니다. 스레드는 공유 데이터베이스 연결을 사용합니다


# cat pool.php
<?php
class ExampleWorker extends Worker {
public function __construct(Logging $logger) {
$this->logger = $logger;
}
protected $logger;
}
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
public function __construct($number) {
$this->number = $number;
}
public function run() {
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example_real&#39;;
$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
$sql = "select OPEN_TIME, `COMMENT` from MT_TRADES where LOGIN=&#39;".$this->number[&#39;name&#39;]."&#39; and CMD=&#39;&#39; and `COMMENT` = &#39;".$this->number[&#39;order&#39;].":DEPOSIT&#39;";
#echo $sql;
$row = $dbh->query($sql);
$mt_trades = $row->fetch(PDO::FETCH_ASSOC);
if($mt_trades){
$row = null;
$sql = "UPDATE db_example.accounts SET paystatus=&#39;成功&#39;, deposit_time=&#39;".$mt_trades[&#39;OPEN_TIME&#39;]."&#39; where `order` = &#39;".$this->number[&#39;order&#39;]."&#39;;";
$dbh->query($sql);
#printf("%s\n",$sql);
}
$dbh = null;
printf("runtime: %s, %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->worker->getThreadId() ,$this->number[&#39;order&#39;]);
}
}
class Logging extends Stackable {
protected static $dbh;
public function __construct() {
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example_real&#39;; // 数据库名
self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
}
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf("{$message}\n", $args);
}
}
protected function getConnection(){
return self::$dbh;
}
}
$pool = new Pool(, \ExampleWorker::class, [new Logging()]);
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;db_example&#39;;
$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";
$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
{
$pool->submit(new Work($account));
}
$pool->shutdown();
?> 
로그인 후 복사

멀티 스레드에서 운영되는 데이터베이스 요약

일반적으로 pthreads는 여전히 개발 중이며 여전히 몇 가지 단점이 있습니다. pthreads의 git이 이 프로젝트를 지속적으로 개선하고 있음을 알 수 있습니다

<?php
class ExampleWorker extends Worker {
#public function __construct(Logging $logger) {
# $this->logger = $logger;
#}
#protected $logger;
protected static $dbh;
public function __construct() {
}
public function run(){
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example&#39;; // 数据库名
self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
}
protected function getInstance(){
return self::$dbh;
}
}
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
public function __construct($data) {
$this->data = $data;
#print_r($data);
}
public function run() {
#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );
try {
$dbh = $this->worker->getInstance();
#print_r($dbh);
$id = $this->data[&#39;id&#39;];
$mobile = safenet_decrypt($this->data[&#39;mobile&#39;]);
#printf("%d, %s \n", $id, $mobile);
if(strlen($mobile) > ){
$mobile = substr($mobile, -);
}
if($mobile == &#39;null&#39;){
# $sql = "UPDATE members_digest SET mobile = &#39;".$mobile."&#39; where id = &#39;".$id."&#39;";
# printf("%s\n",$sql);
# $dbh->query($sql);
$mobile = &#39;&#39;;
$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
}else{
$sql = "UPDATE members_digest SET mobile = md(:mobile) where id = :id";
}
$sth = $dbh->prepare($sql);
$sth->bindValue(&#39;:mobile&#39;, $mobile);
$sth->bindValue(&#39;:id&#39;, $id);
$sth->execute();
#echo $sth->debugDumpParams();
}
catch(PDOException $e) {
$error = sprintf("%s,%s\n", $mobile, $id );
file_put_contents("mobile_error.log", $error, FILE_APPEND);
}
#$dbh = null;
printf("runtime: %s, %s, %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->worker->getThreadId() ,$mobile, $id);
#printf("runtime: %s, %s\n", date(&#39;Y-m-d H:i:s&#39;), $this->number);
}
}
$pool = new Pool(, \ExampleWorker::class, []);
#foreach (range(, ) as $number) {
# $pool->submit(new Work($number));
#}
$dbhost = &#39;db.example.com&#39;; // 数据库服务器
$dbuser = &#39;example.com&#39;; // 数据库用户名
$dbpw = &#39;password&#39;; // 数据库密码
$dbname = &#39;example&#39;;
$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => &#39;SET NAMES \&#39;UTF\&#39;&#39;,
PDO::MYSQL_ATTR_COMPRESS => true
)
);
#print_r($dbh);
#$sql = "select id, mobile from members where id < :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(&#39;:id&#39;,);
#$sth->execute();
#$result = $sth->fetchAll();
#print_r($result);
#
#$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(&#39;:mobile&#39;, &#39;aa&#39;);
#$sth->bindValue(&#39;:id&#39;,&#39;&#39;);
#echo $sth->execute();
#echo $sth->queryString;
#echo $sth->debugDumpParams();
$sql = "select id, mobile from members order by id asc"; // limit ";
$row = $dbh->query($sql);
while($members = $row->fetch(PDO::FETCH_ASSOC))
{
#$order = $account[&#39;order&#39;];
#printf("%s\n",$order);
//print_r($members);
$pool->submit(new Work($members));
#unset($account[&#39;order&#39;]);
}
$pool->shutdown();
?>
로그인 후 복사
지속적인 데이터베이스 연결이 매우 중요합니다. 그렇지 않으면 각 스레드가 데이터베이스 연결을 열고 닫습니다. , 이로 인해 많은 링크 시간 초과가 발생합니다

<.>


여기서는 PHP pthreads 멀티스레딩 설치 및 사용에 대한 관련 지식을 소개하겠습니다. 앞으로도 계속 업데이트할 예정이며, PHP pthreads 멀티스레딩 설치 및 사용과 관련된 더 많은 기사를 보려면 PHP 중국어 웹사이트


를 참고하세요.
관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
최신 이슈
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿