在 Yii2 高级模板中基于 Yii2 队列扩展实现异步执行任务,附加事件处理器,在(每次成功执行作业后、在作业执行期间发生未捕获的异常时)
1、通过配置附加事件处理器,编辑 \environments\dev\common\config\main-local.php、\environments\prod\common\config\main-local.php,如图1
'copyAssetQueue' => [ // 复制资源文件队列
'class' => 'yii\queue\redis\Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:copy:asset', // 队列键前缀
'ttr' => 10 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common\components\queue\CopyAssetEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common\components\queue\CopyAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii\queue\LogBehavior',
],
'uploadAssetQueue' => [ // 上传资源文件队列
'class' => 'yii\queue\redis\Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:upload:asset', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common\components\queue\UploadAssetEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common\components\queue\UploadAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii\queue\LogBehavior',
],
'pubArticleQueue' => [ // 发布文章队列
'class' => 'yii\queue\redis\Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:pub:article', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common\components\queue\PubArticleEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common\components\queue\PubArticleEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii\queue\LogBehavior',
],
'sourceCallbackQueue' => [ // 来源回调队列
'class' => 'yii\queue\redis\Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:source:callback', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common\components\queue\SourceCallbackEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common\components\queue\SourceCallbackEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii\queue\LogBehavior',
],
2、编辑 \qq\rests\qq_cw_app\IndexAction.php,一个推送队列的入口
$data = [
'channel_id' => 2, // 渠道ID
'channel_code' => 'wx', // 渠道代码,qq:企鹅号;wx:微信公众帐号
'channel_type_id' => 3, // 渠道的类型ID
'channel_type_code' => 'wx', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用
'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体
'task_id' => 2, // 任务ID
];
$assets = [
[
'type' => 'image',
'channel_article_id' => 1,
'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/images/1.png',
],
[
'type' => 'video',
'channel_article_id' => 1,
'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4',
],
];
$assetServiceCopyAssetsAsyncResult = AssetService::copyAssetsAsync($data, $assets);
print_r($assetServiceCopyAssetsAsyncResult);
exit;
3、编辑 \common\services\AssetService.php,复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步)
/**
* 复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步)
* @param array $data 数据
* 格式如下:
* [
* 'channel_id' => 1, // 渠道ID
* 'channel_code' => 'qq', // 渠道代码,qq:企鹅号;wx:微信公众帐号
* 'channel_type_id' => 1, // 渠道的类型ID
* 'channel_type_code' => 'qq_cw', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用
* 'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体
* 'task_id' => 1, // 任务ID
* ]
*
* @param array $assets 来源的资源文件的绝对URL
* 格式如下:
* [
* [
* 'type' => 'image', // 资源文件的类型,image:图片;video:视频
* 'channel_article_id' => 1, // 渠道的文章ID
* 'absolute_url' => 'http://localhost/spider/storage/spider/images/1.png', // 来源的资源文件的绝对URL
* ],
* [
* 'type' => 'video', // 资源文件的类型,image:图片;video:视频
* 'channel_article_id' => 1, // 渠道的文章ID
* 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4', // 来源的资源文件的绝对URL
* ],
* ]
*
* @return array $channelPubApiAssetAbsolutePaths 渠道发布的资源文件的相对路径
* 格式如下:
* [
* [
* 'type' => 'image',
* 'channel_article_id' => 1,
* 'relative_path' => '/2018/09/20/1537439889.2333.1441541478.png',
* ],
* [
* 'type' => 'video',
* 'channel_article_id' => 1,
* 'relative_path' => '/2018/09/20/1537439889.2403.62871268.mp4',
* ],
* ]
*
* @throws Exception execution failed
*/
public static function copyAssetsAsync($data, $assets)
{
$assetData = [];
$time = time();
foreach ($assets as $key => $asset) {
$assetData[] = [
'channel_id' => $data['channel_id'],
'channel_code' => $data['channel_code'],
'channel_type_id' => $data['channel_type_id'],
'channel_type_code' => $data['channel_type_code'],
'source' => $data['source'],
'type' => $asset['type'],
'absolute_url' => $asset['absolute_url'],
'relative_path' => '',
'size' => 0,
'task_id' => $data['task_id'],
'channel_article_id' => $asset['channel_article_id'],
'status' => Asset::STATUS_ENABLED,
'is_deleted' => Asset::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => Asset::DELETED_AT_DEFAULT,
];
}
// 批量创建资源
$asset = new Asset();
$assetCreateMultipleResult = $asset->createMultiple($assetData);
// 将任务发送到队列(复制资源文件队列),通过标准工作人员进行处理
Yii::$app->copyAssetQueue->push(new CopyAssetJob([
'taskId' => $data['task_id'],
]));
}
4、编辑 \common\jobs\CopyAssetJob.php,队列的任务类
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/22
* Time: 17:10
*/
namespace common\jobs;
use Yii;
use common\logics\Asset;
use common\services\TaskService;
use common\services\AssetService;
use yii\web\ServerErrorHttpException;
/**
* 复制来源的资源文件至渠道发布的资源目录
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetJob extends Job
{
public $taskId;
/*
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
*/
public function execute($queue)
{
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($this->taskId);
// 基于任务ID查找状态为启用的资源列表
$assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId);
if (empty($assetEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
}
$source = $taskEnabledItem->source;
$assets = [];
foreach ($assetEnabledItems as $assetEnabledItem) {
$assets[] = [
'type' => $assetEnabledItem->type,
'absolute_url' => $assetEnabledItem->absolute_url,
];
}
// 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步)
$assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets);
foreach ($assetEnabledItems as $key => $assetEnabledItem) {
$assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path'];
// 取得文件大小,单位(字节)
$assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']);
$assetEnabledItems[$key] = $assetEnabledItem;
}
// 批量更新资源
$assetEnabledItem->updateMultiple($assetEnabledItems);
}
}
5、编辑 \common\components\queue\CopyAssetEventHandler.php,在配置中所定义的事件处理器,当复制资源文件队列,每次成功执行作业后(afterExec),将调用相应服务进行后续处理,即推送任务至上传资源文件队列;当复制资源文件队列,在作业执行期间发生未捕获的异常时(afterError),插入发布日志,将作业推送至来源回调队列
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/23
* Time: 14:23
*/
namespace common\components\queue;
use Yii;
use common\logics\ChannelAppTask;
use common\logics\PubLog;
use common\services\PubLogService;
use common\services\TaskService;
use common\services\SourceCallbackService;
use yii\helpers\Json;
use yii\base\Component;
use yii\queue\ExecEvent;
use yii\web\NotFoundHttpException;
use yii\web\UnprocessableEntityHttpException;
use yii\web\ServerErrorHttpException;
use yii\db\Exception;
/**
* Class CopyAssetEventHandler
* @package common\components\queue
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetEventHandler extends Component
{
/**
* @param ExecEvent $event
* @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常
* @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
* @throws Exception execution failed
*/
public static function afterExec(ExecEvent $event)
{
$taskId = $event->job->taskId;
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($taskId);
// 基于任务ID查找状态为启用的资源列表
$channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId);
if (empty($channelAppTaskEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021);
}
try {
// 调用相应服务进行后续处理
$serviceClass = 'common\services\\' . str_replace(' ', '', ucwords(str_replace('_', ' ', $taskEnabledItem->channel_type_code))) . 'AssetService'; // 例:common\services\QqCwAssetService
$serviceAction = 'copyAssetExecHandler';
$serviceClass::$serviceAction($taskEnabledItem->id);
} catch (\Throwable $e) {
$pubLogData = [];
$time = time();
foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) {
$pubLogData[] = [
'channel_id' => $channelAppTaskEnabledItem['channel_id'],
'channel_code' => $channelAppTaskEnabledItem['channel_code'],
'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'],
'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'],
'task_id' => $channelAppTaskEnabledItem['task_id'],
'channel_app_task_id' => $channelAppTaskEnabledItem['id'],
'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'],
'code' => $e->getCode(),
'message' => $e->getMessage(),
'data' => Json::encode([]),
'status' => PubLog::STATUS_ERROR,
'is_deleted' => PubLog::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => PubLog::DELETED_AT_DEFAULT,
];
}
// 发布任务成功后,调用相应服务失败,插入发布日志,将作业推送至来源回调队列(异步)
SourceCallbackService::errorAsync($pubLogData);
}
}
/**
* @param ExecEvent $event
* @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常
* @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
* @throws Exception execution failed
*/
public static function afterError(ExecEvent $event)
{
$taskId = $event->job->taskId;
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($taskId);
// 基于任务ID查找状态为启用的资源列表
$channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId);
if (empty($channelAppTaskEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021);
}
$pubLogData = [];
$time = time();
foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) {
$pubLogData[] = [
'channel_id' => $channelAppTaskEnabledItem['channel_id'],
'channel_code' => $channelAppTaskEnabledItem['channel_code'],
'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'],
'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'],
'task_id' => $channelAppTaskEnabledItem['task_id'],
'channel_app_task_id' => $channelAppTaskEnabledItem['id'],
'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'],
'code' => $event->error->getCode(),
'message' => $event->error->getMessage(),
'data' => Json::encode([]),
'status' => PubLog::STATUS_ERROR,
'is_deleted' => PubLog::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => PubLog::DELETED_AT_DEFAULT,
];
}
// 发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步)
SourceCallbackService::errorAsync($pubLogData);
}
}
6、打开网址:http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider ,推送任务至队列
7、info 命令打印关于队列状态的信息,复制资源文件队列中1个任务状态为等待,上传资源文件队列中0个任务状态为等待
.\yii copy-asset-queue/info --color=0 .\yii upload-asset-queue/info --color=0
Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0
8、run 命令获取并执行循环中的任务(复制资源文件队列),直到队列为空,复制资源文件队列中的任务成功执行后,将下一步的上传任务推送至上传资源文件队列,复制资源文件队列中0个任务状态为等待,复制资源文件队列中1个任务状态为完成,上传资源文件队列中1个任务状态为等待,如图2
.\yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0 .\yii copy-asset-queue/info .\yii upload-asset-queue/info --color=0
2018-10-27 17:23:54 [pid: 5216] - Worker is started 2018-10-27 17:23:54 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 5216) - Started 2018-10-27 17:23:55 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 5216) - Done (0.249 s) 2018-10-27 17:23:55 [pid: 5216] - Worker is stopped (0:00:01) Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 1 Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0
9、编辑 \common\jobs\CopyAssetJob.php,队列的任务类,特意抛出一个异常,以测试当复制资源文件队列,在作业执行期间发生未捕获的异常时(afterError),插入发布日志,将作业推送至来源回调队列
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/22
* Time: 17:10
*/
namespace common\jobs;
use Yii;
use common\logics\Asset;
use common\services\TaskService;
use common\services\AssetService;
use yii\web\ServerErrorHttpException;
/**
* 复制来源的资源文件至渠道发布的资源目录
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetJob extends Job
{
public $taskId;
/*
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
*/
public function execute($queue)
{
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($this->taskId);
// 基于任务ID查找状态为启用的资源列表
$assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId);
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
if (empty($assetEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
}
$source = $taskEnabledItem->source;
$assets = [];
foreach ($assetEnabledItems as $assetEnabledItem) {
$assets[] = [
'type' => $assetEnabledItem->type,
'absolute_url' => $assetEnabledItem->absolute_url,
];
}
// 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步)
$assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets);
foreach ($assetEnabledItems as $key => $assetEnabledItem) {
$assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path'];
// 取得文件大小,单位(字节)
$assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']);
$assetEnabledItems[$key] = $assetEnabledItem;
}
// 批量更新资源
$assetEnabledItem->updateMultiple($assetEnabledItems);
}
}
10、清空 Redis,即清空队列中的数据,打开网址:http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider ,推送任务至队列
11、info 命令打印关于队列状态的信息,复制资源文件队列中1个任务状态为等待,上传资源文件队列中0个任务状态为等待,来源回调队列中0个任务状态为等待
.\yii copy-asset-queue/info --color=0 .\yii upload-asset-queue/info --color=0 .\yii source-callback-queue/info --color=0
Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0
12、run 命令获取并执行循环中的任务(复制资源文件队列),直到队列为空,复制资源文件队列中的任务执行失败(因为任务中有抛出未捕获的异常)后,将下一步的上传任务推送至上传资源文件队列,复制资源文件队列中0个任务状态为等待,复制资源文件队列中1个任务状态为完成,上传资源文件队列中0个任务状态为等待,来源回调队列中1个任务状态为等待,如图3
.\yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0 .\yii copy-asset-queue/info --color=0 .\yii upload-asset-queue/info --color=0 .\yii source-callback-queue/info --color=0
2018-10-27 17:37:10 [pid: 32132] - Worker is started 2018-10-27 17:37:11 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 32132) - Started 2018-10-27 17:37:11 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 32132) - Error (0.232 s) > yii\web\ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty 2018-10-27 17:37:11 [pid: 32132] - Worker is stopped (0:00:01) Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 1 Jobs - waiting: 0 - delayed: 0 - reserved: 0 - done: 0 Jobs - waiting: 1 - delayed: 0 - reserved: 0 - done: 0
13、查看系统日志表的异常信息,即 log 表中的 message,有助于后续开发人员的分析工作,如图4
[1] common\jobs\CopyAssetJob (attempt: 1, PID: 32132) is finished with error: yii\web\ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty in E:\wwwroot\channel-pub-api\common\jobs\CopyAssetJob.php:38
Stack trace:
#0 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\Queue.php(214): common\jobs\CopyAssetJob->execute(Object(yii\queue\redis\Queue))
#1 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\cli\Queue.php(162): yii\queue\Queue->handleMessage('1', 'O:24:"common\\jo...', '600', '1')
#2 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\cli\Command.php(145): yii\queue\cli\Queue->execute('1', 'O:24:"common\\jo...', '600', '1', '32132')
#3 [internal function]: yii\queue\cli\Command->actionExec('1', '600', '1', '32132')
#4 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\InlineAction.php(57): call_user_func_array(Array, Array)
#5 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Controller.php(157): yii\base\InlineAction->runWithParams(Array)
#6 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Controller.php(148): yii\base\Controller->runAction('exec', Array)
#7 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Module.php(528): yii\console\Controller->runAction('exec', Array)
#8 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Application.php(180): yii\base\Module->runAction('copy-asset-queu...', Array)
#9 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Application.php(147): yii\console\Application->runAction('copy-asset-queu...', Array)
#10 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Application.php(386): yii\console\Application->handleRequest(Object(yii\console\Request))
#11 E:\wwwroot\channel-pub-api\yii(23): yii\base\Application->run()
#12 {main}.
14、发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步),查看发布日志表中的 message,即 pub_log,此为业务数据,最终会显示给用户,如图5
Based on task ID: 2, find the list of enabled resources is empty
近期评论