diff --git a/engine/queue/score-queue-api/src/main/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryService.java b/engine/queue/score-queue-api/src/main/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryService.java index 1641df2b4..dca935681 100644 --- a/engine/queue/score-queue-api/src/main/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryService.java +++ b/engine/queue/score-queue-api/src/main/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryService.java @@ -16,6 +16,8 @@ package io.cloudslang.engine.queue.services.recovery; +import java.util.concurrent.atomic.AtomicBoolean; + /** * Created with IntelliJ IDEA. * User: kravtsov @@ -36,5 +38,5 @@ public interface WorkerRecoveryService { * Recovery will be done if the worker is non responsive or has not acknowledged messages * @param workerUuid - the uuid of worker */ - void doWorkerAndMessageRecovery(String workerUuid, boolean shouldPurgeQueues); + void doWorkerAndMessageRecovery(String workerUuid, AtomicBoolean shouldPurgeQueues); } diff --git a/engine/queue/score-queue-impl/src/main/java/io/cloudslang/engine/queue/services/recovery/ExecutionRecoveryServiceImpl.java b/engine/queue/score-queue-impl/src/main/java/io/cloudslang/engine/queue/services/recovery/ExecutionRecoveryServiceImpl.java index c017e66eb..26db40d17 100644 --- a/engine/queue/score-queue-impl/src/main/java/io/cloudslang/engine/queue/services/recovery/ExecutionRecoveryServiceImpl.java +++ b/engine/queue/score-queue-impl/src/main/java/io/cloudslang/engine/queue/services/recovery/ExecutionRecoveryServiceImpl.java @@ -76,8 +76,7 @@ protected void recoverWorkers() { for (String workerUuid : workersUuids) { try { - workerRecoveryService.doWorkerAndMessageRecovery(workerUuid, - shouldPurgeQueues.compareAndSet(true, false)); + workerRecoveryService.doWorkerAndMessageRecovery(workerUuid, shouldPurgeQueues); } catch (Exception ex) { logger.error("Failed to recover worker [" + workerUuid + "]", ex); } diff --git a/engine/queue/score-queue-impl/src/main/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryServiceImpl.java b/engine/queue/score-queue-impl/src/main/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryServiceImpl.java index 79dde9136..7a12cdc5d 100644 --- a/engine/queue/score-queue-impl/src/main/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryServiceImpl.java +++ b/engine/queue/score-queue-impl/src/main/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryServiceImpl.java @@ -67,7 +67,7 @@ public class WorkerRecoveryServiceImpl implements WorkerRecoveryService, LoginLi @Override @Transactional - public void doWorkerAndMessageRecovery(final String workerUuid, boolean shouldPurgeQueues) { + public void doWorkerAndMessageRecovery(final String workerUuid, AtomicBoolean shouldPurgeQueues) { //lock this worker to synchronize with drain action workerLockService.lock(workerUuid); @@ -85,7 +85,8 @@ public void doWorkerAndMessageRecovery(final String workerUuid, boolean shouldPu if (worker.getStatus().equals(WorkerStatus.IN_RECOVERY)) { logger.warn("Worker : " + workerUuid + " is IN_RECOVERY status. Worker recovery is started"); } - doWorkerRecovery(workerUuid, shouldPurgeQueues); + doWorkerRecovery(workerUuid, shouldPurgeQueues.get()); + shouldPurgeQueues.set(false); } else { logger.debug("Worker : " + workerUuid + " is NOT for recovery"); } diff --git a/engine/queue/score-queue-impl/src/test/java/io/cloudslang/engine/queue/services/recovery/ExecutionRecoveryServiceTest.java b/engine/queue/score-queue-impl/src/test/java/io/cloudslang/engine/queue/services/recovery/ExecutionRecoveryServiceTest.java index 30afdcb4a..eed052876 100644 --- a/engine/queue/score-queue-impl/src/test/java/io/cloudslang/engine/queue/services/recovery/ExecutionRecoveryServiceTest.java +++ b/engine/queue/score-queue-impl/src/test/java/io/cloudslang/engine/queue/services/recovery/ExecutionRecoveryServiceTest.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.mockito.Mockito.*; @@ -69,9 +70,9 @@ public void testRecoverWorkers() throws Exception { when(workerNodeService.readAllWorkersUuids()).thenReturn(getWorkers()); executionRecoveryService.recoverWorkers(); - verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery("123", true); - verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery("456", false); - verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery("789", false); + verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery(eq("123"), any(AtomicBoolean.class)); + verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery(eq("456"), any(AtomicBoolean.class)); + verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery(eq("789"), any(AtomicBoolean.class)); } private List getWorkers(){ diff --git a/engine/queue/score-queue-impl/src/test/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryServiceTest.java b/engine/queue/score-queue-impl/src/test/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryServiceTest.java index 6ef3dff97..242d08857 100644 --- a/engine/queue/score-queue-impl/src/test/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryServiceTest.java +++ b/engine/queue/score-queue-impl/src/test/java/io/cloudslang/engine/queue/services/recovery/WorkerRecoveryServiceTest.java @@ -35,10 +35,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -87,7 +89,7 @@ public void testDoWorkerAndMessageRecoveryResponsiveAndNoMessages() throws Excep WorkerNode mockWorker = mock(WorkerNode.class); when(mockWorker.getStatus()).thenReturn(WorkerStatus.RUNNING); when(workerNodeService.findByUuid("123")).thenReturn(mockWorker); - workerRecoveryService.doWorkerAndMessageRecovery("123", false); + workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false)); //Make sure the methods did not run verify(workerNodeService, never()).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY); @@ -101,7 +103,7 @@ public void testDoWorkerAndMessageRecoveryNonResponsive() throws Exception { WorkerNode mockWorker = mock(WorkerNode.class); when(mockWorker.getStatus()).thenReturn(WorkerStatus.RUNNING); when(workerNodeService.findByUuid("123")).thenReturn(mockWorker); - workerRecoveryService.doWorkerAndMessageRecovery("123", false); + workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false)); //Make sure the methods did run verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY); } @@ -114,7 +116,7 @@ public void testDoWorkerAndMessageRecoveryResponsiveAndHasMessages() throws Exce WorkerNode mockWorker = mock(WorkerNode.class); when(mockWorker.getStatus()).thenReturn(WorkerStatus.RUNNING); when(workerNodeService.findByUuid("123")).thenReturn(mockWorker); - workerRecoveryService.doWorkerAndMessageRecovery("123", false); + workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false)); //Make sure the methods did run verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY); @@ -128,7 +130,7 @@ public void testDoWorkerAndMessageRecoveryResponsiveInRecoveryAndHasMessages() t WorkerNode mockWorker = mock(WorkerNode.class); when(mockWorker.getStatus()).thenReturn(WorkerStatus.IN_RECOVERY); when(workerNodeService.findByUuid("123")).thenReturn(mockWorker); - workerRecoveryService.doWorkerAndMessageRecovery("123", false); + workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false)); //Make sure the methods did run verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY); @@ -142,18 +144,22 @@ public void testDoWorkerAndMessageRecoveryNonResponsiveAndHasMessages() throws E WorkerNode mockWorker = mock(WorkerNode.class); when(mockWorker.getStatus()).thenReturn(WorkerStatus.IN_RECOVERY); when(workerNodeService.findByUuid("123")).thenReturn(mockWorker); - workerRecoveryService.doWorkerAndMessageRecovery("123", false); + workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false)); //Make sure the methods did run verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY); } @Test public void testDoWorkerRecovery() throws Exception { + when(messageRecoveryService.recoverMessagesBulk("worker1", 100)).thenReturn(true, false); + workerRecoveryService.doWorkerRecovery("worker1", false); verify(workerLockService, times(1)).lock("worker1"); verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("worker1", WorkerStatus.IN_RECOVERY); + verify(messageRecoveryService, times(2)).recoverMessagesBulk(eq("worker1"), eq(100)); verify(workerNodeService, times(1)).updateStatus("worker1", WorkerStatus.RECOVERED); + verify(workerNodeService, times(1)).updateWRV(eq("worker1"), anyString()); } private List getNonResponsiveWorkers() {