Skip to content

Commit c5d7ecc

Browse files
authored
feat: OutboxProcessorScheduler — configurability, reliability, SmartLifecycle (KOJAK-62) (#14)
## Summary Apply the same production-ready pattern from KOJAK-56 (OutboxPurger v2) to OutboxScheduler and OutboxProcessorScheduler: - **Fix silent death bug**: `OutboxScheduler.tick()` had no try/catch — `ScheduledExecutorService` silently stops on uncaught exception. Added `catch(Exception)` with SLF4J error logging. - **Add lifecycle guards**: `AtomicBoolean` running flag prevents double-start, `check(!scheduler.isShutdown)` prevents restart-after-stop with a clear error message, `isRunning()` method. - **Extract config value object**: `OutboxSchedulerConfig` data class with `require()` validation in init block (make illegal states unrepresentable). - **Migrate to SmartLifecycle**: Replace `SmartInitializingSingleton + DisposableBean` with `SmartLifecycle`. Phase ordering: processor (`MAX_VALUE - 2048`) starts before purger (`MAX_VALUE - 1024`) and stops after it. `stop(callback)` with `try-finally`. - **Spring properties binding**: `OutboxProcessorProperties` with `@ConfigurationProperties(prefix = "okapi.processor")`, `@Validated`, `@field:Min(1)`. `@ConditionalOnProperty` for `enabled` toggle. - **Database index**: `(status, created_at)` composite index for `claimPending()` query performance (Postgres + MySQL). - **Configuration metadata**: Processor properties added to `spring-configuration-metadata.json`. ### Configuration ```yaml okapi: processor: enabled: true # default interval-ms: 1000 # default batch-size: 10 # default ``` ### Commits (8) | Commit | Scope | |--------|-------| | `feat(core): add OutboxSchedulerConfig value object with validation` | okapi-core | | `feat(core): rewrite OutboxScheduler with error handling, guards, logging` | okapi-core | | `feat(spring): add OutboxProcessorProperties` | okapi-spring-boot | | `feat(spring): migrate OutboxProcessorScheduler to SmartLifecycle` | okapi-spring-boot | | `feat(spring): bind OutboxProcessorProperties in OutboxAutoConfiguration` | okapi-spring-boot | | `test(spring): add OutboxProcessorAutoConfigurationTest` | okapi-spring-boot | | `feat(db): add index (status, created_at) for processor claimPending query` | okapi-postgres, okapi-mysql | | `feat(spring): add processor properties to spring-configuration-metadata.json` | okapi-spring-boot | ## Test plan - [x] `OutboxSchedulerConfigTest` — 6 tests (defaults, custom values, validation) - [x] `OutboxSchedulerTest` — 7 tests (batchSize forwarding, error recovery, double-start, isRunning transitions, restart-after-stop, transactionRunner wrapping, no transactionRunner) - [x] `OutboxProcessorAutoConfigurationTest` — 7 tests (bean creation, disabled toggle, properties binding, defaults, SmartLifecycle isRunning, validation, stop callback) - [x] All existing tests pass (purger, publisher, E2E) - [x] `./gradlew clean check` — BUILD SUCCESSFUL - [x] `./gradlew ktlintCheck` — clean ## Related - Depends on: #11 (KOJAK-56, OutboxPurger v2) — base branch is `feat102` - Related: KOJAK-61 (OutboxPurgerConfig extraction — separate task, uses `OutboxSchedulerConfig` as reference pattern) - JIRA: [KOJAK-62](https://softwaremill.atlassian.net/browse/KOJAK-62)
1 parent 539c951 commit c5d7ecc

9 files changed

Lines changed: 412 additions & 30 deletions

File tree

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,64 @@
11
package com.softwaremill.okapi.core
22

3+
import org.slf4j.LoggerFactory
34
import java.util.concurrent.Executors
45
import java.util.concurrent.ScheduledExecutorService
56
import java.util.concurrent.TimeUnit
7+
import java.util.concurrent.atomic.AtomicBoolean
68

79
/**
810
* Standalone scheduler that periodically calls [OutboxProcessor.processNext].
911
*
1012
* Each tick is optionally wrapped in a transaction via [transactionRunner].
11-
* Runs on a single daemon thread and provides explicit [start]/[stop] lifecycle.
13+
* Runs on a single daemon thread with explicit [start]/[stop] lifecycle.
14+
* [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown.
15+
* [AtomicBoolean] guards against accidental double-start, not restart.
1216
*
1317
* Framework-specific modules hook into their own lifecycle events:
14-
* - `okapi-spring`: `SmartInitializingSingleton` / `DisposableBean`
18+
* - `okapi-spring-boot`: `SmartLifecycle`
1519
* - `okapi-ktor`: `ApplicationStarted` / `ApplicationStopped`
1620
*/
1721
class OutboxScheduler(
1822
private val outboxProcessor: OutboxProcessor,
1923
private val transactionRunner: TransactionRunner? = null,
20-
private val intervalMs: Long = 1_000L,
21-
private val batchSize: Int = 10,
24+
private val config: OutboxSchedulerConfig = OutboxSchedulerConfig(),
2225
) {
26+
private val running = AtomicBoolean(false)
27+
2328
private val scheduler: ScheduledExecutorService =
2429
Executors.newSingleThreadScheduledExecutor { r ->
2530
Thread(r, "outbox-processor").apply { isDaemon = true }
2631
}
2732

2833
fun start() {
29-
scheduler.scheduleWithFixedDelay(
30-
::tick,
31-
0L,
32-
intervalMs,
33-
TimeUnit.MILLISECONDS,
34-
)
34+
check(!scheduler.isShutdown) { "OutboxScheduler cannot be restarted after stop()" }
35+
if (!running.compareAndSet(false, true)) return
36+
logger.info("Outbox processor started [interval={}ms, batchSize={}]", config.intervalMs, config.batchSize)
37+
scheduler.scheduleWithFixedDelay(::tick, 0L, config.intervalMs, TimeUnit.MILLISECONDS)
3538
}
3639

3740
fun stop() {
41+
if (!running.compareAndSet(true, false)) return
3842
scheduler.shutdown()
3943
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
4044
scheduler.shutdownNow()
4145
}
46+
logger.info("Outbox processor stopped")
4247
}
4348

49+
fun isRunning(): Boolean = running.get()
50+
4451
private fun tick() {
45-
if (transactionRunner != null) {
46-
transactionRunner.runInTransaction { outboxProcessor.processNext(batchSize) }
47-
} else {
48-
outboxProcessor.processNext(batchSize)
52+
try {
53+
transactionRunner?.runInTransaction { outboxProcessor.processNext(config.batchSize) }
54+
?: outboxProcessor.processNext(config.batchSize)
55+
logger.debug("Outbox processor tick completed [batchSize={}]", config.batchSize)
56+
} catch (e: Exception) {
57+
logger.error("Outbox processor tick failed, will retry at next scheduled interval", e)
4958
}
5059
}
60+
61+
companion object {
62+
private val logger = LoggerFactory.getLogger(OutboxScheduler::class.java)
63+
}
5164
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.softwaremill.okapi.core
2+
3+
data class OutboxSchedulerConfig(
4+
val intervalMs: Long = 1_000L,
5+
val batchSize: Int = 10,
6+
) {
7+
init {
8+
require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" }
9+
require(batchSize > 0) { "batchSize must be positive, got: $batchSize" }
10+
}
11+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.softwaremill.okapi.core
2+
3+
import io.kotest.assertions.throwables.shouldThrow
4+
import io.kotest.core.spec.style.FunSpec
5+
import io.kotest.matchers.shouldBe
6+
7+
class OutboxSchedulerConfigTest : FunSpec({
8+
9+
test("default config has valid values") {
10+
val config = OutboxSchedulerConfig()
11+
config.intervalMs shouldBe 1_000L
12+
config.batchSize shouldBe 10
13+
}
14+
15+
test("accepts custom valid values") {
16+
val config = OutboxSchedulerConfig(intervalMs = 500, batchSize = 50)
17+
config.intervalMs shouldBe 500L
18+
config.batchSize shouldBe 50
19+
}
20+
21+
test("rejects zero intervalMs") {
22+
shouldThrow<IllegalArgumentException> {
23+
OutboxSchedulerConfig(intervalMs = 0)
24+
}
25+
}
26+
27+
test("rejects negative intervalMs") {
28+
shouldThrow<IllegalArgumentException> {
29+
OutboxSchedulerConfig(intervalMs = -1)
30+
}
31+
}
32+
33+
test("rejects zero batchSize") {
34+
shouldThrow<IllegalArgumentException> {
35+
OutboxSchedulerConfig(batchSize = 0)
36+
}
37+
}
38+
39+
test("rejects negative batchSize") {
40+
shouldThrow<IllegalArgumentException> {
41+
OutboxSchedulerConfig(batchSize = -5)
42+
}
43+
}
44+
})
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package com.softwaremill.okapi.core
2+
3+
import io.kotest.assertions.throwables.shouldThrow
4+
import io.kotest.core.spec.style.FunSpec
5+
import io.kotest.matchers.shouldBe
6+
import java.util.concurrent.CountDownLatch
7+
import java.util.concurrent.TimeUnit
8+
import java.util.concurrent.atomic.AtomicBoolean
9+
import java.util.concurrent.atomic.AtomicInteger
10+
11+
class OutboxSchedulerTest : FunSpec({
12+
13+
test("tick calls processNext with configured batchSize") {
14+
var capturedLimit: Int? = null
15+
val latch = CountDownLatch(1)
16+
val processor = stubProcessor { limit ->
17+
capturedLimit = limit
18+
latch.countDown()
19+
}
20+
21+
val scheduler = OutboxScheduler(
22+
outboxProcessor = processor,
23+
config = OutboxSchedulerConfig(intervalMs = 50, batchSize = 25),
24+
)
25+
26+
scheduler.start()
27+
latch.await(2, TimeUnit.SECONDS) shouldBe true
28+
scheduler.stop()
29+
30+
capturedLimit shouldBe 25
31+
}
32+
33+
test("exception in tick does not kill scheduler") {
34+
val callCount = AtomicInteger(0)
35+
val latch = CountDownLatch(2)
36+
val processor = stubProcessor { _ ->
37+
val count = callCount.incrementAndGet()
38+
latch.countDown()
39+
if (count == 1) throw RuntimeException("db connection lost")
40+
}
41+
42+
val scheduler = OutboxScheduler(
43+
outboxProcessor = processor,
44+
config = OutboxSchedulerConfig(intervalMs = 50),
45+
)
46+
47+
scheduler.start()
48+
latch.await(2, TimeUnit.SECONDS) shouldBe true
49+
scheduler.stop()
50+
51+
callCount.get() shouldBe 2
52+
}
53+
54+
test("double start is ignored") {
55+
val callCount = AtomicInteger(0)
56+
val latch = CountDownLatch(1)
57+
val processor = stubProcessor { _ ->
58+
callCount.incrementAndGet()
59+
latch.countDown()
60+
}
61+
62+
val scheduler = OutboxScheduler(
63+
outboxProcessor = processor,
64+
config = OutboxSchedulerConfig(intervalMs = 50),
65+
)
66+
67+
scheduler.start()
68+
scheduler.start()
69+
latch.await(2, TimeUnit.SECONDS) shouldBe true
70+
scheduler.stop()
71+
72+
scheduler.isRunning() shouldBe false
73+
}
74+
75+
test("isRunning transitions") {
76+
val processor = stubProcessor { _ -> }
77+
val scheduler = OutboxScheduler(
78+
outboxProcessor = processor,
79+
config = OutboxSchedulerConfig(intervalMs = 60_000),
80+
)
81+
82+
scheduler.isRunning() shouldBe false
83+
scheduler.start()
84+
scheduler.isRunning() shouldBe true
85+
scheduler.stop()
86+
scheduler.isRunning() shouldBe false
87+
}
88+
89+
test("start after stop throws") {
90+
val processor = stubProcessor { _ -> }
91+
val scheduler = OutboxScheduler(
92+
outboxProcessor = processor,
93+
config = OutboxSchedulerConfig(intervalMs = 60_000),
94+
)
95+
96+
scheduler.start()
97+
scheduler.stop()
98+
99+
shouldThrow<IllegalStateException> {
100+
scheduler.start()
101+
}.message shouldBe "OutboxScheduler cannot be restarted after stop()"
102+
}
103+
104+
test("transactionRunner wraps tick when provided") {
105+
val txInvoked = AtomicBoolean(false)
106+
val latch = CountDownLatch(1)
107+
val processor = stubProcessor { _ -> latch.countDown() }
108+
val txRunner = object : TransactionRunner {
109+
override fun <T> runInTransaction(block: () -> T): T {
110+
txInvoked.set(true)
111+
return block()
112+
}
113+
}
114+
115+
val scheduler = OutboxScheduler(
116+
outboxProcessor = processor,
117+
transactionRunner = txRunner,
118+
config = OutboxSchedulerConfig(intervalMs = 50),
119+
)
120+
121+
scheduler.start()
122+
latch.await(2, TimeUnit.SECONDS) shouldBe true
123+
scheduler.stop()
124+
125+
txInvoked.get() shouldBe true
126+
}
127+
128+
test("tick runs without transactionRunner") {
129+
val latch = CountDownLatch(1)
130+
val processor = stubProcessor { _ -> latch.countDown() }
131+
132+
val scheduler = OutboxScheduler(
133+
outboxProcessor = processor,
134+
transactionRunner = null,
135+
config = OutboxSchedulerConfig(intervalMs = 50),
136+
)
137+
138+
scheduler.start()
139+
latch.await(2, TimeUnit.SECONDS) shouldBe true
140+
scheduler.stop()
141+
}
142+
})
143+
144+
private fun stubProcessor(onProcessNext: (Int) -> Unit): OutboxProcessor {
145+
val store = object : OutboxStore {
146+
override fun persist(entry: OutboxEntry) = entry
147+
override fun claimPending(limit: Int): List<OutboxEntry> {
148+
onProcessNext(limit)
149+
return emptyList()
150+
}
151+
override fun updateAfterProcessing(entry: OutboxEntry) = entry
152+
override fun removeDeliveredBefore(time: java.time.Instant, limit: Int) = 0
153+
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, java.time.Instant>()
154+
override fun countByStatuses() = emptyMap<OutboxStatus, Long>()
155+
}
156+
val entryProcessor = OutboxEntryProcessor(
157+
deliverer = object : MessageDeliverer {
158+
override val type = "stub"
159+
override fun deliver(entry: OutboxEntry) = DeliveryResult.Success
160+
},
161+
retryPolicy = RetryPolicy(maxRetries = 3),
162+
clock = java.time.Clock.systemUTC(),
163+
)
164+
return OutboxProcessor(store = store, entryProcessor = entryProcessor)
165+
}

okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.softwaremill.okapi.core.MessageDeliverer
55
import com.softwaremill.okapi.core.OutboxEntryProcessor
66
import com.softwaremill.okapi.core.OutboxProcessor
77
import com.softwaremill.okapi.core.OutboxPublisher
8+
import com.softwaremill.okapi.core.OutboxSchedulerConfig
89
import com.softwaremill.okapi.core.OutboxStore
910
import com.softwaremill.okapi.core.RetryPolicy
1011
import com.softwaremill.okapi.mysql.MysqlOutboxStore
@@ -42,7 +43,7 @@ import javax.sql.DataSource
4243
* - [PlatformTransactionManager] — if absent, each store call runs in its own transaction
4344
*/
4445
@AutoConfiguration
45-
@EnableConfigurationProperties(OutboxPurgerProperties::class)
46+
@EnableConfigurationProperties(OutboxPurgerProperties::class, OutboxProcessorProperties::class)
4647
class OutboxAutoConfiguration {
4748
@Bean
4849
@ConditionalOnMissingBean
@@ -82,13 +83,19 @@ class OutboxAutoConfiguration {
8283

8384
@Bean
8485
@ConditionalOnMissingBean
86+
@ConditionalOnProperty(prefix = "okapi.processor", name = ["enabled"], havingValue = "true", matchIfMissing = true)
8587
fun outboxProcessorScheduler(
88+
props: OutboxProcessorProperties,
8689
outboxProcessor: OutboxProcessor,
8790
transactionManager: ObjectProvider<PlatformTransactionManager>,
8891
): OutboxProcessorScheduler {
8992
return OutboxProcessorScheduler(
9093
outboxProcessor = outboxProcessor,
9194
transactionTemplate = transactionManager.getIfAvailable()?.let { TransactionTemplate(it) },
95+
config = OutboxSchedulerConfig(
96+
intervalMs = props.intervalMs,
97+
batchSize = props.batchSize,
98+
),
9299
)
93100
}
94101

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.softwaremill.okapi.springboot
2+
3+
import jakarta.validation.constraints.Min
4+
import org.springframework.boot.context.properties.ConfigurationProperties
5+
import org.springframework.validation.annotation.Validated
6+
7+
@ConfigurationProperties(prefix = "okapi.processor")
8+
@Validated
9+
data class OutboxProcessorProperties(
10+
@field:Min(1) val intervalMs: Long = 1_000,
11+
@field:Min(1) val batchSize: Int = 10,
12+
)

0 commit comments

Comments
 (0)