From 13717cf32236dc1ff09797c8e3f800fa073abd7f Mon Sep 17 00:00:00 2001 From: Stefan Mirkovic Date: Fri, 22 May 2026 14:08:39 +0200 Subject: [PATCH 1/3] Capture per-entity metrics on Repository and EventStore --- .../api/metrics/EntityStatisticIdentifier.kt | 30 +++++ .../framework/api/metrics/EntityStatistics.kt | 32 +++++ .../metrics/EntityStatisticsWithIdentifier.kt | 26 +++++ .../framework/api/metrics/StatisticReport.kt | 2 + .../AxoniqPlatformConfigurerEnhancer.java | 7 +- .../eventsourcing/AxoniqPlatformEventStore.kt | 102 ++++++++++++++-- .../EventSourcingDecorators.java | 4 +- .../messaging/HandlerMetricsRegistry.kt | 7 +- .../modelling/AxoniqPlatformRepository.kt | 51 +++++++- .../modelling/AxoniqPlatformStateManager.kt | 5 +- .../modelling/CurrentEntityContext.kt | 64 ++++++++++ .../modelling/EntityMetricsRegistry.kt | 110 ++++++++++++++++++ .../modelling/ModellingDecorators.java | 6 +- .../io/axoniq/platform/framework/utils.kt | 17 +++ .../modelling/EntityMetricsRegistryTest.kt | 94 +++++++++++++++ 15 files changed, 534 insertions(+), 23 deletions(-) create mode 100644 framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt create mode 100644 framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt create mode 100644 framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticsWithIdentifier.kt create mode 100644 framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt create mode 100644 framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt create mode 100644 framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt new file mode 100644 index 00000000..6e466ebc --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022-2025. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api.metrics + +import com.fasterxml.jackson.annotation.JsonProperty + +data class EntityStatisticIdentifier( + @JsonProperty("n") + val entityName: String, + @JsonProperty("i") + val entityId: String, + @JsonProperty("mt") + val messageType: String, + @JsonProperty("mn") + val messageName: String, +) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt new file mode 100644 index 00000000..51886d50 --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022-2025. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api.metrics + +import com.fasterxml.jackson.annotation.JsonProperty + +data class EntityStatistics( + @JsonProperty("c") + val count: Double, + @JsonProperty("cr") + val creations: Double, + @JsonProperty("f") + val failed: Double, + @JsonProperty("t") + val timer: StatisticDistribution?, + @JsonProperty("m") + val metrics: Map, +) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticsWithIdentifier.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticsWithIdentifier.kt new file mode 100644 index 00000000..107605f4 --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticsWithIdentifier.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2022-2025. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api.metrics + +import com.fasterxml.jackson.annotation.JsonProperty + +data class EntityStatisticsWithIdentifier( + @JsonProperty("e") + val entity: EntityStatisticIdentifier, + @JsonProperty("s") + val statistics: EntityStatistics, +) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/StatisticReport.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/StatisticReport.kt index 192f1e91..68e05b58 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/StatisticReport.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/StatisticReport.kt @@ -25,4 +25,6 @@ data class StatisticReport( val dispatchers: List, @JsonProperty("a") val aggregates: List, + @JsonProperty("e") + val entities: List = emptyList(), ) diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java index eb125879..15246907 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java @@ -38,6 +38,7 @@ import io.axoniq.platform.framework.messaging.AxoniqPlatformCommandBus; import io.axoniq.platform.framework.messaging.AxoniqPlatformQueryBus; import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry; +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry; import org.axonframework.common.configuration.ComponentDefinition; import org.axonframework.common.configuration.ComponentRegistry; import org.axonframework.common.configuration.Configuration; @@ -140,12 +141,16 @@ public void enhance(ComponentRegistry registry) { // The start handler will allow for eager creation .onStart(Phase.EXTERNAL_CONNECTIONS, c -> { })) + .registerComponent(ComponentDefinition + .ofType(EntityMetricsRegistry.class) + .withBuilder(c -> new EntityMetricsRegistry())) .registerComponent(ComponentDefinition .ofType(HandlerMetricsRegistry.class) .withBuilder(c -> new HandlerMetricsRegistry( c.getComponent(AxoniqConsoleRSocketClient.class), c.getComponent(PlatformClientConnectionService.class), - c.getComponent(AxoniqPlatformConfiguration.class)))) + c.getComponent(AxoniqPlatformConfiguration.class), + c.getComponent(EntityMetricsRegistry.class)))) .registerComponent(ComponentDefinition .ofType(ApplicationThreadDumpProvider.class) .withBuilder(c -> new ApplicationThreadDumpProvider())) diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt index faae87e4..fb818d7e 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt @@ -16,11 +16,14 @@ package io.axoniq.platform.framework.eventsourcing +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier import io.axoniq.platform.framework.api.metrics.PreconfiguredMetric import io.axoniq.platform.framework.messaging.HandlerMeasurement import io.axoniq.platform.framework.messaging.HandlerMeasurement.Companion.RESOURCE_KEY import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry import io.axoniq.platform.framework.messaging.toInformation +import io.axoniq.platform.framework.modelling.CurrentEntityContext +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry import io.github.oshai.kotlinlogging.KotlinLogging import org.axonframework.common.infra.ComponentDescriptor import org.axonframework.eventsourcing.eventstore.AppendCondition @@ -34,10 +37,13 @@ import org.axonframework.messaging.eventhandling.processing.streaming.token.Trac import org.axonframework.messaging.eventstreaming.StreamingCondition import java.time.Instant import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong class AxoniqPlatformEventStorageEngine( private val delegate: EventStorageEngine, private val registry: HandlerMetricsRegistry, + private val entityMetricsRegistry: EntityMetricsRegistry, ) : EventStorageEngine { override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, events: List>): CompletableFuture> { @@ -59,12 +65,21 @@ class AxoniqPlatformEventStorageEngine( } // Second, measure commit time if measurement is ongoing val startTime = System.nanoTime() + val currentEntity = context.getResource(CurrentEntityContext.RESOURCE_KEY) return delegate.appendEvents(condition, context, events) .whenComplete { _, _ -> val endTime = System.nanoTime() HandlerMeasurement.onContext(context) { it.registerMetricValue(PreconfiguredMetric.EVENT_COMMIT_TIME, endTime - startTime) } + currentEntity?.let { + entityMetricsRegistry.registerAdditionalTimer( + it, + EntityMetricsRegistry.METRIC_EVENT_COMMIT_TIME, + endTime - startTime, + TimeUnit.NANOSECONDS, + ) + } } } @@ -73,26 +88,91 @@ class AxoniqPlatformEventStorageEngine( } override fun source(condition: SourcingCondition): MessageStream { - return delegate.source(condition) + val entity = CurrentEntityContext.getFromThread() + // Capture the loadOrCreate-shared sourced-events counter at source() invocation time, while + // we are still on the calling thread. The actual onNext callbacks may fire later/elsewhere + // but they hold the reference through closure capture, so the count stays accurate. + val sourcedCounter = CurrentEntityContext.getSourcedEventsCounterFromThread() + val stream = delegate.source(condition) + if (entity == null) { + return stream + } + recordCriteriaSize(entity, condition) + return wrapForStreamSize(entity, stream, sourcedCounter) } override fun stream(condition: StreamingCondition): MessageStream { - return delegate.stream(condition) + val entity = CurrentEntityContext.getFromThread() + val sourcedCounter = CurrentEntityContext.getSourcedEventsCounterFromThread() + val stream = delegate.stream(condition) + if (entity == null) { + return stream + } + recordCriteriaSize(entity, condition.criteria().flatten().size) + return wrapForStreamSize(entity, stream, sourcedCounter) } - override fun firstToken(): CompletableFuture { - return delegate.firstToken() + override fun firstToken(): CompletableFuture = delegate.firstToken() + + override fun latestToken(): CompletableFuture = delegate.latestToken() + + override fun tokenAt(at: Instant): CompletableFuture = delegate.tokenAt(at) + + override fun describeTo(descriptor: ComponentDescriptor) { + descriptor.describeWrapperOf(delegate) } - override fun latestToken(): CompletableFuture { - return delegate.latestToken() + private fun recordCriteriaSize(entity: EntityStatisticIdentifier, condition: SourcingCondition) { + recordCriteriaSize(entity, condition.criteria().flatten().size) } - override fun tokenAt(at: Instant): CompletableFuture { - return delegate.tokenAt(at) + private fun recordCriteriaSize(entity: EntityStatisticIdentifier, criteriaSize: Int) { + // Timer is time-typed; for count-style metrics we record in MILLISECONDS so the + // downstream `HistogramSnapshot.toDistribution()` (which reads values in ms) returns + // the raw count verbatim — e.g. 5 criteria → 5.0. The UI overrides the unit label + // to display this as "5" rather than "5ms". + // Skip recording 0: HdrHistogram has no real zero bucket and would report the lowest + // bucket midpoint (~0.001 with high precision) instead, polluting the distribution. + if (criteriaSize <= 0) return + entityMetricsRegistry.registerAdditionalTimer( + entity, + EntityMetricsRegistry.METRIC_CRITERIA_SIZE, + criteriaSize.toLong(), + TimeUnit.MILLISECONDS, + ) } - override fun describeTo(descriptor: ComponentDescriptor) { - descriptor.describeWrapperOf(delegate) + private fun wrapForStreamSize( + entity: EntityStatisticIdentifier, + stream: MessageStream, + sharedSourcedCounter: AtomicLong?, + ): MessageStream { + val counter = AtomicLong(0) + return stream + .onNext { + counter.incrementAndGet() + // Also bump the loadOrCreate-shared counter so the repository can tell load + // (count > 0) from creation (count == 0). May be null for direct event-store + // streaming outside a managed load. + sharedSourcedCounter?.incrementAndGet() + } + .onComplete { + // Count-style metric; see note in recordCriteriaSize for unit choice. + // Skip recording for fresh-entity creations (no events sourced) — that case + // belongs to the Creations counter, not the stream-size distribution. + val size = counter.get() + if (size > 0L) { + entityMetricsRegistry.registerAdditionalTimer( + entity, + EntityMetricsRegistry.METRIC_EVENT_STREAM_SIZE, + size, + TimeUnit.MILLISECONDS, + ) + } + } + } + + companion object { + private val logger = KotlinLogging.logger { } } -} \ No newline at end of file +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java index 5c4d9d88..49e7b468 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java @@ -17,6 +17,7 @@ package io.axoniq.platform.framework.eventsourcing; import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry; +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry; import org.axonframework.common.configuration.ComponentRegistry; import org.axonframework.common.configuration.DecoratorDefinition; import org.axonframework.eventsourcing.eventstore.EventStorageEngine; @@ -37,7 +38,8 @@ static void apply(ComponentRegistry registry) { DecoratorDefinition.forType(EventStorageEngine.class) .with((c, name, delegate) -> { HandlerMetricsRegistry metricsRegistry = c.getComponent(HandlerMetricsRegistry.class); - return new AxoniqPlatformEventStorageEngine(delegate, metricsRegistry); + EntityMetricsRegistry entityMetricsRegistry = c.getComponent(EntityMetricsRegistry.class); + return new AxoniqPlatformEventStorageEngine(delegate, metricsRegistry, entityMetricsRegistry); }).order(Integer.MAX_VALUE)); } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt index 559adde6..3c9a335a 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt @@ -35,6 +35,7 @@ import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient import io.axoniq.platform.framework.client.PlatformClientConnectionObserver import io.axoniq.platform.framework.client.PlatformClientConnectionService import io.axoniq.platform.framework.computeIfAbsentWithRetry +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry import io.github.oshai.kotlinlogging.KotlinLogging import io.micrometer.core.instrument.Timer import io.micrometer.core.instrument.simple.SimpleMeterRegistry @@ -46,7 +47,8 @@ import java.util.concurrent.TimeUnit class HandlerMetricsRegistry( private val axoniqConsoleRSocketClient: AxoniqConsoleRSocketClient, private val platformClientConnectionService: PlatformClientConnectionService, - private val properties: AxoniqPlatformConfiguration + private val properties: AxoniqPlatformConfiguration, + private val entityMetricsRegistry: EntityMetricsRegistry, ) : PlatformClientConnectionObserver { private val logger = KotlinLogging.logger { } private var reportTask: ScheduledFuture<*>? = null @@ -118,7 +120,8 @@ class HandlerMetricsRegistry( DispatcherStatistics(it.value.count()) ) }, - aggregates = emptyList() + aggregates = emptyList(), + entities = entityMetricsRegistry.getStats(), ) return flow } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt index fa9efad9..1214a861 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt @@ -16,16 +16,20 @@ package io.axoniq.platform.framework.modelling +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier import io.axoniq.platform.framework.api.metrics.LoadModelMetric import io.axoniq.platform.framework.messaging.HandlerMeasurement +import io.axoniq.platform.framework.messaging.toInformation import org.axonframework.common.infra.ComponentDescriptor import org.axonframework.messaging.core.unitofwork.ProcessingContext import org.axonframework.modelling.repository.ManagedEntity import org.axonframework.modelling.repository.Repository import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicLong class AxoniqPlatformRepository( private val delegate: Repository, + private val entityMetricsRegistry: EntityMetricsRegistry, ) : Repository.LifecycleManagement { override fun attach(entity: ManagedEntity, processingContext: ProcessingContext): ManagedEntity { val lfcm = delegate as Repository.LifecycleManagement @@ -41,9 +45,17 @@ class AxoniqPlatformRepository( } override fun load(identifier: ID, processingContext: ProcessingContext): CompletableFuture> { + val entityId = entityIdentifierFor(identifier, processingContext) + processingContext.putResource(CurrentEntityContext.RESOURCE_KEY, entityId) + CurrentEntityContext.setOnThread(entityId) val startTime = System.nanoTime() - return delegate.load(identifier, processingContext).whenComplete { _, _ -> + return try { + delegate.load(identifier, processingContext) + } finally { + CurrentEntityContext.clearOnThread() + }.whenComplete { _, error -> val duration = System.nanoTime() - startTime + entityMetricsRegistry.registerLoad(entityId, duration, success = error == null) HandlerMeasurement.onContext(processingContext) { it.registerMetricValue(LoadModelMetric(identifier = "load:${entityType().simpleName}"), duration) } @@ -51,9 +63,28 @@ class AxoniqPlatformRepository( } override fun loadOrCreate(identifier: ID, processingContext: ProcessingContext): CompletableFuture> { + val entityId = entityIdentifierFor(identifier, processingContext) + // Shared counter the event-store wrapper bumps for each sourced event. After loadOrCreate + // completes, a zero count means no prior events existed → the framework created a fresh + // entity. Counting once here (rather than registering creation unconditionally) keeps the + // Creations/min rate aligned with reality — without this, every redeem/update on an existing + // event-sourced entity would be reported as a creation too. + val sourcedCounter = AtomicLong(0) + processingContext.putResource(CurrentEntityContext.RESOURCE_KEY, entityId) + CurrentEntityContext.setOnThread(entityId) + CurrentEntityContext.setSourcedEventsCounterOnThread(sourcedCounter) val startTime = System.nanoTime() - return delegate.loadOrCreate(identifier, processingContext).whenComplete { _, _ -> + return try { + delegate.loadOrCreate(identifier, processingContext) + } finally { + CurrentEntityContext.clearOnThread() + CurrentEntityContext.clearSourcedEventsCounterOnThread() + }.whenComplete { _, error -> val duration = System.nanoTime() - startTime + entityMetricsRegistry.registerLoad(entityId, duration, success = error == null) + if (error == null && sourcedCounter.get() == 0L) { + entityMetricsRegistry.registerCreation(entityId) + } HandlerMeasurement.onContext(processingContext) { it.registerMetricValue(LoadModelMetric(identifier = "load:${entityType().simpleName}"), duration) } @@ -67,4 +98,18 @@ class AxoniqPlatformRepository( override fun describeTo(descriptor: ComponentDescriptor) { descriptor.describeWrapperOf(delegate) } -} \ No newline at end of file + + private fun entityIdentifierFor(identifier: ID, processingContext: ProcessingContext): EntityStatisticIdentifier { + val messageInfo = HandlerMeasurement.fromContext(processingContext)?.message?.toInformation() + return EntityStatisticIdentifier( + entityName = entityType().simpleName, + entityId = identifier.toString(), + messageType = messageInfo?.type ?: UNKNOWN, + messageName = messageInfo?.name ?: UNKNOWN, + ) + } + + companion object { + private const val UNKNOWN = "Unknown" + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt index f029c5bd..7b5f6db1 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt @@ -23,14 +23,15 @@ import org.axonframework.modelling.repository.Repository import java.util.concurrent.CompletableFuture class AxoniqPlatformStateManager( - private val delegate: StateManager + private val delegate: StateManager, + private val entityMetricsRegistry: EntityMetricsRegistry, ): StateManager { override fun register(repository: Repository): StateManager { if(repository is AxoniqPlatformRepository) { delegate.register(repository) return this } - delegate.register(AxoniqPlatformRepository(repository)) + delegate.register(AxoniqPlatformRepository(repository, entityMetricsRegistry)) return this } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt new file mode 100644 index 00000000..715855dc --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.modelling + +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier +import org.axonframework.messaging.core.Context +import java.util.concurrent.atomic.AtomicLong + +/** + * Carries the entity statistic identifier that is currently being loaded through the [Context] / + * [org.axonframework.messaging.core.unitofwork.ProcessingContext], so downstream decorators (event store, + * lock primitives) can attribute their metrics to the correct entity. + * + * Where [org.axonframework.messaging.core.unitofwork.ProcessingContext] is available (e.g. event-store + * append), prefer [RESOURCE_KEY]. The [EventStorageEngine.source] API does not propagate a context, so + * for that call site the same identifier is also exposed via a thread-local set by + * [AxoniqPlatformRepository] just before delegating into the framework. + * + * The [SOURCED_EVENTS_TL] thread-local lets the repository distinguish a real load from a fresh-entity + * creation: when `loadOrCreate` returns and the counter is still zero, the framework sourced no prior + * events for this identifier, meaning the entity was just created. The counter is shared by reference + * with the event-store wrapper so it stays correct even if events are delivered on a different thread. + */ +object CurrentEntityContext { + val RESOURCE_KEY: Context.ResourceKey = + Context.ResourceKey.withLabel("Axoniq Platform - Current Entity") + + private val threadLocal: ThreadLocal = ThreadLocal() + private val sourcedEventsTL: ThreadLocal = ThreadLocal() + + fun setOnThread(identifier: EntityStatisticIdentifier) { + threadLocal.set(identifier) + } + + fun getFromThread(): EntityStatisticIdentifier? = threadLocal.get() + + fun clearOnThread() { + threadLocal.remove() + } + + fun setSourcedEventsCounterOnThread(counter: AtomicLong) { + sourcedEventsTL.set(counter) + } + + fun getSourcedEventsCounterFromThread(): AtomicLong? = sourcedEventsTL.get() + + fun clearSourcedEventsCounterOnThread() { + sourcedEventsTL.remove() + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt new file mode 100644 index 00000000..9e5d9f42 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.modelling + +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier +import io.axoniq.platform.framework.api.metrics.EntityStatistics +import io.axoniq.platform.framework.api.metrics.EntityStatisticsWithIdentifier +import io.axoniq.platform.framework.computeIfAbsentWithRetry +import io.axoniq.platform.framework.createCountTimer +import io.axoniq.platform.framework.createTimer +import io.axoniq.platform.framework.messaging.RollingCountMeasure +import io.axoniq.platform.framework.messaging.toDistribution +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit + +/** + * Registry collecting metrics per entity load by `(entityName, entityId, messageType, messageName)`. + * + * Drained by [io.axoniq.platform.framework.messaging.HandlerMetricsRegistry] on its periodic reporting + * tick and shipped to the platform as part of [io.axoniq.platform.framework.api.metrics.StatisticReport]. + */ +class EntityMetricsRegistry { + private val meterRegistry = SimpleMeterRegistry() + private val entities: MutableMap = ConcurrentHashMap() + + fun registerLoad(identifier: EntityStatisticIdentifier, durationNs: Long, success: Boolean) { + val stats = entry(identifier) + stats.totalTimer.record(durationNs, TimeUnit.NANOSECONDS) + stats.totalCount.increment() + if (!success) { + stats.failureCount.increment() + } + } + + fun registerCreation(identifier: EntityStatisticIdentifier) { + entry(identifier).creationCount.increment() + } + + fun registerAdditionalTimer(identifier: EntityStatisticIdentifier, name: String, value: Long, unit: TimeUnit) { + entry(identifier).metrics + .computeIfAbsentWithRetry(name) { timerFactory(name)(meterRegistry, "${identifier}_timer_$name") } + .record(value, unit) + } + + /** + * Picks a higher-precision histogram for count-style metrics so reported percentiles round-trip + * small integers cleanly (otherwise `1` shows up as `0.98` and `0` as `0.98` on the Inspector UI). + * Time-typed metrics keep the default precision — their values span many orders of magnitude where + * the extra precision is wasted memory. + */ + private fun timerFactory(name: String): (MeterRegistry, String) -> Timer { + return when (name) { + METRIC_EVENT_STREAM_SIZE, METRIC_CRITERIA_SIZE -> ::createCountTimer + else -> ::createTimer + } + } + + fun getStats(): List { + return entities.entries.map { + EntityStatisticsWithIdentifier( + it.key, + EntityStatistics( + count = it.value.totalCount.count(), + creations = it.value.creationCount.count(), + failed = it.value.failureCount.count(), + timer = it.value.totalTimer.takeSnapshot().toDistribution(), + metrics = it.value.metrics.map { (k, v) -> k to v.takeSnapshot().toDistribution() }.toMap() + ) + ) + } + } + + private fun entry(identifier: EntityStatisticIdentifier): EntityRegistryStatistics { + return entities.computeIfAbsentWithRetry(identifier) { _ -> + EntityRegistryStatistics(createTimer(meterRegistry, "${identifier}_timer_total")) + } + } + + private data class EntityRegistryStatistics( + val totalTimer: Timer, + val totalCount: RollingCountMeasure = RollingCountMeasure(), + val creationCount: RollingCountMeasure = RollingCountMeasure(), + val failureCount: RollingCountMeasure = RollingCountMeasure(), + val metrics: MutableMap = ConcurrentHashMap(), + ) + + companion object { + const val METRIC_LOCK_WAIT_TIME = "lock_wait_time" + const val METRIC_EVENT_COMMIT_TIME = "event_commit_time" + const val METRIC_EVENT_STREAM_SIZE = "event_stream_size" + const val METRIC_CRITERIA_SIZE = "criteria_size" + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java index 3a7c4e59..48db69c3 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java @@ -39,7 +39,7 @@ static void apply(ComponentRegistry registry) { if(delegate instanceof AxoniqPlatformStateManager) { return delegate; } - return new AxoniqPlatformStateManager(delegate); + return new AxoniqPlatformStateManager(delegate, cc.getComponent(EntityMetricsRegistry.class)); }) .order(Integer.MAX_VALUE)); @@ -48,12 +48,12 @@ static void apply(ComponentRegistry registry) { .registerDecorator(DecoratorDefinition.forType(Repository.class) .with((cc, name, delegate) -> delegate instanceof AxoniqPlatformRepository ? delegate : - new AxoniqPlatformRepository<>(delegate)) + new AxoniqPlatformRepository<>(delegate, cc.getComponent(EntityMetricsRegistry.class))) .order(Integer.MIN_VALUE)) .registerDecorator(DecoratorDefinition.forType(StateManager.class) .with((cc, name, delegate) -> delegate instanceof AxoniqPlatformStateManager ? delegate : - new AxoniqPlatformStateManager(delegate)) + new AxoniqPlatformStateManager(delegate, cc.getComponent(EntityMetricsRegistry.class))) .order(Integer.MAX_VALUE)); return null; }, true); diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt index 2fcfbbc3..3c3e79af 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt @@ -141,6 +141,23 @@ fun createTimer(meterRegistry: MeterRegistry, name: String): Timer { .register(meterRegistry) } +/** + * Same as [createTimer] but with higher percentile precision, suitable for count-style metrics + * (e.g. event_stream_size, criteria_size) where the recorded values are small integers and the + * default HdrHistogram bucketing reports 1.0 as ~0.98 or 0 as ~0.98. Precision 3 gives sub-1% + * relative error which is enough to round-trip small integers cleanly, at the cost of a few KB + * extra heap per Timer — acceptable given the bounded number of count metrics per entity. + */ +fun createCountTimer(meterRegistry: MeterRegistry, name: String): Timer { + return Timer + .builder(name) + .publishPercentiles(1.00, 0.95, 0.90, 0.50, 0.01) + .distributionStatisticExpiry(Duration.ofMinutes(5)) + .distributionStatisticBufferLength(5) + .percentilePrecision(3) + .register(meterRegistry) +} + /** * Truncates the string to ensure it doesn't exceed the specified maximum byte size. * diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt new file mode 100644 index 00000000..f14e7ddf --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.modelling + +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import java.util.concurrent.TimeUnit + +class EntityMetricsRegistryTest { + + private val orderCreated = EntityStatisticIdentifier( + entityName = "Order", + entityId = "order-1", + messageType = "CommandMessage", + messageName = "CreateOrder", + ) + + private val orderShipped = EntityStatisticIdentifier( + entityName = "Order", + entityId = "order-1", + messageType = "CommandMessage", + messageName = "ShipOrder", + ) + + @Test + fun `separate keys produce separate report entries`() { + val registry = EntityMetricsRegistry() + + registry.registerLoad(orderCreated, 10_000_000, success = true) + registry.registerLoad(orderShipped, 20_000_000, success = true) + registry.registerCreation(orderCreated) + + val stats = registry.getStats().associateBy { it.entity } + assertEquals(2, stats.size) + assertNotNull(stats[orderCreated]) + assertNotNull(stats[orderShipped]) + } + + @Test + fun `load records a total timer with non-empty snapshot`() { + val registry = EntityMetricsRegistry() + + registry.registerLoad(orderCreated, 5_000_000, success = true) + + val stats = registry.getStats().single() + assertNotNull(stats.statistics.timer) + // mean is reported in ms — 5ms input should yield > 0 + assertTrue((stats.statistics.timer?.mean ?: 0.0) > 0.0) + } + + @Test + fun `additional timer is exposed under the requested metric name`() { + val registry = EntityMetricsRegistry() + + registry.registerLoad(orderCreated, 5_000_000, success = true) + registry.registerAdditionalTimer( + orderCreated, + EntityMetricsRegistry.METRIC_CRITERIA_SIZE, + 42L, + TimeUnit.NANOSECONDS, + ) + + val stats = registry.getStats().single() + assertTrue(stats.statistics.metrics.containsKey(EntityMetricsRegistry.METRIC_CRITERIA_SIZE)) + } + + @Test + fun `creation without load still produces a report entry`() { + val registry = EntityMetricsRegistry() + + registry.registerCreation(orderCreated) + + val stats = registry.getStats() + assertEquals(1, stats.size) + assertEquals(orderCreated, stats.single().entity) + } +} From f939673d5eb242d434a1047ee798fc54b1d2285b Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Fri, 22 May 2026 15:48:47 +0100 Subject: [PATCH 2/3] Remove ThreadLocal usage for AxoniqPlatformEventStorageEngine --- .../AxoniqPlatformEventStorageEngine.kt | 104 ++++++++ .../eventsourcing/AxoniqPlatformEventStore.kt | 228 ++++++++---------- .../EventSourcingDecorators.java | 8 + .../modelling/AxoniqPlatformRepository.kt | 30 +-- .../modelling/CurrentEntityContext.kt | 24 +- 5 files changed, 225 insertions(+), 169 deletions(-) create mode 100644 framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStorageEngine.kt diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStorageEngine.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStorageEngine.kt new file mode 100644 index 00000000..36c37a34 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStorageEngine.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.metrics.PreconfiguredMetric +import io.axoniq.platform.framework.messaging.HandlerMeasurement +import io.axoniq.platform.framework.messaging.HandlerMeasurement.Companion.RESOURCE_KEY +import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry +import io.axoniq.platform.framework.messaging.toInformation +import io.axoniq.platform.framework.modelling.CurrentEntityContext +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry +import org.axonframework.common.infra.ComponentDescriptor +import org.axonframework.eventsourcing.eventstore.AppendCondition +import org.axonframework.eventsourcing.eventstore.EventStorageEngine +import org.axonframework.eventsourcing.eventstore.SourcingCondition +import org.axonframework.eventsourcing.eventstore.TaggedEventMessage +import org.axonframework.messaging.core.MessageStream +import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.eventhandling.EventMessage +import org.axonframework.messaging.eventhandling.processing.streaming.token.TrackingToken +import org.axonframework.messaging.eventstreaming.StreamingCondition +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit + +class AxoniqPlatformEventStorageEngine( + private val delegate: EventStorageEngine, + private val registry: HandlerMetricsRegistry, + private val entityMetricsRegistry: EntityMetricsRegistry, +) : EventStorageEngine { + + override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, events: List>): CompletableFuture> { + // First, report dispatches + val container = context?.getResource(RESOURCE_KEY) + if (container == null) { + events.forEach { tm -> + val event: EventMessage = tm.event() + registry.registerMessageDispatchedWithoutHandling(event.toInformation()) + } + } else { + events.forEach { tm -> + val event: EventMessage = tm.event() + container.reportMessageDispatched(event.toInformation()) + } + } + if (context == null) { + return delegate.appendEvents(condition, context, events) + } + // Second, measure commit time if measurement is ongoing + val startTime = System.nanoTime() + val currentEntity = context.getResource(CurrentEntityContext.RESOURCE_KEY) + return delegate.appendEvents(condition, context, events) + .whenComplete { _, _ -> + val endTime = System.nanoTime() + HandlerMeasurement.onContext(context) { + it.registerMetricValue(PreconfiguredMetric.EVENT_COMMIT_TIME, endTime - startTime) + } + currentEntity?.let { + entityMetricsRegistry.registerAdditionalTimer( + it, + EntityMetricsRegistry.METRIC_EVENT_COMMIT_TIME, + endTime - startTime, + TimeUnit.NANOSECONDS, + ) + } + } + } + + override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, vararg events: TaggedEventMessage<*>): CompletableFuture> { + return appendEvents(condition, context, events.toList()) + } + + override fun source(condition: SourcingCondition): MessageStream { + return delegate.source(condition) + } + + override fun stream(condition: StreamingCondition): MessageStream { + return delegate.stream(condition) + } + + override fun firstToken(): CompletableFuture = delegate.firstToken() + + override fun latestToken(): CompletableFuture = delegate.latestToken() + + override fun tokenAt(at: Instant): CompletableFuture = delegate.tokenAt(at) + + override fun describeTo(descriptor: ComponentDescriptor) { + descriptor.describeWrapperOf(delegate) + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt index fb818d7e..5dc5f911 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt @@ -17,19 +17,15 @@ package io.axoniq.platform.framework.eventsourcing import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier -import io.axoniq.platform.framework.api.metrics.PreconfiguredMetric -import io.axoniq.platform.framework.messaging.HandlerMeasurement -import io.axoniq.platform.framework.messaging.HandlerMeasurement.Companion.RESOURCE_KEY -import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry -import io.axoniq.platform.framework.messaging.toInformation import io.axoniq.platform.framework.modelling.CurrentEntityContext import io.axoniq.platform.framework.modelling.EntityMetricsRegistry -import io.github.oshai.kotlinlogging.KotlinLogging +import org.axonframework.common.Registration import org.axonframework.common.infra.ComponentDescriptor -import org.axonframework.eventsourcing.eventstore.AppendCondition -import org.axonframework.eventsourcing.eventstore.EventStorageEngine +import org.axonframework.eventsourcing.eventstore.ConsistencyMarker +import org.axonframework.eventsourcing.eventstore.EventStore +import org.axonframework.eventsourcing.eventstore.EventStoreTransaction +import org.axonframework.eventsourcing.eventstore.Position import org.axonframework.eventsourcing.eventstore.SourcingCondition -import org.axonframework.eventsourcing.eventstore.TaggedEventMessage import org.axonframework.messaging.core.MessageStream import org.axonframework.messaging.core.unitofwork.ProcessingContext import org.axonframework.messaging.eventhandling.EventMessage @@ -39,140 +35,120 @@ import java.time.Instant import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong +import java.util.function.BiFunction +import java.util.function.Consumer -class AxoniqPlatformEventStorageEngine( - private val delegate: EventStorageEngine, - private val registry: HandlerMetricsRegistry, - private val entityMetricsRegistry: EntityMetricsRegistry, -) : EventStorageEngine { - - override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, events: List>): CompletableFuture> { - // First, report dispatches - val container = context?.getResource(RESOURCE_KEY) - if (container == null) { - events.forEach { tm -> - val event: EventMessage = tm.event() - registry.registerMessageDispatchedWithoutHandling(event.toInformation()) - } - } else { - events.forEach { tm -> - val event: EventMessage = tm.event() - container.reportMessageDispatched(event.toInformation()) - } - } - if (context == null) { - return delegate.appendEvents(condition, context, events) - } - // Second, measure commit time if measurement is ongoing - val startTime = System.nanoTime() - val currentEntity = context.getResource(CurrentEntityContext.RESOURCE_KEY) - return delegate.appendEvents(condition, context, events) - .whenComplete { _, _ -> - val endTime = System.nanoTime() - HandlerMeasurement.onContext(context) { - it.registerMetricValue(PreconfiguredMetric.EVENT_COMMIT_TIME, endTime - startTime) - } - currentEntity?.let { - entityMetricsRegistry.registerAdditionalTimer( - it, - EntityMetricsRegistry.METRIC_EVENT_COMMIT_TIME, - endTime - startTime, - TimeUnit.NANOSECONDS, - ) - } - } +class AxoniqPlatformEventStore(private val delegate: EventStore, private val entityMetricsRegistry: EntityMetricsRegistry) : EventStore { + override fun transaction(processingContext: ProcessingContext): EventStoreTransaction { + return AxoniqPlatformEventStoreTransaction( + delegate.transaction(processingContext), processingContext + ) } - override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, vararg events: TaggedEventMessage<*>): CompletableFuture> { - return appendEvents(condition, context, events.toList()) + override fun open(condition: StreamingCondition, context: ProcessingContext?): MessageStream { + return delegate.open(condition, context) } - override fun source(condition: SourcingCondition): MessageStream { - val entity = CurrentEntityContext.getFromThread() - // Capture the loadOrCreate-shared sourced-events counter at source() invocation time, while - // we are still on the calling thread. The actual onNext callbacks may fire later/elsewhere - // but they hold the reference through closure capture, so the count stays accurate. - val sourcedCounter = CurrentEntityContext.getSourcedEventsCounterFromThread() - val stream = delegate.source(condition) - if (entity == null) { - return stream - } - recordCriteriaSize(entity, condition) - return wrapForStreamSize(entity, stream, sourcedCounter) + override fun firstToken(context: ProcessingContext?): CompletableFuture { + return delegate.firstToken(context) } - override fun stream(condition: StreamingCondition): MessageStream { - val entity = CurrentEntityContext.getFromThread() - val sourcedCounter = CurrentEntityContext.getSourcedEventsCounterFromThread() - val stream = delegate.stream(condition) - if (entity == null) { - return stream - } - recordCriteriaSize(entity, condition.criteria().flatten().size) - return wrapForStreamSize(entity, stream, sourcedCounter) + override fun latestToken(context: ProcessingContext?): CompletableFuture { + return delegate.latestToken(context) } - override fun firstToken(): CompletableFuture = delegate.firstToken() + override fun tokenAt(at: Instant, context: ProcessingContext?): CompletableFuture { + return delegate.tokenAt(at, context) + } - override fun latestToken(): CompletableFuture = delegate.latestToken() + override fun subscribe(eventsBatchConsumer: BiFunction, ProcessingContext, CompletableFuture<*>>): Registration { + return delegate.subscribe(eventsBatchConsumer) + } - override fun tokenAt(at: Instant): CompletableFuture = delegate.tokenAt(at) + override fun publish(context: ProcessingContext?, events: List): CompletableFuture { + return delegate.publish(context, events) + } override fun describeTo(descriptor: ComponentDescriptor) { - descriptor.describeWrapperOf(delegate) + delegate.describeTo(descriptor) } - private fun recordCriteriaSize(entity: EntityStatisticIdentifier, condition: SourcingCondition) { - recordCriteriaSize(entity, condition.criteria().flatten().size) - } + inner class AxoniqPlatformEventStoreTransaction( + private val delegate: EventStoreTransaction, + private val processingContext: ProcessingContext): EventStoreTransaction { + override fun source(condition: SourcingCondition, resumePositionCallback: Consumer?): MessageStream { + val entity = processingContext.getResource(CurrentEntityContext.RESOURCE_KEY) + val counter = processingContext.getResource(CurrentEntityContext.COUNTER_KEY) + if(entity == null || counter == null) { + return delegate.source(condition, resumePositionCallback) + } + recordCriteriaSize(entity, condition) + return wrapForStreamSize(entity, delegate.source(condition, resumePositionCallback), counter) + } - private fun recordCriteriaSize(entity: EntityStatisticIdentifier, criteriaSize: Int) { - // Timer is time-typed; for count-style metrics we record in MILLISECONDS so the - // downstream `HistogramSnapshot.toDistribution()` (which reads values in ms) returns - // the raw count verbatim — e.g. 5 criteria → 5.0. The UI overrides the unit label - // to display this as "5" rather than "5ms". - // Skip recording 0: HdrHistogram has no real zero bucket and would report the lowest - // bucket midpoint (~0.001 with high precision) instead, polluting the distribution. - if (criteriaSize <= 0) return - entityMetricsRegistry.registerAdditionalTimer( - entity, - EntityMetricsRegistry.METRIC_CRITERIA_SIZE, - criteriaSize.toLong(), - TimeUnit.MILLISECONDS, - ) - } - private fun wrapForStreamSize( - entity: EntityStatisticIdentifier, - stream: MessageStream, - sharedSourcedCounter: AtomicLong?, - ): MessageStream { - val counter = AtomicLong(0) - return stream - .onNext { - counter.incrementAndGet() - // Also bump the loadOrCreate-shared counter so the repository can tell load - // (count > 0) from creation (count == 0). May be null for direct event-store - // streaming outside a managed load. - sharedSourcedCounter?.incrementAndGet() - } - .onComplete { - // Count-style metric; see note in recordCriteriaSize for unit choice. - // Skip recording for fresh-entity creations (no events sourced) — that case - // belongs to the Creations counter, not the stream-size distribution. - val size = counter.get() - if (size > 0L) { - entityMetricsRegistry.registerAdditionalTimer( - entity, - EntityMetricsRegistry.METRIC_EVENT_STREAM_SIZE, - size, - TimeUnit.MILLISECONDS, - ) + private fun recordCriteriaSize(entity: EntityStatisticIdentifier, condition: SourcingCondition) { + recordCriteriaSize(entity, condition.criteria().flatten().size) + } + + private fun recordCriteriaSize(entity: EntityStatisticIdentifier, criteriaSize: Int) { + // Timer is time-typed; for count-style metrics we record in MILLISECONDS so the + // downstream `HistogramSnapshot.toDistribution()` (which reads values in ms) returns + // the raw count verbatim — e.g. 5 criteria → 5.0. The UI overrides the unit label + // to display this as "5" rather than "5ms". + // Skip recording 0: HdrHistogram has no real zero bucket and would report the lowest + // bucket midpoint (~0.001 with high precision) instead, polluting the distribution. + if (criteriaSize <= 0) return + entityMetricsRegistry.registerAdditionalTimer( + entity, + EntityMetricsRegistry.METRIC_CRITERIA_SIZE, + criteriaSize.toLong(), + TimeUnit.MILLISECONDS, + ) + } + + private fun wrapForStreamSize( + entity: EntityStatisticIdentifier, + stream: MessageStream, + sharedSourcedCounter: AtomicLong?, + ): MessageStream { + val counter = AtomicLong(0) + return stream + .onNext { + counter.incrementAndGet() + // Also bump the loadOrCreate-shared counter so the repository can tell load + // (count > 0) from creation (count == 0). May be null for direct event-store + // streaming outside a managed load. + sharedSourcedCounter?.incrementAndGet() } - } - } + .onComplete { + // Count-style metric; see note in recordCriteriaSize for unit choice. + // Skip recording for fresh-entity creations (no events sourced) — that case + // belongs to the Creations counter, not the stream-size distribution. + val size = counter.get() + if (size > 0L) { + entityMetricsRegistry.registerAdditionalTimer( + entity, + EntityMetricsRegistry.METRIC_EVENT_STREAM_SIZE, + size, + TimeUnit.MILLISECONDS, + ) + } + } + } + + + override fun appendEvent(eventMessage: EventMessage) { + delegate.appendEvent(eventMessage) + } + + override fun onAppend(callback: Consumer) { + delegate.onAppend(callback) + } + + override fun appendPosition(): ConsistencyMarker { + return delegate.appendPosition() + } - companion object { - private val logger = KotlinLogging.logger { } } -} +} \ No newline at end of file diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java index 49e7b468..b72827b6 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java @@ -21,6 +21,7 @@ import org.axonframework.common.configuration.ComponentRegistry; import org.axonframework.common.configuration.DecoratorDefinition; import org.axonframework.eventsourcing.eventstore.EventStorageEngine; +import org.axonframework.eventsourcing.eventstore.EventStore; /** * Holder of the actual decorator registration against {@code axon-eventsourcing} types. Kept separate from @@ -41,5 +42,12 @@ static void apply(ComponentRegistry registry) { EntityMetricsRegistry entityMetricsRegistry = c.getComponent(EntityMetricsRegistry.class); return new AxoniqPlatformEventStorageEngine(delegate, metricsRegistry, entityMetricsRegistry); }).order(Integer.MAX_VALUE)); + + registry.registerDecorator( + DecoratorDefinition.forType(EventStore.class) + .with((c, name, delegate) -> { + EntityMetricsRegistry entityMetricsRegistry = c.getComponent(EntityMetricsRegistry.class); + return new AxoniqPlatformEventStore(delegate, entityMetricsRegistry); + }).order(Integer.MAX_VALUE)); } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt index 1214a861..fcdc868d 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt @@ -47,19 +47,15 @@ class AxoniqPlatformRepository( override fun load(identifier: ID, processingContext: ProcessingContext): CompletableFuture> { val entityId = entityIdentifierFor(identifier, processingContext) processingContext.putResource(CurrentEntityContext.RESOURCE_KEY, entityId) - CurrentEntityContext.setOnThread(entityId) val startTime = System.nanoTime() - return try { - delegate.load(identifier, processingContext) - } finally { - CurrentEntityContext.clearOnThread() - }.whenComplete { _, error -> - val duration = System.nanoTime() - startTime - entityMetricsRegistry.registerLoad(entityId, duration, success = error == null) - HandlerMeasurement.onContext(processingContext) { - it.registerMetricValue(LoadModelMetric(identifier = "load:${entityType().simpleName}"), duration) - } - } + return delegate.load(identifier, processingContext) + .whenComplete { _, error -> + val duration = System.nanoTime() - startTime + entityMetricsRegistry.registerLoad(entityId, duration, success = error == null) + HandlerMeasurement.onContext(processingContext) { + it.registerMetricValue(LoadModelMetric(identifier = "load:${entityType().simpleName}"), duration) + } + } } override fun loadOrCreate(identifier: ID, processingContext: ProcessingContext): CompletableFuture> { @@ -71,15 +67,9 @@ class AxoniqPlatformRepository( // event-sourced entity would be reported as a creation too. val sourcedCounter = AtomicLong(0) processingContext.putResource(CurrentEntityContext.RESOURCE_KEY, entityId) - CurrentEntityContext.setOnThread(entityId) - CurrentEntityContext.setSourcedEventsCounterOnThread(sourcedCounter) + processingContext.putResource(CurrentEntityContext.COUNTER_KEY, AtomicLong(0)) val startTime = System.nanoTime() - return try { - delegate.loadOrCreate(identifier, processingContext) - } finally { - CurrentEntityContext.clearOnThread() - CurrentEntityContext.clearSourcedEventsCounterOnThread() - }.whenComplete { _, error -> + return delegate.loadOrCreate(identifier, processingContext).whenComplete { _, error -> val duration = System.nanoTime() - startTime entityMetricsRegistry.registerLoad(entityId, duration, success = error == null) if (error == null && sourcedCounter.get() == 0L) { diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt index 715855dc..2727499a 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt @@ -38,27 +38,5 @@ import java.util.concurrent.atomic.AtomicLong object CurrentEntityContext { val RESOURCE_KEY: Context.ResourceKey = Context.ResourceKey.withLabel("Axoniq Platform - Current Entity") - - private val threadLocal: ThreadLocal = ThreadLocal() - private val sourcedEventsTL: ThreadLocal = ThreadLocal() - - fun setOnThread(identifier: EntityStatisticIdentifier) { - threadLocal.set(identifier) - } - - fun getFromThread(): EntityStatisticIdentifier? = threadLocal.get() - - fun clearOnThread() { - threadLocal.remove() - } - - fun setSourcedEventsCounterOnThread(counter: AtomicLong) { - sourcedEventsTL.set(counter) - } - - fun getSourcedEventsCounterFromThread(): AtomicLong? = sourcedEventsTL.get() - - fun clearSourcedEventsCounterOnThread() { - sourcedEventsTL.remove() - } + val COUNTER_KEY: Context.ResourceKey = Context.ResourceKey.withLabel("Axoniq Platform - Entity source counter") } From a4bf3fe6b7291fd347d53bf66a575516caa7556a Mon Sep 17 00:00:00 2001 From: Stefan Mirkovic Date: Wed, 27 May 2026 10:23:27 +0200 Subject: [PATCH 3/3] Capture per-entity metrics keyed by entity class name EntityMetricsRegistry shipped on each StatisticReport. Repository decorator records load timings; event-store decorator attributes commit time, stream size and criteria size via CurrentEntityContext on the ProcessingContext. Count timers use higher precision and skip recording zero. --- .../api/metrics/EntityStatisticIdentifier.kt | 6 --- .../framework/api/metrics/EntityStatistics.kt | 2 - .../eventsourcing/AxoniqPlatformEventStore.kt | 23 +++------- .../modelling/AxoniqPlatformRepository.kt | 30 ++----------- .../modelling/CurrentEntityContext.kt | 18 ++------ .../modelling/EntityMetricsRegistry.kt | 8 +--- .../modelling/EntityMetricsRegistryTest.kt | 43 +++++-------------- 7 files changed, 23 insertions(+), 107 deletions(-) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt index 6e466ebc..9d1335ba 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt @@ -21,10 +21,4 @@ import com.fasterxml.jackson.annotation.JsonProperty data class EntityStatisticIdentifier( @JsonProperty("n") val entityName: String, - @JsonProperty("i") - val entityId: String, - @JsonProperty("mt") - val messageType: String, - @JsonProperty("mn") - val messageName: String, ) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt index 51886d50..d8432d3a 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt @@ -21,8 +21,6 @@ import com.fasterxml.jackson.annotation.JsonProperty data class EntityStatistics( @JsonProperty("c") val count: Double, - @JsonProperty("cr") - val creations: Double, @JsonProperty("f") val failed: Double, @JsonProperty("t") diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt index 5dc5f911..8f9fbb94 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt @@ -78,17 +78,11 @@ class AxoniqPlatformEventStore(private val delegate: EventStore, private val ent private val processingContext: ProcessingContext): EventStoreTransaction { override fun source(condition: SourcingCondition, resumePositionCallback: Consumer?): MessageStream { val entity = processingContext.getResource(CurrentEntityContext.RESOURCE_KEY) - val counter = processingContext.getResource(CurrentEntityContext.COUNTER_KEY) - if(entity == null || counter == null) { + if (entity == null) { return delegate.source(condition, resumePositionCallback) } - recordCriteriaSize(entity, condition) - return wrapForStreamSize(entity, delegate.source(condition, resumePositionCallback), counter) - } - - - private fun recordCriteriaSize(entity: EntityStatisticIdentifier, condition: SourcingCondition) { recordCriteriaSize(entity, condition.criteria().flatten().size) + return wrapForStreamSize(entity, delegate.source(condition, resumePositionCallback)) } private fun recordCriteriaSize(entity: EntityStatisticIdentifier, criteriaSize: Int) { @@ -110,21 +104,14 @@ class AxoniqPlatformEventStore(private val delegate: EventStore, private val ent private fun wrapForStreamSize( entity: EntityStatisticIdentifier, stream: MessageStream, - sharedSourcedCounter: AtomicLong?, ): MessageStream { val counter = AtomicLong(0) return stream - .onNext { - counter.incrementAndGet() - // Also bump the loadOrCreate-shared counter so the repository can tell load - // (count > 0) from creation (count == 0). May be null for direct event-store - // streaming outside a managed load. - sharedSourcedCounter?.incrementAndGet() - } + .onNext { counter.incrementAndGet() } .onComplete { // Count-style metric; see note in recordCriteriaSize for unit choice. - // Skip recording for fresh-entity creations (no events sourced) — that case - // belongs to the Creations counter, not the stream-size distribution. + // Skip recording when no events were sourced — a fresh entity has nothing + // meaningful to contribute to the stream-size distribution. val size = counter.get() if (size > 0L) { entityMetricsRegistry.registerAdditionalTimer( diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt index fcdc868d..c1d9d398 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt @@ -19,13 +19,11 @@ package io.axoniq.platform.framework.modelling import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier import io.axoniq.platform.framework.api.metrics.LoadModelMetric import io.axoniq.platform.framework.messaging.HandlerMeasurement -import io.axoniq.platform.framework.messaging.toInformation import org.axonframework.common.infra.ComponentDescriptor import org.axonframework.messaging.core.unitofwork.ProcessingContext import org.axonframework.modelling.repository.ManagedEntity import org.axonframework.modelling.repository.Repository import java.util.concurrent.CompletableFuture -import java.util.concurrent.atomic.AtomicLong class AxoniqPlatformRepository( private val delegate: Repository, @@ -45,7 +43,7 @@ class AxoniqPlatformRepository( } override fun load(identifier: ID, processingContext: ProcessingContext): CompletableFuture> { - val entityId = entityIdentifierFor(identifier, processingContext) + val entityId = entityIdentifier() processingContext.putResource(CurrentEntityContext.RESOURCE_KEY, entityId) val startTime = System.nanoTime() return delegate.load(identifier, processingContext) @@ -59,22 +57,12 @@ class AxoniqPlatformRepository( } override fun loadOrCreate(identifier: ID, processingContext: ProcessingContext): CompletableFuture> { - val entityId = entityIdentifierFor(identifier, processingContext) - // Shared counter the event-store wrapper bumps for each sourced event. After loadOrCreate - // completes, a zero count means no prior events existed → the framework created a fresh - // entity. Counting once here (rather than registering creation unconditionally) keeps the - // Creations/min rate aligned with reality — without this, every redeem/update on an existing - // event-sourced entity would be reported as a creation too. - val sourcedCounter = AtomicLong(0) + val entityId = entityIdentifier() processingContext.putResource(CurrentEntityContext.RESOURCE_KEY, entityId) - processingContext.putResource(CurrentEntityContext.COUNTER_KEY, AtomicLong(0)) val startTime = System.nanoTime() return delegate.loadOrCreate(identifier, processingContext).whenComplete { _, error -> val duration = System.nanoTime() - startTime entityMetricsRegistry.registerLoad(entityId, duration, success = error == null) - if (error == null && sourcedCounter.get() == 0L) { - entityMetricsRegistry.registerCreation(entityId) - } HandlerMeasurement.onContext(processingContext) { it.registerMetricValue(LoadModelMetric(identifier = "load:${entityType().simpleName}"), duration) } @@ -89,17 +77,7 @@ class AxoniqPlatformRepository( descriptor.describeWrapperOf(delegate) } - private fun entityIdentifierFor(identifier: ID, processingContext: ProcessingContext): EntityStatisticIdentifier { - val messageInfo = HandlerMeasurement.fromContext(processingContext)?.message?.toInformation() - return EntityStatisticIdentifier( - entityName = entityType().simpleName, - entityId = identifier.toString(), - messageType = messageInfo?.type ?: UNKNOWN, - messageName = messageInfo?.name ?: UNKNOWN, - ) - } - - companion object { - private const val UNKNOWN = "Unknown" + private fun entityIdentifier(): EntityStatisticIdentifier { + return EntityStatisticIdentifier(entityName = entityType().simpleName) } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt index 2727499a..38cb707e 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt @@ -18,25 +18,13 @@ package io.axoniq.platform.framework.modelling import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier import org.axonframework.messaging.core.Context -import java.util.concurrent.atomic.AtomicLong /** - * Carries the entity statistic identifier that is currently being loaded through the [Context] / - * [org.axonframework.messaging.core.unitofwork.ProcessingContext], so downstream decorators (event store, - * lock primitives) can attribute their metrics to the correct entity. - * - * Where [org.axonframework.messaging.core.unitofwork.ProcessingContext] is available (e.g. event-store - * append), prefer [RESOURCE_KEY]. The [EventStorageEngine.source] API does not propagate a context, so - * for that call site the same identifier is also exposed via a thread-local set by - * [AxoniqPlatformRepository] just before delegating into the framework. - * - * The [SOURCED_EVENTS_TL] thread-local lets the repository distinguish a real load from a fresh-entity - * creation: when `loadOrCreate` returns and the counter is still zero, the framework sourced no prior - * events for this identifier, meaning the entity was just created. The counter is shared by reference - * with the event-store wrapper so it stays correct even if events are delivered on a different thread. + * Carries the entity statistic identifier that is currently being loaded through the + * [org.axonframework.messaging.core.unitofwork.ProcessingContext], so downstream decorators + * (event store, lock primitives) can attribute their metrics to the correct entity. */ object CurrentEntityContext { val RESOURCE_KEY: Context.ResourceKey = Context.ResourceKey.withLabel("Axoniq Platform - Current Entity") - val COUNTER_KEY: Context.ResourceKey = Context.ResourceKey.withLabel("Axoniq Platform - Entity source counter") } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt index 9e5d9f42..b1ceb05c 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt @@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit /** - * Registry collecting metrics per entity load by `(entityName, entityId, messageType, messageName)`. + * Registry collecting metrics per entity (keyed by entity name only). * * Drained by [io.axoniq.platform.framework.messaging.HandlerMetricsRegistry] on its periodic reporting * tick and shipped to the platform as part of [io.axoniq.platform.framework.api.metrics.StatisticReport]. @@ -49,10 +49,6 @@ class EntityMetricsRegistry { } } - fun registerCreation(identifier: EntityStatisticIdentifier) { - entry(identifier).creationCount.increment() - } - fun registerAdditionalTimer(identifier: EntityStatisticIdentifier, name: String, value: Long, unit: TimeUnit) { entry(identifier).metrics .computeIfAbsentWithRetry(name) { timerFactory(name)(meterRegistry, "${identifier}_timer_$name") } @@ -78,7 +74,6 @@ class EntityMetricsRegistry { it.key, EntityStatistics( count = it.value.totalCount.count(), - creations = it.value.creationCount.count(), failed = it.value.failureCount.count(), timer = it.value.totalTimer.takeSnapshot().toDistribution(), metrics = it.value.metrics.map { (k, v) -> k to v.takeSnapshot().toDistribution() }.toMap() @@ -96,7 +91,6 @@ class EntityMetricsRegistry { private data class EntityRegistryStatistics( val totalTimer: Timer, val totalCount: RollingCountMeasure = RollingCountMeasure(), - val creationCount: RollingCountMeasure = RollingCountMeasure(), val failureCount: RollingCountMeasure = RollingCountMeasure(), val metrics: MutableMap = ConcurrentHashMap(), ) diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt index f14e7ddf..420f49f3 100644 --- a/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt @@ -25,39 +25,27 @@ import java.util.concurrent.TimeUnit class EntityMetricsRegistryTest { - private val orderCreated = EntityStatisticIdentifier( - entityName = "Order", - entityId = "order-1", - messageType = "CommandMessage", - messageName = "CreateOrder", - ) - - private val orderShipped = EntityStatisticIdentifier( - entityName = "Order", - entityId = "order-1", - messageType = "CommandMessage", - messageName = "ShipOrder", - ) + private val order = EntityStatisticIdentifier(entityName = "Order") + private val reservation = EntityStatisticIdentifier(entityName = "Reservation") @Test - fun `separate keys produce separate report entries`() { + fun `separate entity names produce separate report entries`() { val registry = EntityMetricsRegistry() - registry.registerLoad(orderCreated, 10_000_000, success = true) - registry.registerLoad(orderShipped, 20_000_000, success = true) - registry.registerCreation(orderCreated) + registry.registerLoad(order, 10_000_000, success = true) + registry.registerLoad(reservation, 20_000_000, success = true) val stats = registry.getStats().associateBy { it.entity } assertEquals(2, stats.size) - assertNotNull(stats[orderCreated]) - assertNotNull(stats[orderShipped]) + assertNotNull(stats[order]) + assertNotNull(stats[reservation]) } @Test fun `load records a total timer with non-empty snapshot`() { val registry = EntityMetricsRegistry() - registry.registerLoad(orderCreated, 5_000_000, success = true) + registry.registerLoad(order, 5_000_000, success = true) val stats = registry.getStats().single() assertNotNull(stats.statistics.timer) @@ -69,9 +57,9 @@ class EntityMetricsRegistryTest { fun `additional timer is exposed under the requested metric name`() { val registry = EntityMetricsRegistry() - registry.registerLoad(orderCreated, 5_000_000, success = true) + registry.registerLoad(order, 5_000_000, success = true) registry.registerAdditionalTimer( - orderCreated, + order, EntityMetricsRegistry.METRIC_CRITERIA_SIZE, 42L, TimeUnit.NANOSECONDS, @@ -80,15 +68,4 @@ class EntityMetricsRegistryTest { val stats = registry.getStats().single() assertTrue(stats.statistics.metrics.containsKey(EntityMetricsRegistry.METRIC_CRITERIA_SIZE)) } - - @Test - fun `creation without load still produces a report entry`() { - val registry = EntityMetricsRegistry() - - registry.registerCreation(orderCreated) - - val stats = registry.getStats() - assertEquals(1, stats.size) - assertEquals(orderCreated, stats.single().entity) - } }