Skip to content

Commit 42172c7

Browse files
authored
feat: multi-datasource transaction validation (KOJAK-63) (#17)
## Summary - `SpringTransactionContextValidator` now checks the specific DataSource via `TransactionSynchronizationManager.getResource(dataSource)` — rejects publishes in transactions on the wrong database - `ExposedTransactionContextValidator` uses `database.transactionManager.currentOrNull()` for per-database validation in Ktor/Exposed setups - New `okapi.datasource-qualifier` property — one line of YAML to configure multi-datasource (e.g. `okapi.datasource-qualifier: storeDataSource`) - Full autocomplete support via `spring-configuration-metadata.json` ## Motivation In multi-datasource setups the previous validator only checked `isActualTransactionActive()` — it accepted ANY active transaction, even from a different DataSource. This silently broke the outbox pattern's atomicity guarantee. ## Changes - `SpringTransactionContextValidator(dataSource)` — adds third check: `getResource(dataSource) != null` - `SpringOutboxPublisher(delegate, dataSource)` — passes DataSource to validator - `OutboxAutoConfiguration` — resolves DataSource by `okapi.datasource-qualifier` or falls back to primary - `OkapiProperties` — new `@ConfigurationProperties(prefix = "okapi")` with `datasourceQualifier` - `ExposedTransactionContextValidator(database)` — Exposed counterpart using per-database `currentOrNull()` - `spring-configuration-metadata.json` — added `okapi.datasource-qualifier` for IDE autocomplete ## Test plan - [x] `SpringTransactionContextValidatorTest` — 5 scenarios (no tx, correct DS, wrong DS, read-only, both DS) - [x] `MultiDataSourceTransactionTest` — integration test with 2 Postgres Testcontainers + 2 TransactionManagers - [x] `ExposedTransactionContextValidatorTest` — 6 scenarios with H2 (no tx, correct DB, wrong DB, nested both ways, read-only) - [x] `DataSourceQualifierAutoConfigurationTest` — 4 scenarios (single DS, qualifier hit, qualifier miss, multiple DS) - [x] All existing tests pass (backward compatible)
1 parent 5a3a77e commit 42172c7

19 files changed

Lines changed: 599 additions & 21 deletions

File tree

gradle/libs.versions.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ vanniktechPublish = "0.36.0"
1515
wiremock = "3.13.2"
1616
slf4j = "2.0.17"
1717
assertj = "3.27.7"
18+
h2 = "2.3.232"
1819

1920
[libraries]
2021
kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }
@@ -26,6 +27,7 @@ exposedCore = { module = "org.jetbrains.exposed:exposed-core", version.ref = "ex
2627
exposedJdbc = { module = "org.jetbrains.exposed:exposed-jdbc", version.ref = "exposed" }
2728
exposedJson = { module = "org.jetbrains.exposed:exposed-json", version.ref = "exposed" }
2829
exposedJavaTime = { module = "org.jetbrains.exposed:exposed-java-time", version.ref = "exposed" }
30+
exposedSpringTransaction = { module = "org.jetbrains.exposed:spring7-transaction", version.ref = "exposed" }
2931
jacksonModuleKotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
3032
jacksonDatatypeJsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
3133
liquibaseCore = { module = "org.liquibase:liquibase-core", version.ref = "liquibase" }
@@ -37,13 +39,15 @@ mysql = { module = "com.mysql:mysql-connector-j", version.ref = "mysql" }
3739
kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafkaClients" }
3840
springContext = { module = "org.springframework:spring-context", version.ref = "spring" }
3941
springTx = { module = "org.springframework:spring-tx", version.ref = "spring" }
42+
springJdbc = { module = "org.springframework:spring-jdbc", version.ref = "spring" }
4043
springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" }
4144
springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" }
4245
springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" }
4346
assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" }
4447
wiremock = { module = "org.wiremock:wiremock", version.ref = "wiremock" }
4548
slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
4649
slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
50+
h2 = { module = "com.h2database:h2", version.ref = "h2" }
4751

4852
[plugins]
4953
ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" }

okapi-core/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ description = "Core outbox abstractions and processing engine"
77

88
dependencies {
99
implementation(libs.slf4jApi)
10+
compileOnly(libs.exposedJdbc)
1011
testImplementation(libs.kotestRunnerJunit5)
1112
testImplementation(libs.kotestAssertionsCore)
13+
testImplementation(libs.exposedJdbc)
14+
testImplementation(libs.h2)
1215
testRuntimeOnly(libs.slf4jSimple)
1316
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.softwaremill.okapi.core
2+
3+
import org.jetbrains.exposed.v1.jdbc.Database
4+
import org.jetbrains.exposed.v1.jdbc.transactions.currentOrNull
5+
import org.jetbrains.exposed.v1.jdbc.transactions.transactionManager
6+
7+
/**
8+
* Exposed implementation of [TransactionContextValidator].
9+
*
10+
* Validates that the current thread is inside an active, non-read-only
11+
* Exposed transaction on the specified [database] instance (the one
12+
* where the outbox table lives).
13+
*
14+
* Uses the per-database variant of `currentOrNull()` which searches the
15+
* thread-local transaction stack filtered by [Database] instance.
16+
* This correctly handles multiple databases and nested transactions.
17+
*
18+
* @param database The [Database] instance where the outbox table resides.
19+
*/
20+
class ExposedTransactionContextValidator(
21+
private val database: Database,
22+
) : TransactionContextValidator {
23+
override fun isInActiveReadWriteTransaction(): Boolean {
24+
val transaction = database.transactionManager.currentOrNull() ?: return false
25+
return !transaction.readOnly
26+
}
27+
28+
override val failureMessage: String
29+
get() = "No active read-write Exposed transaction on the outbox Database. " +
30+
"Ensure publish() is called within a transaction(database) { } block using the outbox Database instance."
31+
}

okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionContextValidator.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ package com.softwaremill.okapi.core
44
* Checks whether the current execution context is inside an active read-write transaction.
55
*
66
* Framework-specific modules provide implementations:
7-
* - `okapi-spring`: checks via `TransactionSynchronizationManager`
8-
* - `okapi-ktor`: checks via Exposed's `TransactionManager.currentOrNull()`
7+
* - `okapi-spring`: [SpringTransactionContextValidator][com.softwaremill.okapi.springboot.SpringTransactionContextValidator]
8+
* — checks via `TransactionSynchronizationManager`
9+
* - `okapi-core`: [ExposedTransactionContextValidator] — checks via Exposed's `TransactionManager.currentOrNull()`
910
* - Standalone: no-op (always returns true)
1011
*/
1112
interface TransactionContextValidator {
1213
fun isInActiveReadWriteTransaction(): Boolean
14+
15+
val failureMessage: String
16+
get() = "No active read-write transaction. Ensure that publish() is called within a transactional context."
1317
}

okapi-core/src/main/kotlin/com/softwaremill/okapi/core/TransactionalOutboxPublisher.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@ class TransactionalOutboxPublisher(
1414
private val validator: TransactionContextValidator,
1515
) {
1616
fun publish(outboxMessage: OutboxMessage, deliveryInfo: DeliveryInfo): OutboxId {
17-
check(validator.isInActiveReadWriteTransaction()) {
18-
"No active read-write transaction. Ensure that publish() is called within a transactional context."
19-
}
17+
check(validator.isInActiveReadWriteTransaction()) { validator.failureMessage }
2018
return delegate.publish(outboxMessage, deliveryInfo)
2119
}
2220
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.softwaremill.okapi.core
2+
3+
import io.kotest.core.spec.style.BehaviorSpec
4+
import io.kotest.matchers.shouldBe
5+
import org.jetbrains.exposed.v1.jdbc.Database
6+
import org.jetbrains.exposed.v1.jdbc.transactions.transaction
7+
8+
class ExposedTransactionContextValidatorTest : BehaviorSpec({
9+
10+
val outboxDb = Database.connect(
11+
"jdbc:h2:mem:outbox_test_${System.nanoTime()};DB_CLOSE_DELAY=-1",
12+
driver = "org.h2.Driver",
13+
)
14+
val otherDb = Database.connect(
15+
"jdbc:h2:mem:other_test_${System.nanoTime()};DB_CLOSE_DELAY=-1",
16+
driver = "org.h2.Driver",
17+
)
18+
val validator = ExposedTransactionContextValidator(outboxDb)
19+
20+
given("no active Exposed transaction") {
21+
then("returns false") {
22+
validator.isInActiveReadWriteTransaction() shouldBe false
23+
}
24+
}
25+
26+
given("active transaction on outbox Database") {
27+
then("returns true") {
28+
transaction(outboxDb) {
29+
validator.isInActiveReadWriteTransaction() shouldBe true
30+
}
31+
}
32+
}
33+
34+
given("active transaction on OTHER Database") {
35+
then("returns false") {
36+
transaction(otherDb) {
37+
validator.isInActiveReadWriteTransaction() shouldBe false
38+
}
39+
}
40+
}
41+
42+
given("nested transaction: outboxDb outer, otherDb inner") {
43+
then("returns true — outboxDb transaction is still active on the stack") {
44+
transaction(outboxDb) {
45+
transaction(otherDb) {
46+
validator.isInActiveReadWriteTransaction() shouldBe true
47+
}
48+
}
49+
}
50+
}
51+
52+
given("nested transaction: otherDb outer, outboxDb inner") {
53+
then("returns true inside inner transaction") {
54+
transaction(otherDb) {
55+
transaction(outboxDb) {
56+
validator.isInActiveReadWriteTransaction() shouldBe true
57+
}
58+
}
59+
}
60+
}
61+
62+
given("nested: RW outer on outboxDb, read-only inner on outboxDb") {
63+
then("returns true — Exposed currentOrNull() finds the outer RW transaction for same-db nesting") {
64+
transaction(outboxDb) {
65+
// Exposed does not stack same-db transactions independently;
66+
// currentOrNull() finds the outer (RW) transaction, so the validator sees RW.
67+
val innerResult = transaction(db = outboxDb, readOnly = true) {
68+
validator.isInActiveReadWriteTransaction()
69+
}
70+
innerResult shouldBe true
71+
}
72+
}
73+
}
74+
75+
given("read-only transaction on outbox Database") {
76+
then("returns false") {
77+
transaction(db = outboxDb, readOnly = true) {
78+
validator.isInActiveReadWriteTransaction() shouldBe false
79+
}
80+
}
81+
}
82+
})

okapi-integration-tests/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
testImplementation(libs.exposedJdbc)
3030
testImplementation(libs.exposedJson)
3131
testImplementation(libs.exposedJavaTime)
32+
testImplementation(libs.exposedSpringTransaction)
3233

3334
// Liquibase (schema migrations in tests)
3435
testImplementation(libs.liquibaseCore)
@@ -46,4 +47,5 @@ dependencies {
4647
testImplementation(libs.springContext)
4748
testImplementation(libs.springTx)
4849
testImplementation(libs.springBootAutoconfigure)
50+
testImplementation(libs.springJdbc)
4951
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package com.softwaremill.okapi.test.transaction
2+
3+
import com.softwaremill.okapi.core.DeliveryInfo
4+
import com.softwaremill.okapi.core.OutboxMessage
5+
import com.softwaremill.okapi.core.OutboxPublisher
6+
import com.softwaremill.okapi.core.OutboxStatus
7+
import com.softwaremill.okapi.postgres.PostgresOutboxStore
8+
import com.softwaremill.okapi.springboot.SpringOutboxPublisher
9+
import io.kotest.assertions.throwables.shouldThrow
10+
import io.kotest.core.spec.style.FunSpec
11+
import io.kotest.matchers.maps.shouldContain
12+
import io.kotest.matchers.shouldNotBe
13+
import liquibase.Liquibase
14+
import liquibase.database.DatabaseFactory
15+
import liquibase.database.jvm.JdbcConnection
16+
import liquibase.resource.ClassLoaderResourceAccessor
17+
import org.jetbrains.exposed.v1.spring7.transaction.SpringTransactionManager
18+
import org.postgresql.ds.PGSimpleDataSource
19+
import org.springframework.jdbc.datasource.DataSourceTransactionManager
20+
import org.springframework.transaction.support.TransactionTemplate
21+
import org.testcontainers.containers.PostgreSQLContainer
22+
import java.sql.DriverManager
23+
import java.time.Clock
24+
import javax.sql.DataSource
25+
26+
/**
27+
* Integration test verifying that [SpringOutboxPublisher] correctly validates
28+
* the DataSource-aware transactional context in a multi-datasource setup.
29+
*
30+
* Uses two separate PostgreSQL containers:
31+
* - **outboxContainer**: hosts the outbox table (Liquibase migration applied)
32+
* - **otherContainer**: a second database with no outbox table
33+
*
34+
* The outbox DataSource uses [SpringTransactionManager] (Exposed-compatible),
35+
* while the other DataSource uses plain [DataSourceTransactionManager].
36+
*/
37+
class MultiDataSourceTransactionTest : FunSpec({
38+
39+
val outboxContainer = PostgreSQLContainer<Nothing>("postgres:16")
40+
val otherContainer = PostgreSQLContainer<Nothing>("postgres:16")
41+
42+
lateinit var outboxDataSource: DataSource
43+
lateinit var otherDataSource: DataSource
44+
lateinit var outboxTxTemplate: TransactionTemplate
45+
lateinit var otherTxTemplate: TransactionTemplate
46+
lateinit var publisher: SpringOutboxPublisher
47+
lateinit var store: PostgresOutboxStore
48+
49+
val clock: Clock = Clock.systemUTC()
50+
51+
val stubDeliveryInfo = object : DeliveryInfo {
52+
override val type = "stub"
53+
override fun serialize(): String = """{"type":"stub"}"""
54+
}
55+
56+
val testMessage = OutboxMessage(messageType = "test.event", payload = """{"key":"value"}""")
57+
58+
beforeSpec {
59+
outboxContainer.start()
60+
otherContainer.start()
61+
62+
outboxDataSource = PGSimpleDataSource().apply {
63+
setURL(outboxContainer.jdbcUrl)
64+
user = outboxContainer.username
65+
password = outboxContainer.password
66+
}
67+
68+
otherDataSource = PGSimpleDataSource().apply {
69+
setURL(otherContainer.jdbcUrl)
70+
user = otherContainer.username
71+
password = otherContainer.password
72+
}
73+
74+
// Run Liquibase migration only on the outbox database
75+
runLiquibase(outboxContainer)
76+
77+
// SpringTransactionManager (Exposed) for the outbox DataSource —
78+
// PostgresOutboxStore uses Exposed internally, so the transaction must be Exposed-compatible
79+
val outboxTxManager = SpringTransactionManager(outboxDataSource)
80+
outboxTxTemplate = TransactionTemplate(outboxTxManager)
81+
82+
// Plain DataSourceTransactionManager for the other DataSource
83+
val otherTxManager = DataSourceTransactionManager(otherDataSource)
84+
otherTxTemplate = TransactionTemplate(otherTxManager)
85+
86+
store = PostgresOutboxStore(clock)
87+
val corePublisher = OutboxPublisher(store, clock)
88+
publisher = SpringOutboxPublisher(delegate = corePublisher, dataSource = outboxDataSource)
89+
}
90+
91+
afterSpec {
92+
outboxContainer.stop()
93+
otherContainer.stop()
94+
}
95+
96+
beforeEach {
97+
outboxDataSource.connection.use { conn ->
98+
conn.createStatement().use { it.execute("TRUNCATE TABLE outbox") }
99+
}
100+
}
101+
102+
test("publish succeeds when in transaction on outbox DataSource") {
103+
val outboxId = outboxTxTemplate.execute {
104+
publisher.publish(testMessage, stubDeliveryInfo)
105+
}
106+
107+
outboxId shouldNotBe null
108+
109+
val counts = outboxTxTemplate.execute { store.countByStatuses() }
110+
counts shouldContain (OutboxStatus.PENDING to 1L)
111+
}
112+
113+
test("publish fails with IllegalStateException when in transaction on OTHER DataSource") {
114+
shouldThrow<IllegalStateException> {
115+
otherTxTemplate.execute {
116+
publisher.publish(testMessage, stubDeliveryInfo)
117+
}
118+
}
119+
}
120+
121+
test("publish fails with IllegalStateException when called outside any transaction") {
122+
shouldThrow<IllegalStateException> {
123+
publisher.publish(testMessage, stubDeliveryInfo)
124+
}
125+
}
126+
127+
test("publish succeeds with nested transaction (savepoint) on outbox DataSource") {
128+
val outboxId = outboxTxTemplate.execute {
129+
// Nested call creates a savepoint (PROPAGATION_REQUIRED by default nests via savepoints)
130+
outboxTxTemplate.execute {
131+
publisher.publish(testMessage, stubDeliveryInfo)
132+
}
133+
}
134+
135+
outboxId shouldNotBe null
136+
137+
val counts = outboxTxTemplate.execute { store.countByStatuses() }
138+
counts shouldContain (OutboxStatus.PENDING to 1L)
139+
}
140+
})
141+
142+
private fun runLiquibase(container: PostgreSQLContainer<Nothing>) {
143+
val connection = DriverManager.getConnection(container.jdbcUrl, container.username, container.password)
144+
val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection))
145+
Liquibase("com/softwaremill/okapi/db/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") }
146+
connection.close()
147+
}

okapi-spring-boot/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies {
2424
testImplementation(libs.kotestAssertionsCore)
2525
testImplementation(libs.springContext)
2626
testImplementation(libs.springTx)
27+
testImplementation(libs.springJdbc)
2728
testImplementation(libs.springBootAutoconfigure)
2829
testImplementation(libs.springBootTest)
2930
testImplementation(libs.assertjCore)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.softwaremill.okapi.springboot
2+
3+
import org.springframework.boot.context.properties.ConfigurationProperties
4+
import org.springframework.validation.annotation.Validated
5+
6+
@ConfigurationProperties(prefix = "okapi")
7+
@Validated
8+
data class OkapiProperties(
9+
val datasourceQualifier: String? = null,
10+
) {
11+
init {
12+
require(datasourceQualifier == null || datasourceQualifier.isNotBlank()) {
13+
"okapi.datasource-qualifier must not be blank. Set it to the bean name of the outbox DataSource, or remove the property."
14+
}
15+
}
16+
}

0 commit comments

Comments
 (0)