Skip to content

Commit 8437b5a

Browse files
committed
Added AsyncTransformer.
Added AsyncTransformer support to FilterTransformer. Added Porter async FilterTransformer integration test.
1 parent e15e980 commit 8437b5a

13 files changed

Lines changed: 123 additions & 40 deletions

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"scriptfusion/static-class": "^1",
1414
"scriptfusion/retry": "^1.1",
1515
"scriptfusion/retry-exception-handlers": "^1.1",
16-
"amphp/amp": "^2.0",
16+
"amphp/amp": "^2",
1717
"psr/container": "^1",
1818
"psr/cache": "^1"
1919
},

src/Collection/CountableRecordsTrait.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
<?php
2+
declare(strict_types=1);
3+
24
namespace ScriptFUSION\Porter\Collection;
35

46
trait CountableRecordsTrait
57
{
68
/** @var int */
79
private $count;
810

9-
public function count()
11+
public function count(): int
1012
{
1113
return $this->count;
1214
}
1315

14-
private function setCount($count)
16+
private function setCount(int $count): void
1517
{
1618
$this->count = $count | 0;
1719
}

src/Porter.php

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
<?php
2+
declare(strict_types=1);
3+
24
namespace ScriptFUSION\Porter;
35

6+
use Amp\Iterator;
47
use Amp\Producer;
58
use Amp\Promise;
69
use Psr\Container\ContainerInterface;
@@ -22,6 +25,7 @@
2225
use ScriptFUSION\Porter\Provider\Resource\ProviderResource;
2326
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
2427
use ScriptFUSION\Porter\Specification\ImportSpecification;
28+
use ScriptFUSION\Porter\Transform\AsyncTransformer;
2529
use ScriptFUSION\Porter\Transform\Transformer;
2630

2731
/**
@@ -58,7 +62,7 @@ public function __construct(ContainerInterface $providers)
5862
*
5963
* @throws ImportException Provider failed to return an iterator.
6064
*/
61-
public function import(ImportSpecification $specification)
65+
public function import(ImportSpecification $specification): PorterRecords
6266
{
6367
$specification = clone $specification;
6468

@@ -86,7 +90,7 @@ public function import(ImportSpecification $specification)
8690
*
8791
* @throws ImportException More than one record was imported.
8892
*/
89-
public function importOne(ImportSpecification $specification)
93+
public function importOne(ImportSpecification $specification): ?array
9094
{
9195
$results = $this->import($specification);
9296

@@ -103,7 +107,7 @@ public function importOne(ImportSpecification $specification)
103107
return $one;
104108
}
105109

106-
private function fetch(ProviderResource $resource, $providerName, ConnectionContext $context)
110+
private function fetch(ProviderResource $resource, $providerName, ConnectionContext $context): \Iterator
107111
{
108112
$provider = $this->getProvider($providerName ?: $resource->getProviderClassName());
109113

@@ -127,7 +131,7 @@ private function fetch(ProviderResource $resource, $providerName, ConnectionCont
127131
return $resource->fetch(new ImportConnector($connector, $context));
128132
}
129133

130-
public function importAsync(AsyncImportSpecification $specification): Producer
134+
public function importAsync(AsyncImportSpecification $specification): Iterator
131135
{
132136
$specification = clone $specification;
133137

@@ -137,15 +141,7 @@ public function importAsync(AsyncImportSpecification $specification): Producer
137141
ConnectionContextFactory::create($specification)
138142
);
139143

140-
// if (!$records instanceof ProviderRecords) {
141-
// $records = $this->createProviderRecords($records, $specification->getResource());
142-
// }
143-
144-
// $records = $this->transformRecords($records, $specification->getTransformers(), $specification->getContext());
145-
146-
// return $this->createPorterRecords($records, $specification);
147-
148-
return $records;
144+
return $this->transformAsync($records, $specification->getTransformers(), $specification->getContext());
149145
}
150146

151147
public function importOneAsync(AsyncImportSpecification $specification): Promise
@@ -157,15 +153,15 @@ public function importOneAsync(AsyncImportSpecification $specification): Promise
157153

158154
$one = $results->getCurrent();
159155

160-
if ((yield $results->advance()) !== false) {
156+
if (yield $results->advance()) {
161157
throw new ImportException('Cannot import one: more than one record imported.');
162158
}
163159

164160
return $one;
165161
});
166162
}
167163

