Skip to content

Commit 3cd63a2

Browse files
authored
Merge pull request #39 from netlogix/feat/job-status-service
2 parents ba09ab1 + 9a976d3 commit 3cd63a2

10 files changed

Lines changed: 126 additions & 57 deletions

Classes/Command/SchedulerCommandController.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,11 @@ public function injectConnection(Connection $connection): void
5454
* Reset stale jobs that have not changed for too long.
5555
*
5656
* @param string $groupName Free jobs in this group only
57-
* @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago
58-
* @throws Exception
57+
* @param ?int $minutes @deprecated Use staleJobTimeout configuration setting instead.
5958
*/
6059
public function resetStaleJobsCommand(
6160
string $groupName,
62-
int $minutes = 10
61+
?int $minutes = 10
6362
): void {
6463
$freed = $this->scheduler->resetStaleJobs($groupName, $minutes);
6564
if ($freed) {

Classes/Domain/AbstractScheduler.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ abstract class AbstractScheduler implements Scheduler
3737
*/
3838
protected TimeBaseForDueDateCalculation $timeBaseForDueDateCalculation;
3939

40+
#[Flow\InjectConfiguration(path: 'staleJobTimeout')]
41+
protected int $staleJobTimeoutSecs;
42+
4043
protected const CLAIM_QUERY = "";
4144
protected const SELECT_QUERY = "";
4245
protected const RELEASE_QUERY = "";
@@ -280,16 +283,19 @@ public function activity(ScheduledJob $job): void
280283
* Reset stale jobs that have not changed for too long.
281284
*
282285
* @param string $groupName Free jobs in this group only
283-
* @param int $minutes Count jobs as stale if their last activity was more than these many minutes ago
286+
* @param int|null $minutes Only free jobs that are stale for at least this many minutes. @deprecated Use staleJobTimeout configuration setting instead.
284287
* @throws Exception
285288
* @return int Number of freed jobs
286289
*/
287-
public function resetStaleJobs(string $groupName, int $minutes): int {
290+
public function resetStaleJobs(
291+
string $groupName,
292+
?int $minutes = null
293+
): int {
288294
return $this->dbal->executeQuery(
289295
sql: static::RESET_STALE_JOBS_QUERY,
290296
params: [
291297
'groupName' => $groupName,
292-
'minutes' => max($minutes, 1),
298+
'seconds' => max($minutes === null ? $this->staleJobTimeoutSecs : $minutes * 60, 1),
293299
],
294300
types: [
295301
'groupName' => Types::STRING,
@@ -347,4 +353,8 @@ protected function validateGroupName(string $groupName): void
347353
public function getConnection(): Connection {
348354
return $this->dbal;
349355
}
356+
357+
public function getStaleJobTimeoutSeconds(): int {
358+
return $this->staleJobTimeoutSecs;
359+
}
350360
}

Classes/Domain/MySQLScheduler.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class MySQLScheduler extends AbstractScheduler {
118118
WHERE running = 1
119119
AND claimed NOT LIKE 'failed(%)'
120120
AND groupname = :groupName
121-
AND activity < NOW() - INTERVAL :minutes MINUTE
121+
AND activity < NOW() - INTERVAL :seconds SECOND
122122
MySQL;
123123

124124
}

Classes/Domain/PostgreSQLScheduler.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class PostgreSQLScheduler extends AbstractScheduler {
8383
WHERE running = 1
8484
AND claimed NOT LIKE 'failed(%)'
8585
AND groupname = :groupName
86-
AND activity < NOW() - make_interval(mins => :minutes)
86+
AND activity < NOW() - make_interval(secs => :seconds)
8787
PostgreSQL;
8888

8989
}

Classes/Domain/Scheduler.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ public function fail(ScheduledJob $job, string $reason): void;
2323

2424
public function activity(ScheduledJob $job): void;
2525

26-
public function resetStaleJobs(string $groupName, int $minutes): int;
26+
public function resetStaleJobs(string $groupName, ?int $minutes = null): int;
2727

2828
public function getConnection(): Connection;
29+
30+
public function getStaleJobTimeoutSeconds(): int;
2931
}

Classes/Service/Connection.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Doctrine\DBAL\Connection as DBALConnection;
99
use Doctrine\DBAL\Exception\ConnectionLost;
1010
use Doctrine\DBAL\Exception\RetryableException;
11+
use Doctrine\DBAL\TransactionIsolationLevel;
1112
use Doctrine\ORM\EntityManagerInterface;
1213

1314
/**
@@ -49,12 +50,18 @@ public function fetchOne(string $query, array $params = [], array $types = [])
4950
});
5051
}
5152

52-
// requires dbal autocommit to be enabled
5353
public function fetchOneReadUncommited(string $query, array $params = [], array $types = [])
5454
{
5555
return $this->withAutoReconnectAndRetry(function () use ($query, $params, $types) {
56-
$this->dbal->executeQuery("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
57-
return $this->dbal->fetchOne($query, $params, $types);
56+
$previous = $this->dbal->getTransactionIsolation();
57+
try {
58+
$this->dbal->setTransactionIsolation(TransactionIsolationLevel::READ_UNCOMMITTED);
59+
return $this->dbal->transactional(function () use ($query, $params, $types) {
60+
return $this->dbal->fetchOne($query, $params, $types);
61+
});
62+
} finally {
63+
$this->dbal->setTransactionIsolation($previous);
64+
}
5865
});
5966
}
6067

Classes/Service/JobStatusService.php

Lines changed: 18 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@
1010
#[Flow\Scope("singleton")]
1111
abstract class JobStatusService {
1212

13+
protected const string TOTAL_COUNT_QUERY = "";
14+
protected const string RUNNING_COUNT_QUERY = "";
15+
protected const string PENDING_COUNT_QUERY = "";
16+
protected const string STALE_COUNT_QUERY = "";
17+
protected const string FAILED_COUNT_QUERY = "";
18+
1319
#[Flow\Inject]
1420
protected Scheduler $scheduler;
1521

1622
public function getTotalJobCount(string $groupName): int {
17-
$tableName = ScheduledJob::TABLE_NAME;
18-
$query = <<<MySQL
19-
SELECT COUNT(*) FROM {$tableName}
20-
WHERE groupname = :groupName
21-
MySQL;
2223
return $this->fetchOne(
23-
$query,
24+
static::TOTAL_COUNT_QUERY,
2425
[
2526
'groupName' => $groupName
2627
],
@@ -31,36 +32,22 @@ public function getTotalJobCount(string $groupName): int {
3132
}
3233

3334
public function getRunningJobCount(string $groupName): int {
34-
$tableName = ScheduledJob::TABLE_NAME;
35-
$query = <<<MySQL
36-
SELECT COUNT(*) FROM {$tableName}
37-
WHERE running = 1
38-
AND claimed NOT LIKE 'failed(%)'
39-
AND groupname = :groupName
40-
AND activity > NOW() - INTERVAL 2 SECOND
41-
MySQL;
4235
return $this->fetchOne(
43-
$query,
36+
static::RUNNING_COUNT_QUERY,
4437
[
45-
'groupName' => $groupName
38+
'groupName' => $groupName,
39+
'seconds' => $this->scheduler->getStaleJobTimeoutSeconds()
4640
],
4741
[
48-
'groupName' => Types::STRING
42+
'groupName' => Types::STRING,
43+
'seconds' => Types::INTEGER
4944
]
5045
);
5146
}
5247

5348
public function getPendingJobCount(string $groupName): int {
54-
$tableName = ScheduledJob::TABLE_NAME;
55-
$query = <<<MySQL
56-
SELECT COUNT(*) FROM {$tableName}
57-
WHERE ((running = 0
58-
AND claimed = '')
59-
OR running = 2)
60-
AND groupname = :groupName
61-
MySQL;
6249
return $this->fetchOne(
63-
$query,
50+
static::PENDING_COUNT_QUERY,
6451
[
6552
'groupName' => $groupName
6653
],
@@ -70,37 +57,23 @@ public function getPendingJobCount(string $groupName): int {
7057
);
7158
}
7259

73-
public function getStaleJobCount(string $groupName, int $minutes): int {
74-
$tableName = ScheduledJob::TABLE_NAME;
75-
$query = <<<MySQL
76-
SELECT COUNT(*) FROM {$tableName}
77-
WHERE running = 1
78-
AND claimed NOT LIKE 'failed(%)'
79-
AND groupname = :groupName
80-
AND activity < NOW() - INTERVAL :minutes MINUTE
81-
MySQL;
60+
public function getStaleJobCount(string $groupName): int {
8261
return $this->fetchOne(
83-
$query,
62+
static::STALE_COUNT_QUERY,
8463
[
8564
"groupName" => $groupName,
86-
"minutes" => $minutes
65+
"seconds" => $this->scheduler->getStaleJobTimeoutSeconds()
8766
],
8867
[
8968
"groupName" => Types::STRING,
90-
"minutes" => Types::INTEGER
69+
"seconds" => Types::INTEGER
9170
]
9271
);
9372
}
9473

9574
public function getFailedJobCount(string $groupName): int {
96-
$tableName = ScheduledJob::TABLE_NAME;
97-
$query = <<<MySQL
98-
SELECT COUNT(*) FROM {$tableName}
99-
WHERE claimed LIKE 'failed(%)'
100-
AND groupname = :groupName
101-
MySQL;
10275
return $this->fetchOne(
103-
$query,
76+
static::FAILED_COUNT_QUERY,
10477
[
10578
'groupName' => $groupName
10679
],

Classes/Service/MySQLJobStatusService.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,41 @@
44

55
class MySQLJobStatusService extends JobStatusService {
66

7+
protected const string TOTAL_COUNT_QUERY = <<<MySQL
8+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
9+
WHERE groupname = :groupName
10+
MySQL;
11+
12+
protected const string RUNNING_COUNT_QUERY = <<<MySQL
13+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
14+
WHERE running = 1
15+
AND claimed NOT LIKE 'failed(%)'
16+
AND groupname = :groupName
17+
AND activity > NOW() - INTERVAL :seconds SECOND
18+
MySQL;
19+
20+
protected const string PENDING_COUNT_QUERY = <<<MySQL
21+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
22+
WHERE ((running = 0
23+
AND claimed = '')
24+
OR running = 2)
25+
AND groupname = :groupName
26+
MySQL;
27+
28+
protected const string STALE_COUNT_QUERY = <<<MySQL
29+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
30+
WHERE running = 1
31+
AND claimed NOT LIKE 'failed(%)'
32+
AND groupname = :groupName
33+
AND activity <= NOW() - INTERVAL :seconds SECOND
34+
MySQL;
35+
36+
protected const string FAILED_COUNT_QUERY = <<<MySQL
37+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
38+
WHERE claimed LIKE 'failed(%)'
39+
AND groupname = :groupName
40+
MySQL;
41+
742
protected function fetchOne(string $query, array $params = [], array $types = []) {
843
return $this->scheduler->getConnection()->fetchOneReadUncommited($query, $params, $types);
944
}

Classes/Service/PostgreSQLJobStatusService.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,39 @@
44

55
class PostgreSQLJobStatusService extends JobStatusService {
66

7+
protected const string TOTAL_COUNT_QUERY = <<<PostgreSQL
8+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
9+
WHERE groupname = :groupName
10+
PostgreSQL;
11+
12+
protected const string RUNNING_COUNT_QUERY = <<<PostgreSQL
13+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
14+
WHERE running = 1
15+
AND claimed NOT LIKE 'failed(%)'
16+
AND groupname = :groupName
17+
AND activity > NOW() - make_interval(secs => :seconds)
18+
PostgreSQL;
19+
20+
protected const string PENDING_COUNT_QUERY = <<<PostgreSQL
21+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
22+
WHERE ((running = 0
23+
AND claimed = '')
24+
OR running = 2)
25+
AND groupname = :groupName
26+
PostgreSQL;
27+
28+
protected const string STALE_COUNT_QUERY = <<<PostgreSQL
29+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
30+
WHERE running = 1
31+
AND claimed NOT LIKE 'failed(%)'
32+
AND groupname = :groupName
33+
AND activity <= NOW() - make_interval(secs => :seconds)
34+
PostgreSQL;
35+
36+
protected const string FAILED_COUNT_QUERY = <<<PostgreSQL
37+
SELECT COUNT(*) FROM netlogix_jobqueue_scheduled_job
38+
WHERE claimed LIKE 'failed(%)'
39+
AND groupname = :groupName
40+
PostgreSQL;
41+
742
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
Netlogix:
2+
JobQueue:
3+
Scheduled:
4+
5+
# Time (in seconds) of inactivity (no updates to the jobs activity timestamp column) after which a job
6+
# is considered stale and needs to be reset (see SchedulerCommandController::resetStaleJobsCommand)
7+
# in order to be picked up again.
8+
staleJobTimeout: 60

0 commit comments

Comments
 (0)