Skip to content

Commit 0a8cdca

Browse files
committed
Added ExponentialAsyncDelayRecoverableExceptionHandler.
Changed default exception handler for AsyncImportSpecification. Added base Specification class for both sync and async specifications.
1 parent 26c1cb2 commit 0a8cdca

15 files changed

Lines changed: 447 additions & 288 deletions

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"php": "^7.1",
1313
"scriptfusion/static-class": "^1",
1414
"scriptfusion/retry": "^2.1",
15-
"scriptfusion/retry-exception-handlers": "^1.1",
15+
"scriptfusion/retry-exception-handlers": "^1.2",
1616
"amphp/amp": "^2",
1717
"psr/container": "^1",
1818
"psr/cache": "^1"

src/Connector/ImportConnectorFactory.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace ScriptFUSION\Porter\Connector;
55

6-
use ScriptFUSION\Porter\Specification\ImportSpecification;
6+
use ScriptFUSION\Porter\Specification\Specification;
77
use ScriptFUSION\StaticClass;
88

99
final class ImportConnectorFactory
@@ -12,11 +12,11 @@ final class ImportConnectorFactory
1212

1313
/**
1414
* @param Connector|AsyncConnector $connector
15-
* @param ImportSpecification $specification
15+
* @param Specification $specification
1616
*
1717
* @return ImportConnector
1818
*/
19-
public static function create($connector, ImportSpecification $specification): ImportConnector
19+
public static function create($connector, Specification $specification): ImportConnector
2020
{
2121
return new ImportConnector(
2222
$connector,
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Connector\Recoverable;
5+
6+
use Amp\Promise;
7+
use ScriptFUSION\Retry\ExceptionHandler\AsyncExponentialBackoffExceptionHandler;
8+
9+
/**
10+
* Delays async execution for an exponentially increasing series of delays.
11+
*/
12+
class ExponentialAsyncDelayRecoverableExceptionHandler implements RecoverableExceptionHandler
13+
{
14+
private $initialDelay;
15+
16+
/**
17+
* @var AsyncExponentialBackoffExceptionHandler
18+
*/
19+
private $handler;
20+
21+
/**
22+
* Initializes this instance with the specified initial delay. The initial delay will be used when the first
23+
* exception is handled; subsequent exceptions will cause longer delays.
24+
*
25+
* @param int $initialDelay Initial delay in milliseconds.
26+
*/
27+
public function __construct(int $initialDelay = AsyncExponentialBackoffExceptionHandler::DEFAULT_COEFFICIENT)
28+
{
29+
$this->initialDelay = $initialDelay;
30+
}
31+
32+
public function initialize(): void
33+
{
34+
$this->handler = new AsyncExponentialBackoffExceptionHandler($this->initialDelay);
35+
}
36+
37+
public function __invoke(RecoverableException $exception): ?Promise
38+
{
39+
return ($this->handler)();
40+
}
41+
}

src/Connector/Recoverable/ExponentialSleepRecoverableExceptionHandler.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,22 @@
77
use ScriptFUSION\Retry\ExceptionHandler\ExponentialBackoffExceptionHandler;
88

99
/**
10-
* Sleeps for an exponentially increasing series of delays specified in microseconds.
10+
* Sleeps for an exponentially increasing series of delays.
1111
*/
1212
class ExponentialSleepRecoverableExceptionHandler implements RecoverableExceptionHandler
1313
{
1414
private $initialDelay;
1515

16+
/**
17+
* @var ExponentialBackoffExceptionHandler
18+
*/
1619
private $handler;
1720

1821
/**
1922
* Initializes this instance with the specified initial delay. The initial delay will be used when the first
2023
* exception is handled; subsequent exceptions will cause longer delays.
2124
*
22-
* @param int $initialDelay Initial delay.
25+
* @param int $initialDelay Initial delay in microseconds.
2326
*/
2427
public function __construct(int $initialDelay = ExponentialBackoffExceptionHandler::DEFAULT_COEFFICIENT)
2528
{
@@ -33,6 +36,8 @@ public function initialize(): void
3336

3437
public function __invoke(RecoverableException $exception): ?Promise
3538
{
36-
return ($this->handler)($exception);
39+
($this->handler)();
40+
41+
return null;
3742
}
3843
}

src/Porter.php

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ public function __construct(ContainerInterface $providers)
5959
* @param ImportSpecification $specification Import specification.
6060
*
6161
* @return PorterRecords|CountablePorterRecords
62-
*
63-
* @throws ImportException Provider failed to return an iterator.
6462
*/
6563
public function import(ImportSpecification $specification): PorterRecords
6664
{
@@ -128,6 +126,13 @@ private function fetch(ImportSpecification $specification): \Iterator
128126
return $resource->fetch(ImportConnectorFactory::create($connector, $specification));
129127
}
130128

129+
/**
130+
* Imports data asynchronously according to the design of the specified asynchronous import specification.
131+
*
132+
* @param AsyncImportSpecification $specification Asynchronous import specification.
133+
*
134+
* @return AsyncPorterRecords|CountableAsyncPorterRecords
135+
*/
131136
public function importAsync(AsyncImportSpecification $specification): AsyncRecordCollection
132137
{
133138
$specification = clone $specification;
@@ -143,6 +148,13 @@ public function importAsync(AsyncImportSpecification $specification): AsyncRecor
143148
return $this->createAsyncPorterRecords($records, $specification);
144149
}
145150

151+
/**
152+
* Imports one record according to the design of the specified asynchronous import specification.
153+
*
154+
* @param AsyncImportSpecification $specification Asynchronous import specification.
155+
*
156+
* @return Promise Promise that resolves to a record.
157+
*/
146158
public function importOneAsync(AsyncImportSpecification $specification): Promise
147159
{
148160
return \Amp\call(function () use ($specification) {
@@ -210,17 +222,19 @@ private function transformRecords(RecordCollection $records, array $transformers
210222
return $records;
211223
}
212224

225+
/**
226+
* @param AsyncRecordCollection $records
227+
* @param AsyncTransformer[] $transformers
228+
* @param mixed $context
229+
*
230+
* @return AsyncRecordCollection
231+
*/
213232
private function transformAsync(
214233
AsyncRecordCollection $records,
215234
array $transformers,
216235
$context
217236
): AsyncRecordCollection {
218237
foreach ($transformers as $transformer) {
219-
if (!$transformer instanceof AsyncTransformer) {
220-
// TODO: Proper exception or separate async stack.
221-
throw new \RuntimeException('Cannot use sync transformer.');
222-
}
223-
224238
if ($transformer instanceof PorterAware) {
225239
$transformer->setPorter($this);
226240
}

src/Provider/Resource/PseudoBisyncResource.php

Lines changed: 0 additions & 35 deletions
This file was deleted.

src/Specification/AsyncImportSpecification.php

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,51 @@
33

44
namespace ScriptFUSION\Porter\Specification;
55

6+
use ScriptFUSION\Porter\Connector\Recoverable\ExponentialAsyncDelayRecoverableExceptionHandler;
7+
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
68
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;
7-
use ScriptFUSION\Porter\Provider\Resource\ProviderResource;
8-
use ScriptFUSION\Porter\Provider\Resource\PseudoBisyncResource;
9+
use ScriptFUSION\Porter\Transform\AnysyncTransformer;
10+
use ScriptFUSION\Porter\Transform\AsyncTransformer;
911

1012
/**
11-
* TODO: Use trait instead of extending ImportSpecification. Async specs are NOT a type of sync spec.
13+
* Specifies which resource to import asynchronously and how the data should be transformed.
1214
*/
13-
class AsyncImportSpecification extends ImportSpecification
15+
class AsyncImportSpecification extends Specification
1416
{
1517
private $asyncResource;
1618

1719
public function __construct(AsyncResource $resource)
1820
{
19-
if (!$resource instanceof ProviderResource) {
20-
$resource = new PseudoBisyncResource($resource);
21-
}
22-
23-
parent::__construct($resource);
24-
2521
$this->asyncResource = $resource;
22+
23+
parent::__construct();
2624
}
2725

2826
public function __clone()
2927
{
30-
parent::__clone();
31-
3228
$this->asyncResource = clone $this->asyncResource;
29+
30+
parent::__clone();
3331
}
3432

3533
final public function getAsyncResource(): AsyncResource
3634
{
3735
return $this->asyncResource;
3836
}
37+
38+
final public function addTransformer(AnysyncTransformer $transformer): Specification
39+
{
40+
if (!$transformer instanceof AsyncTransformer) {
41+
throw new IncompatibleTransformerException(
42+
'Transformer does not implement interface: ' . AsyncTransformer::class . '.'
43+
);
44+
}
45+
46+
return parent::addTransformer($transformer);
47+
}
48+
49+
protected static function createDefaultRecoverableExceptionHandler(): RecoverableExceptionHandler
50+
{
51+
return new ExponentialAsyncDelayRecoverableExceptionHandler;
52+
}
3953
}

0 commit comments

Comments
 (0)