Heim > php教程 > php手册 > PHP解决批处理问题

PHP解决批处理问题

WBOY
Freigeben: 2016-06-06 19:57:43
Original
1072 Leute haben es durchsucht

个人原创网址:http://www.phpthinking.com/archives/478 如何创建长时间运行的作业 如果 Web 应用程序中的一个特性需要超过 1 秒或 2 秒才能完成,那么应该怎么办?需要某种离线处理解决方案。学习几种对 PHP 应用程序中长时间运行的作业进行离线服务的方法

个人原创网址:http://www.phpthinking.com/archives/478

如何创建长时间运行的作业

如果 Web 应用程序中的一个特性需要超过 1 秒或 2 秒才能完成,那么应该怎么办?需要某种离线处理解决方案。学习几种对 PHP 应用程序中长时间运行的作业进行离线服务的方法。

大型的连锁店有一个大问题。每天,在每家商店会发生数千次交易。公司执行官希望对这些数据进行挖掘。哪些产品卖得好?哪些不好?有机产品在哪里卖得好?冰淇淋的销售情况怎么样?

为了捕捉这些数据,组织必须将所有事务性数据装载进一个数据模型,以便更适合生成公司所需的报告类型。但是,这很花费时间,而且随着连锁规模的增长,处理一天的数据可能要花费一天以上的时间。因此,这是个大问题。

现在,您的 Web 应用程序可能不需要处理这么多数据,但是任何站点的处理时间都有可能超过客户愿意等待的时间。一般来说,客户愿意等待的时间是 200 毫秒,如果超过这个时间,客户就会觉得过程 “缓慢”。这个数字基于桌面应用程序,而 Web 使我们更有耐心了。但无论如何,不应该让客户等待的时间超过几秒。所以,要采用一些策略来处理 PHP 中的批处理作业。

分散的方式与 cron

在 UNIX? 机器上,执行批处理的核心程序是 cron 守护进程。这个守护进程读取一个配置文件,这个文件会告诉它要运行哪些命令行以及运行的频率。然后,这个守护进程就按照配置执行它们。在遇到错误时,它甚至能够向指定的电子邮件地址发送错误输出,从而帮助对问题进行调试。

我知道一些工程师强烈主张使用线程技术。“线程!线程才是进行后台处理的真正方法。cron 守护进程太过时了。”

我不这么认为。

这两种方法我都用过,我认为 cron 具备 “Keep It Simple, Stupid(KISS,简单就是美)” 原则的优点。它使后台处理保持简单。不需要编写一直运行的多线程的作业处理应用程序(因此不会有内存泄漏),而是由 cron 启动一个简单的批处理脚本。这个脚本判断是否有作业要处理,执行作业,然后退出。不需要担心内存泄漏。也不需要担心线程停止或陷入无限循环。

那么,cron 是如何工作的?这依赖于您所处的系统环境。我只讨论老式简单的 cron 的 UNIX 命令行版本,您可以向系统管理员咨询如何在自己的 Web 应用程序中实现它。

下面是一个简单的 cron 配置,它在每天晚上 11 点运行一个 PHP 脚本:

0 23 * * * jack /usr/bin/php /users/home/jack/myscript.php
Nach dem Login kopieren

前 5 个字段定义应该启动脚本的时间。然后是应该用来运行这个脚本的用户名。其余的命令是要执行的命令行。时间字段分别是分、小时、月中的日、月和周中的日。下面是几个示例。

命令:

15 * * * * jack /usr/bin/php /users/home/jack/myscript.php
Nach dem Login kopieren

在每个小时的第 15 分钟运行脚本。

命令:

15,45 * * * * jack /usr/bin/php /users/home/jack/myscript.php
Nach dem Login kopieren

在每个小时的第 15 和第 45 分钟运行脚本。

命令:

*/1 3-23 * * * jack /usr/bin/php /users/home/jack/myscript.php
Nach dem Login kopieren

在早上 3 点到晚上 11 点之间的每分钟运行脚本。

命令

30 23 * * 6 jack /usr/bin/php /users/home/jack/myscript.php
Nach dem Login kopieren

在每星期六的晚上 11:30 运行脚本(星期六由 6 指定)。

可以看到,组合的数量是无限的。可以根据需要控制运行脚本的时间。还可以指定多个要运行的脚本,这样的话,一些脚本可以每分钟都运行,而其他脚本(比如备份脚本)可以每天只运行一次。

为了指定将报告的错误发送到哪个电子邮件地址,可以使用 MAILTO 指令,如下所示:

MAILTO=jherr@pobox.com
Nach dem Login kopieren

