1. By configuring the additional event processor, edit \environments\dev\common\config\main-local.php, \environments\prod\common\config\main-local.php, as shown in Figure 1

'copyAssetQueue' => [ // 复制资源文件队列
'class' => 'yii\queue\redis\Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:copy:asset', // 队列键前缀
'ttr' => 10 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common\components\queue\CopyAssetEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common\components\queue\CopyAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii\queue\LogBehavior',
],
'uploadAssetQueue' => [ // 上传资源文件队列
'class' => 'yii\queue\redis\Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:upload:asset', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common\components\queue\UploadAssetEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common\components\queue\UploadAssetEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii\queue\LogBehavior',
],
'pubArticleQueue' => [ // 发布文章队列
'class' => 'yii\queue\redis\Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:pub:article', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common\components\queue\PubArticleEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common\components\queue\PubArticleEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii\queue\LogBehavior',
],
'sourceCallbackQueue' => [ // 来源回调队列
'class' => 'yii\queue\redis\Queue',
'redis' => 'redis', // Redis 连接组件或它的配置
'channel' => 'cpa:queue:source:callback', // 队列键前缀
'ttr' => 5 * 60, // 作业处理的最长时间,单位(秒)
'on afterExec' => ['common\components\queue\SourceCallbackEventHandler', 'afterExec'], // 每次成功执行作业后
'on afterError' => ['common\components\queue\SourceCallbackEventHandler', 'afterError'], // 在作业执行期间发生未捕获的异常时
'as log' => 'yii\queue\LogBehavior',
],
2. Edit \QQ\RESTS\QQ_CW_APP\IndexAction.php, an entry of a push queue
$data = [
'channel_id' => 2, // 渠道ID
'channel_code' => 'wx', // 渠道代码,qq:企鹅号;wx:微信公众帐号
'channel_type_id' => 3, // 渠道的类型ID
'channel_type_code' => 'wx', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用
'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体
'task_id' => 2, // 任务ID
];
$assets = [
[
'type' => 'image',
'channel_article_id' => 1,
'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/images/1.png',
],
[
'type' => 'video',
'channel_article_id' => 1,
'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4',
],
];
$assetServiceCopyAssetsAsyncResult = AssetService::copyAssetsAsync($data, $assets);
print_r($assetServiceCopyAssetsAsyncResult);
exit;
3. Edit \Common\Services\AssetService.php, copy the source resource file to the resource directory published by the channel, after the queue task is successfully executed, call the corresponding service, otherwise, insert the release log (asynchronous)
/**
* 复制来源的资源文件至渠道发布的资源目录,队列任务执行成功后,调用相应服务,否则,插入发布日志(异步)
* @param array $data 数据
* 格式如下:
* [
* 'channel_id' => 1, // 渠道ID
* 'channel_code' => 'qq', // 渠道代码,qq:企鹅号;wx:微信公众帐号
* 'channel_type_id' => 1, // 渠道的类型ID
* 'channel_type_code' => 'qq_cw', // 渠道的类型代码,qq_cw:企鹅号的内容网站应用;qq_tp:企鹅号的第三方服务平台应用;wx:微信公众帐号应用
* 'source' => 'spider', // 来源,xContent:内容库;vms:视频管理系统;cms:内容管理系统;spider:自媒体
* 'task_id' => 1, // 任务ID
* ]
*
* @param array $assets 来源的资源文件的绝对URL
* 格式如下:
* [
* [
* 'type' => 'image', // 资源文件的类型,image:图片;video:视频
* 'channel_article_id' => 1, // 渠道的文章ID
* 'absolute_url' => 'http://localhost/spider/storage/spider/images/1.png', // 来源的资源文件的绝对URL
* ],
* [
* 'type' => 'video', // 资源文件的类型,image:图片;video:视频
* 'channel_article_id' => 1, // 渠道的文章ID
* 'absolute_url' => 'http://localhost/channel-pub-api/storage/spider/videos/7月份北上广深等十大城市租金环比上涨 看东方 20180820 高清_高清.mp4', // 来源的资源文件的绝对URL
* ],
* ]
*
* @return array $channelPubApiAssetAbsolutePaths 渠道发布的资源文件的相对路径
* 格式如下:
* [
* [
* 'type' => 'image',
* 'channel_article_id' => 1,
* 'relative_path' => '/2018/09/20/1537439889.2333.1441541478.png',
* ],
* [
* 'type' => 'video',
* 'channel_article_id' => 1,
* 'relative_path' => '/2018/09/20/1537439889.2403.62871268.mp4',
* ],
* ]
*
* @throws Exception execution failed
*/
public static function copyAssetsAsync($data, $assets)
{
$assetData = [];
$time = time();
foreach ($assets as $key => $asset) {
$assetData[] = [
'channel_id' => $data['channel_id'],
'channel_code' => $data['channel_code'],
'channel_type_id' => $data['channel_type_id'],
'channel_type_code' => $data['channel_type_code'],
'source' => $data['source'],
'type' => $asset['type'],
'absolute_url' => $asset['absolute_url'],
'relative_path' => '',
'size' => 0,
'task_id' => $data['task_id'],
'channel_article_id' => $asset['channel_article_id'],
'status' => Asset::STATUS_ENABLED,
'is_deleted' => Asset::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => Asset::DELETED_AT_DEFAULT,
];
}
// 批量创建资源
$asset = new Asset();
$assetCreateMultipleResult = $asset->createMultiple($assetData);
// 将任务发送到队列(复制资源文件队列),通过标准工作人员进行处理
Yii::$app->copyAssetQueue->push(new CopyAssetJob([
'taskId' => $data['task_id'],
]));
}
4. Edit \Common\Jobs\CopyAssetJob.php, the task class of the queue
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/22
* Time: 17:10
*/
namespace common\jobs;
use Yii;
use common\logics\Asset;
use common\services\TaskService;
use common\services\AssetService;
use yii\web\ServerErrorHttpException;
/**
* 复制来源的资源文件至渠道发布的资源目录
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetJob extends Job
{
public $taskId;
/*
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
*/
public function execute($queue)
{
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($this->taskId);
// 基于任务ID查找状态为启用的资源列表
$assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId);
if (empty($assetEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
}
$source = $taskEnabledItem->source;
$assets = [];
foreach ($assetEnabledItems as $assetEnabledItem) {
$assets[] = [
'type' => $assetEnabledItem->type,
'absolute_url' => $assetEnabledItem->absolute_url,
];
}
// 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步)
$assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets);
foreach ($assetEnabledItems as $key => $assetEnabledItem) {
$assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path'];
// 取得文件大小,单位(字节)
$assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']);
$assetEnabledItems[$key] = $assetEnabledItem;
}
// 批量更新资源
$assetEnabledItem->updateMultiple($assetEnabledItems);
}
}
5. Edit \common\components\queue\copyAssetEventHandler.php, the event handler defined in the configuration, when the resource file queue is copied, after each successful execution of the job (Aftere) XEC), the corresponding service will be called for subsequent processing, that is, the task is pushed to the upload resource file queue; when the resource file queue is copied, When an uncaught exception occurs during business execution, insert a release log and push the job to the source callback queue
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/23
* Time: 14:23
*/
namespace common\components\queue;
use Yii;
use common\logics\ChannelAppTask;
use common\logics\PubLog;
use common\services\PubLogService;
use common\services\TaskService;
use common\services\SourceCallbackService;
use yii\helpers\Json;
use yii\base\Component;
use yii\queue\ExecEvent;
use yii\web\NotFoundHttpException;
use yii\web\UnprocessableEntityHttpException;
use yii\web\ServerErrorHttpException;
use yii\db\Exception;
/**
* Class CopyAssetEventHandler
* @package common\components\queue
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetEventHandler extends Component
{
/**
* @param ExecEvent $event
* @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常
* @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
* @throws Exception execution failed
*/
public static function afterExec(ExecEvent $event)
{
$taskId = $event->job->taskId;
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($taskId);
// 基于任务ID查找状态为启用的资源列表
$channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId);
if (empty($channelAppTaskEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021);
}
try {
// 调用相应服务进行后续处理
$serviceClass = 'common\services\\' . str_replace(' ', '', ucwords(str_replace('_', ' ', $taskEnabledItem->channel_type_code))) . 'AssetService'; // 例:common\services\QqCwAssetService
$serviceAction = 'copyAssetExecHandler';
$serviceClass::$serviceAction($taskEnabledItem->id);
} catch (\Throwable $e) {
$pubLogData = [];
$time = time();
foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) {
$pubLogData[] = [
'channel_id' => $channelAppTaskEnabledItem['channel_id'],
'channel_code' => $channelAppTaskEnabledItem['channel_code'],
'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'],
'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'],
'task_id' => $channelAppTaskEnabledItem['task_id'],
'channel_app_task_id' => $channelAppTaskEnabledItem['id'],
'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'],
'code' => $e->getCode(),
'message' => $e->getMessage(),
'data' => Json::encode([]),
'status' => PubLog::STATUS_ERROR,
'is_deleted' => PubLog::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => PubLog::DELETED_AT_DEFAULT,
];
}
// 发布任务成功后,调用相应服务失败,插入发布日志,将作业推送至来源回调队列(异步)
SourceCallbackService::errorAsync($pubLogData);
}
}
/**
* @param ExecEvent $event
* @throws NotFoundHttpException 如果未找到数据模型,将抛出 404 HTTP 异常
* @throws UnprocessableEntityHttpException 如果找到数据模型,状态未启用,将抛出 422 HTTP 异常
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
* @throws Exception execution failed
*/
public static function afterError(ExecEvent $event)
{
$taskId = $event->job->taskId;
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($taskId);
// 基于任务ID查找状态为启用的资源列表
$channelAppTaskEnabledItems = ChannelAppTask::findAllEnabledByTaskId($taskId);
if (empty($channelAppTaskEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35021'), ['task_id' => $taskId])), 35021);
}
$pubLogData = [];
$time = time();
foreach ($channelAppTaskEnabledItems as $channelAppTaskEnabledItem) {
$pubLogData[] = [
'channel_id' => $channelAppTaskEnabledItem['channel_id'],
'channel_code' => $channelAppTaskEnabledItem['channel_code'],
'channel_type_id' => $channelAppTaskEnabledItem['channel_type_id'],
'channel_type_code' => $channelAppTaskEnabledItem['channel_type_code'],
'task_id' => $channelAppTaskEnabledItem['task_id'],
'channel_app_task_id' => $channelAppTaskEnabledItem['id'],
'channel_app_task_uuid' => $channelAppTaskEnabledItem['uuid'],
'code' => $event->error->getCode(),
'message' => $event->error->getMessage(),
'data' => Json::encode([]),
'status' => PubLog::STATUS_ERROR,
'is_deleted' => PubLog::IS_DELETED_NO,
'created_at' => $time,
'updated_at' => $time,
'deleted_at' => PubLog::DELETED_AT_DEFAULT,
];
}
// 发布任务失败后,插入发布日志,将作业推送至来源回调队列(异步)
SourceCallbackService::errorAsync($pubLogData);
}
}
6. Open the URL: http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider , push the task to the queue
7. The info command prints the information about the queue status, copying the status of 1 task in the resource file queue is waiting, and uploading the 0 task status in the resource file queue is waiting
.\yii copy-asset-queue/info --color=0
.\yii upload-asset-queue/info --color=0
Jobs
- waiting: 1
- delayed: 0
- reserved: 0
- done: 0
Jobs
- waiting: 0
- delayed: 0
- reserved: 0
- done: 0
8. Run The command obtains and executes the tasks in the loop (copy the resource file queue), until the queue is empty, after the task in the resource file queue is successfully executed, the next upload task is pushed to the upload resource File queue, copy the 0 task status in the resource file queue is waiting, the copy of the resource file queue is complete, and the upload resource file queue is waiting, as shown in Figure 2

.\yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0
.\yii copy-asset-queue/info
.\yii upload-asset-queue/info --color=0
2018-10-27 17:23:54 [pid: 5216] - Worker is started
2018-10-27 17:23:54 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 5216) - Started
2018-10-27 17:23:55 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 5216) - Done (0.249 s)
2018-10-27 17:23:55 [pid: 5216] - Worker is stopped (0:00:01)
Jobs
- waiting: 0
- delayed: 0
- reserved: 0
- done: 1
Jobs
- waiting: 1
- delayed: 0
- reserved: 0
- done: 0
9. Edit \common\jobs\copyAssetJob.php, the task class of the queue, specially throw an exception to test when the data is copied The source file queue, when an uncaught exception occurs during the job execution (AfterError), inserts the release log, and pushes the job to the source callback queue
<?php
/**
* Created by PhpStorm.
* User: Qiang Wang
* Date: 2018/10/22
* Time: 17:10
*/
namespace common\jobs;
use Yii;
use common\logics\Asset;
use common\services\TaskService;
use common\services\AssetService;
use yii\web\ServerErrorHttpException;
/**
* 复制来源的资源文件至渠道发布的资源目录
*
* @author Qiang Wang <shuijingwanwq@163.com>
* @since 1.0
*/
class CopyAssetJob extends Job
{
public $taskId;
/*
* @throws ServerErrorHttpException 如果基于任务ID查找状态为启用的资源列表为空,将抛出 500 HTTP 异常
*/
public function execute($queue)
{
// 基于ID查找状态为启用的单个数据模型(任务)
$taskEnabledItem = TaskService::findModelEnabledById($this->taskId);
// 基于任务ID查找状态为启用的资源列表
$assetEnabledItems = Asset::findAllEnabledByTaskId($this->taskId);
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
if (empty($assetEnabledItems)) {
throw new ServerErrorHttpException(Yii::t('common/error', Yii::t('common/error', Yii::t('common/error', '35020'), ['task_id' => $this->taskId])), 35020);
}
$source = $taskEnabledItem->source;
$assets = [];
foreach ($assetEnabledItems as $assetEnabledItem) {
$assets[] = [
'type' => $assetEnabledItem->type,
'absolute_url' => $assetEnabledItem->absolute_url,
];
}
// 复制来源的资源文件至渠道发布的资源目录,返回相对路径(同步)
$assetServiceCopyAssetsSyncResult = AssetService::copyAssetsSync($source, $assets);
foreach ($assetEnabledItems as $key => $assetEnabledItem) {
$assetEnabledItem->relative_path = $assetServiceCopyAssetsSyncResult[$key]['relative_path'];
// 取得文件大小,单位(字节)
$assetEnabledItem->size = filesize(Yii::$app->params['channelPubApi']['asset'][$assetEnabledItem->type]['basePath'] . $assetServiceCopyAssetsSyncResult[$key]['relative_path']);
$assetEnabledItems[$key] = $assetEnabledItem;
}
// 批量更新资源
$assetEnabledItem->updateMultiple($assetEnabledItems);
}
}
10. Empty Redis, that is, clear the data in the queue, open the URL: http://api.channel-pub-api.localhost/qq/v1/qq-cw-apps?group_id=spider , push the task to the queue
11. The info command prints information about the queue status, copying the status of 1 task in the resource file queue is waiting, 0 task states in the upload resource file queue is waiting, and 0 task states in the source callback queue are waiting
.\yii copy-asset-queue/info --color=0
.\yii upload-asset-queue/info --color=0
.\yii source-callback-queue/info --color=0
Jobs
- waiting: 1
- delayed: 0
- reserved: 0
- done: 0
Jobs
- waiting: 0
- delayed: 0
- reserved: 0
- done: 0
Jobs
- waiting: 0
- delayed: 0
- reserved: 0
- done: 0
12. Run The command fetches and executes the tasks in the loop (copy the resource file queue) until the queue is empty, and the task in the resource file queue fails to execute (because there is an uncaught exception in the task), and the next upload task is pushed to the upload resource File queue, copy the 0 task status in the resource file queue is waiting, copy the resource file queue 1 task status is complete, upload 0 task status in the uploaded resource file queue is waiting, and 1 task status in the source callback queue is waiting, as shown in Figure 3

.\yii copy-asset-queue/run --verbose=1 --isolate=1 --color=0
.\yii copy-asset-queue/info --color=0
.\yii upload-asset-queue/info --color=0
.\yii source-callback-queue/info --color=0
2018-10-27 17:37:10 [pid: 32132] - Worker is started
2018-10-27 17:37:11 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 32132) - Started
2018-10-27 17:37:11 [1] common\jobs\CopyAssetJob (attempt: 1, pid: 32132) - Error (0.232 s)
> yii\web\ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty
2018-10-27 17:37:11 [pid: 32132] - Worker is stopped (0:00:01)
Jobs
- waiting: 0
- delayed: 0
- reserved: 0
- done: 1
Jobs
- waiting: 0
- delayed: 0
- reserved: 0
- done: 0
Jobs
- waiting: 1
- delayed: 0
- reserved: 0
- done: 0
13. View the exception information of the system log table, that is, the message in the log table, which is helpful for the analysis of subsequent developers, as shown in Figure 4

[1] common\jobs\CopyAssetJob (attempt: 1, PID: 32132) is finished with error: yii\web\ServerErrorHttpException: Based on task ID: 2, find the list of enabled resources is empty in E:\wwwroot\channel-pub-api\common\jobs\CopyAssetJob.php:38
Stack trace:
#0 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\Queue.php(214): common\jobs\CopyAssetJob->execute(Object(yii\queue\redis\Queue))
#1 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\cli\Queue.php(162): yii\queue\Queue->handleMessage('1', 'O:24:"common\\jo...', '600', '1')
#2 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2-queue\src\cli\Command.php(145): yii\queue\cli\Queue->execute('1', 'O:24:"common\\jo...', '600', '1', '32132')
#3 [internal function]: yii\queue\cli\Command->actionExec('1', '600', '1', '32132')
#4 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\InlineAction.php(57): call_user_func_array(Array, Array)
#5 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Controller.php(157): yii\base\InlineAction->runWithParams(Array)
#6 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Controller.php(148): yii\base\Controller->runAction('exec', Array)
#7 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Module.php(528): yii\console\Controller->runAction('exec', Array)
#8 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Application.php(180): yii\base\Module->runAction('copy-asset-queu...', Array)
#9 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\console\Application.php(147): yii\console\Application->runAction('copy-asset-queu...', Array)
#10 E:\wwwroot\channel-pub-api\vendor\yiisoft\yii2\base\Application.php(386): yii\console\Application->handleRequest(Object(yii\console\Request))
#11 E:\wwwroot\channel-pub-api\yii(23): yii\base\Application->run()
#12 {main}.
14. After the publishing task fails, insert the release log, push the job to the source callback queue (asynchronous), check the message in the release log table, that is, pub_log, this is business data, and it will be displayed to the user in the end, as shown in Figure 5

Based on task ID: 2, find the list of enabled resources is empty
Leave a Reply