Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.cloudslang.engine.queue.services.recovery;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Created with IntelliJ IDEA.
* User: kravtsov
Expand All @@ -36,5 +38,5 @@ public interface WorkerRecoveryService {
* Recovery will be done if the worker is non responsive or has not acknowledged messages
* @param workerUuid - the uuid of worker
*/
void doWorkerAndMessageRecovery(String workerUuid, boolean shouldPurgeQueues);
void doWorkerAndMessageRecovery(String workerUuid, AtomicBoolean shouldPurgeQueues);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ protected void recoverWorkers() {

for (String workerUuid : workersUuids) {
try {
workerRecoveryService.doWorkerAndMessageRecovery(workerUuid,
shouldPurgeQueues.compareAndSet(true, false));
workerRecoveryService.doWorkerAndMessageRecovery(workerUuid, shouldPurgeQueues);
} catch (Exception ex) {
logger.error("Failed to recover worker [" + workerUuid + "]", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class WorkerRecoveryServiceImpl implements WorkerRecoveryService, LoginLi

@Override
@Transactional
public void doWorkerAndMessageRecovery(final String workerUuid, boolean shouldPurgeQueues) {
public void doWorkerAndMessageRecovery(final String workerUuid, AtomicBoolean shouldPurgeQueues) {

//lock this worker to synchronize with drain action
workerLockService.lock(workerUuid);
Expand All @@ -85,7 +85,8 @@ public void doWorkerAndMessageRecovery(final String workerUuid, boolean shouldPu
if (worker.getStatus().equals(WorkerStatus.IN_RECOVERY)) {
logger.warn("Worker : " + workerUuid + " is IN_RECOVERY status. Worker recovery is started");
}
doWorkerRecovery(workerUuid, shouldPurgeQueues);
doWorkerRecovery(workerUuid, shouldPurgeQueues.get());
shouldPurgeQueues.set(false);
} else {
logger.debug("Worker : " + workerUuid + " is NOT for recovery");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -69,9 +70,9 @@ public void testRecoverWorkers() throws Exception {
when(workerNodeService.readAllWorkersUuids()).thenReturn(getWorkers());
executionRecoveryService.recoverWorkers();

verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery("123", true);
verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery("456", false);
verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery("789", false);
verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery(eq("123"), any(AtomicBoolean.class));
verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery(eq("456"), any(AtomicBoolean.class));
verify(workerRecoveryService, times(1)).doWorkerAndMessageRecovery(eq("789"), any(AtomicBoolean.class));
}

private List<String> getWorkers(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -87,7 +89,7 @@ public void testDoWorkerAndMessageRecoveryResponsiveAndNoMessages() throws Excep
WorkerNode mockWorker = mock(WorkerNode.class);
when(mockWorker.getStatus()).thenReturn(WorkerStatus.RUNNING);
when(workerNodeService.findByUuid("123")).thenReturn(mockWorker);
workerRecoveryService.doWorkerAndMessageRecovery("123", false);
workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false));

//Make sure the methods did not run
verify(workerNodeService, never()).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY);
Expand All @@ -101,7 +103,7 @@ public void testDoWorkerAndMessageRecoveryNonResponsive() throws Exception {
WorkerNode mockWorker = mock(WorkerNode.class);
when(mockWorker.getStatus()).thenReturn(WorkerStatus.RUNNING);
when(workerNodeService.findByUuid("123")).thenReturn(mockWorker);
workerRecoveryService.doWorkerAndMessageRecovery("123", false);
workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false));
//Make sure the methods did run
verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY);
}
Expand All @@ -114,7 +116,7 @@ public void testDoWorkerAndMessageRecoveryResponsiveAndHasMessages() throws Exce
WorkerNode mockWorker = mock(WorkerNode.class);
when(mockWorker.getStatus()).thenReturn(WorkerStatus.RUNNING);
when(workerNodeService.findByUuid("123")).thenReturn(mockWorker);
workerRecoveryService.doWorkerAndMessageRecovery("123", false);
workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false));

//Make sure the methods did run
verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY);
Expand All @@ -128,7 +130,7 @@ public void testDoWorkerAndMessageRecoveryResponsiveInRecoveryAndHasMessages() t
WorkerNode mockWorker = mock(WorkerNode.class);
when(mockWorker.getStatus()).thenReturn(WorkerStatus.IN_RECOVERY);
when(workerNodeService.findByUuid("123")).thenReturn(mockWorker);
workerRecoveryService.doWorkerAndMessageRecovery("123", false);
workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false));

//Make sure the methods did run
verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY);
Expand All @@ -142,18 +144,22 @@ public void testDoWorkerAndMessageRecoveryNonResponsiveAndHasMessages() throws E
WorkerNode mockWorker = mock(WorkerNode.class);
when(mockWorker.getStatus()).thenReturn(WorkerStatus.IN_RECOVERY);
when(workerNodeService.findByUuid("123")).thenReturn(mockWorker);
workerRecoveryService.doWorkerAndMessageRecovery("123", false);
workerRecoveryService.doWorkerAndMessageRecovery("123", new AtomicBoolean(false));
//Make sure the methods did run
verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("123", WorkerStatus.IN_RECOVERY);
}

@Test
public void testDoWorkerRecovery() throws Exception {
when(messageRecoveryService.recoverMessagesBulk("worker1", 100)).thenReturn(true, false);

workerRecoveryService.doWorkerRecovery("worker1", false);

verify(workerLockService, times(1)).lock("worker1");
verify(workerNodeService, times(1)).updateStatusInSeparateTransaction("worker1", WorkerStatus.IN_RECOVERY);
verify(messageRecoveryService, times(2)).recoverMessagesBulk(eq("worker1"), eq(100));
verify(workerNodeService, times(1)).updateStatus("worker1", WorkerStatus.RECOVERED);
verify(workerNodeService, times(1)).updateWRV(eq("worker1"), anyString());
}

private List<String> getNonResponsiveWorkers() {
Expand Down