注意:对于 Microsoft? Windows? 用户,有一个等效的 Scheduled Tasks 系统可以用来定期启动命令行进程(比如 PHP 脚本)。

批处理体系结构的基础知识

批处理是相当简单的。在大多数情况下,采用两个工作流之一。第一个工作流用于进行报告;脚本每天运行一次,它生成报告并将报告发送给一组用户。第二个工作流是在响应某种请求时创建的批作业。例如,我登录进 Web 应用程序中,并要求它向系统中注册的所有用户发送一个消息,将一个新的特性告诉他们。这个操作必须进行批处理,因为系统中有 10,000 个用户。PHP 要花费一段时间才能完成这样的任务,所以它必须由浏览器之外的一个作业来执行。

在第二个工作流中,Web 应用程序只需将信息放在某个位置,让批处理应用程序共享它。这些信息指定作业的性质(例如,“Send this e-mail to all the people on the system”。)批处理程序运行这个作业,然后删除作业。另一种方法是,处理程序将作业标为已完成。无论用哪种方法,作业都应该识别为已完成,这样就不会再次运行它。

本文的其余部分演示在 Web 应用程序前端和批处理后端之间共享数据的各种方法。

邮件队列

第一种方法是使用专用的邮件队列系统。在这种模型中,数据库中的一个表包含应该发送给各个用户的电子邮件消息。Web 界面使用mailouts 类将电子邮件添加到队列中。电子邮件处理程序使用 mailouts 类检索未处理的电子邮件,然后再次使用它从队列中删除未处理的电子邮件。

这个模型首先需要 MySQL 模式。

清单 1. mailout.sql

DROP TABLE IF EXISTS mailouts;
CREATE TABLE mailouts (
  id MEDIUMINT NOT NULL AUTO_INCREMENT,
  from_address TEXT NOT NULL,
  to_address TEXT NOT NULL,
  subject TEXT NOT NULL,
  content TEXT NOT NULL,
  PRIMARY KEY ( id )
);
Nach dem Login kopieren

这个模式非常简单。每行中有一个 from 和一个 to 地址,以及电子邮件的主题和内容。

对数据库中的 mailouts 表进行处理的是 PHP mailouts 类。

清单 2. mailouts.php

<?php require_once('DB.php');

class Mailouts
{
  public static function get_db()
  {
    $dsn = 'mysql://root:@localhost/mailout';
    $db =& DB::Connect( $dsn, array() );
    if (PEAR::isError($db)) { die($db->getMessage()); }
    return $db;
  }
  public static function delete( $id )
  {
    $db = Mailouts::get_db();
    $sth = $db->prepare( 'DELETE FROM mailouts WHERE id=?' );
    $db->execute( $sth, $id );
    return true;
  }
  public static function add( $from, $to, $subject, $content )
  {
    $db = Mailouts::get_db();
    $sth = $db->prepare( 'INSERT INTO mailouts VALUES (null,?,?,?,?)' );
    $db->execute( $sth, array( $from, $to, $subject, $content ) );
    return true;
  }
  public static function get_all()
  {
    $db = Mailouts::get_db();
    $res = $db->query( "SELECT * FROM mailouts" );
    $rows = array();
    while( $res->fetchInto( $row ) ) { $rows []= $row; }
    return $rows;
  }
}
?>
Nach dem Login kopieren

这个脚本包含 Pear::DB 数据库访问类。然后定义 mailouts 类,其中包含三个主要的静态函数:adddelete 和 get_alladd() 方法向队列中添加一个电子邮件,这个方法由前端使用。get_all() 方法从表中返回所有数据。delete() 方法删除一个电子邮件。

您可能会问,我为什么不只在脚本末尾调用 delete_all() 方法。不这么做有两个原因:如果在发送每个消息之后删除它,那么即使脚本在出现问题之后重新运行,消息也不可能发送两次;在批作业的启动和完成之间可能会添加新的消息。

下一步是编写一个简单的测试脚本,这个脚本将一个条目添加到队列中。

清单 3. mailout_test_add.php

<?php require 'mailout.php';

Mailouts::add( 'donotreply@mydomain.com',
  'molly@nocompany.com.org',
  'Test Subject',
  'This is a test of the batch mail sendout' );
?>
Nach dem Login kopieren

在这个示例中,我添加一个 mailout,这个消息要发送给某公司的 Molly,其中包括主题 “Test Subject” 和电子邮件主体。可以在命令行上运行这个脚本:php mailout_test_add.php

为了发送电子邮件,需要另一个脚本,这个脚本作为作业处理程序。

清单 4. mailout_send.php

<?php require_once 'mailout.php';

