Skip to content

Commit 95885f7

Browse files
authored
feat: OutboxPurger v2 — configurability, reliability, batched delete (KOJAK-56) (#11)
## Summary Makes OutboxPurger a production-ready, configurable component — establishing a pattern to replicate for OutboxScheduler/ProcessorScheduler. - **Batched delete** with `FOR UPDATE SKIP LOCKED` (Postgres subquery, MySQL derived table wrapper) — prevents full table scans and lock contention on large tables - **Error handling** — `try/catch` in `tick()` prevents `ScheduledExecutorService` from silently dying on exceptions - **Configurability** — `@ConfigurationProperties` binding from `application.yml` (`okapi.purger.retention-days`, `interval-minutes`, `batch-size`, `enabled`) - **SmartLifecycle** — replaces `SmartInitializingSingleton + DisposableBean` for phase-ordered startup/shutdown - **Database index** — `(status, last_attempt)` composite index for efficient purge queries - **SLF4J logging** — INFO on start/stop, DEBUG per tick, ERROR on failures ### Breaking change `OutboxStore.removeDeliveredBefore(Instant)` → `removeDeliveredBefore(Instant, Int): Int` — accepts batch limit, returns count deleted. Pre-1.0, no external consumers. ### Key design decisions | Decision | Rationale | |----------|-----------| | `FOR UPDATE SKIP LOCKED` in purge | Multi-instance safety without distributed locks | | Manual `spring-configuration-metadata.json` | KAPT maintenance mode, KSP unsupported by Spring (2026) | | `implementation` for SLF4J | Types don't leak to public API, consistent with Exposed | | `MAX_BATCHES_PER_TICK = 10` | Safety guard: max 1000 entries per tick, prevents DB monopolization | | `check(!scheduler.isShutdown)` | Prevents cryptic `RejectedExecutionException` on restart after stop | ### JIRA - [KOJAK-56](https://softwaremill.atlassian.net/browse/KOJAK-56) — this PR - Follow-up: [KOJAK-60](https://softwaremill.atlassian.net/browse/KOJAK-60) (Postgres store tests), [KOJAK-61](https://softwaremill.atlassian.net/browse/KOJAK-61) (config extraction to core) ## Test plan - [x] 11 unit tests in `OutboxPurgerTest` (batch loop, error recovery, double-start, restart guard, validation) - [x] 8 Spring integration tests in `OutboxPurgerAutoConfigurationTest` (property binding, conditional beans, lifecycle, stop callback) - [x] MySQL `removeDeliveredBefore` with limit test (Testcontainers) - [x] E2E tests pass (Postgres + WireMock, MySQL + WireMock) - [x] ktlint clean
1 parent c0b1e46 commit 95885f7

21 files changed

Lines changed: 481 additions & 49 deletions

File tree

gradle/libs.versions.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ kafkaClients = "4.2.0"
1212
spring = "7.0.6"
1313
springBoot = "4.0.5"
1414
wiremock = "3.13.2"
15+
slf4j = "2.0.17"
16+
assertj = "3.27.3"
1517

1618
[libraries]
1719
kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }
@@ -33,7 +35,12 @@ kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka
3335
springContext = { module = "org.springframework:spring-context", version.ref = "spring" }
3436
springTx = { module = "org.springframework:spring-tx", version.ref = "spring" }
3537
springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" }
38+
springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" }
39+
springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" }
40+
assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" }
3641
wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" }
42+
slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
43+
slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
3744

3845
[plugins]
3946
ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" }

okapi-core/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ plugins {
33
}
44

55
dependencies {
6+
implementation(libs.slf4jApi)
67
testImplementation(libs.kotestRunnerJunit5)
78
testImplementation(libs.kotestAssertionsCore)
9+
testRuntimeOnly(libs.slf4jSimple)
810
}
Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,86 @@
11
package com.softwaremill.okapi.core
22

3+
import org.slf4j.LoggerFactory
34
import java.time.Clock
45
import java.time.Duration
56
import java.util.concurrent.Executors
67
import java.util.concurrent.ScheduledExecutorService
78
import java.util.concurrent.TimeUnit
9+
import java.util.concurrent.atomic.AtomicBoolean
810

