Skip to content

Commit 0844c4a

Browse files
authored
Merge pull request #4 from michalsn/feat/redis
feat: redis
2 parents b27a50e + ba832bc commit 0844c4a

17 files changed

Lines changed: 1152 additions & 10 deletions

.github/workflows/phpunit.yml

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,19 @@ on:
2121
jobs:
2222
main:
2323
name: PHP ${{ matrix.php-versions }} Unit Tests
24-
runs-on: ubuntu-latest
24+
runs-on: ubuntu-22.04
25+
26+
services:
27+
redis:
28+
image: redis
29+
ports:
30+
- 6379:6379
31+
options: >-
32+
--health-cmd "redis-cli ping"
33+
--health-interval=10s
34+
--health-timeout=5s
35+
--health-retries=3
36+
2537
if: "!contains(github.event.head_commit.message, '[ci skip]')"
2638
strategy:
2739
matrix:
@@ -36,7 +48,7 @@ jobs:
3648
with:
3749
php-version: ${{ matrix.php-versions }}
3850
tools: composer, phive, phpunit
39-
extensions: intl, json, mbstring, gd, xdebug, xml, sqlite3
51+
extensions: intl, json, mbstring, gd, xdebug, xml, sqlite3, redis
4052
coverage: xdebug
4153
env:
4254
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -79,6 +91,7 @@ jobs:
7991
coveralls:
8092
needs: [main]
8193
name: Coveralls Finished
94+
if: github.repository_owner == 'michalsn'
8295
runs-on: ubuntu-latest
8396
steps:
8497
- name: Upload Coveralls results