function process( $from, $to, $subject, $email ) {
  mail( $to, $subject, $email, "From: $from" );
}

$messages = Mailouts::get_all();
foreach( $messages as $msg ) {
  process( $msg[1], $msg[2], $msg[3], $msg[4] );
  Mailouts::delete( $msg[0] );
}
?>
Nach dem Login kopieren

这个脚本使用 get_all() 方法检索所有电子邮件消息,然后使用 PHP 的 mail() 方法逐一发送消息。在每次成功发送电子邮件之后,调用delete() 方法从队列中删除对应的记录。

使用 cron 守护进程定期运行这个脚本。运行这个脚本的频率取决于您的应用程序的需要。

注意:PHP Extension and Application Repository(PEAR)存储库包含一个出色的 邮件队列系统 实现,可以免费下载。

更通用的方法

专门用来发送电子邮件的解决方案是很不错,但是是否有更通用的方法?我们需要能够发送电子邮件、生成报告或者执行其他耗费时间的处理,而不必在浏览器中等待处理完成。

为此,可以利用一个事实:PHP 是一种解释型语言。可以将 PHP 代码存储在数据库中的队列中,以后再执行它。这需要两个表,见清单 5。

清单 5. generic.sql

DROP TABLE IF EXISTS processing_items;
CREATE TABLE processing_items (
  id MEDIUMINT NOT NULL AUTO_INCREMENT,
  function TEXT NOT NULL,
  PRIMARY KEY ( id )
);

DROP TABLE IF EXISTS processing_args;
CREATE TABLE processing_args (
  id MEDIUMINT NOT NULL AUTO_INCREMENT,
  item_id MEDIUMINT NOT NULL,
  key_name TEXT NOT NULL,
  value TEXT NOT NULL,
  PRIMARY KEY ( id )
);
Nach dem Login kopieren

第一个表 processing_items 包含作业处理程序调用的函数。第二个表 processing_args 包含要发送给函数的参数,采用的形式是由键/值对组成的 hash 表。

与 mailouts 表一样,这两个表也由 PHP 类包装,这个类称为 ProcessingItems

清单 6. generic.php

<?php require_once('DB.php');