168-
private function fetchAsync(AsyncResource $resource, $providerName, ConnectionContext $context): Producer
164+
private function fetchAsync(AsyncResource $resource, $providerName, ConnectionContext $context): Iterator
169165
{
170166
$provider = $this->getProvider($providerName ?: $resource->getProviderClassName());
171167

@@ -201,7 +197,7 @@ private function fetchAsync(AsyncResource $resource, $providerName, ConnectionCo
201197
*
202198
* @return RecordCollection
203199
*/
204-
private function transformRecords(RecordCollection $records, array $transformers, $context)
200+
private function transformRecords(RecordCollection $records, array $transformers, $context): RecordCollection
205201
{
206202
foreach ($transformers as $transformer) {
207203
if ($transformer instanceof PorterAware) {
@@ -214,19 +210,50 @@ private function transformRecords(RecordCollection $records, array $transformers
214210
return $records;
215211
}
216212

217-
private function createProviderRecords(\Iterator $records, ProviderResource $resource)
213+
private function transformAsync(Iterator $records, array $transformers, $context): Producer
214+
{
215+
return new Producer(static function (\Closure $emit) use ($records, $transformers, $context) {
216+
while (yield $records->advance()) {
217+
$record = $records->getCurrent();
218+
219+
foreach ($transformers as $transformer) {
220+
if (!$transformer instanceof AsyncTransformer) {
221+
// TODO: Proper exception or separate async stack.
222+
throw new \RuntimeException('Cannot use sync transformer.');
223+
}
224+
225+
$record = yield $transformer->transformAsync($record, $context);
226+
227+
if ($record === null) {
228+
// Do not process more transformers.
229+
break;
230+
}
231+
}
232+
233+
if ($record !== null) {
234+
if (!\is_array($record)) {
235+
throw new \RuntimeException('Unexpected type: record must be array or null.');
236+
}
237+
238+
yield $emit($record);
239+
}
240+
}
241+
});
242+
}
243+
244+
private function createProviderRecords(\Iterator $records, ProviderResource $resource): ProviderRecords
218245
{
219246
if ($records instanceof \Countable) {
220-
return new CountableProviderRecords($records, count($records), $resource);
247+
return new CountableProviderRecords($records, \count($records), $resource);
221248
}
222249

223250
return new ProviderRecords($records, $resource);
224251
}
225252

226-
private function createPorterRecords(RecordCollection $records, ImportSpecification $specification)
253+
private function createPorterRecords(RecordCollection $records, ImportSpecification $specification): PorterRecords
227254
{
228255
if ($records instanceof \Countable) {
229-
return new CountablePorterRecords($records, count($records), $specification);
256+
return new CountablePorterRecords($records, \count($records), $specification);
230257
}
231258

232259
return new PorterRecords($records, $specification);

src/Provider/Resource/AsyncResource.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace ScriptFUSION\Porter\Provider\Resource;
55

6-
use Amp\Producer;
6+
use Amp\Iterator;
77
use ScriptFUSION\Porter\Connector\ImportConnector;
88

99
interface AsyncResource
@@ -20,7 +20,7 @@ public function getProviderClassName(): string;
2020
*
2121
* @param ImportConnector $connector Connector.
2222
*
23-
* @return Producer Enumerable data series.
23+
* @return Iterator Enumerable data series.
2424
*/
25-
public function fetchAsync(ImportConnector $connector): Producer;
25+
public function fetchAsync(ImportConnector $connector): Iterator;
2626
}

src/Provider/Resource/PseudoBisyncResource.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace ScriptFUSION\Porter\Provider\Resource;
55

6-
use Amp\Producer;
6+
use Amp\Iterator;
77
use ScriptFUSION\Porter\Connector\ImportConnector;
88

99
/**
@@ -28,7 +28,7 @@ public function fetch(ImportConnector $connector): \Iterator
2828
throw new \LogicException('Not implemented. Did you call fetch() instead of fetchAsync()?');
2929
}
3030

31-
public function fetchAsync(ImportConnector $connector): Producer
31+
public function fetchAsync(ImportConnector $connector): Iterator
3232
{
3333
return $this->resource->fetchAsync($connector);
3434
}

src/Specification/AsyncImportSpecification.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
use ScriptFUSION\Porter\Provider\Resource\ProviderResource;
88
use ScriptFUSION\Porter\Provider\Resource\PseudoBisyncResource;
99

10+
/**
11+
* TODO: Use trait instead of extending ImportSpecification. Async specs are NOT a type of sync spec.
12+
*/
1013
class AsyncImportSpecification extends ImportSpecification
1114
{
1215
private $asyncResource;

src/Transform/AsyncTransformer.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Transform;
5+
6+
use Amp\Promise;
7+
8+
interface AsyncTransformer
9+
{
10+
public function transformAsync(array $record, $context): Promise;
11+
}

src/Transform/FilterTransformer.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
<?php
22
namespace ScriptFUSION\Porter\Transform;
33

4+
use Amp\Promise;
5+
use Amp\Success;
46
use ScriptFUSION\Porter\Collection\FilteredRecords;
57
use ScriptFUSION\Porter\Collection\RecordCollection;
68

7-
class FilterTransformer implements Transformer
9+
class FilterTransformer implements Transformer, AsyncTransformer
810
{
911
/**
1012
* @var callable
@@ -21,7 +23,7 @@ public function __construct(callable $filter)
2123

2224
public function transform(RecordCollection $records, $context)
2325
{
24-
$filter = function ($predicate) use ($records, $context) {
26+
$filter = static function ($predicate) use ($records, $context) {
2527
foreach ($records as $record) {
2628
if ($predicate($record, $context)) {
2729
yield $record;
@@ -31,4 +33,9 @@ public function transform(RecordCollection $records, $context)
3133

3234
return new FilteredRecords($filter($this->filter), $records, $filter);
3335
}
36+
37+
public function transformAsync(array $record, $context): Promise
38+
{
39+
return new Success(($this->filter)($record, $context) ? $record : null);
40+
}
3441
}

test/Integration/Porter/AsyncPorterTest.php

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
namespace ScriptFUSIONTest\Integration\Porter;
55

66
use Amp\Loop;
7+
use Amp\Producer;
78
use ScriptFUSION\Porter\Porter;
89
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
10+
use ScriptFUSION\Porter\Transform\FilterTransformer;
911

1012
/**
1113
* @see Porter
@@ -23,8 +25,8 @@ public function testImportAsync(): void
2325
{
2426
Loop::run(function () {
2527
$records = $this->porter->importAsync($this->specification);
26-
yield $records->advance();
2728

29+
self::assertTrue(yield $records->advance());
2830
self::assertSame(['foo'], $records->getCurrent());
2931
});
3032
}
@@ -35,4 +37,31 @@ public function testImportOneAsync(): void
3537
self::assertSame(['foo'], yield $this->porter->importOneAsync($this->specification));
3638
});
3739
}
40+
41+
public function testFilterAsync(): void
42+
{
43+
$this->resource->shouldReceive('fetchAsync')->andReturn(
44+
new Producer(static function (\Closure $emit): \Generator {
45+
foreach (range(1, 10) as $integer) {
46+
yield $emit([$integer]);
47+
}
48+
})
49+
);
50+
51+
$this->specification->addTransformer(
52+
new FilterTransformer(static function (array $record): int {
53+
return $record[0] % 2;
54+
})
55+
);
56+
57+
Loop::run(function () {
58+
$records = $this->porter->importAsync($this->specification);
59+
60+
while (yield $records->advance()) {
61+
$filtered[] = $records->getCurrent()[0];
62+
}
63+
64+
self::assertSame([1, 3, 5, 7, 9], $filtered);
65+
});
66+
}
3867
}

test/Integration/Porter/PorterTest.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77
use Mockery\MockInterface;
88
use PHPUnit\Framework\TestCase;
99
use Psr\Container\ContainerInterface;
10+
use ScriptFUSION\Porter\Connector\AsyncConnector;
1011
use ScriptFUSION\Porter\Connector\ConnectionContext;
1112
use ScriptFUSION\Porter\Connector\Connector;
1213
use ScriptFUSION\Porter\Porter;
14+
use ScriptFUSION\Porter\Provider\AsyncProvider;
1315
use ScriptFUSION\Porter\Provider\Provider;
16+
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;
1417
use ScriptFUSION\Porter\Provider\Resource\ProviderResource;
18+
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
1519
use ScriptFUSION\Porter\Specification\ImportSpecification;
1620
use ScriptFUSIONTest\MockFactory;
1721

@@ -25,22 +29,22 @@ abstract class PorterTest extends TestCase
2529
protected $porter;
2630

2731
/**
28-
* @var Provider|MockInterface
32+
* @var Provider|AsyncProvider|MockInterface
2933
*/
3034
protected $provider;
3135

3236
/**
33-
* @var ProviderResource|MockInterface
37+
* @var ProviderResource|AsyncResource|MockInterface
3438
*/
3539
protected $resource;
3640

3741
/**
38-
* @var Connector|MockInterface
42+
* @var Connector|AsyncConnector|MockInterface
3943
*/
4044
protected $connector;
4145

4246
/**
43-
* @var ImportSpecification
47+
* @var ImportSpecification|AsyncImportSpecification
4448
*/
4549
protected $specification;
4650

0 commit comments

Comments
 (0)