Skip to content

Commit 42e0266

Browse files
committed
- code cleanup and refactor
- moved to php 7.1 - libs update
1 parent 1fe41a2 commit 42e0266

8 files changed

Lines changed: 100 additions & 162 deletions

File tree

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ I serialise callable function and sent to child process by exec. To get callback
2626
### Example ?
2727
Sure take a look - https://github.com/krowinski/async/blob/master/example/example.php
2828

29+
### User cases (mostly some code on website that user don't need to wait for)
30+
- send callback
31+
- publish to queue amqp
32+
- send external analytic data
33+
- remove files
34+
- process payments
35+
2936
### Supports M$ Windows?
3037
NO.
3138

bin/console

100644100755
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
#!/usr/bin/env php
12
<?php
2-
3+
declare(strict_types=1);
34

45
namespace bin;
56

67
array_map(
7-
function ($autoloadFile) {
8+
static function ($autoloadFile) {
89
if (is_file($autoloadFile)) {
10+
/** @noinspection PhpIncludeInspection */
911
include $autoloadFile;
1012
}
1113
},

composer.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22
"name": "krowinski/async",
33
"description": "Run php closure asynchronously",
44
"keywords": [
5-
"php", "async"
5+
"php",
6+
"async"
67
],
78
"type": "library",
89
"require": {
9-
"php": ">=5.6",
10+
"php": ">=7.1",
1011
"ext-pcntl": "*",
11-
"symfony/process": "^2.7|^3.3|^4.0",
12-
"symfony/console": "^2.7|^3.3|^4.0",
13-
"jeremeamia/superclosure": "^2.3"
12+
"symfony/process": "^4.0|^5.0",
13+
"symfony/console": "^4.0|^5.0",
14+
"jeremeamia/superclosure": "^2.4"
1415
},
1516
"license": "MIT",
1617
"authors": [

example/example.php

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,56 @@
11
<?php
2-
2+
/** @noinspection PhpComposerExtensionStubsInspection */
3+
declare(strict_types=1);
34

45
namespace bin;
56

6-
77
include __DIR__ . '/../vendor/autoload.php';
88

99
use Async\AsyncCall;
10+
use Exception;
11+
use RuntimeException;
1012

1113
$s = microtime(true);
1214

1315
AsyncCall::run(
14-
function () {
16+
static function () {
1517
sleep(2);
1618
// there is no callback so echo will not be showed and parent process will not w8
1719
echo 'sleep 2s' . PHP_EOL;
1820
}
1921
);
2022

2123
AsyncCall::run(
22-
function () {
24+
static function () {
2325
sleep(4);
2426
// echo will be catched in child process and returned as second parameter $stdOut
2527
echo 'sleep 4s' . PHP_EOL;
2628
},
27-
function ($results, $stdOut) {
29+
static function ($results, $stdOut) {
2830
echo $stdOut;
2931
}
3032
);
3133

3234
AsyncCall::run(
33-
function () {
34-
throw new \RuntimeException('bar');
35+
static function () {
36+
throw new RuntimeException('bar');
3537
}
3638
);
3739

3840
AsyncCall::run(
39-
function () {
40-
throw new \RuntimeException('foo');
41+
static function () {
42+
throw new RuntimeException('foo');
4143
},
42-
function () {
44+
static function () {
4345
},
44-
function (\Exception $error) {
46+
static function (Exception $error) {
4547
// we will get error
4648
assert($error->getMessage() === 'foo');
4749
}
4850
);
4951

5052
AsyncCall::run(
51-
function () {
53+
static function () {
5254
// if this is in parent, child will not see this
5355
function getPage($url)
5456
{
@@ -64,7 +66,7 @@ function getPage($url)
6466
// this will be returned to callback as first parameter
6567
return getPage('example.com');
6668
},
67-
function ($results) {
69+
static function ($results) {
6870
echo $results;
6971
}
7072
);
@@ -77,10 +79,10 @@ function ($results) {
7779
while ($i--) {
7880
// this will start 2 process and then wait them to finish before starting second one
7981
AsyncCall::run(
80-
function () {
82+
static function () {
8183
sleep(1);
8284
},
83-
function () {
85+
static function () {
8486
}
8587
);
8688
}
@@ -89,5 +91,6 @@ function () {
8991

9092

9193
echo PHP_EOL;
94+
echo 'Script ended: ';
9295
echo microtime(true) - $s;
9396
echo PHP_EOL;

src/Async/AsyncCall.php

Lines changed: 29 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
<?php
2-
2+
declare(strict_types=1);
33

44
namespace Async;
55

66
use SuperClosure\Serializer;
77
use Symfony\Component\Console\Exception\InvalidArgumentException;
88

9-
/**
10-
* Class AsyncCall
11-
* @package Async
12-
*/
139
class AsyncCall
1410
{
15-
const CONSOLE_EXECUTE = 'php ' . __DIR__ . '/../../bin/console app:run-child-process ';
11+
private const CONSOLE_LOCATION = __DIR__ . '/../../bin/console';
1612

1713
/**
1814
* @var bool
@@ -30,87 +26,76 @@ class AsyncCall
3026
* @var int
3127
*/
3228
private static $processesLimit = 0;
29+
private static $processAmount = 0;
3330

34-
/**
35-
* @param $processesLimit
36-
* @throws \Symfony\Component\Console\Exception\InvalidArgumentException
37-
*/
38-
public static function setProcessLimit($processesLimit)
31+
public static function setProcessLimit(int $processesLimit): void
3932
{
4033
if ($processesLimit < 0) {
41-
throw new InvalidArgumentException('Processes limit Must be possitive itiger');
34+
throw new InvalidArgumentException('Processes limit Must be positive integer');
4235
}
43-
self::$processesLimit = (int)$processesLimit;
36+
self::$processesLimit = $processesLimit;
4437
}
4538

46-
/**
47-
* @param callable $job
48-
* @param callable $callback
49-
* @param callable $onError
50-
* @param float $timeout
51-
* @param float $idleTimeout
52-
* @throws \Symfony\Component\Process\Exception\InvalidArgumentException
53-
* @throws \Symfony\Component\Process\Exception\RuntimeException
54-
* @throws \Symfony\Component\Process\Exception\LogicException
55-
* @throws \RuntimeException
56-
*/
5739
public static function run(
5840
callable $job,
5941
callable $callback = null,
6042
callable $onError = null,
61-
$timeout = null,
62-
$idleTimeout = null
63-
) {
43+
float $timeout = null,
44+
float $idleTimeout = null
45+
): void {
6446
self::registerShutdownFunction();
6547

6648
if (!self::$serializer) {
6749
self::$serializer = new Serializer();
6850
}
6951

7052
// we got process limit so wait for them to finish
71-
if (0 !== self::$processesLimit && self::$processesLimit >= count(self::$processList)) {
53+
if (0 !== self::$processesLimit && self::$processesLimit >= self::$processAmount) {
7254
self::waitForProcessesToFinish(self::$processesLimit);
7355
}
7456

75-
$process = new AsyncProcess(self::CONSOLE_EXECUTE . base64_encode(self::$serializer->serialize($job)));
57+
$process = new AsyncProcess(
58+
[
59+
self::CONSOLE_LOCATION,
60+
AsyncChildCommand::COMMAND_NAME,
61+
base64_encode(self::$serializer->serialize($job))
62+
]
63+
);
7664
$process->setTimeout($timeout);
7765
$process->setIdleTimeout($idleTimeout);
7866
$process->startJob($callback, $onError);
7967

80-
//echo $process->getCommandLine() . PHP_EOL;
8168
self::$processList[] = $process;
69+
self::$processAmount++;
8270
}
8371

84-
private static function registerShutdownFunction()
72+
private static function registerShutdownFunction(): void
8573
{
8674
if (!self::$shutdownFunctionRegistered) {
8775
register_shutdown_function(
88-
function () {
76+
static function () {
8977
self::waitForProcessesToFinish();
9078
}
9179
);
9280
self::$shutdownFunctionRegistered = true;
9381
}
9482
}
9583

96-
/**
97-
* @param int $maxProcessToWait
98-
*/
99-
private static function waitForProcessesToFinish($maxProcessToWait = 0)
84+
private static function waitForProcessesToFinish(int $maxProcessToWait = 0): void
10085
{
101-
while (true) {
102-
$processAmount = count(self::$processList);
103-
104-
if (0 === $processAmount) {
105-
break;
106-
}
107-
if ($maxProcessToWait > $processAmount) {
86+
for (; ;) {
87+
if (0 === self::$processAmount || $maxProcessToWait > self::$processAmount) {
10888
break;
10989
}
11090

11191
foreach (self::$processList as $i => $process) {
112-
if ($process->getStatus() === AsyncProcess::STATUS_TERMINATED || (!$process->hasCallbackSet() && !$process->hasOnErrorSet())) {
92+
if (
93+
$process->getStatus() === AsyncProcess::STATUS_TERMINATED ||
94+
(!$process->hasCallbackSet() && !$process->hasOnErrorSet())
95+
) {
11396
unset(self::$processList[$i]);
97+
self::$processAmount--;
98+
11499
continue;
115100
}
116101
}

src/Async/AsyncChildCommand.php

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,29 @@
11
<?php
2-
2+
declare(strict_types=1);
33

44
namespace Async;
55

6-
6+
use Exception;
77
use SuperClosure\Serializer;
88
use Symfony\Component\Console\Command\Command;
99
use Symfony\Component\Console\Input\InputArgument;
1010
use Symfony\Component\Console\Input\InputInterface;
1111
use Symfony\Component\Console\Output\OutputInterface;
1212

13-
/**
14-
* Class AsyncChildCommand
15-
* @package Async
16-
*/
1713
class AsyncChildCommand extends Command
1814
{
19-
const PARAM_NAME_JOB = 'job';
20-
21-
/**
22-
* @var Serializer
23-
*/
15+
public const COMMAND_NAME = 'app:run-child-process';
16+
private const PARAM_NAME_JOB = 'job';
2417
private $serializer;
2518

26-
/**
27-
* AsyncChildCommand constructor.
28-
* @param null $name
29-
* @throws \Symfony\Component\Console\Exception\LogicException
30-
*/
31-
public function __construct($name = null)
19+
public function __construct(?string $name = null)
3220
{
3321
parent::__construct($name);
3422

3523
$this->serializer = new Serializer();
3624
}
3725

38-
/**
39-
* @throws \Symfony\Component\Console\Exception\InvalidArgumentException
40-
*/
41-
protected function configure()
42-
{
43-
$this
44-
->setName('app:run-child-process')
45-
->setDescription('Runs a child process.')
46-
->addArgument(self::PARAM_NAME_JOB, InputArgument::REQUIRED, 'Serialized callback job param.');
47-
}
48-
49-
/**
50-
* @param InputInterface $input
51-
* @param OutputInterface $output
52-
* @return int
53-
* @throws \Symfony\Component\Console\Exception\InvalidArgumentException
54-
* @throws \SuperClosure\Exception\ClosureUnserializationException
55-
*/
56-
public function execute(InputInterface $input, OutputInterface $output)
26+
public function execute(InputInterface $input, OutputInterface $output): int
5727
{
5828
try {
5929
$job = $this->serializer->unserialize(base64_decode($input->getArgument(self::PARAM_NAME_JOB)));
@@ -62,7 +32,7 @@ public function execute(InputInterface $input, OutputInterface $output)
6232
$jobResults = $job();
6333
$ob = ob_get_clean();
6434
$error = null;
65-
} catch (\Exception $exception) {
35+
} catch (Exception $exception) {
6636
$jobResults = null;
6737
$ob = null;
6838
$error = $exception;
@@ -72,4 +42,12 @@ public function execute(InputInterface $input, OutputInterface $output)
7242

7343
return 0;
7444
}
45+
46+
protected function configure(): void
47+
{
48+
$this
49+
->setName(self::COMMAND_NAME)
50+
->setDescription('Runs a child process.')
51+
->addArgument(self::PARAM_NAME_JOB, InputArgument::REQUIRED, 'Serialized callback job param.');
52+
}
7553
}

0 commit comments

Comments
 (0)