1818use ScriptFUSION \Porter \Connector \ConnectorOptions ;
1919use ScriptFUSION \Porter \Connector \ImportConnectorFactory ;
2020use ScriptFUSION \Porter \Provider \AsyncProvider ;
21- use ScriptFUSION \Porter \Provider \ForeignResourceException ;
2221use ScriptFUSION \Porter \Provider \ObjectNotCreatedException ;
2322use ScriptFUSION \Porter \Provider \Provider ;
2423use ScriptFUSION \Porter \Provider \ProviderFactory ;
2726use ScriptFUSION \Porter \Specification \ImportSpecification ;
2827use ScriptFUSION \Porter \Transform \AsyncTransformer ;
2928use ScriptFUSION \Porter \Transform \Transformer ;
29+ use function Amp \call ;
3030
3131/**
3232 * Imports data from a provider defined in the providers container or internal factory.
@@ -106,6 +106,10 @@ private function fetch(ImportSpecification $specification): \Iterator
106106 $ resource = $ specification ->getResource ();
107107 $ provider = $ this ->getProvider ($ specification ->getProviderName () ?? $ resource ->getProviderClassName ());
108108
109+ if (!$ provider instanceof Provider) {
110+ throw new IncompatibleProviderException ('Provider ' );
111+ }
112+
109113 if ($ resource ->getProviderClassName () !== \get_class ($ provider )) {
110114 throw new ForeignResourceException (sprintf (
111115 'Cannot fetch data from foreign resource: "%s". ' ,
@@ -143,7 +147,11 @@ public function importAsync(AsyncImportSpecification $specification): AsyncRecor
143147 $ records = new AsyncProviderRecords ($ records , $ specification ->getAsyncResource ());
144148 }
145149
146- $ records = $ this ->transformAsync ($ records , $ specification ->getTransformers (), $ specification ->getContext ());
150+ $ records = $ this ->transformRecordsAsync (
151+ $ records ,
152+ $ specification ->getTransformers (),
153+ $ specification ->getContext ()
154+ );
147155
148156 return $ this ->createAsyncPorterRecords ($ records , $ specification );
149157 }
@@ -157,7 +165,7 @@ public function importAsync(AsyncImportSpecification $specification): AsyncRecor
157165 */
158166 public function importOneAsync (AsyncImportSpecification $ specification ): Promise
159167 {
160- return \ Amp \ call (function () use ($ specification ) {
168+ return call (function () use ($ specification ) {
161169 $ results = $ this ->importAsync ($ specification );
162170
163171 yield $ results ->advance ();
@@ -178,8 +186,7 @@ private function fetchAsync(AsyncImportSpecification $specification): Iterator
178186 $ provider = $ this ->getProvider ($ specification ->getProviderName () ?? $ resource ->getProviderClassName ());
179187
180188 if (!$ provider instanceof AsyncProvider) {
181- // TODO: Specific exception type.
182- throw new \RuntimeException ('Provider does not implement AsyncProvider. ' );
189+ throw new IncompatibleProviderException ('AsyncProvider ' );
183190 }
184191
185192 if ($ resource ->getProviderClassName () !== \get_class ($ provider )) {
@@ -206,8 +213,6 @@ private function fetchAsync(AsyncImportSpecification $specification): Iterator
206213 * @param RecordCollection $records
207214 * @param Transformer[] $transformers
208215 * @param mixed $context
209- *
210- * @return RecordCollection
211216 */
212217 private function transformRecords (RecordCollection $ records , array $ transformers , $ context ): RecordCollection
213218 {
@@ -226,10 +231,8 @@ private function transformRecords(RecordCollection $records, array $transformers
226231 * @param AsyncRecordCollection $records
227232 * @param AsyncTransformer[] $transformers
228233 * @param mixed $context
229- *
230- * @return AsyncRecordCollection
231234 */
232- private function transformAsync (
235+ private function transformRecordsAsync (
233236 AsyncRecordCollection $ records ,
234237 array $ transformers ,
235238 $ context
0 commit comments