composer.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "Queues for CodeIgniter 4 framework",
44
"license": "MIT",
55
"type": "library",
6-
"keywords": ["codeigniter", "codeigniter4", "queue"],
6+
"keywords": ["codeigniter", "codeigniter4", "queue", "database", "redis", "predis"],
77
"authors": [
88
{
99
"name": "michalsn",
@@ -18,7 +18,7 @@
1818
"require-dev": {
1919
"codeigniter4/devkit": "^1.0",
2020
"codeigniter4/framework": "^4.4",
21-
"rector/rector": "0.18.6"
21+
"predis/predis": "^2.0"
2222
},
2323
"minimum-stability": "dev",
2424
"prefer-stable": true,
@@ -32,6 +32,10 @@
3232
"Tests\\": "tests"
3333
}
3434
},
35+
"suggest": {
36+
"ext-redis": "If you want to use RedisHandler",
37+
"predis/predis": "If you want to use PredisHandler"
38+
},
3539
"config": {
3640
"allow-plugins": {
3741
"phpstan/extension-installer": true

docs/configuration.md

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ Available options:
1515
- [$defaultHandler](#defaultHandler)
1616
- [$handlers](#handlers)
1717
- [$database](#database)
18+
- [$redis](#redis)
19+
- [$predis](#predis)
1820
- [$keepDoneJobs](#keepDoneJobs)
1921
- [$keepFailedJobs](#keepFailedJobs)
2022
- [$queueDefaultPriority](#queueDefaultPriority)
@@ -27,7 +29,7 @@ The default handler used by the library. Default value: `database`.
2729

2830
### $handlers
2931

30-
An array of available handlers. By now only `database` handler is implemented.
32+
An array of available handlers. By now only `database`, `redis` and `predis` handlers are implemented.
3133

3234
### $database
3335

@@ -36,6 +38,29 @@ The configuration settings for `database` handler.
3638
* `dbGroup` - The database group to use. Default value: `default`.
3739
* `getShared` - Weather to use shared instance. Default value: `true`.
3840

41+
### $redis
42+
43+
The configuration settings for `redis` handler. You need to have a [ext-redis](https://github.com/phpredis/phpredis) installed to use it.
44+
45+
* `host` - The host name or unix socket. Default value: `127.0.0.1`.
46+
* `password` - The password. Default value: `null`.
47+
* `port` - The port number. Default value: `6379`.
48+
* `timeout` - The timeout for connection. Default value: `0`.
49+
* `database` - The database number. Default value: `0`.
50+
* `prefix` - The default key prefix. Default value: `''` (not set).
51+
52+
### $predis
53+
54+
The configuration settings for `predis` handler. You need to have [Predis](https://github.com/predis/predis) installed to use it.
55+
56+
* `scheme` - The scheme to use: `tcp`, `tls` or `unix`. Default value: `tcp`.
57+
* `host` - The host name. Default value: `127.0.0.1`.
58+
* `password` - The password. Default value: `null`.
59+
* `port` - The port number (when `tcp`). Default value: `6379`.
60+
* `timeout` - The timeout for connection. Default value: `5`.
61+
* `database` - The database number. Default value: `0`.
62+
* `prefix` - The default key prefix. Default value: `''` (not set).
63+
3964
### $keepDoneJobs
4065

4166
If the job is done, should we keep it in the table? Default value: `false`.

phpstan.neon.dist

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,25 @@ parameters:
2020
message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Handlers\\BaseHandler::push\(\).#'
2121
paths:
2222
- src/Handlers/BaseHandler.php
23+
-
24+
message: '#Call to deprecated function random_string\(\):#'
25+
paths:
26+
- src/Handlers/RedisHandler.php
27+
- src/Handlers/PredisHandler.php
28+
-
29+
message: '#Cannot access property \$timestamp on array\|bool\|float\|int\|object\|string.#'
30+
paths:
31+
- tests/_support/Database/Seeds/TestRedisQueueSeeder.php
2332
-
2433
message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#'
2534
paths:
2635
- src/Handlers/BaseHandler.php
2736
- src/Handlers/DatabaseHandler.php
37+
- src/Handlers/RedisHandler.php
38+
- src/Handlers/PredisHandler.php
2839
- src/Models/QueueJobModel.php
40+
- tests/RedisHandlerTest.php
41+
- tests/PredisHandlerTest.php
2942
-
3043
message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::affectedRows\(\).#'
3144
paths:

src/Config/Queue.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use CodeIgniter\Config\BaseConfig;
66
use Michalsn\CodeIgniterQueue\Exceptions\QueueException;
77
use Michalsn\CodeIgniterQueue\Handlers\DatabaseHandler;
8+
use Michalsn\CodeIgniterQueue\Handlers\PredisHandler;
9+
use Michalsn\CodeIgniterQueue\Handlers\RedisHandler;
810

911
class Queue extends BaseConfig
1012
{
@@ -18,6 +20,8 @@ class Queue extends BaseConfig
1820
*/
1921
public array $handlers = [
2022
'database' => DatabaseHandler::class,
23+
'redis' => RedisHandler::class,
24+
'predis' => PredisHandler::class,
2125
];
2226

2327
/**
@@ -28,6 +32,31 @@ class Queue extends BaseConfig
2832
'getShared' => true,
2933
];
3034

35+
/**
36+
* Redis handler config.
37+
*/
38+
public array $redis = [
39+
'host' => '127.0.0.1',
40+
'password' => null,
41+
'port' => 6379,
42+
'timeout' => 0,
43+
'database' => 0,
44+
'prefix' => '',
45+
];
46+
47+
/**
48+
* Predis handler config.
49+
*/
50+
public array $predis = [
51+
'scheme' => 'tcp',
52+
'host' => '127.0.0.1',
53+
'password' => null,
54+
'port' => 6379,
55+
'timeout' => 5,
56+
'database' => 0,
57+
'prefix' => '',
58+
];
59+
3160
/**
3261
* Whether to keep the DONE jobs in the queue.
3362
*/

src/Exceptions/QueueException.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ public static function forIncorrectHandler(): static
1111
return new self(lang('Queue.incorrectHandler'));
1212
}
1313

14+
public static function forIncorrectQueueFormat(): static
15+
{
16+
return new self(lang('Queue.incorrectQueueFormat'));
17+
}
18+
19+
public static function forTooLongQueueName(): static
20+
{
21+
return new self(lang('Queue.tooLongQueueName'));
22+
}
23+
1424
public static function forIncorrectJobHandler(): static
1525
{
1626
return new self(lang('Queue.incorrectJobHandler'));

src/Handlers/BaseHandler.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ protected function logFailed(QueueJob $queueJob, Throwable $err): bool
136136
*/
137137
protected function validateJobAndPriority(string $queue, string $job): void
138138
{
139+
// Validate queue
140+
$this->validateQueue($queue);
141+
139142
// Validate jobHandler.
140143
if (! in_array($job, array_keys($this->config->jobHandlers), true)) {
141144
throw QueueException::forIncorrectJobHandler();
@@ -150,4 +153,18 @@ protected function validateJobAndPriority(string $queue, string $job): void
150153
throw QueueException::forIncorrectQueuePriority($this->priority, $queue);
151154
}
152155
}
156+
157+
/**
158+
* Validate queue name.
159+
*/
160+
protected function validateQueue(string $queue): void
161+
{
162+
if (! preg_match('/^[a-z0-9_-]+$/', $queue)) {
163+
throw QueueException::forIncorrectQueueFormat();
164+
}
165+
166+
if (strlen($queue) > 64) {
167+
throw QueueException::forTooLongQueueName();
168+
}
169+
}
153170
}

src/Handlers/PredisHandler.php

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
<?php
2+
3+
namespace Michalsn\CodeIgniterQueue\Handlers;
4+
5+
use CodeIgniter\Exceptions\CriticalError;
6+
use CodeIgniter\I18n\Time;
7+
use Exception;
8+
use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig;
9+
use Michalsn\CodeIgniterQueue\Entities\QueueJob;
10+
use Michalsn\CodeIgniterQueue\Enums\Status;
11+
use Michalsn\CodeIgniterQueue\Interfaces\QueueInterface;
12+
use Michalsn\CodeIgniterQueue\Payload;
13+
use Predis\Client;
14+
use Throwable;
15+
16+
class PredisHandler extends BaseHandler implements QueueInterface
17+
{
18+
private readonly Client $predis;
19+
20+
public function __construct(protected QueueConfig $config)
21+
{
22+
try {
23+
$this->predis = new Client($config->predis, ['prefix' => $config->predis['prefix']]);
24+
$this->predis->time();
25+
} catch (Exception $e) {
26+
throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').');
27+
}
28+
}
29+
30+
/**
31+
* Add job to the queue.
32+
*/
33+
public function push(string $queue, string $job, array $data): bool
34+
{
35+
$this->validateJobAndPriority($queue, $job);
36+
37+
helper('text');
38+
39+
$queueJob = new QueueJob([
40+
'id' => random_string('numeric', 16),
41+
'queue' => $queue,
42+
'payload' => new Payload($job, $data),
43+
'priority' => $this->priority,
44+
'status' => Status::PENDING->value,
45+
'attempts' => 0,
46+
'available_at' => Time::now()->timestamp,
47+
]);
48+
49+
$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => Time::now()->timestamp]);
50+
51+
$this->priority = null;
52+
53+
return $result > 0;
54+
}
55+
56+
/**
57+
* Get job from the queue.
58+
*/
59+
public function pop(string $queue, array $priorities): ?QueueJob
60+
{
61+
$tasks = [];
62+
$now = Time::now()->timestamp;
63+
64+
foreach ($priorities as $priority) {
65+
if ($tasks = $this->predis->zrangebyscore("queues:{$queue}:{$priority}", '-inf', $now, ['LIMIT' => [0, 1]])) {
66+
if ($this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks)) {
67+
break;
68+
}
69+
$tasks = [];
70+
}
71+
}
72+
73+
if (empty($tasks[0])) {
74+
return null;
75+
}
76+
77+
$queueJob = new QueueJob(json_decode((string) $tasks[0], true));
78+
79+
// Set the actual status as in DB.
80+
$queueJob->status = Status::RESERVED->value;
81+
$queueJob->syncOriginal();
82+
83+
$this->predis->hset("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob));
84+
85+
return $queueJob;
86+
}
87+
88+
/**
89+
* Schedule job for later
90+
*/
91+
public function later(QueueJob $queueJob, int $seconds): bool
92+
{
93+
$queueJob->status = Status::PENDING->value;
94+
$queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp;
95+
96+
if ($result = $this->predis->zadd("queues:{$queueJob->queue}:{$queueJob->priority}", [json_encode($queueJob) => $queueJob->available_at->timestamp])) {
97+
$this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
98+
}
99+
100+
return $result > 0;
101+
}
102+
103+
/**
104+
* Move job to failed table or move and delete.
105+
*/
106+
public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
107+
{
108+
if ($keepJob) {
109+
$this->logFailed($queueJob, $err);
110+
}
111+
112+
return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
113+
}
114+
115+
/**
116+
* Change job status to DONE or delete it.
117+
*/
118+
public function done(QueueJob $queueJob, bool $keepJob): bool
119+
{
120+
if ($keepJob) {
121+
$queueJob->status = Status::DONE->value;
122+
$this->predis->lpush("queues:{$queueJob->queue}::done", [json_encode($queueJob)]);
123+
}
124+
125+
return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
126+
}
127+
128+
/**
129+
* Delete queue jobs
130+
*/
131+
public function clear(?string $queue = null): bool
132+
{
133+
if ($queue !== null) {
134+
if ($keys = $this->predis->keys("queues:{$queue}:*")) {
135+
return $this->predis->del($keys) > 0;
136+
}
137+
138+
return true;
139+
}
140+
141+
if ($keys = $this->predis->keys('queues:*')) {
142+
return $this->predis->del($keys) > 0;
143+
}
144+
145+
return true;
146+
}
147+
}

0 commit comments

Comments
 (0)