Skip to content

Commit 539c951

Browse files
authored
Add integration tests module with contract, concurrency, and E2E tests (KOJAK-20) (#12)
## Summary - New `okapi-integration-tests` module (non-published) with comprehensive test suite - Refactored existing E2E tests from BehaviorSpec to FunSpec (fixing fragile isolation) - Refactored `PostgresOutboxStore.claimPending()` from raw SQL to type-safe Exposed DSL with `ForUpdateOption.SKIP_LOCKED` - Added MySQL composite index + `FORCE INDEX` hint to fix InnoDB locking behavior with `SKIP LOCKED` ## What's included ### Test support infrastructure - `PostgresTestSupport`, `MysqlTestSupport`, `KafkaTestSupport` — Testcontainers wrappers - `RecordingMessageDeliverer` — thread-safe test spy with `assertNoAmplification()` ### Store contract tests (16 tests) Shared via FunSpec extension function, run on both Postgres and MySQL: - persist/read roundtrip, ordering, limit, status filtering, update, purge, count, empty edge case ### Concurrency tests (4 tests) - **Deterministic** (CountDownLatch + virtual threads): Thread A holds lock → Thread B gets disjoint entries via SKIP LOCKED - **Realistic** (CyclicBarrier + virtual thread executor): 5 concurrent processors, verify zero delivery amplification ### Kafka transport integration (5 tests) - Real Kafka broker via Testcontainers: topic delivery, headers, partition key, null key, success result ### E2E tests (11 tests) - `HttpEndToEndTest` (Postgres + HTTP): 6 tests including **transaction rollback** and **retry exhaustion** (new scenarios) - `MysqlHttpEndToEndTest` (MySQL + HTTP): 4 tests - `KafkaEndToEndTest` (Postgres + Kafka): full pipeline ### Cleanup - Removed old `OutboxEndToEndTest` and `MysqlOutboxEndToEndTest` (BehaviorSpec, fragile isolation) - Cleaned up test dependencies from `okapi-spring-boot` and `okapi-mysql` ## Key discovery: MySQL FORCE INDEX Concurrency tests revealed that without `FORCE INDEX(idx_outbox_status_created_at)`, InnoDB locks ALL examined rows during `SELECT ... FOR UPDATE SKIP LOCKED` (full table scan), not just the LIMIT'd ones. Added composite index migration + FORCE INDEX hint. ## Test plan - [ ] `./gradlew test -x :okapi-integration-tests:test` — unit tests pass - [ ] `./gradlew :okapi-integration-tests:test` — all 36 integration tests pass (requires Docker) - [ ] `./gradlew ktlintCheck` — formatting clean
1 parent 95885f7 commit 539c951

28 files changed

Lines changed: 1226 additions & 417 deletions

File tree

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ jacksonDatatypeJsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datat
2929
liquibaseCore = { module = "org.liquibase:liquibase-core", version.ref = "liquibase" }
3030
testcontainersPostgresql = { module = "org.testcontainers:postgresql", version.ref = "testcontainers" }
3131
testcontainersMysql = { module = "org.testcontainers:mysql", version.ref = "testcontainers" }
32+
testcontainersKafka = { module = "org.testcontainers:kafka", version.ref = "testcontainers" }
3233
postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql" }
3334
mysql = { module = "com.mysql:mysql-connector-j", version.ref = "mysql" }
3435
kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafkaClients" }
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
plugins {
2+
id("buildsrc.convention.kotlin-jvm")
3+
}
4+
5+
dependencies {
6+
// Okapi modules under test
7+
testImplementation(project(":okapi-core"))
8+
testImplementation(project(":okapi-postgres"))
9+
testImplementation(project(":okapi-mysql"))
10+
testImplementation(project(":okapi-kafka"))
11+
testImplementation(project(":okapi-http"))
12+
testImplementation(project(":okapi-spring-boot"))
13+
14+
// Test framework
15+
testImplementation(libs.kotestRunnerJunit5)
16+
testImplementation(libs.kotestAssertionsCore)
17+
18+
// Testcontainers
19+
testImplementation(libs.testcontainersPostgresql)
20+
testImplementation(libs.testcontainersMysql)
21+
testImplementation(libs.testcontainersKafka)
22+
23+
// DB drivers
24+
testImplementation(libs.postgresql)
25+
testImplementation(libs.mysql)
26+
27+
// Exposed ORM (for transaction blocks and DB queries in tests)
28+
testImplementation(libs.exposedCore)
29+
testImplementation(libs.exposedJdbc)
30+
testImplementation(libs.exposedJson)
31+
testImplementation(libs.exposedJavaTime)
32+
33+
// Liquibase (schema migrations in tests)
34+
testImplementation(libs.liquibaseCore)
35+
36+
// Kafka clients (consumer verification in tests)
37+
testImplementation(libs.kafkaClients)
38+
39+
// SLF4J for Testcontainers logging
40+
testRuntimeOnly("org.slf4j:slf4j-simple:2.0.13")
41+
42+
// WireMock (HTTP E2E tests)
43+
testImplementation(libs.wiremock)
44+
45+
// Spring (for E2E tests that may need Spring context)
46+
testImplementation(libs.springContext)
47+
testImplementation(libs.springTx)
48+
testImplementation(libs.springBootAutoconfigure)
49+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package com.softwaremill.okapi.test.concurrency
2+
3+
import com.softwaremill.okapi.core.DeliveryInfo
4+
import com.softwaremill.okapi.core.OutboxEntry
5+
import com.softwaremill.okapi.core.OutboxEntryProcessor
6+
import com.softwaremill.okapi.core.OutboxId
7+
import com.softwaremill.okapi.core.OutboxMessage
8+
import com.softwaremill.okapi.core.OutboxProcessor
9+
import com.softwaremill.okapi.core.OutboxStatus
10+
import com.softwaremill.okapi.core.OutboxStore
11+
import com.softwaremill.okapi.core.RetryPolicy
12+
import com.softwaremill.okapi.test.support.RecordingMessageDeliverer
13+
import io.kotest.core.spec.style.FunSpec
14+
import io.kotest.matchers.collections.shouldHaveSize
15+
import io.kotest.matchers.maps.shouldContain
16+
import io.kotest.matchers.shouldBe
17+
import org.jetbrains.exposed.v1.jdbc.transactions.transaction
18+
import java.sql.Connection
19+
import java.time.Clock
20+
import java.time.Instant
21+
import java.time.ZoneOffset
22+
import java.util.concurrent.CompletableFuture
23+
import java.util.concurrent.CountDownLatch
24+
import java.util.concurrent.CyclicBarrier
25+
import java.util.concurrent.Executors
26+
import java.util.concurrent.TimeUnit
27+
28+
private class StubDeliveryInfo(
29+
override val type: String = "recording",
30+
private val metadata: String = """{"type":"recording"}""",
31+
) : DeliveryInfo {
32+
override fun serialize(): String = metadata
33+
}
34+
35+
private fun createTestEntry(index: Int, now: Instant = Instant.parse("2024-01-01T00:00:00Z")): OutboxEntry = OutboxEntry.createPending(
36+
message = OutboxMessage(messageType = "concurrent.test", payload = """{"index":$index}"""),
37+
deliveryInfo = StubDeliveryInfo(),
38+
now = now.plusSeconds(index.toLong()),
39+
)
40+
41+
fun FunSpec.concurrentClaimTests(
42+
dbName: String,
43+
storeFactory: () -> OutboxStore,
44+
startDb: () -> Unit,
45+
stopDb: () -> Unit,
46+
truncate: () -> Unit,
47+
) {
48+
lateinit var store: OutboxStore
49+
50+
beforeSpec {
51+
startDb()
52+
store = storeFactory()
53+
}
54+
55+
afterSpec {
56+
stopDb()
57+
}
58+
59+
beforeEach {
60+
truncate()
61+
}
62+
63+
test("[$dbName] concurrent claimPending with held locks produces disjoint sets") {
64+
// Insert 20 entries
65+
val allIds = transaction {
66+
(0 until 20).map { i ->
67+
val entry = createTestEntry(i)
68+
store.persist(entry)
69+
entry.outboxId
70+
}
71+
}
72+
73+
val lockAcquired = CountDownLatch(1)
74+
val canCommit = CountDownLatch(1)
75+
val claimedByA = CompletableFuture<List<OutboxId>>()
76+
77+
// Thread A: claim entries and hold the transaction open.
78+
// READ_COMMITTED is required for MySQL — under REPEATABLE_READ, InnoDB's
79+
// next-key locks cause SKIP LOCKED to skip more rows than actually locked.
80+
val threadA = Thread.ofVirtual().name("processor-A").start {
81+
try {
82+
transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
83+
val claimed = store.claimPending(10)
84+
claimedByA.complete(claimed.map { it.outboxId })
85+
lockAcquired.countDown()
86+
// Hold locks open until main thread signals
87+
canCommit.await(10, TimeUnit.SECONDS)
88+
}
89+
} catch (e: Exception) {
90+
claimedByA.completeExceptionally(e)
91+
}
92+
}
93+
94+
// Wait for Thread A to acquire locks
95+
lockAcquired.await(10, TimeUnit.SECONDS) shouldBe true
96+
97+
// Main thread: claim remaining entries (SKIP LOCKED should skip A's locked rows)
98+
val idsA = claimedByA.get(10, TimeUnit.SECONDS)
99+
val idsB = transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
100+
store.claimPending(10)
101+
}.map { it.outboxId }
102+
103+
// Assert disjoint
104+
val intersection = idsA.toSet().intersect(idsB.toSet())
105+
intersection shouldHaveSize 0
106+
107+
// Together they cover all 20 entries
108+
val union = (idsA + idsB).toSet()
109+
union shouldHaveSize 20
110+
union shouldBe allIds.toSet()
111+
112+
// Let Thread A commit and finish
113+
canCommit.countDown()
114+
threadA.join(10_000)
115+
}
116+
117+
test("[$dbName] concurrent processors cause no delivery amplification") {
118+
val fixedClock = Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)
119+
120+
// Insert 50 entries
121+
transaction {
122+
(0 until 50).forEach { i -> store.persist(createTestEntry(i)) }
123+
}
124+
125+
val recorder = RecordingMessageDeliverer()
126+
val entryProcessor = OutboxEntryProcessor(recorder, RetryPolicy(maxRetries = 0), fixedClock)
127+
128+
val barrier = CyclicBarrier(5)
129+
val executor = Executors.newVirtualThreadPerTaskExecutor()
130+
131+
val futures = (1..5).map {
132+
CompletableFuture.supplyAsync(
133+
{
134+
barrier.await(10, TimeUnit.SECONDS)
135+
transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
136+
OutboxProcessor(store, entryProcessor).processNext(limit = 50)
137+
}
138+
},
139+
executor,
140+
)
141+
}
142+
143+
// Wait for all threads to complete
144+
CompletableFuture.allOf(*futures.toTypedArray()).get(30, TimeUnit.SECONDS)
145+
executor.shutdown()
146+
147+
// Verify no amplification
148+
recorder.assertNoAmplification()
149+
recorder.deliveryCount() shouldBe 50
150+
151+
// Verify DB state
152+
val counts = transaction { store.countByStatuses() }
153+
counts shouldContain (OutboxStatus.DELIVERED to 50L)
154+
counts shouldContain (OutboxStatus.PENDING to 0L)
155+
}
156+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.softwaremill.okapi.test.concurrency
2+
3+
import com.softwaremill.okapi.mysql.MysqlOutboxStore
4+
import com.softwaremill.okapi.test.support.MysqlTestSupport
5+
import io.kotest.core.spec.style.FunSpec
6+
import java.time.Clock
7+
import java.time.Instant
8+
import java.time.ZoneOffset
9+
10+
class MysqlConcurrentClaimTest : FunSpec({
11+
val db = MysqlTestSupport()
12+
13+
concurrentClaimTests(
14+
dbName = "mysql",
15+
storeFactory = { MysqlOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) },
16+
startDb = { db.start() },
17+
stopDb = { db.stop() },
18+
truncate = { db.truncate() },
19+
)
20+
})
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.softwaremill.okapi.test.concurrency
2+
3+
import com.softwaremill.okapi.postgres.PostgresOutboxStore
4+
import com.softwaremill.okapi.test.support.PostgresTestSupport
5+
import io.kotest.core.spec.style.FunSpec
6+
import java.time.Clock
7+
import java.time.Instant
8+
import java.time.ZoneOffset
9+
10+
class PostgresConcurrentClaimTest : FunSpec({
11+
val db = PostgresTestSupport()
12+
13+
concurrentClaimTests(
14+
dbName = "postgres",
15+
storeFactory = { PostgresOutboxStore(Clock.fixed(Instant.parse("2024-01-01T00:00:00Z"), ZoneOffset.UTC)) },
16+
startDb = { db.start() },
17+
stopDb = { db.stop() },
18+
truncate = { db.truncate() },
19+
)
20+
})

0 commit comments

Comments
 (0)