Skip to content

Commit 6d7bf4c

Browse files
authored
Merge pull request #33 from Flowpack/bugfix/remove-message-cache-on-failure
BUGFIX: Remove message cache file on failure
2 parents 2aa7c92 + cd1bcd3 commit 6d7bf4c

1 file changed

Lines changed: 6 additions & 1 deletion

File tree

Classes/Job/JobManager.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public function queue(string $queueName, JobInterface $job, array $options = [])
8787
*/
8888
public function waitAndExecute(string $queueName, $timeout = null): ?Message
8989
{
90+
91+
$messageCacheIdentifier = null;
9092
$queue = $this->queueManager->getQueue($queueName);
9193
$message = $queue->waitAndReserve($timeout);
9294
if ($message === null) {
@@ -102,7 +104,6 @@ public function waitAndExecute(string $queueName, $timeout = null): ?Message
102104
$messageCacheIdentifier = sha1(serialize($message));
103105
$this->messageCache->set($messageCacheIdentifier, $message);
104106
Scripts::executeCommand('flowpack.jobqueue.common:job:execute', $this->flowSettings, false, [$queue->getName(), $messageCacheIdentifier]);
105-
$this->messageCache->remove($messageCacheIdentifier);
106107
} else {
107108
$this->executeJobForMessage($queue, $message);
108109
}
@@ -118,6 +119,10 @@ public function waitAndExecute(string $queueName, $timeout = null): ?Message
118119
$this->emitMessageFailed($queue, $message, $exception);
119120
throw new JobQueueException(sprintf('Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING', $message->getIdentifier(), $queue->getName(), $message->getNumberOfReleases() + 1, $maximumNumberOfReleases + 1), 1334056584, $exception);
120121
}
122+
} finally {
123+
if ($messageCacheIdentifier !== null) {
124+
$this->messageCache->remove($messageCacheIdentifier);
125+
}
121126
}
122127

123128
$queue->finish($message->getIdentifier());

0 commit comments

Comments
 (0)