Skip to content

Commit 084f002

Browse files
igorjava2025sonus21
authored andcommitted
Add HARD_STRICT mode
See details there #276 (cherry picked from commit acc09bf)
1 parent 583538d commit 084f002

6 files changed

Lines changed: 448 additions & 1 deletion

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
2828
import com.github.sonus21.rqueue.core.middleware.Middleware;
2929
import com.github.sonus21.rqueue.core.support.MessageProcessor;
30+
import com.github.sonus21.rqueue.listener.HardStrictPriorityPollerProperties;
3031
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
3132
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
3233
import com.github.sonus21.rqueue.models.enums.PriorityMode;
@@ -92,6 +93,8 @@ public class SimpleRqueueListenerContainerFactory {
9293

9394
// Set priority mode for the pollers
9495
private PriorityMode priorityMode = PriorityMode.WEIGHTED;
96+
// Set HardStrictPriorityPollerProperties for HARD_STRICT priority mode poller
97+
private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
9598

9699
/**
97100
* Whether all beans of spring application should be inspected to find methods annotated with
@@ -348,6 +351,9 @@ public RqueueMessageListenerContainer createMessageListenerContainer() {
348351
if (messageHeaders != null) {
349352
messageListenerContainer.setMessageHeaders(messageHeaders);
350353
}
354+
if (hardStrictPriorityPollerProperties != null) {
355+
messageListenerContainer.setHardStrictPriorityPollerProperties(hardStrictPriorityPollerProperties);
356+
}
351357
return messageListenerContainer;
352358
}
353359

@@ -486,6 +492,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) {
486492
this.messageHeaders = messageHeaders;
487493
}
488494

495+
public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() {
496+
return this.hardStrictPriorityPollerProperties;
497+
}
498+
499+
public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
500+
this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties;
501+
}
502+
489503
/**
490504
* Rqueue scans all beans to find method annotated with {@link RqueueListener}.
491505
*
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.listener;
18+
19+
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
20+
import com.github.sonus21.rqueue.core.middleware.Middleware;
21+
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
22+
import com.github.sonus21.rqueue.utils.Constants;
23+
import com.github.sonus21.rqueue.utils.QueueThreadPool;
24+
import com.github.sonus21.rqueue.utils.TimeoutUtils;
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.function.Function;
30+
import java.util.stream.Collectors;
31+
import org.slf4j.event.Level;
32+
import org.springframework.messaging.MessageHeaders;
33+
34+
/**
35+
* Use it only with priority queues.
36+
* Message processing can be slow.
37+
* The hard strict priority algorithm is better in HardStrictPriorityPoller than in StrictPriorityPoller
38+
* More details see in <a href="https://github.com/sonus21/rqueue/issues/276">GitHub project issue</a>
39+
*/
40+
class HardStrictPriorityPoller extends RqueueMessagePoller {
41+
42+
private final RqueueBeanProvider rqueueBeanProvider;
43+
44+
private final Map<String, QueueDetail> queueNameToDetail;
45+
private final Map<String, QueueThreadPool> queueNameToThread;
46+
private final Map<String, Long> queueDeactivationTime = new HashMap<>();
47+
private final HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
48+
49+
HardStrictPriorityPoller(
50+
String groupName,
51+
final List<QueueDetail> queueDetails,
52+
final Map<String, QueueThreadPool> queueNameToThread,
53+
RqueueBeanProvider rqueueBeanProvider,
54+
QueueStateMgr queueStateMgr,
55+
List<Middleware> middlewares,
56+
long pollingInterval,
57+
long backoffTime,
58+
PostProcessingHandler postProcessingHandler,
59+
MessageHeaders messageHeaders,
60+
HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
61+
super(
62+
"HardStrict-" + groupName,
63+
rqueueBeanProvider,
64+
queueStateMgr,
65+
middlewares,
66+
pollingInterval,
67+
backoffTime,
68+
postProcessingHandler,
69+
messageHeaders);
70+
71+
this.rqueueBeanProvider = rqueueBeanProvider;
72+
// Sort queues by priority once during initialization
73+
List<QueueDetail> queueDetailList = new ArrayList<>(queueDetails);
74+
queueDetailList.sort(
75+
(o1, o2) ->
76+
o2.getPriority().get(Constants.DEFAULT_PRIORITY_KEY)
77+
- o1.getPriority().get(Constants.DEFAULT_PRIORITY_KEY));
78+
79+
this.queues = queueDetailList.stream().map(QueueDetail::getName).collect(Collectors.toList());
80+
this.queueNameToDetail =
81+
queueDetailList.stream()
82+
.collect(Collectors.toMap(QueueDetail::getName, Function.identity()));
83+
this.queueNameToThread = queueNameToThread;
84+
this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties != null
85+
? hardStrictPriorityPollerProperties
86+
: new HardStrictPriorityPollerProperties();
87+
}
88+
89+
@Override
90+
public void start() {
91+
log(Level.DEBUG, "Running, Ordered Queues: {}", null, queues);
92+
while (true) {
93+
if (shouldExit()) {
94+
return;
95+
}
96+
97+
boolean messageFoundInAnyQueue = false;
98+
99+
try {
100+
for (String queue : queues) {
101+
if (eligibleForPolling(queue) && !isDeactivated(queue)) {
102+
QueueThreadPool queueThreadPool = queueNameToThread.get(queue);
103+
QueueDetail queueDetail = queueNameToDetail.get(queue);
104+
poll(-1, queue, queueDetail, queueThreadPool);
105+
106+
if (hardStrictPriorityPollerProperties.getAfterPollSleepInterval() != null) {
107+
TimeoutUtils.sleepLog(hardStrictPriorityPollerProperties.getAfterPollSleepInterval(), false);
108+
}
109+
110+
if (existMessagesInCurrentQueueOrHigherPriorityQueue(queue, queues)) {
111+
// break current cycle and start new cycle
112+
// it allow to process queue with the higher priority
113+
messageFoundInAnyQueue = true;
114+
break;
115+
}
116+
}
117+
}
118+
119+
// If no messages were found across all queues, sleep for the polling interval
120+
if (!messageFoundInAnyQueue) {
121+
TimeoutUtils.sleepLog(pollingInterval, false);
122+
}
123+
124+
} catch (Throwable e) {
125+
log(Level.ERROR, "Exception in the poller {}", e, e.getMessage());
126+
if (shouldExit()) {
127+
return;
128+
}
129+
TimeoutUtils.sleepLog(backoffTime, false);
130+
}
131+
}
132+
}
133+
134+
boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, List<String> queues) {
135+
for (String queue : queues) {
136+
if (eligibleForPolling(queue) && !isDeactivated(queue)) {
137+
QueueDetail queueDetail = queueNameToDetail.get(queue);
138+
if (existAvailableMessagesForPoll(queueDetail)) {
139+
// the current or higher priority queue contains messages that need to be processed.
140+
return true;
141+
}
142+
}
143+
// we check all queues from the highest priority to current queue
144+
if (queue.equals(currentQueue)) {
145+
return false;
146+
}
147+
}
148+
// unexpected behavior, need more details if it occurs
149+
log(Level.WARN, "current queue '{}' not found in queues list '{}'", null, currentQueue, queues);
150+
return false;
151+
}
152+
153+
protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) {
154+
List<?> readyMessages = rqueueBeanProvider
155+
.getRqueueMessageTemplate()
156+
.readFromList(queueDetail.getQueueName(), 0, 0);
157+
158+
if (readyMessages != null && !readyMessages.isEmpty()) {
159+
log(Level.TRACE, "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName());
160+
return true;
161+
}
162+
163+
// Only check delayed messages with score <= current time
164+
long currentTime = System.currentTimeMillis();
165+
List<?> delayedMessages = rqueueBeanProvider
166+
.getRqueueMessageTemplate()
167+
.readFromZsetWithScore(queueDetail.getScheduledQueueName(), 0, currentTime);
168+
169+
if (delayedMessages != null && !delayedMessages.isEmpty()) {
170+
log(Level.TRACE, "delayedMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName());
171+
return true;
172+
}
173+
174+
return false;
175+
}
176+
177+
private boolean isDeactivated(String queue) {
178+
Long deactivationTime = queueDeactivationTime.get(queue);
179+
if (deactivationTime == null) {
180+
return false;
181+
}
182+
if (System.currentTimeMillis() - deactivationTime > pollingInterval) {
183+
queueDeactivationTime.remove(queue);
184+
return false;
185+
}
186+
return true;
187+
}
188+
189+
@Override
190+
long getSemaphoreWaitTime() {
191+
return hardStrictPriorityPollerProperties.getSemaphoreWaitTime() != null
192+
? hardStrictPriorityPollerProperties.getSemaphoreWaitTime()
193+
: 20L;
194+
}
195+
196+
@Override
197+
void deactivate(int index, String queue, DeactivateType deactivateType) {
198+
if (deactivateType == DeactivateType.POLL_FAILED) {
199+
// Pause in case of connection errors or polling failures
200+
TimeoutUtils.sleepLog(backoffTime, false);
201+
} else {
202+
// Mark deactivation time if the queue is empty
203+
queueDeactivationTime.put(queue, System.currentTimeMillis());
204+
}
205+
}
206+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.github.sonus21.rqueue.listener;
2+
3+
public class HardStrictPriorityPollerProperties {
4+
// set such default values for parameters "afterPollSleepInterval" and "semaphoreWaitTime"
5+
// because local load tests have correct strict priority algorithm work and good performance with them
6+
private Long afterPollSleepInterval = 30L;
7+
private Long semaphoreWaitTime = 15L;
8+
9+
public Long getAfterPollSleepInterval() {
10+
return afterPollSleepInterval;
11+
}
12+
13+
public void setAfterPollSleepInterval(Long afterPollSleepInterval) {
14+
this.afterPollSleepInterval = afterPollSleepInterval;
15+
}
16+
17+
public Long getSemaphoreWaitTime() {
18+
return this.semaphoreWaitTime;
19+
}
20+
21+
public void setSemaphoreWaitTime(Long semaphoreWaitTime) {
22+
this.semaphoreWaitTime = semaphoreWaitTime;
23+
}
24+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public class RqueueMessageListenerContainer
106106
private int phase = Integer.MAX_VALUE;
107107
private PriorityMode priorityMode;
108108
private MessageHeaders messageHeaders;
109+
private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
109110

110111
public RqueueMessageListenerContainer(
111112
RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) {
@@ -515,6 +516,21 @@ protected void startGroup(String groupName, List<QueueDetail> queueDetails) {
515516
backOffTime,
516517
postProcessingHandler,
517518
getMessageHeaders()));
519+
} else if (getPriorityMode() == PriorityMode.HARD_STRICT) {
520+
future =
521+
taskExecutor.submit(
522+
new HardStrictPriorityPoller(
523+
StringUtils.groupName(groupName),
524+
queueDetails,
525+
queueThread,
526+
rqueueBeanProvider,
527+
queueStateMgr,
528+
getMiddleWares(),
529+
pollingInterval,
530+
backOffTime,
531+
postProcessingHandler,
532+
getMessageHeaders(),
533+
getHardStrictPriorityPollerProperties()));
518534
} else {
519535
future =
520536
taskExecutor.submit(
@@ -712,6 +728,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) {
712728
this.messageHeaders = messageHeaders;
713729
}
714730

731+
public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() {
732+
return this.hardStrictPriorityPollerProperties;
733+
}
734+
735+
public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
736+
this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties;
737+
}
738+
715739
class QueueStateMgr {
716740

717741
Set<String> pausedQueues = ConcurrentHashMap.newKeySet();

rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@
1818

1919
public enum PriorityMode {
2020
STRICT,
21-
WEIGHTED
21+
WEIGHTED,
22+
HARD_STRICT
2223
}

0 commit comments

Comments
 (0)