diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt new file mode 100644 index 00000000..9d1335ba --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticIdentifier.kt @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022-2025. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api.metrics + +import com.fasterxml.jackson.annotation.JsonProperty + +data class EntityStatisticIdentifier( + @JsonProperty("n") + val entityName: String, +) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt new file mode 100644 index 00000000..d8432d3a --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatistics.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022-2025. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api.metrics + +import com.fasterxml.jackson.annotation.JsonProperty + +data class EntityStatistics( + @JsonProperty("c") + val count: Double, + @JsonProperty("f") + val failed: Double, + @JsonProperty("t") + val timer: StatisticDistribution?, + @JsonProperty("m") + val metrics: Map, +) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticsWithIdentifier.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticsWithIdentifier.kt new file mode 100644 index 00000000..107605f4 --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/EntityStatisticsWithIdentifier.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2022-2025. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api.metrics + +import com.fasterxml.jackson.annotation.JsonProperty + +data class EntityStatisticsWithIdentifier( + @JsonProperty("e") + val entity: EntityStatisticIdentifier, + @JsonProperty("s") + val statistics: EntityStatistics, +) diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/StatisticReport.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/StatisticReport.kt index 192f1e91..68e05b58 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/StatisticReport.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/metrics/StatisticReport.kt @@ -25,4 +25,6 @@ data class StatisticReport( val dispatchers: List, @JsonProperty("a") val aggregates: List, + @JsonProperty("e") + val entities: List = emptyList(), ) diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java index eb125879..15246907 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java @@ -38,6 +38,7 @@ import io.axoniq.platform.framework.messaging.AxoniqPlatformCommandBus; import io.axoniq.platform.framework.messaging.AxoniqPlatformQueryBus; import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry; +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry; import org.axonframework.common.configuration.ComponentDefinition; import org.axonframework.common.configuration.ComponentRegistry; import org.axonframework.common.configuration.Configuration; @@ -140,12 +141,16 @@ public void enhance(ComponentRegistry registry) { // The start handler will allow for eager creation .onStart(Phase.EXTERNAL_CONNECTIONS, c -> { })) + .registerComponent(ComponentDefinition + .ofType(EntityMetricsRegistry.class) + .withBuilder(c -> new EntityMetricsRegistry())) .registerComponent(ComponentDefinition .ofType(HandlerMetricsRegistry.class) .withBuilder(c -> new HandlerMetricsRegistry( c.getComponent(AxoniqConsoleRSocketClient.class), c.getComponent(PlatformClientConnectionService.class), - c.getComponent(AxoniqPlatformConfiguration.class)))) + c.getComponent(AxoniqPlatformConfiguration.class), + c.getComponent(EntityMetricsRegistry.class)))) .registerComponent(ComponentDefinition .ofType(ApplicationThreadDumpProvider.class) .withBuilder(c -> new ApplicationThreadDumpProvider())) diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStorageEngine.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStorageEngine.kt new file mode 100644 index 00000000..36c37a34 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStorageEngine.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.metrics.PreconfiguredMetric +import io.axoniq.platform.framework.messaging.HandlerMeasurement +import io.axoniq.platform.framework.messaging.HandlerMeasurement.Companion.RESOURCE_KEY +import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry +import io.axoniq.platform.framework.messaging.toInformation +import io.axoniq.platform.framework.modelling.CurrentEntityContext +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry +import org.axonframework.common.infra.ComponentDescriptor +import org.axonframework.eventsourcing.eventstore.AppendCondition +import org.axonframework.eventsourcing.eventstore.EventStorageEngine +import org.axonframework.eventsourcing.eventstore.SourcingCondition +import org.axonframework.eventsourcing.eventstore.TaggedEventMessage +import org.axonframework.messaging.core.MessageStream +import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.eventhandling.EventMessage +import org.axonframework.messaging.eventhandling.processing.streaming.token.TrackingToken +import org.axonframework.messaging.eventstreaming.StreamingCondition +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit + +class AxoniqPlatformEventStorageEngine( + private val delegate: EventStorageEngine, + private val registry: HandlerMetricsRegistry, + private val entityMetricsRegistry: EntityMetricsRegistry, +) : EventStorageEngine { + + override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, events: List>): CompletableFuture> { + // First, report dispatches + val container = context?.getResource(RESOURCE_KEY) + if (container == null) { + events.forEach { tm -> + val event: EventMessage = tm.event() + registry.registerMessageDispatchedWithoutHandling(event.toInformation()) + } + } else { + events.forEach { tm -> + val event: EventMessage = tm.event() + container.reportMessageDispatched(event.toInformation()) + } + } + if (context == null) { + return delegate.appendEvents(condition, context, events) + } + // Second, measure commit time if measurement is ongoing + val startTime = System.nanoTime() + val currentEntity = context.getResource(CurrentEntityContext.RESOURCE_KEY) + return delegate.appendEvents(condition, context, events) + .whenComplete { _, _ -> + val endTime = System.nanoTime() + HandlerMeasurement.onContext(context) { + it.registerMetricValue(PreconfiguredMetric.EVENT_COMMIT_TIME, endTime - startTime) + } + currentEntity?.let { + entityMetricsRegistry.registerAdditionalTimer( + it, + EntityMetricsRegistry.METRIC_EVENT_COMMIT_TIME, + endTime - startTime, + TimeUnit.NANOSECONDS, + ) + } + } + } + + override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, vararg events: TaggedEventMessage<*>): CompletableFuture> { + return appendEvents(condition, context, events.toList()) + } + + override fun source(condition: SourcingCondition): MessageStream { + return delegate.source(condition) + } + + override fun stream(condition: StreamingCondition): MessageStream { + return delegate.stream(condition) + } + + override fun firstToken(): CompletableFuture = delegate.firstToken() + + override fun latestToken(): CompletableFuture = delegate.latestToken() + + override fun tokenAt(at: Instant): CompletableFuture = delegate.tokenAt(at) + + override fun describeTo(descriptor: ComponentDescriptor) { + descriptor.describeWrapperOf(delegate) + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt index faae87e4..8f9fbb94 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEventStore.kt @@ -16,17 +16,16 @@ package io.axoniq.platform.framework.eventsourcing -import io.axoniq.platform.framework.api.metrics.PreconfiguredMetric -import io.axoniq.platform.framework.messaging.HandlerMeasurement -import io.axoniq.platform.framework.messaging.HandlerMeasurement.Companion.RESOURCE_KEY -import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry -import io.axoniq.platform.framework.messaging.toInformation -import io.github.oshai.kotlinlogging.KotlinLogging +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier +import io.axoniq.platform.framework.modelling.CurrentEntityContext +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry +import org.axonframework.common.Registration import org.axonframework.common.infra.ComponentDescriptor -import org.axonframework.eventsourcing.eventstore.AppendCondition -import org.axonframework.eventsourcing.eventstore.EventStorageEngine +import org.axonframework.eventsourcing.eventstore.ConsistencyMarker +import org.axonframework.eventsourcing.eventstore.EventStore +import org.axonframework.eventsourcing.eventstore.EventStoreTransaction +import org.axonframework.eventsourcing.eventstore.Position import org.axonframework.eventsourcing.eventstore.SourcingCondition -import org.axonframework.eventsourcing.eventstore.TaggedEventMessage import org.axonframework.messaging.core.MessageStream import org.axonframework.messaging.core.unitofwork.ProcessingContext import org.axonframework.messaging.eventhandling.EventMessage @@ -34,65 +33,109 @@ import org.axonframework.messaging.eventhandling.processing.streaming.token.Trac import org.axonframework.messaging.eventstreaming.StreamingCondition import java.time.Instant import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong +import java.util.function.BiFunction +import java.util.function.Consumer -class AxoniqPlatformEventStorageEngine( - private val delegate: EventStorageEngine, - private val registry: HandlerMetricsRegistry, -) : EventStorageEngine { - - override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, events: List>): CompletableFuture> { - // First, report dispatches - val container = context?.getResource(RESOURCE_KEY) - if (container == null) { - events.forEach { tm -> - val event: EventMessage = tm.event() - registry.registerMessageDispatchedWithoutHandling(event.toInformation()) - } - } else { - events.forEach { tm -> - val event: EventMessage = tm.event() - container.reportMessageDispatched(event.toInformation()) - } - } - if (context == null) { - return delegate.appendEvents(condition, context, events) - } - // Second, measure commit time if measurement is ongoing - val startTime = System.nanoTime() - return delegate.appendEvents(condition, context, events) - .whenComplete { _, _ -> - val endTime = System.nanoTime() - HandlerMeasurement.onContext(context) { - it.registerMetricValue(PreconfiguredMetric.EVENT_COMMIT_TIME, endTime - startTime) - } - } +class AxoniqPlatformEventStore(private val delegate: EventStore, private val entityMetricsRegistry: EntityMetricsRegistry) : EventStore { + override fun transaction(processingContext: ProcessingContext): EventStoreTransaction { + return AxoniqPlatformEventStoreTransaction( + delegate.transaction(processingContext), processingContext + ) } - override fun appendEvents(condition: AppendCondition, context: ProcessingContext?, vararg events: TaggedEventMessage<*>): CompletableFuture> { - return appendEvents(condition, context, events.toList()) + override fun open(condition: StreamingCondition, context: ProcessingContext?): MessageStream { + return delegate.open(condition, context) } - override fun source(condition: SourcingCondition): MessageStream { - return delegate.source(condition) + override fun firstToken(context: ProcessingContext?): CompletableFuture { + return delegate.firstToken(context) } - override fun stream(condition: StreamingCondition): MessageStream { - return delegate.stream(condition) + override fun latestToken(context: ProcessingContext?): CompletableFuture { + return delegate.latestToken(context) } - override fun firstToken(): CompletableFuture { - return delegate.firstToken() + override fun tokenAt(at: Instant, context: ProcessingContext?): CompletableFuture { + return delegate.tokenAt(at, context) } - override fun latestToken(): CompletableFuture { - return delegate.latestToken() + override fun subscribe(eventsBatchConsumer: BiFunction, ProcessingContext, CompletableFuture<*>>): Registration { + return delegate.subscribe(eventsBatchConsumer) } - override fun tokenAt(at: Instant): CompletableFuture { - return delegate.tokenAt(at) + override fun publish(context: ProcessingContext?, events: List): CompletableFuture { + return delegate.publish(context, events) } override fun describeTo(descriptor: ComponentDescriptor) { - descriptor.describeWrapperOf(delegate) + delegate.describeTo(descriptor) + } + + inner class AxoniqPlatformEventStoreTransaction( + private val delegate: EventStoreTransaction, + private val processingContext: ProcessingContext): EventStoreTransaction { + override fun source(condition: SourcingCondition, resumePositionCallback: Consumer?): MessageStream { + val entity = processingContext.getResource(CurrentEntityContext.RESOURCE_KEY) + if (entity == null) { + return delegate.source(condition, resumePositionCallback) + } + recordCriteriaSize(entity, condition.criteria().flatten().size) + return wrapForStreamSize(entity, delegate.source(condition, resumePositionCallback)) + } + + private fun recordCriteriaSize(entity: EntityStatisticIdentifier, criteriaSize: Int) { + // Timer is time-typed; for count-style metrics we record in MILLISECONDS so the + // downstream `HistogramSnapshot.toDistribution()` (which reads values in ms) returns + // the raw count verbatim — e.g. 5 criteria → 5.0. The UI overrides the unit label + // to display this as "5" rather than "5ms". + // Skip recording 0: HdrHistogram has no real zero bucket and would report the lowest + // bucket midpoint (~0.001 with high precision) instead, polluting the distribution. + if (criteriaSize <= 0) return + entityMetricsRegistry.registerAdditionalTimer( + entity, + EntityMetricsRegistry.METRIC_CRITERIA_SIZE, + criteriaSize.toLong(), + TimeUnit.MILLISECONDS, + ) + } + + private fun wrapForStreamSize( + entity: EntityStatisticIdentifier, + stream: MessageStream, + ): MessageStream { + val counter = AtomicLong(0) + return stream + .onNext { counter.incrementAndGet() } + .onComplete { + // Count-style metric; see note in recordCriteriaSize for unit choice. + // Skip recording when no events were sourced — a fresh entity has nothing + // meaningful to contribute to the stream-size distribution. + val size = counter.get() + if (size > 0L) { + entityMetricsRegistry.registerAdditionalTimer( + entity, + EntityMetricsRegistry.METRIC_EVENT_STREAM_SIZE, + size, + TimeUnit.MILLISECONDS, + ) + } + } + } + + + override fun appendEvent(eventMessage: EventMessage) { + delegate.appendEvent(eventMessage) + } + + override fun onAppend(callback: Consumer) { + delegate.onAppend(callback) + } + + override fun appendPosition(): ConsistencyMarker { + return delegate.appendPosition() + } + } } \ No newline at end of file diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java index 5c4d9d88..b72827b6 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/EventSourcingDecorators.java @@ -17,9 +17,11 @@ package io.axoniq.platform.framework.eventsourcing; import io.axoniq.platform.framework.messaging.HandlerMetricsRegistry; +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry; import org.axonframework.common.configuration.ComponentRegistry; import org.axonframework.common.configuration.DecoratorDefinition; import org.axonframework.eventsourcing.eventstore.EventStorageEngine; +import org.axonframework.eventsourcing.eventstore.EventStore; /** * Holder of the actual decorator registration against {@code axon-eventsourcing} types. Kept separate from @@ -37,7 +39,15 @@ static void apply(ComponentRegistry registry) { DecoratorDefinition.forType(EventStorageEngine.class) .with((c, name, delegate) -> { HandlerMetricsRegistry metricsRegistry = c.getComponent(HandlerMetricsRegistry.class); - return new AxoniqPlatformEventStorageEngine(delegate, metricsRegistry); + EntityMetricsRegistry entityMetricsRegistry = c.getComponent(EntityMetricsRegistry.class); + return new AxoniqPlatformEventStorageEngine(delegate, metricsRegistry, entityMetricsRegistry); + }).order(Integer.MAX_VALUE)); + + registry.registerDecorator( + DecoratorDefinition.forType(EventStore.class) + .with((c, name, delegate) -> { + EntityMetricsRegistry entityMetricsRegistry = c.getComponent(EntityMetricsRegistry.class); + return new AxoniqPlatformEventStore(delegate, entityMetricsRegistry); }).order(Integer.MAX_VALUE)); } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt index 559adde6..3c9a335a 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMetricsRegistry.kt @@ -35,6 +35,7 @@ import io.axoniq.platform.framework.client.AxoniqConsoleRSocketClient import io.axoniq.platform.framework.client.PlatformClientConnectionObserver import io.axoniq.platform.framework.client.PlatformClientConnectionService import io.axoniq.platform.framework.computeIfAbsentWithRetry +import io.axoniq.platform.framework.modelling.EntityMetricsRegistry import io.github.oshai.kotlinlogging.KotlinLogging import io.micrometer.core.instrument.Timer import io.micrometer.core.instrument.simple.SimpleMeterRegistry @@ -46,7 +47,8 @@ import java.util.concurrent.TimeUnit class HandlerMetricsRegistry( private val axoniqConsoleRSocketClient: AxoniqConsoleRSocketClient, private val platformClientConnectionService: PlatformClientConnectionService, - private val properties: AxoniqPlatformConfiguration + private val properties: AxoniqPlatformConfiguration, + private val entityMetricsRegistry: EntityMetricsRegistry, ) : PlatformClientConnectionObserver { private val logger = KotlinLogging.logger { } private var reportTask: ScheduledFuture<*>? = null @@ -118,7 +120,8 @@ class HandlerMetricsRegistry( DispatcherStatistics(it.value.count()) ) }, - aggregates = emptyList() + aggregates = emptyList(), + entities = entityMetricsRegistry.getStats(), ) return flow } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt index fa9efad9..c1d9d398 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformRepository.kt @@ -16,6 +16,7 @@ package io.axoniq.platform.framework.modelling +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier import io.axoniq.platform.framework.api.metrics.LoadModelMetric import io.axoniq.platform.framework.messaging.HandlerMeasurement import org.axonframework.common.infra.ComponentDescriptor @@ -26,6 +27,7 @@ import java.util.concurrent.CompletableFuture class AxoniqPlatformRepository( private val delegate: Repository, + private val entityMetricsRegistry: EntityMetricsRegistry, ) : Repository.LifecycleManagement { override fun attach(entity: ManagedEntity, processingContext: ProcessingContext): ManagedEntity { val lfcm = delegate as Repository.LifecycleManagement @@ -41,19 +43,26 @@ class AxoniqPlatformRepository( } override fun load(identifier: ID, processingContext: ProcessingContext): CompletableFuture> { + val entityId = entityIdentifier() + processingContext.putResource(CurrentEntityContext.RESOURCE_KEY, entityId) val startTime = System.nanoTime() - return delegate.load(identifier, processingContext).whenComplete { _, _ -> - val duration = System.nanoTime() - startTime - HandlerMeasurement.onContext(processingContext) { - it.registerMetricValue(LoadModelMetric(identifier = "load:${entityType().simpleName}"), duration) - } - } + return delegate.load(identifier, processingContext) + .whenComplete { _, error -> + val duration = System.nanoTime() - startTime + entityMetricsRegistry.registerLoad(entityId, duration, success = error == null) + HandlerMeasurement.onContext(processingContext) { + it.registerMetricValue(LoadModelMetric(identifier = "load:${entityType().simpleName}"), duration) + } + } } override fun loadOrCreate(identifier: ID, processingContext: ProcessingContext): CompletableFuture> { + val entityId = entityIdentifier() + processingContext.putResource(CurrentEntityContext.RESOURCE_KEY, entityId) val startTime = System.nanoTime() - return delegate.loadOrCreate(identifier, processingContext).whenComplete { _, _ -> + return delegate.loadOrCreate(identifier, processingContext).whenComplete { _, error -> val duration = System.nanoTime() - startTime + entityMetricsRegistry.registerLoad(entityId, duration, success = error == null) HandlerMeasurement.onContext(processingContext) { it.registerMetricValue(LoadModelMetric(identifier = "load:${entityType().simpleName}"), duration) } @@ -67,4 +76,8 @@ class AxoniqPlatformRepository( override fun describeTo(descriptor: ComponentDescriptor) { descriptor.describeWrapperOf(delegate) } -} \ No newline at end of file + + private fun entityIdentifier(): EntityStatisticIdentifier { + return EntityStatisticIdentifier(entityName = entityType().simpleName) + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt index f029c5bd..7b5f6db1 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt @@ -23,14 +23,15 @@ import org.axonframework.modelling.repository.Repository import java.util.concurrent.CompletableFuture class AxoniqPlatformStateManager( - private val delegate: StateManager + private val delegate: StateManager, + private val entityMetricsRegistry: EntityMetricsRegistry, ): StateManager { override fun register(repository: Repository): StateManager { if(repository is AxoniqPlatformRepository) { delegate.register(repository) return this } - delegate.register(AxoniqPlatformRepository(repository)) + delegate.register(AxoniqPlatformRepository(repository, entityMetricsRegistry)) return this } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt new file mode 100644 index 00000000..38cb707e --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/CurrentEntityContext.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.modelling + +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier +import org.axonframework.messaging.core.Context + +/** + * Carries the entity statistic identifier that is currently being loaded through the + * [org.axonframework.messaging.core.unitofwork.ProcessingContext], so downstream decorators + * (event store, lock primitives) can attribute their metrics to the correct entity. + */ +object CurrentEntityContext { + val RESOURCE_KEY: Context.ResourceKey = + Context.ResourceKey.withLabel("Axoniq Platform - Current Entity") +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt new file mode 100644 index 00000000..b1ceb05c --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/EntityMetricsRegistry.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.modelling + +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier +import io.axoniq.platform.framework.api.metrics.EntityStatistics +import io.axoniq.platform.framework.api.metrics.EntityStatisticsWithIdentifier +import io.axoniq.platform.framework.computeIfAbsentWithRetry +import io.axoniq.platform.framework.createCountTimer +import io.axoniq.platform.framework.createTimer +import io.axoniq.platform.framework.messaging.RollingCountMeasure +import io.axoniq.platform.framework.messaging.toDistribution +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit + +/** + * Registry collecting metrics per entity (keyed by entity name only). + * + * Drained by [io.axoniq.platform.framework.messaging.HandlerMetricsRegistry] on its periodic reporting + * tick and shipped to the platform as part of [io.axoniq.platform.framework.api.metrics.StatisticReport]. + */ +class EntityMetricsRegistry { + private val meterRegistry = SimpleMeterRegistry() + private val entities: MutableMap = ConcurrentHashMap() + + fun registerLoad(identifier: EntityStatisticIdentifier, durationNs: Long, success: Boolean) { + val stats = entry(identifier) + stats.totalTimer.record(durationNs, TimeUnit.NANOSECONDS) + stats.totalCount.increment() + if (!success) { + stats.failureCount.increment() + } + } + + fun registerAdditionalTimer(identifier: EntityStatisticIdentifier, name: String, value: Long, unit: TimeUnit) { + entry(identifier).metrics + .computeIfAbsentWithRetry(name) { timerFactory(name)(meterRegistry, "${identifier}_timer_$name") } + .record(value, unit) + } + + /** + * Picks a higher-precision histogram for count-style metrics so reported percentiles round-trip + * small integers cleanly (otherwise `1` shows up as `0.98` and `0` as `0.98` on the Inspector UI). + * Time-typed metrics keep the default precision — their values span many orders of magnitude where + * the extra precision is wasted memory. + */ + private fun timerFactory(name: String): (MeterRegistry, String) -> Timer { + return when (name) { + METRIC_EVENT_STREAM_SIZE, METRIC_CRITERIA_SIZE -> ::createCountTimer + else -> ::createTimer + } + } + + fun getStats(): List { + return entities.entries.map { + EntityStatisticsWithIdentifier( + it.key, + EntityStatistics( + count = it.value.totalCount.count(), + failed = it.value.failureCount.count(), + timer = it.value.totalTimer.takeSnapshot().toDistribution(), + metrics = it.value.metrics.map { (k, v) -> k to v.takeSnapshot().toDistribution() }.toMap() + ) + ) + } + } + + private fun entry(identifier: EntityStatisticIdentifier): EntityRegistryStatistics { + return entities.computeIfAbsentWithRetry(identifier) { _ -> + EntityRegistryStatistics(createTimer(meterRegistry, "${identifier}_timer_total")) + } + } + + private data class EntityRegistryStatistics( + val totalTimer: Timer, + val totalCount: RollingCountMeasure = RollingCountMeasure(), + val failureCount: RollingCountMeasure = RollingCountMeasure(), + val metrics: MutableMap = ConcurrentHashMap(), + ) + + companion object { + const val METRIC_LOCK_WAIT_TIME = "lock_wait_time" + const val METRIC_EVENT_COMMIT_TIME = "event_commit_time" + const val METRIC_EVENT_STREAM_SIZE = "event_stream_size" + const val METRIC_CRITERIA_SIZE = "criteria_size" + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java index 3a7c4e59..48db69c3 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java @@ -39,7 +39,7 @@ static void apply(ComponentRegistry registry) { if(delegate instanceof AxoniqPlatformStateManager) { return delegate; } - return new AxoniqPlatformStateManager(delegate); + return new AxoniqPlatformStateManager(delegate, cc.getComponent(EntityMetricsRegistry.class)); }) .order(Integer.MAX_VALUE)); @@ -48,12 +48,12 @@ static void apply(ComponentRegistry registry) { .registerDecorator(DecoratorDefinition.forType(Repository.class) .with((cc, name, delegate) -> delegate instanceof AxoniqPlatformRepository ? delegate : - new AxoniqPlatformRepository<>(delegate)) + new AxoniqPlatformRepository<>(delegate, cc.getComponent(EntityMetricsRegistry.class))) .order(Integer.MIN_VALUE)) .registerDecorator(DecoratorDefinition.forType(StateManager.class) .with((cc, name, delegate) -> delegate instanceof AxoniqPlatformStateManager ? delegate : - new AxoniqPlatformStateManager(delegate)) + new AxoniqPlatformStateManager(delegate, cc.getComponent(EntityMetricsRegistry.class))) .order(Integer.MAX_VALUE)); return null; }, true); diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt index 2fcfbbc3..3c3e79af 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt @@ -141,6 +141,23 @@ fun createTimer(meterRegistry: MeterRegistry, name: String): Timer { .register(meterRegistry) } +/** + * Same as [createTimer] but with higher percentile precision, suitable for count-style metrics + * (e.g. event_stream_size, criteria_size) where the recorded values are small integers and the + * default HdrHistogram bucketing reports 1.0 as ~0.98 or 0 as ~0.98. Precision 3 gives sub-1% + * relative error which is enough to round-trip small integers cleanly, at the cost of a few KB + * extra heap per Timer — acceptable given the bounded number of count metrics per entity. + */ +fun createCountTimer(meterRegistry: MeterRegistry, name: String): Timer { + return Timer + .builder(name) + .publishPercentiles(1.00, 0.95, 0.90, 0.50, 0.01) + .distributionStatisticExpiry(Duration.ofMinutes(5)) + .distributionStatisticBufferLength(5) + .percentilePrecision(3) + .register(meterRegistry) +} + /** * Truncates the string to ensure it doesn't exceed the specified maximum byte size. * diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt new file mode 100644 index 00000000..420f49f3 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/modelling/EntityMetricsRegistryTest.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.modelling + +import io.axoniq.platform.framework.api.metrics.EntityStatisticIdentifier +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import java.util.concurrent.TimeUnit + +class EntityMetricsRegistryTest { + + private val order = EntityStatisticIdentifier(entityName = "Order") + private val reservation = EntityStatisticIdentifier(entityName = "Reservation") + + @Test + fun `separate entity names produce separate report entries`() { + val registry = EntityMetricsRegistry() + + registry.registerLoad(order, 10_000_000, success = true) + registry.registerLoad(reservation, 20_000_000, success = true) + + val stats = registry.getStats().associateBy { it.entity } + assertEquals(2, stats.size) + assertNotNull(stats[order]) + assertNotNull(stats[reservation]) + } + + @Test + fun `load records a total timer with non-empty snapshot`() { + val registry = EntityMetricsRegistry() + + registry.registerLoad(order, 5_000_000, success = true) + + val stats = registry.getStats().single() + assertNotNull(stats.statistics.timer) + // mean is reported in ms — 5ms input should yield > 0 + assertTrue((stats.statistics.timer?.mean ?: 0.0) > 0.0) + } + + @Test + fun `additional timer is exposed under the requested metric name`() { + val registry = EntityMetricsRegistry() + + registry.registerLoad(order, 5_000_000, success = true) + registry.registerAdditionalTimer( + order, + EntityMetricsRegistry.METRIC_CRITERIA_SIZE, + 42L, + TimeUnit.NANOSECONDS, + ) + + val stats = registry.getStats().single() + assertTrue(stats.statistics.metrics.containsKey(EntityMetricsRegistry.METRIC_CRITERIA_SIZE)) + } +}