class ProcessingItems
{
  public static function get_db() { ... }
  public static function delete( $id )
  {
    $db = ProcessingItems::get_db();
    $sth = $db->prepare( 'DELETE FROM processing_args WHERE item_id=?' );
    $db->execute( $sth, $id );
    $sth = $db->prepare( 'DELETE FROM processing_items WHERE id=?' );
    $db->execute( $sth, $id );
    return true;
  }
  public static function add( $function, $args )
  {
    $db = ProcessingItems::get_db();

    $sth = $db->prepare( 'INSERT INTO processing_items VALUES (null,?)' );
    $db->execute( $sth, array( $function ) );

    $res = $db->query( "SELECT last_insert_id()" );
    $id = null;
    while( $res->fetchInto( $row ) ) { $id = $row[0]; }

    foreach( $args as $key => $value )
    {
        $sth = $db->prepare( 'INSERT INTO processing_args
  VALUES (null,?,?,?)' );
        $db->execute( $sth, array( $id, $key, $value ) );
    }

    return true;
  }
  public static function get_all()
  {
    $db = ProcessingItems::get_db();

    $res = $db->query( "SELECT * FROM processing_items" );
    $rows = array();
    while( $res->fetchInto( $row ) )
    {
        $item = array();
        $item['id'] = $row[0];
        $item['function'] = $row[1];
        $item['args'] = array();

        $ares = $db->query( "SELECT key_name, value FROM
   processing_args WHERE item_id=?", $item['id'] );
        while( $ares->fetchInto( $arow ) )
            $item['args'][ $arow[0] ] = $arow[1];

        $rows []= $item;
    }
    return $rows;
  }
}
?>
Nach dem Login kopieren

这个类包含三个重要的方法:add()get_all() 和 delete()。与 mailouts 系统一样,前端使用 add(),处理引擎使用 get_all() 和delete()

清单 7 所示的测试脚本将一个条目添加到处理队列中。

清单 7. generic_test_add.php

<?php require_once 'generic.php';
ProcessingItems::add( 'printvalue', array( 'value' => 'foo' ) );
?>
Nach dem Login kopieren

在这个示例中,添加了一个对 printvalue 函数的调用,并将 value 参数设置为 foo。我使用 PHP 命令行解释器运行这个脚本,并将这个方法调用放进队列中。然后使用以下处理脚本运行这个方法。

清单 8. generic_process.php

<?php require_once 'generic.php';

function printvalue( $args ) {
  echo 'Printing: '.$args['value']."\n";
}

foreach( ProcessingItems::get_all() as $item ) {
  call_user_func_array( $item['function'],
    array( $item['args'] ) );
  ProcessingItems::delete( $item['id'] );
}
?>
Nach dem Login kopieren

这个脚本非常简单。它获得 get_all() 返回的处理条目,然后使用 call_user_func_array(一个 PHP 内部函数)用给定的参数动态地调用这个方法。在这个示例中,调用本地的 printvalue 函数。

为了演示这种功能,我们看看在命令行上发生了什么:

% php generic_test_add.php 
% php generic_process.php 
Printing: foo
%
Nach dem Login kopieren

输出并不多,但是您能够看出要点。通过这种机制,可以将任何 PHP 函数的处理推迟。

现在,如果您不喜欢将 PHP 函数名和参数放进数据库中,那么另一种方法是在 PHP 代码中建立数据库中的 “处理作业类型” 名称和实际 PHP 处理函数之间的映射。按照这种方式,如果以后决定修改 PHP 后端,那么只要 “处理作业类型” 字符串匹配,系统就仍然可以工作。

放弃数据库

最后,我演示另一种稍有不同的解决方案,它使用一个目录中的文件来存储批作业,而不是使用数据库。在这里提供这个思路并不是建议您 “采用这种方式,而不使用数据库”,这只是一种可供选择的方式,是否采用它由您决定。

显然,这个解决方案中没有模式,因为我们不使用数据库。所以先编写一个类,它包含与前面示例中相似的 add()get_all() 和 delete()方法。

清单 9. batch_by_file.php

<?php define( 'BATCH_DIRECTORY', 'batch_items/' );
class BatchFiles
{
  public static function delete( $id )
  {
    unlink( $id );
    return true;
  }
  public static function add( $function, $args )
  {
    $path = '';
    while( true )
    {
        $path = BATCH_DIRECTORY.time();
        if ( file_exists( $path ) == false )
            break;
    }

    $fh = fopen( $path, "w" );
    fprintf( $fh, $function."\n" );
    foreach( $args as $k => $v )
    {
        fprintf( $fh, $k.":".$v."\n" );
    }
    fclose( $fh );

    return true;
  }
  public static function get_all()
  {
    $rows = array();
    if (is_dir(BATCH_DIRECTORY)) {
        if ($dh = opendir(BATCH_DIRECTORY)) {
            while (($file = readdir($dh)) !== false) {
                $path = BATCH_DIRECTORY.$file;
                if ( is_dir( $path ) == false )
                {
                    $item = array();
                    $item['id'] = $path;
                    $fh = fopen( $path, 'r' );
                    if ( $fh )
                    {
                        $item['function'] = trim(fgets( $fh ));
                        $item['args'] = array();
                        while( ( $line = fgets( $fh ) ) != null )
                        {
                            $args = split( ':', trim($line) );
                            $item['args'][$args[0]] = $args[1];
                        }
                        $rows []= $item;
                        fclose( $fh );
                    }
                }
            }
            closedir($dh);
        }
    }
    return $rows;
  }
}
?>
Nach dem Login kopieren

BatchFiles 类有三个主要方法:add()get_all() 和 delete()。这个类不访问数据库,而是读写 batch_items 目录中的文件。

使用以下测试代码添加新的批处理条目。

清单 10. batch_by_file_test_add.php

<?php require_once 'batch_by_file.php';

BatchFiles::add( "printvalue", array( 'value' => 'foo' ) );
?>
Nach dem Login kopieren

有一点需要注意:除了类名(BatchFiles)之外,实际上没有任何迹象能够说明作业是如何存储的。所以,以后很容易将它改为数据库风格的存储方式,而不需要修改接口。

最后是处理程序的代码。

清单 11. batch_by_file_processor.php

<?php require_once 'batch_by_file.php';

function printvalue( $args ) {
  echo 'Printing: '.$args['value']."\n";
}

foreach( BatchFiles::get_all() as $item ) {
  call_user_func_array( $item['function'], array( $item['args'] ) );
  BatchFiles::delete( $item['id'] );
}
?>
Nach dem Login kopieren

这段代码几乎与数据库版本完全相同,只是修改了文件名和类名。

结语

正如前面提到的,服务器对线程提供了许多支持,可以进行后台批处理。在某些情况下,使用辅助线程处理小作业肯定比较容易。但是,也可以使用传统工具(cron、MySQL、标准的面向对象的 PHP 和 Pear::DB)在 PHP 应用程序中创建批作业,这很容易实现、部署和维护


Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Empfehlungen
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage