Skip to content

Commit

Permalink
parallel jobs and daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
yurikuzn committed Nov 5, 2018
1 parent 0c2959d commit 5350cf8
Show file tree
Hide file tree
Showing 10 changed files with 408 additions and 40 deletions.
44 changes: 44 additions & 0 deletions application/Espo/Core/Application.php
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,50 @@ public function runCron()
$cronManager->run();
}

public function runDaemon()
{
$maxProcessNumber = $this->getConfig()->get('daemonMaxProcessNumber');
$interval = $this->getConfig()->get('daemonInterval');
$timeout = $this->getConfig()->get('daemonProcessTimeout');

if (!$maxProcessNumber || !$interval) {
$GLOBALS['log']->error("Daemon config params are not set.");
return;
}

$processList = [];
while (true) {
$toSkip = false;
$runningCount = 0;
foreach ($processList as $i => $process) {
if ($process->isRunning()) {
$runningCount++;
} else if ($process->isRunning()) {
unset($processList[$i]);
}
}
$processList = array_values($processList);
if (count($runningCount) >= $maxProcessNumber) {
$toSkip = true;
}
if (!$toSkip) {
$process = new \Symfony\Component\Process\Process(['php', 'cron.php']);
$process->setTimeout($timeout);
$process->run();
}
sleep($interval);
}
}

public function runJob($id)
{
$auth = $this->createAuth();
$auth->useNoAuth();

$cronManager = new \Espo\Core\CronManager($this->container);
$cronManager->runJobById($id);
}

public function runRebuild()
{
$dataManager = $this->getContainer()->get('dataManager');
Expand Down
117 changes: 93 additions & 24 deletions application/Espo/Core/CronManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
************************************************************************/

namespace Espo\Core;

use \PDO;
use Espo\Core\Utils\Json;

use Espo\Core\Exceptions\NotFound;
use Espo\Core\Exceptions\Error;

class CronManager
{
Expand All @@ -48,8 +51,12 @@ class CronManager

private $cronScheduledJobUtil;

private $useProcessPool = false;

const PENDING = 'Pending';

const READY = 'Ready';

const RUNNING = 'Running';

const SUCCESS = 'Success';
Expand All @@ -70,6 +77,14 @@ public function __construct(\Espo\Core\Container $container)
$this->scheduledJobUtil = $this->container->get('scheduledJob');
$this->cronJobUtil = new \Espo\Core\Utils\Cron\Job($this->config, $this->entityManager);
$this->cronScheduledJobUtil = new \Espo\Core\Utils\Cron\ScheduledJob($this->config, $this->entityManager);

if ($this->getConfig()->get('jobRunInParallel')) {
if (\Spatie\Async\Pool::isSupported()) {
$this->useProcessPool = true;
} else {
$GLOBALS['log']->warning("CronManager: useProcessPool requires pcntl and posix extensions.");
}
}
}

protected function getContainer()
Expand Down Expand Up @@ -146,6 +161,16 @@ protected function checkLastRunTime()
return false;
}

protected function useProcessPool()
{
return $this->useProcessPool;
}

public function setUseProcessPool($useProcessPool)
{
$this->useProcessPool = $useProcessPool;
}

/**
* Run Cron
*
Expand All @@ -166,6 +191,13 @@ public function run()
$this->getCronJobUtil()->removePendingJobDuplicates();
$pendingJobList = $this->getCronJobUtil()->getPendingJobList();

if ($this->useProcessPool()) {
$pool = \Spatie\Async\Pool::create()
->autoload(getcwd() . '/vendor/autoload.php')
->concurrency($this->getConfig()->get('jobPoolConcurrency'))
->timeout($this->getConfig()->get('jobPeriodForActiveProcess'));
}

foreach ($pendingJobList as $job) {
$skip = false;
$this->getEntityManager()->getPdo()->query('LOCK TABLES `job` WRITE');
Expand All @@ -183,39 +215,76 @@ public function run()
$this->getEntityManager()->getPdo()->query('UNLOCK TABLES');
continue;
}
if ($this->useProcessPool()) {
$job->set('status', self::READY);
} else {
$job->set('status', self::RUNNING);
$job->set('pid', \Espo\Core\Utils\System::getPid());
}

$job->set('status', self::RUNNING);
$job->set('pid', $this->getCronJobUtil()->getPid());
$this->getEntityManager()->saveEntity($job);
$this->getEntityManager()->getPdo()->query('UNLOCK TABLES');

$isSuccess = true;
$skipLog = false;

try {
if ($job->get('scheduledJobId')) {
$this->runScheduledJob($job);
} else {
$this->runService($job);
}
} catch (\Exception $e) {
$isSuccess = false;
if ($e->getCode() === -1) {
$job->set('attempts', 0);
$skipLog = true;
} else {
$GLOBALS['log']->error('CronManager: Failed job running, job ['.$job->id.']. Error Details: '.$e->getMessage());
}
if ($this->useProcessPool()) {
$task = new \Espo\Core\Utils\Cron\JobTask($job->id);
$pool->add($task);
} else {
$this->runJob($job);
}
}

$status = $isSuccess ? self::SUCCESS : self::FAILED;
if ($this->useProcessPool()) {
$pool->wait();
}
}

$job->set('status', $status);
$this->getEntityManager()->saveEntity($job);
public function runJobById($id)
{
if (empty($id)) throw new Error();

$job = $this->getEntityManager()->getEntity('Job', $id);

if (!$job) throw new Error("Job {$id} not found.");

if ($job->get('status') !== self::READY) {
throw new Error("Can't run job {$id} with no status Ready.");
}

$job->set('status', self::RUNNING);
$job->set('pid', \Espo\Core\Utils\System::getPid());
$this->getEntityManager()->saveEntity($job);

if ($job->get('scheduledJobId') && !$skipLog) {
$this->getCronScheduledJobUtil()->addLogRecord($job->get('scheduledJobId'), $status, null, $job->get('targetId'), $job->get('targetType'));
$this->runJob($job);
}

public function runJob($job)
{
$isSuccess = true;
$skipLog = false;

try {
if ($job->get('scheduledJobId')) {
$this->runScheduledJob($job);
} else {
$this->runService($job);
}
} catch (\Exception $e) {
$isSuccess = false;
if ($e->getCode() === -1) {
$job->set('attempts', 0);
$skipLog = true;
} else {
$GLOBALS['log']->error('CronManager: Failed job running, job ['.$job->id.']. Error Details: '.$e->getMessage());
}
}

$status = $isSuccess ? self::SUCCESS : self::FAILED;

$job->set('status', $status);
$this->getEntityManager()->saveEntity($job);

if ($job->get('scheduledJobId') && !$skipLog) {
$this->getCronScheduledJobUtil()->addLogRecord($job->get('scheduledJobId'), $status, null, $job->get('targetId'), $job->get('targetType'));
}
}

Expand Down
13 changes: 4 additions & 9 deletions application/Espo/Core/Utils/Cron/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public function isScheduledJobRunning($scheduledJobId, $targetId = null, $target
{
$where = [
'scheduledJobId' => $scheduledJobId,
'status' => CronManager::RUNNING
'status' => [CronManager::RUNNING, CronManager::READY]
];
if ($targetId && $targetType) {
$where['targetId'] = $targetId;
Expand All @@ -127,7 +127,7 @@ public function getRunningScheduledJobIdList()
$query = "
SELECT scheduled_job_id FROM job
WHERE
`status` = 'Running' AND
(`status` = 'Running' OR `status` = 'Ready') AND
scheduled_job_id IS NOT NULL AND
target_id IS NULL AND
deleted = 0
Expand Down Expand Up @@ -205,7 +205,7 @@ protected function markFailedJobsByPeriod($period)
$select = "
SELECT id, scheduled_job_id, execute_time, target_id, target_type, pid FROM `job`
WHERE
`status` = '" . CronManager::RUNNING ."' AND execute_time < '".date('Y-m-d H:i:s', $time)."'
(`status` = '" . CronManager::RUNNING ."' OR `status` = '" . CronManager::READY ."') AND execute_time < '".date('Y-m-d H:i:s', $time)."'
";
$sth = $pdo->prepare($select);
$sth->execute();
Expand Down Expand Up @@ -357,9 +357,4 @@ public function updateFailedJobAttempts()
}
}
}

public function getPid()
{
return System::getPid();
}
}
}
50 changes: 50 additions & 0 deletions application/Espo/Core/Utils/Cron/JobTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php
/************************************************************************
* This file is part of EspoCRM.
*
* EspoCRM - Open Source CRM application.
* Copyright (C) 2014-2018 Yuri Kuznetsov, Taras Machyshyn, Oleksiy Avramenko
* Website: http://www.espocrm.com
*
* EspoCRM is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* EspoCRM is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with EspoCRM. If not, see http://www.gnu.org/licenses/.
*
* The interactive user interfaces in modified source and object code versions
* of this program must display Appropriate Legal Notices, as required under
* Section 5 of the GNU General Public License version 3.
*
* In accordance with Section 7(b) of the GNU General Public License version 3,
* these Appropriate Legal Notices must retain the display of the "EspoCRM" word.
************************************************************************/

namespace Espo\Core\Utils\Cron;

class JobTask extends \Spatie\Async\Task
{
private $jobId;

public function __construct($jobId)
{
$this->jobId = $jobId;
}

public function configure()
{
}

public function run()
{
$app = new \Espo\Core\Application();
$app->runJob($this->jobId);
}
}
1 change: 0 additions & 1 deletion application/Espo/Core/defaults/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,3 @@
'noteEditThresholdPeriod' => '7 days',
'isInstalled' => false
);

8 changes: 8 additions & 0 deletions application/Espo/Core/defaults/systemConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@
'jobPeriod' => 7800, /** Max execution time (in seconds) allocated for a sinle job. If exceeded then set to Failed.*/
'jobPeriodForActiveProcess' => 36000, /** Max execution time (in seconds) allocated for a sinle job with active process. If exceeded then set to Failed.*/
'jobRerunAttemptNumber' => 1, /** Number of attempts to re-run failed jobs. */
'jobRunInParallel' => false, /** Jobs will be executed in parallel processes. */
'jobPoolConcurrency' => 8, /** Max number of processes run simultaneously. */
'cronMinInterval' => 4, /** Min interval (in seconds) between two cron runs. */
'daemonMaxProcessNumber' => 5,
'daemonInterval' => 10, /** Interval between cron process runs in seconds. */
'daemonProcessTimeout' => 36000,
'crud' => array(
'get' => 'read',
'post' => 'create',
Expand Down Expand Up @@ -113,7 +118,10 @@
'jobMaxPortion',
'jobPeriod',
'jobRerunAttemptNumber',
'jobUseThreads',
'cronMinInterval',
'daemonInterval',
'daemonMaxThreadNumber',
'authenticationMethod',
'adminPanelIframeUrl',
'ldapHost',
Expand Down
2 changes: 1 addition & 1 deletion application/Espo/Resources/metadata/entityDefs/Job.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
},
"status": {
"type": "enum",
"options": ["Pending", "Running", "Success", "Failed"],
"options": ["Pending", "Ready", "Running", "Success", "Failed"],
"default": "Pending"
},
"executeTime": {
Expand Down
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"homepage": "https://github.com/espocrm/espocrm.git",
"license": "GPL-3.0-only",
"require": {
"php": ">=5.6.0",
"php": ">=7.1.0",
"ext-pdo_mysql": "*",
"ext-openssl": "*",
"ext-json": "*",
Expand All @@ -29,7 +29,9 @@
"php-mime-mail-parser/php-mime-mail-parser": "3.*",
"zbateson/mail-mime-parser": "0.4.*",
"phpoffice/phpexcel": "^1.8",
"phpoffice/phpspreadsheet": "^1.1"
"phpoffice/phpspreadsheet": "^1.1",
"spatie/async": "0.0.4",
"symfony/process": "4.1.7"
},
"autoload": {
"psr-0": {
Expand Down
Loading

0 comments on commit 5350cf8

Please sign in to comment.