|
8 | 8 | use Flowpack\Prunner\Dto\PipelinesAndJobsResponse; |
9 | 9 | use Flowpack\Prunner\ValueObject\JobId; |
10 | 10 | use Flowpack\Prunner\ValueObject\PipelineName; |
| 11 | +use Flowpack\Prunner\ValueObject\QueuePartitionName; |
11 | 12 | use GuzzleHttp\Client; |
12 | 13 | use GuzzleHttp\Exception\GuzzleException; |
13 | 14 | use Neos\Flow\Annotations as Flow; |
@@ -91,14 +92,21 @@ public function loadJobLogs(JobId $jobId, string $taskName): JobLogs |
91 | 92 | } |
92 | 93 |
|
93 | 94 | /** |
| 95 | + * @param QueuePartitionName|null $queuePartition only needed for queue_strategy "partitioned_replace" |
94 | 96 | * @throws \JsonException |
95 | 97 | */ |
96 | | - public function schedulePipeline(PipelineName $pipeline, array $variables): JobId |
| 98 | + public function schedulePipeline(PipelineName $pipeline, array $variables, ?QueuePartitionName $queuePartition = null): JobId |
97 | 99 | { |
98 | | - $response = $this->apiCall('POST', 'pipelines/schedule', json_encode([ |
| 100 | + $requestBody = [ |
99 | 101 | 'pipeline' => $pipeline->getName(), |
100 | | - 'variables' => $variables |
101 | | - ], JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT)); |
| 102 | + 'variables' => $variables, |
| 103 | + ]; |
| 104 | + |
| 105 | + if ($queuePartition !== null) { |
| 106 | + $requestBody['queuePartition'] = $queuePartition->getName(); |
| 107 | + } |
| 108 | + |
| 109 | + $response = $this->apiCall('POST', 'pipelines/schedule', json_encode($requestBody, JSON_THROW_ON_ERROR | JSON_FORCE_OBJECT)); |
102 | 110 | if ($response->getStatusCode() !== 202) { |
103 | 111 | throw new \RuntimeException('Scheduling a new pipeline run should have returned status code 202, but got: ' . $response->getStatusCode()); |
104 | 112 | } |
|
0 commit comments