911
/**
1012
* Periodically removes DELIVERED outbox entries older than [retentionDuration].
1113
*
1214
* Runs on a single daemon thread with explicit [start]/[stop] lifecycle.
13-
* Delegates to [OutboxStore.removeDeliveredBefore] — works with any storage adapter.
15+
* [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown.
16+
* [AtomicBoolean] guards against accidental double-start, not restart.
17+
*
18+
* Delegates to [OutboxStore.removeDeliveredBefore] -- works with any storage adapter.
1419
*/
1520
class OutboxPurger(
1621
private val outboxStore: OutboxStore,
1722
private val retentionDuration: Duration = Duration.ofDays(7),
1823
private val intervalMs: Long = 3_600_000L,
24+
private val batchSize: Int = 100,
1925
private val clock: Clock = Clock.systemUTC(),
2026
) {
27+
init {
28+
require(retentionDuration > Duration.ZERO) { "retentionDuration must be positive, got: $retentionDuration" }
29+
require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" }
30+
require(batchSize > 0) { "batchSize must be positive, got: $batchSize" }
31+
}
32+
33+
private val running = AtomicBoolean(false)
34+
2135
private val scheduler: ScheduledExecutorService =
2236
Executors.newSingleThreadScheduledExecutor { r ->
2337
Thread(r, "outbox-purger").apply { isDaemon = true }
2438
}
2539

2640
fun start() {
27-
scheduler.scheduleWithFixedDelay(
28-
::tick,
29-
intervalMs,
41+
check(!scheduler.isShutdown) { "OutboxPurger cannot be restarted after stop()" }
42+
if (!running.compareAndSet(false, true)) return
43+
logger.info(
44+
"Outbox purger started [retention={}, interval={}ms, batchSize={}]",
45+
retentionDuration,
3046
intervalMs,
31-
TimeUnit.MILLISECONDS,
47+
batchSize,
3248
)
49+
scheduler.scheduleWithFixedDelay(::tick, intervalMs, intervalMs, TimeUnit.MILLISECONDS)
3350
}
3451

3552
fun stop() {
53+
if (!running.compareAndSet(true, false)) return
3654
scheduler.shutdown()
3755
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
3856
scheduler.shutdownNow()
3957
}
58+
logger.info("Outbox purger stopped")
4059
}
4160

61+
fun isRunning(): Boolean = running.get()
62+
4263
private fun tick() {
43-
outboxStore.removeDeliveredBefore(clock.instant().minus(retentionDuration))
64+
try {
65+
val cutoff = clock.instant().minus(retentionDuration)
66+
var totalDeleted = 0
67+
var batches = 0
68+
do {
69+
val deleted = outboxStore.removeDeliveredBefore(cutoff, batchSize)
70+
totalDeleted += deleted
71+
batches++
72+
} while (deleted == batchSize && batches < MAX_BATCHES_PER_TICK)
73+
74+
if (totalDeleted > 0) {
75+
logger.debug("Purged {} delivered entries in {} batches", totalDeleted, batches)
76+
}
77+
} catch (e: Exception) {
78+
logger.error("Outbox purge failed, will retry at next scheduled interval", e)
79+
}
80+
}
81+
82+
companion object {
83+
private val logger = LoggerFactory.getLogger(OutboxPurger::class.java)
84+
private const val MAX_BATCHES_PER_TICK = 10
4485
}
4586
}

okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxStore.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@ interface OutboxStore {
1212
/** Updates an entry after a delivery attempt (status change, retries, lastError). */
1313
fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry
1414

15-
/** Removes DELIVERED entries older than [time]. */
16-
fun removeDeliveredBefore(time: Instant)
15+
/**
16+
* Removes up to [limit] DELIVERED entries older than [time].
17+
* @param limit maximum number of entries to delete; must be positive
18+
* @return the number of entries actually deleted, always in `[0, limit]`
19+
*/
20+
fun removeDeliveredBefore(time: Instant, limit: Int): Int
1721

1822
/** Returns the oldest createdAt per status (useful for lag metrics). */
1923
fun findOldestCreatedAt(statuses: Set<OutboxStatus>): Map<OutboxStatus, Instant>

okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxProcessorTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class OutboxProcessorTest :
3636

3737
override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = entry.also { processedEntries += it }
3838

39-
override fun removeDeliveredBefore(time: Instant) = Unit
39+
override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0
4040

4141
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, Instant>()
4242

okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPublisherTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class OutboxPublisherTest :
2727

2828
override fun updateAfterProcessing(entry: OutboxEntry) = entry
2929

30-
override fun removeDeliveredBefore(time: Instant) = Unit
30+
override fun removeDeliveredBefore(time: Instant, limit: Int): Int = 0
3131

3232
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, Instant>()
3333

Lines changed: 140 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,168 @@
11
package com.softwaremill.okapi.core
22

3+
import io.kotest.assertions.throwables.shouldThrow
34
import io.kotest.core.spec.style.FunSpec
45
import io.kotest.matchers.shouldBe
56
import java.time.Clock
67
import java.time.Duration
78
import java.time.Instant
89
import java.time.ZoneOffset
10+
import java.util.concurrent.CountDownLatch
11+
import java.util.concurrent.TimeUnit
12+
import java.util.concurrent.atomic.AtomicInteger
913

1014
private val fixedNow = Instant.parse("2025-03-20T12:00:00Z")
1115
private val fixedClock = Clock.fixed(fixedNow, ZoneOffset.UTC)
1216

1317
class OutboxPurgerTest : FunSpec({
14-
test("tick removes entries older than retention duration") {
18+
19+
test("tick removes entries older than retention duration with correct batch size") {
1520
var capturedCutoff: Instant? = null
16-
val store = object : OutboxStore {
17-
override fun persist(entry: OutboxEntry) = entry
18-
override fun claimPending(limit: Int) = emptyList<OutboxEntry>()
19-
override fun updateAfterProcessing(entry: OutboxEntry) = entry
20-
override fun removeDeliveredBefore(time: Instant) {
21-
capturedCutoff = time
22-
}
23-
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, Instant>()
24-
override fun countByStatuses() = emptyMap<OutboxStatus, Long>()
25-
}
21+
var capturedLimit: Int? = null
22+
val latch = CountDownLatch(1)
23+
val store = stubStore(onRemove = { time, limit ->
24+
capturedCutoff = time
25+
capturedLimit = limit
26+
latch.countDown()
27+
0
28+
})
2629

2730
val purger = OutboxPurger(
2831
outboxStore = store,
2932
retentionDuration = Duration.ofDays(7),
3033
intervalMs = 50,
34+
batchSize = 100,
3135
clock = fixedClock,
3236
)
3337

3438
purger.start()
35-
Thread.sleep(150)
39+
latch.await(2, TimeUnit.SECONDS) shouldBe true
3640
purger.stop()
3741

3842
capturedCutoff shouldBe fixedNow.minus(Duration.ofDays(7))
43+
capturedLimit shouldBe 100
44+
}
45+
46+
test("batch loop stops when deleted < batchSize") {
47+
val callCount = AtomicInteger(0)
48+
val latch = CountDownLatch(1)
49+
val store = stubStore(onRemove = { _, _ ->
50+
val count = callCount.incrementAndGet()
51+
if (count == 1) {
52+
100 // first batch: full
53+
} else {
54+
latch.countDown()
55+
42 // second batch: partial, loop stops
56+
}
57+
})
58+
59+
val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
60+
purger.start()
61+
latch.await(2, TimeUnit.SECONDS) shouldBe true
62+
purger.stop()
63+
64+
callCount.get() shouldBe 2
65+
}
66+
67+
test("batch loop respects MAX_BATCHES_PER_TICK") {
68+
val callCount = AtomicInteger(0)
69+
val latch = CountDownLatch(1)
70+
val store = stubStore(onRemove = { _, _ ->
71+
val count = callCount.incrementAndGet()
72+
if (count >= 10) latch.countDown()
73+
100 // always full, would loop forever without guard
74+
})
75+
76+
val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
77+
purger.start()
78+
latch.await(2, TimeUnit.SECONDS) shouldBe true
79+
purger.stop()
80+
81+
callCount.get() shouldBe 10
82+
}
83+
84+
test("exception in tick does not kill scheduler") {
85+
val callCount = AtomicInteger(0)
86+
val latch = CountDownLatch(2)
87+
val store = stubStore(onRemove = { _, _ ->
88+
val count = callCount.incrementAndGet()
89+
latch.countDown()
90+
if (count == 1) throw RuntimeException("db connection lost")
91+
0
92+
})
93+
94+
val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
95+
purger.start()
96+
latch.await(2, TimeUnit.SECONDS) shouldBe true
97+
purger.stop()
98+
99+
callCount.get() shouldBe 2
100+
}
101+
102+
test("double start is ignored") {
103+
val callCount = AtomicInteger(0)
104+
val latch = CountDownLatch(1)
105+
val store = stubStore(onRemove = { _, _ ->
106+
callCount.incrementAndGet()
107+
latch.countDown()
108+
0
109+
})
110+
111+
val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
112+
purger.start()
113+
purger.start() // second start should be ignored
114+
latch.await(2, TimeUnit.SECONDS) shouldBe true
115+
purger.stop()
116+
117+
purger.isRunning() shouldBe false
118+
}
119+
120+
test("isRunning transitions") {
121+
val store = stubStore(onRemove = { _, _ -> 0 })
122+
val purger = OutboxPurger(store, intervalMs = 60_000, batchSize = 100, clock = fixedClock)
123+
124+
purger.isRunning() shouldBe false
125+
purger.start()
126+
purger.isRunning() shouldBe true
127+
purger.stop()
128+
purger.isRunning() shouldBe false
129+
}
130+
131+
test("constructor rejects invalid batchSize") {
132+
shouldThrow<IllegalArgumentException> {
133+
OutboxPurger(stubStore(), batchSize = 0, clock = fixedClock)
134+
}
135+
}
136+
137+
test("constructor rejects zero retentionDuration") {
138+
shouldThrow<IllegalArgumentException> {
139+
OutboxPurger(stubStore(), retentionDuration = Duration.ZERO, clock = fixedClock)
140+
}
141+
}
142+
143+
test("constructor rejects negative intervalMs") {
144+
shouldThrow<IllegalArgumentException> {
145+
OutboxPurger(stubStore(), intervalMs = -1, clock = fixedClock)
146+
}
147+
}
148+
149+
test("start after stop throws") {
150+
val purger = OutboxPurger(stubStore(), intervalMs = 60_000, batchSize = 100, clock = fixedClock)
151+
152+
purger.start()
153+
purger.stop()
154+
155+
shouldThrow<IllegalStateException> {
156+
purger.start()
157+
}.message shouldBe "OutboxPurger cannot be restarted after stop()"
39158
}
40159
})
160+
161+
private fun stubStore(onRemove: (Instant, Int) -> Int = { _, _ -> 0 }) = object : OutboxStore {
162+
override fun persist(entry: OutboxEntry) = entry
163+
override fun claimPending(limit: Int) = emptyList<OutboxEntry>()
164+
override fun updateAfterProcessing(entry: OutboxEntry) = entry
165+
override fun removeDeliveredBefore(time: Instant, limit: Int): Int = onRemove(time, limit)
166+
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, Instant>()
167+
override fun countByStatuses() = emptyMap<OutboxStatus, Long>()
168+
}

okapi-mysql/src/main/kotlin/com/softwaremill/okapi/mysql/MysqlOutboxStore.kt

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@ import com.softwaremill.okapi.core.OutboxEntry
44
import com.softwaremill.okapi.core.OutboxId
55
import com.softwaremill.okapi.core.OutboxStatus
66
import com.softwaremill.okapi.core.OutboxStore
7+
import org.jetbrains.exposed.v1.core.IntegerColumnType
78
import org.jetbrains.exposed.v1.core.alias
8-
import org.jetbrains.exposed.v1.core.and
99
import org.jetbrains.exposed.v1.core.count
10-
import org.jetbrains.exposed.v1.core.eq
1110
import org.jetbrains.exposed.v1.core.inList
12-
import org.jetbrains.exposed.v1.core.less
1311
import org.jetbrains.exposed.v1.core.min
14-
import org.jetbrains.exposed.v1.jdbc.deleteWhere
1512
import org.jetbrains.exposed.v1.jdbc.select
1613
import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager
1714
import org.jetbrains.exposed.v1.jdbc.upsert
@@ -57,10 +54,28 @@ class MysqlOutboxStore(
5754

5855
override fun updateAfterProcessing(entry: OutboxEntry): OutboxEntry = persist(entry)
5956

60-
override fun removeDeliveredBefore(time: Instant) {
61-
OutboxTable.deleteWhere {
62-
(OutboxTable.status eq OutboxStatus.DELIVERED.name) and (OutboxTable.lastAttempt less time)
63-
}
57+
override fun removeDeliveredBefore(time: Instant, limit: Int): Int {
58+
val sql = """
59+
DELETE FROM ${OutboxTable.tableName} WHERE ${OutboxTable.id.name} IN (
60+
SELECT ${OutboxTable.id.name} FROM (
61+
SELECT ${OutboxTable.id.name} FROM ${OutboxTable.tableName}
62+
WHERE ${OutboxTable.status.name} = '${OutboxStatus.DELIVERED}'
63+
AND ${OutboxTable.lastAttempt.name} < ?
64+
ORDER BY ${OutboxTable.id.name}
65+
LIMIT ?
66+
FOR UPDATE SKIP LOCKED
67+
) AS batch
68+
)
69+
""".trimIndent()
70+
71+
val statement = TransactionManager.current().connection.prepareStatement(sql, false)
72+
statement.fillParameters(
73+
listOf(
74+
OutboxTable.lastAttempt.columnType to time,
75+
IntegerColumnType() to limit,
76+
),
77+
)
78+
return statement.executeUpdate()
6479
}
6580

6681
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>): Map<OutboxStatus, Instant> {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
--liquibase formatted sql
2+
--changeset outbox:002
3+
4+
CREATE INDEX idx_outbox_status_last_attempt ON outbox(status, last_attempt);

okapi-mysql/src/main/resources/com/softwaremill/okapi/db/mysql/changelog.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
55
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd">
66
<include file="001__create_outbox_table.sql" relativeToChangelogFile="true"/>
7+
<include file="002__add_purger_index.sql" relativeToChangelogFile="true"/>
78
</databaseChangeLog>

0 commit comments

Comments
 (0)