利用Zend Job Queue进行任务调度
核心要点
大多数Web应用程序都遵循同步通信模型。但是,非交互式和长时间运行的任务(例如报告生成)更适合异步执行。一种将任务卸载到稍后时间甚至在不同服务器上运行的方法是使用Zend Server 5中提供的Job Queue模块(但社区版中不包含)。Job Queue允许基于时间、优先级甚至依赖关系进行作业调度。作业可以被延迟或定期执行,并且——最重要的是——可以并行运行!最重要的是,Zend Server本身提供了一个管理GUI来跟踪作业的执行,包括其状态、执行时间和输出。Job Queue模块的主要优势在于其并行执行任务的能力。与cron作业不同,Job Queue允许:
Job Queue可用于异步任务的一些示例包括:
Job Queue 使用方法
Job Queue的API可通过ZendJobQueue类使用。要执行大多数任务,您将通过实例化ZendJobQueue对象并使用createHttpJob()方法创建作业来连接到Job Queue服务器。
<?php $queue = new ZendJobQueue(); $queue->createHttpJob("http://example.com/jobs/somejob.php");
传递路径到createHttpJob()而不是完整的URL将创建一个作业,其主机名为$_SERVER["HTTP_HOST"]的值。注意$_SERVER["HTTP_HOST"]不可用的情况,例如从cron脚本调度作业时。
<?php $queue = new ZendJobQueue(); $queue->createHttpJob("http://example.com/jobs/somejob.php");
作业参数可以作为查询字符串的一部分传递,也可以作为createHttpJob()的第二个参数以数组形式传递。如果参数作为第二个参数传递,则该数组必须与JSON兼容。要在作业代码中访问参数,可以使用getCurrentJobParams()静态方法。
<?php // 这两个调用是等效的 $queue->createHttpJob("/jobs/somejob.php"); $queue->createHttpJob("http://" . $_SERVER["HTTP_HOST"] . "/jobs/somejob.php");
其他作业选项可通过createHttpJob()的第三个参数使用。它是一个关联数组,包含以下键:
例如,创建延迟作业或重复作业如下所示:
<?php $params = ZendJobQueue::getCurrentJobParams();
失败(和成功)可以如下方式处理:
<?php $params = array("p1" => 10, "p2" => "somevalue"); // 一小时后处理 $options = array("schedule_time" => date("Y-m-d H:i:s", strtotime("+1 hour"))); $queue->createHttpJob("http://example.com/jobs/somejob.php", $params, $options); // 每隔一天凌晨1:05处理 $options = array("schedule" => "5 1 */2 * *"); $queue->createHttpJob("http://example.com/jobs/somejob.php", $params, $options);
扩展示例
假设您的Web应用程序必须根据用户的请求生成和发送一组报告。通常,由于PHP不支持多处理并且使用同步通信模型,用户必须等待所有请求的报告逐个生成并发送电子邮件。在这种情况下使用Job Queue不仅允许用户执行应用程序的其他操作(因为工作将异步完成),而且应用程序可以同时处理多个报告(因为作业可以并行执行)——因此大多数报告(如果不是全部)将在大约相同的时间完成。
<?php try { doSomething(); ZendJobQueue::setCurrentJobStatus(ZendJobQueue::OK); } catch (Exception $e) { ZendJobQueue::setCurrentJobStatus(ZendJobQueue::STATUS_LOGICALLY_FAILED, $e->getMessage()); }
scheduleReport()函数返回与每个已调度报告关联的作业标识符列表。在此函数中,ZendJobQueue类的isJobQueueDaemonRunning()方法验证相应的服务是否正在运行以及是否可以调度作业。根据报告的优先级,作业可以立即调度运行或两分钟后运行(如果同时请求许多报告,则为了减少服务器的负载)。调度作业后,其ID将保存到所有成功创建的作业列表中。了解作业ID对于能够监控作业甚至取消作业非常重要。以下是对scheduleReport()函数的调用的样子:
<?php function scheduleReport($reportList, $recipient) { // 已调度作业列表 $jobList = array(); $queue = new ZendJobQueue(); // 检查Job Queue是否正在运行 if ($queue->isJobQueueDaemonRunning() && count($reportList) > 0) { foreach ($reportList as $report) { $params = array("type" => $report["type"], "start" => $report["start"], "length" => $report["length"], "recipient" => $recipient); $options = array("priority" => $report["priority"]); // 除非优先级为紧急,否则在两分钟内执行作业 if ($report["priority"] != ZendJobQueue::PRIORITY_URGENT) { $options["schedule_time"] = date("Y-m-d H:i:s", strtotime("+2 minutes")); } $jobID = $queue->createHttpJob("http://example.com/jobs/report.php", $params, $options); // 将作业ID添加到已成功调度作业的列表中 if ($jobID !== false) { $jobList[] = $jobID; } } } return $jobList; }
如前所述,也可以取消已调度的作业。但是,一旦作业正在进行中,它将完成。因此,如果请求的优先级不是紧急的,则用户有两分钟的时间来取消已调度报告的交付。
<?php // 设置每日销售报告和每月财务报告的请求 $reportList = array( array("type" => "sales", "start" => "2011-12-09 00:00:00", "length" => 1, "priority" => ZendJobQueue::PRIORITY_URGENT), array("type" => "finance", "start" => "2011-11-01 00:00:00", "length" => 30, "priority" => ZendJobQueue::PRIORITY_NORMAL)); // 调度报告 $jobList = scheduleReport($reportList, "user@example.com"); // 验证报告是否已调度 if (empty($jobList)) { // 显示错误消息 }
cancelReport()函数只是从尚未开始运行的已调度报告队列中删除作业。然后,作业脚本如下所示:
<?php function cancelReport($jobID) { $queue = new ZendJobQueue(); return $queue->removeJob($jobID); } if ($jobID !== false && cancelReport($jobID)) { // 作业已成功从队列中删除 }
runReport()函数最终根据提供的参数准备和发送报告。完成后,作业状态将设置为成功(如果发生错误,则为逻辑失败)。
替代方案
当然,Job Queue也有替代方案。cron、pcntl_fork甚至通过PHP/Java Bridge的基于Java的解决方案可能值得研究,这取决于您的需求。还存在更有趣的工具,例如Gearman、node.js和RabbitMQ。
总结
虽然Zend Server的Job Queue并不是处理PHP中队列和并行处理的唯一方法,但它是一个极其简单的解决方案,由“The PHP Company”支持,并且非常易于使用。随着Zend的PHPCloud越来越成功,Job Queue的采用应该会更加广泛。如果您想查看本文中示例代码的完整内容,您可以在GitHub上找到它。图片来自Varina和Jay Patel/Shutterstock
关于Zend Queue的常见问题解答(FAQ)
Zend Queue是Zend Framework的一个组件,它为各种排队系统提供了一个简单的API。它允许开发人员异步创建、管理和处理数据或任务队列。这意味着任务可以在后台执行,从而提高Web应用程序的性能和用户体验。它支持多个后端,例如Array、SQLite等。
Zend Queue通过允许异步处理任务来提高Web应用程序的性能。这意味着任务可以在后台执行,而不会阻塞主执行线程。这可以显著提高Web应用程序的响应速度,因为用户不必等待任务完成才能继续与应用程序交互。
要在Zend Queue中创建一个新队列,您可以使用createQueue方法。此方法需要两个参数:队列的名称和超时时间。超时参数是可选的,默认为null。这是一个示例:
<?php $queue = new ZendJobQueue(); $queue->createHttpJob("http://example.com/jobs/somejob.php");
要在Zend Queue中向队列添加消息,您可以使用send方法。此方法需要一个参数:要添加到队列的消息。这是一个示例:
<?php // 这两个调用是等效的 $queue->createHttpJob("/jobs/somejob.php"); $queue->createHttpJob("http://" . $_SERVER["HTTP_HOST"] . "/jobs/somejob.php");
要在Zend Queue中处理来自队列的消息,您可以使用receive方法。此方法从队列中检索一组消息进行处理。这是一个示例:
<?php $params = ZendJobQueue::getCurrentJobParams();
要在Zend Queue中删除队列,您可以使用deleteQueue方法。此方法需要一个参数:要删除的队列的名称。这是一个示例:
<?php $params = array("p1" => 10, "p2" => "somevalue"); // 一小时后处理 $options = array("schedule_time" => date("Y-m-d H:i:s", strtotime("+1 hour"))); $queue->createHttpJob("http://example.com/jobs/somejob.php", $params, $options); // 每隔一天凌晨1:05处理 $options = array("schedule" => "5 1 */2 * *"); $queue->createHttpJob("http://example.com/jobs/somejob.php", $params, $options);
要在Zend Queue中检查队列是否存在,您可以使用isExists方法。此方法需要一个参数:要检查的队列的名称。这是一个示例:
<?php try { doSomething(); ZendJobQueue::setCurrentJobStatus(ZendJobQueue::OK); } catch (Exception $e) { ZendJobQueue::setCurrentJobStatus(ZendJobQueue::STATUS_LOGICALLY_FAILED, $e->getMessage()); }
要在Zend Queue中计算队列中的消息数量,您可以使用count方法。此方法返回队列中的消息数量。这是一个示例:
<?php $queue = new ZendJobQueue(); $queue->createHttpJob("http://example.com/jobs/somejob.php");
要在Zend Queue中清除队列中的所有消息,您可以使用purge方法。此方法将删除队列中的所有消息。这是一个示例:
<?php // 这两个调用是等效的 $queue->createHttpJob("/jobs/somejob.php"); $queue->createHttpJob("http://" . $_SERVER["HTTP_HOST"] . "/jobs/somejob.php");
要在Zend Queue中设置队列的超时时间,您可以使用setTimeout方法。此方法需要两个参数:队列的名称和超时时间(以秒为单位)。这是一个示例:
<?php $params = ZendJobQueue::getCurrentJobParams();
请注意,以上代码示例基于Zend_Queue,而不是文中提到的Zend Job Queue。 Zend Job Queue的API可能略有不同,需要参考Zend Server的官方文档。
以上是与Zend作业队列安排的详细内容。更多信息请关注PHP中文网其他相关文章!