From 9ee6d08d5940e7a25150e4db47dabb99a81703c2 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Tue, 5 May 2026 13:37:28 +0100 Subject: [PATCH] Drive model inspection through real UnitOfWork and support modules This is a proposal PR to the original PR posed by Mirko, #146. It takes a different approach to the same problem, and functionally nothing has changed ## Why the proposal The original approach was very reflection-heavy, which is partly to be expected, but can also be improved upon. The more inner reflection is present, the more likely it's to break. In addition, the approach didn't account for entites in submodules for hierarchic contexts. As such, only top-level entities would be discovered. As this all requires very deep knowledge about how the Configuration of Axon Framework works, I decided to make this proposal PR to guide by example, instead of separate comments. ## Basics of the rewrite 1. The `UnitOfWorkFeactory.create()` is used during all sourcing and read operations. This way all parameter resolvers will work correctly. This takes away the need of manually invoking methods. 2. Repository instances are discovered at boot time. If it's an EventSourcingRepository, it's registeredd to the `RSocketModelInspectionResponder` 3. The `EventSourcingRepository` is rebuilt using reflection to be able to decorate the `EntityEvolver` with the `AxoniqPlatformEntityEvolver`. 4. Reading the event stream and entity state is now a `Repository.load(...)` operation. Several resources are put in the `ProcessingContext`, which the decorators respond to. a. `AxoniqPlatformEntityEvolver.BEFORE_CONSUMER`: If present, calls with the entity state before any evolve b. `AxoniqPlatformEntityEvolver.AFTER_CONSUMER`: If present, calls with the entity state after any evolve c. AxoniqPlatformEntityEvolver.MAX_INDEX: Stops evolving the entity after a certain index. ## Result With the new code a lot of the reflection-based code could be deleted. The RSocketModelInspectionResponder.kt shrunk by half its size. We have programmed against the interfaces of the framework, which are less likely to change. In addition I added tests that confirm it works. --- .../AxoniqPlatformConfigurerEnhancer.java | 10 +- .../AxoniqPlatformEntityEvolver.kt | 80 ++ ...AxoniqPlatformModelInspectionEnhancer.java | 51 +- .../ModelInspectionDecorators.java | 155 ++++ .../RSocketModelInspectionResponder.kt | 830 ++++-------------- .../framework/messaging/HandlerMeasurement.kt | 8 - .../modelling/AxoniqPlatformStateManager.kt | 6 +- .../modelling/ModellingDecorators.java | 12 +- .../io/axoniq/platform/framework/utils.kt | 15 +- ...oniqPlatformModelInspectionEnhancerTest.kt | 39 +- ...cketModelInspectionResponderHelpersTest.kt | 40 +- ...ModelInspectionResponderIntegrationTest.kt | 333 +++++++ ...ionResponderNestedModuleIntegrationTest.kt | 195 ++++ ...spectionResponderReflectionDispatchTest.kt | 224 ----- 14 files changed, 1014 insertions(+), 984 deletions(-) create mode 100644 framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt create mode 100644 framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java create mode 100644 framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt create mode 100644 framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt delete mode 100644 framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderReflectionDispatchTest.kt diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java index b3846e25..4ff12820 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java @@ -165,6 +165,14 @@ public void enhance(ComponentRegistry registry) { UtilsKt.doOnSubModules(registry, (componentRegistry, module) -> { + // Only event processor modules expose a processorName; the doOnSubModules walker + // visits every sub-module (event-sourced entities, command/query modules, ...), so + // skipping non-processor modules here keeps AxoniqPlatformEventHandlingComponent + // from being constructed with a null processor name. + if (!(module instanceof PooledStreamingEventProcessorModule) + && !(module instanceof SubscribingEventProcessorModule)) { + return null; + } componentRegistry .registerDecorator(DecoratorDefinition.forType(EventHandlingComponent.class) .with((cc, name, delegate) -> @@ -176,7 +184,7 @@ public void enhance(ComponentRegistry registry) { .order(0)); return null; - }); + }, true); } /** diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt new file mode 100644 index 00000000..757a3bf6 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import org.axonframework.messaging.core.Context +import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.eventhandling.EventMessage +import org.axonframework.modelling.EntityEvolver +import java.util.function.BiConsumer + +/** + * Wraps the underlying [EntityEvolver] of an [org.axonframework.eventsourcing.EventSourcingRepository] + * so that inspection-time replay can fire BEFORE/AFTER hooks per event without reimplementing + * AF5's dispatch. + * + * The hooks are no-ops unless the matching [Context.ResourceKey] resources are present on the + * [ProcessingContext], so command handling and normal event sourcing go through unchanged. + * + * Constructed by [AxoniqPlatformModelInspectionEnhancer], which detects the inner + * [org.axonframework.eventsourcing.EventSourcingRepository] in the decorator chain and reconstructs + * it with this wrapper substituted for its `entityEvolver` argument. + */ +class AxoniqPlatformEntityEvolver( + private val delegate: EntityEvolver, +) : EntityEvolver { + + companion object { + /** Called before [delegate.evolve]. Receives the event and the pre-evolve entity state. */ + val BEFORE_CONSUMER: Context.ResourceKey> = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.BEFORE_CONSUMER") + + /** Called after [delegate.evolve]. Receives the event and the post-evolve entity state. */ + val AFTER_CONSUMER: Context.ResourceKey> = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.AFTER_CONSUMER") + + /** + * Zero-based event index — when present, [evolve] returns the current entity unchanged + * once it has applied this many events. Lets inspection reconstruct state up to a given + * sequence without doing the bookkeeping outside the framework. + */ + val MAX_INDEX: Context.ResourceKey = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.MAX_INDEX") + + /** Internal counter advanced by [evolve] when [MAX_INDEX] is set. */ + val INDEX_COUNTER: Context.ResourceKey = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.INDEX_COUNTER") + } + + /** Tiny mutable holder so we can advance the index without re-putting a Long resource each call. */ + class LongCounter(var value: Long = 0) + + override fun evolve(entity: E, event: EventMessage, context: ProcessingContext): E { + val maxIndex = context.getResource(MAX_INDEX) + if (maxIndex != null) { + val counter = context.computeResourceIfAbsent(INDEX_COUNTER) { LongCounter() } + if (counter.value > maxIndex) { + return entity + } + counter.value++ + } + context.getResource(BEFORE_CONSUMER)?.accept(event, entity) + val result = delegate.evolve(entity, event, context) + context.getResource(AFTER_CONSUMER)?.accept(event, result) + return result + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java index 2d0534ed..07eb91f4 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java @@ -18,39 +18,54 @@ import io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer; import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; -import org.axonframework.common.configuration.ComponentDefinition; import org.axonframework.common.configuration.ComponentRegistry; import org.axonframework.common.configuration.ConfigurationEnhancer; -import org.axonframework.common.lifecycle.Phase; -import org.axonframework.eventsourcing.eventstore.EventStorageEngine; -import org.axonframework.modelling.StateManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Enhancer that registers the {@link RSocketModelInspectionResponder} when both - * {@link StateManager} and {@link EventStorageEngine} are available (AF5 applications). + * Service-loaded enhancer that wires the {@link RSocketModelInspectionResponder} when the + * application has the platform client connected ({@link RSocketHandlerRegistrar} present) and + * {@code axon-eventsourcing} is on the classpath. + * + *

This class is deliberately free of direct references to {@code axon-eventsourcing} types + * so it can be loaded even when event sourcing is absent from the classpath. The actual + * decorator wiring lives in {@link ModelInspectionDecorators} and is only touched after the + * runtime classpath probe succeeds.

+ * + *

We deliberately do not probe for {@code StateManager} either: it's registered by + * {@code ModellingConfigurationDefaults} at {@link Integer#MAX_VALUE}, after this enhancer's + * order, so the probe would falsely return {@code false} during boot.

*/ public class AxoniqPlatformModelInspectionEnhancer implements ConfigurationEnhancer { + private static final Logger logger = LoggerFactory.getLogger(AxoniqPlatformModelInspectionEnhancer.class); + private static final String EVENTSOURCING_PROBE_CLASS = "org.axonframework.eventsourcing.eventstore.EventStorageEngine"; + @Override public void enhance(ComponentRegistry registry) { - if (!registry.hasComponent(StateManager.class) - || !registry.hasComponent(EventStorageEngine.class) - || !registry.hasComponent(RSocketHandlerRegistrar.class)) { + if (!registry.hasComponent(RSocketHandlerRegistrar.class)) { return; } - - registry.registerComponent(ComponentDefinition - .ofType(RSocketModelInspectionResponder.class) - .withBuilder(c -> new RSocketModelInspectionResponder( - c.getComponent(StateManager.class), - c.getComponent(EventStorageEngine.class), - c.getComponent(RSocketHandlerRegistrar.class), - c)) - .onStart(Phase.EXTERNAL_CONNECTIONS, RSocketModelInspectionResponder::start)); + if (!isClasspathAvailable()) { + logger.debug("axon-eventsourcing not on classpath; skipping model inspection wiring."); + return; + } + ModelInspectionDecorators.apply(registry); } @Override public int order() { return AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER + 1; } + + private static boolean isClasspathAvailable() { + try { + Class.forName(EVENTSOURCING_PROBE_CLASS, false, + AxoniqPlatformModelInspectionEnhancer.class.getClassLoader()); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java new file mode 100644 index 00000000..c1c8d2ea --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing; + +import io.axoniq.platform.framework.ReflectionKt; +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; +import org.axonframework.common.configuration.ComponentDefinition; +import org.axonframework.common.configuration.ComponentRegistry; +import org.axonframework.common.configuration.DecoratorDefinition; +import org.axonframework.common.lifecycle.Phase; +import org.axonframework.eventsourcing.EventSourcedEntityFactory; +import org.axonframework.eventsourcing.EventSourcingRepository; +import org.axonframework.eventsourcing.eventstore.EventStorageEngine; +import org.axonframework.eventsourcing.eventstore.EventStore; +import org.axonframework.eventsourcing.handler.SourcingHandler; +import org.axonframework.modelling.EntityEvolver; +import org.axonframework.modelling.repository.Repository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +/** + * Holds the actual decorator and component wiring for model inspection. Kept separate from + * {@link AxoniqPlatformModelInspectionEnhancer} so the enhancer class can be loaded even when + * {@code axon-eventsourcing} is not on the classpath — this class is only touched after a + * {@code Class.forName} probe confirms the module is present. + * + *

We do not walk submodules: AF5's nested module structure shares a single + * {@link org.axonframework.common.configuration.DefaultComponentRegistry} (each {@code BaseModule} + * resolves the parent's {@code ComponentRegistry} component instead of creating its own), so a + * single {@code Repository} decorator at the top covers every event-sourced entity in the + * application — top-level or arbitrarily nested.

+ */ +final class ModelInspectionDecorators { + + private static final Logger logger = LoggerFactory.getLogger(ModelInspectionDecorators.class); + + private ModelInspectionDecorators() { + } + + static void apply(ComponentRegistry registry) { + if (!registry.hasComponent(EventStorageEngine.class)) { + return; + } + // The enhancer pipeline can fire multiple times against the same registry as nested + // module configurations build. Idempotency guard: once the responder is in place, the + // decorator and its lifecycle hook are already registered, so re-running would + // duplicate the wrapping and double-evolve every event. + if (registry.hasComponent(RSocketModelInspectionResponder.class)) { + return; + } + + registry.registerComponent(ComponentDefinition + .ofType(RSocketModelInspectionResponder.class) + .withBuilder(c -> new RSocketModelInspectionResponder( + c.getComponent(EventStorageEngine.class), + c.getComponent(RSocketHandlerRegistrar.class), + c)) + .onStart(Phase.EXTERNAL_CONNECTIONS, RSocketModelInspectionResponder::start)); + + // Single decorator at the top covers every Repository registered in the application, + // top-level or nested — AF5's nested modules share the same component registry. + // + // The decorator reconstructs the underlying EventSourcingRepository with entityEvolver + // wrapped in AxoniqPlatformEntityEvolver. We deliberately do NOT decorate the registered + // EntityMetamodel — AnnotatedEventSourcedEntityModule casts the registered metamodel to + // AnnotatedEntityMetamodel inside its EntityIdResolver builder, and a wrapper would + // make that cast fail at startup. + // + // The .onStart hook then registers the rebuilt repository with the responder so it + // knows about this entity for the registered-entities query. + registry.registerDecorator(DecoratorDefinition + .forType(Repository.class) + .with((config, name, delegate) -> rebuildIfEventSourcingRepository(delegate)) + .onStart(Phase.LOCAL_MESSAGE_HANDLER_REGISTRATIONS, (configuration, component) -> { + configuration.getComponent(RSocketModelInspectionResponder.class) + .registerRepository(component); + return CompletableFuture.completedFuture(null); + })); + } + + /** + * Walks the wrapper chain from {@code delegate} downward to find an + * {@link EventSourcingRepository}, reconstructs that ESR with {@code entityEvolver} wrapped + * in {@link AxoniqPlatformEntityEvolver}, and swaps the wrapping component's {@code delegate} + * field to point at the new ESR. The outer wrapper(s) are kept intact so any platform-side + * decoration (e.g. {@code AxoniqPlatformRepository} for metrics) still applies. + * + *

Why peel rather than match {@code instanceof EventSourcingRepository} on the input: + * by the time this decorator runs, lower-order decorators (notably the metrics-adding + * {@code AxoniqPlatformRepository} from the modelling layer at {@code Integer.MIN_VALUE}) + * have already wrapped the ESR. Matching directly would miss every real configuration.

+ * + *

Logged-and-passthrough on reflection failure: if the field layout shifts in a future + * AF release we don't want to break command handling, just lose inspection hooks.

+ */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static Repository rebuildIfEventSourcingRepository(Repository delegate) { + Object current = delegate; + Object parent = null; + while (current != null && !(current instanceof EventSourcingRepository)) { + parent = current; + current = ReflectionKt.getPropertyValue(current, "delegate"); + } + if (!(current instanceof EventSourcingRepository esr)) { + return delegate; + } + try { + Class idType = ReflectionKt.getPropertyValue(esr, "idType"); + Class entityType = ReflectionKt.getPropertyValue(esr, "entityType"); + EventStore eventStore = ReflectionKt.getPropertyValue(esr, "eventStore"); + EventSourcedEntityFactory factory = ReflectionKt.getPropertyValue(esr, "entityFactory"); + EntityEvolver evolver = ReflectionKt.getPropertyValue(esr, "entityEvolver"); + SourcingHandler sourcingHandler = ReflectionKt.getPropertyValue(esr, "sourcingHandler"); + + EntityEvolver wrappedEvolver = new AxoniqPlatformEntityEvolver(evolver); + EventSourcingRepository rebuilt = new EventSourcingRepository( + idType, + entityType, + eventStore, + factory, + wrappedEvolver, + sourcingHandler + ); + if (parent == null) { + // No wrapper between us and the ESR — return the rebuilt ESR directly. + return rebuilt; + } + // Swap the parent wrapper's delegate to point at the rebuilt ESR. Keeps any outer + // wrappers (metrics, etc.) intact, just rewires the bottom of the chain. + ReflectionKt.setPropertyValue(parent, "delegate", rebuilt); + return delegate; + } catch (Exception e) { + logger.warn("[ModelInspection] Could not reconstruct EventSourcingRepository for [{}] — " + + "inspection hooks will be unavailable for this entity, but command handling is unaffected: {}", + esr.entityType().getName(), e.getMessage()); + return delegate; + } + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt index 18d879fc..0188391e 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt @@ -24,179 +24,73 @@ import io.axoniq.platform.framework.api.* import io.axoniq.platform.framework.client.RSocketHandlerRegistrar import io.axoniq.platform.framework.truncateToBytes import org.axonframework.common.configuration.Configuration -import org.axonframework.common.infra.ComponentDescriptor -import org.axonframework.common.infra.DescribableComponent -import org.axonframework.conversion.Converter -import org.axonframework.eventsourcing.CriteriaResolver -import org.axonframework.eventsourcing.EventSourcedEntityFactory -import org.axonframework.eventsourcing.annotation.AnnotationBasedEventCriteriaResolver -import org.axonframework.eventsourcing.annotation.EventSourcingHandler import org.axonframework.eventsourcing.eventstore.EventStorageEngine -import org.axonframework.eventsourcing.eventstore.SourcingCondition -import org.axonframework.eventsourcing.handler.InitializingEntityEvolver import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory import org.axonframework.messaging.eventhandling.EventMessage -import org.axonframework.messaging.eventhandling.GenericEventMessage -import org.axonframework.messaging.eventhandling.TerminalEventMessage -import org.axonframework.messaging.eventstreaming.EventCriteria -import org.axonframework.modelling.EntityEvolver -import org.axonframework.modelling.StateManager -import java.lang.reflect.Proxy +import org.axonframework.modelling.repository.Repository import org.slf4j.LoggerFactory import java.util.* +import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap +import java.util.function.BiConsumer open class RSocketModelInspectionResponder( - private val stateManager: StateManager, - private val eventStorageEngine: EventStorageEngine, + @Suppress("unused") private val eventStorageEngine: EventStorageEngine, private val registrar: RSocketHandlerRegistrar, - private val configuration: Configuration + private val configuration: Configuration, ) { private val logger = LoggerFactory.getLogger(this::class.java) + private val objectMapper = ObjectMapper().apply { findAndRegisterModules() disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) - // AF5 entities are typically Kotlin data classes / Java records with private fields and no - // public getters in the bean-getter sense. Enable direct field access so Jackson surfaces - // the entity's actual state instead of emitting `{}` for "no discoverable properties". + // AF5 entities are typically Kotlin data classes / Java records with private fields and + // no public bean-style getters. Field access lets Jackson surface the actual state + // instead of `{}`. setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE) setVisibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE) } - /** - * Cache of [CriteriaResolver] instances per (entityType, idType) pair. Resolvers are - * stateless and obtaining one via describe-walking or reflection is not free, so caching - * avoids repeating the work on every query. Keyed by class identity so redeploys with - * different classloaders get fresh entries naturally. - */ - private val criteriaResolverCache = ConcurrentHashMap, Class<*>>, CriteriaResolver>() - - /** - * Cache of [InitializingEntityEvolver] instances per (entityType, idType) pair. The - * initializer combines the entity factory and the raw evolver: if the current state is - * null on a given event, it creates a fresh entity via the factory; otherwise it - * delegates to the evolver. This is exactly what we need for ad-hoc state replay - * starting from a null state. Cache is safe because the initializer is stateless. - */ - private val initializingEvolverCache = ConcurrentHashMap, Class<*>>, InitializingEntityEvolver>() - - /** - * The framework-wide [Converter] used to deserialize raw event payloads (read from the event - * store as `byte[]`) into typed event objects, so the evolver's `@EventSourcingHandler` - * methods can match them by parameter class. Resolved lazily from the [Configuration]. - */ - private val payloadConverter: Converter? by lazy { - try { - configuration.getComponent(Converter::class.java).also { - logger.info("[ModelInspection] Using payload converter [{}] for event deserialization", it.javaClass.simpleName) - } - } catch (e: Exception) { - logger.warn("[ModelInspection] Could not obtain Converter from configuration — events will be passed to evolver with raw payloads (state replay will likely be empty): {}", - e.message) - null - } + private val unitOfWorkFactory: UnitOfWorkFactory by lazy { + configuration.getComponent(UnitOfWorkFactory::class.java) } /** - * Deserializes the event message's raw `byte[]` payload into a typed instance of the event - * class indicated by `message.type().name()`. Without this, the evolver receives `byte[]` - * payloads and no `@EventSourcingHandler(EventClass)` method matches, so state never advances - * past entity creation defaults. - * - * Why not [EventMessage.withConvertedPayload]: that API short-circuits via - * `convertedPayload.class.isAssignableFrom(payloadType())`. When the converter quietly - * returns the same `byte[]` (no registered conversion path for `byte[] -> EventClass`), - * the assignability check passes and the original message is returned untouched — so - * `payloadType()` stays `byte[]` and the evolver can't dispatch to a typed handler. - * - * Instead, we explicitly call [Message.payloadAs] (forces conversion via the converter) - * and reconstruct a [GenericEventMessage] with the typed payload, so the new - * `payloadType()` is the actual event class — exactly what `AnnotatedEntityMetamodel.evolve` - * expects to find a matching `@EventSourcingHandler`. - * - * Returns the original message untouched when the converter is unavailable, the type can't - * be resolved on the classpath, conversion fails, or the result is unexpectedly null. + * Repositories collected at boot from each event-sourced submodule via the decorator hook + * in [AxoniqPlatformModelInspectionEnhancer]. Replaces walking the top-level state manager, + * which only sees the top-level state manager and misses everything registered in submodules. */ - private fun deserializePayload(message: EventMessage): EventMessage { - val converter = payloadConverter ?: return message - val typeName = message.type()?.name() ?: return message - return try { - val cls = Class.forName(typeName) - // Force the converter to produce a typed instance. This bypasses - // withConvertedPayload's assignability short-circuit which keeps payloadType=byte[] - // when the converter returns the input unchanged. - val typedPayload: Any? = message.payloadAs(cls, converter) - if (typedPayload == null || cls.isInstance(typedPayload).not()) { - logger.debug("[ModelInspection] Converter returned unexpected payload for type [{}]: actual=[{}] — keeping original", - typeName, typedPayload?.javaClass?.name) - return message - } - logger.info("[ModelInspection] Converted payload: msgType.name=[{}] msgType.version=[{}] payloadCls=[{}]", - message.type().name(), - message.type()?.version(), - typedPayload.javaClass.name) - // Build a fresh GenericEventMessage whose payloadType() is the typed class. - // GenericMessage's 4-arg constructor derives payloadType from payload.getClass(), - // so the resulting message advertises the correct event class to the evolver. - // Metadata extends Map in AF5 but Kotlin needs an explicit cast - // because of its stricter generics treatment of the Java collection. - @Suppress("UNCHECKED_CAST") - GenericEventMessage( - message.identifier(), - message.type(), - typedPayload, - message.metadata() as Map, - message.timestamp(), - ) - } catch (e: ClassNotFoundException) { - logger.debug("Event type [{}] not on classpath — leaving payload as raw bytes", typeName) - message - } catch (e: Exception) { - logger.debug("Could not convert event payload of type [{}]: {}", typeName, e.message) - message - } - } + private val repositories = ConcurrentHashMap, Class<*>>, Repository>() - /** - * No-op [ProcessingContext] proxy used for ad-hoc evolve calls outside a real unit of work. - * `AnnotatedEntityMetamodel.evolve` requires a non-null context (defensive `Objects.requireNonNull`), - * but during model inspection replay we have no active processing context. The proxy returns - * sane defaults: null for resource lookups, `false` for predicates, `this` for fluent setters, - * and a no-op for void methods. Any method that would actually need a resource will see null - * and either no-op or throw — caught at the [evolveSafely] boundary. - */ - private val noOpProcessingContext: ProcessingContext = Proxy.newProxyInstance( - ProcessingContext::class.java.classLoader, - arrayOf(ProcessingContext::class.java) - ) { proxy, method, _ -> - when (method.returnType) { - java.lang.Boolean.TYPE -> false - ProcessingContext::class.java -> proxy - Void.TYPE -> null - else -> null - } - } as ProcessingContext + @Suppress("UNCHECKED_CAST") + fun registerRepository(repository: Repository<*, *>) { + val key = repository.entityType() to repository.idType() + repositories[key] = repository as Repository + logger.info("[ModelInspection] Registered repository for entity=[{}] id=[{}]", + repository.entityType().name, repository.idType().name) + } fun start() { registrar.registerHandlerWithoutPayload( Routes.Model.REGISTERED_ENTITIES, - this::handleRegisteredEntities + this::handleRegisteredEntities, ) registrar.registerHandlerWithPayload( Routes.Model.DOMAIN_EVENTS, ModelDomainEventsQuery::class.java, - this::handleDomainEvents + this::handleDomainEvents, ) registrar.registerHandlerWithPayload( Routes.Model.ENTITY_STATE_AT_SEQUENCE, ModelEntityStateAtSequenceQuery::class.java, - this::handleEntityStateAtSequence + this::handleEntityStateAtSequence, ) registrar.registerHandlerWithPayload( Routes.Model.REPLAY_TIMELINE, ModelTimelineQuery::class.java, - this::handleTimelineReplay + this::handleTimelineReplay, ) } @@ -204,28 +98,29 @@ open class RSocketModelInspectionResponder( // Registered entities introspection // ------------------------------------------------------------------------------------------ - private fun handleRegisteredEntities(): RegisteredEntitiesResult { + internal fun handleRegisteredEntities(): RegisteredEntitiesResult { logger.debug("Handling Axoniq Platform MODEL_REGISTERED_ENTITIES query") - val entities = stateManager.registeredEntities().map { entityType -> - val idTypeInfos = stateManager.registeredIdsFor(entityType).map { idClass -> - IdType( - type = idClass.name, - idFields = describeIdFields(idClass), - ) - } + val grouped: Map, List>> = repositories.keys + .groupBy({ it.first }, { it.second }) + + val entities = grouped.map { (entityType, idClasses) -> RegisteredEntityInfo( entityType = entityType.name, - idTypes = idTypeInfos, + idTypes = idClasses.map { idClass -> + IdType( + type = idClass.name, + idFields = describeIdFields(idClass), + ) + }, ) } + logger.debug("Found entities: {}", entities) return RegisteredEntitiesResult(entities = entities) } /** - * Returns structural descriptors for the given id class. Empty for "simple" types where - * the frontend should show a single text input (String, primitives, UUID); populated for - * records / data classes / plain objects where each property should get its own input. - * Only 1-deep properties are described — nested types surface as type = "object". + * Returns structural descriptors for the given id class. Empty for "simple" types (single + * text input on the frontend); populated for compound types (one input per descriptor). */ internal fun describeIdFields(idClass: Class<*>): List { if (isSimpleIdType(idClass)) { @@ -236,7 +131,7 @@ open class RSocketModelInspectionResponder( IdFieldDescriptor( name = component.name, type = normalizedType(component.type), - javaType = component.type.name + javaType = component.type.name, ) } } @@ -248,7 +143,7 @@ open class RSocketModelInspectionResponder( IdFieldDescriptor( name = field.name, type = normalizedType(field.type), - javaType = field.type.name + javaType = field.type.name, ) } } @@ -297,39 +192,9 @@ open class RSocketModelInspectionResponder( } // ------------------------------------------------------------------------------------------ - // Criteria resolution + id deserialization + // Id deserialization // ------------------------------------------------------------------------------------------ - /** - * Resolves the [EventCriteria] for a given (entityType, idType, entityId) triple by - * obtaining the registered [CriteriaResolver] for the chosen id type and invoking it - * with the deserialized typed id. Multi-tag and compound-id entities produce the - * correct criteria automatically — no tag-key resolution needed. - */ - private fun resolveCriteria(entityType: Class<*>, idClass: Class<*>, entityId: String): EventCriteria { - val typedId = deserializeEntityId(entityId, idClass) - ?: throw IllegalArgumentException("Could not deserialize id [$entityId] as type [${idClass.name}]") - return resolveCriteriaWithTypedId(entityType, idClass, typedId) - } - - /** - * Same as [resolveCriteria] but skips id deserialization — used by handlers that already - * have the typed id (e.g. for state reconstruction via [InitializingEntityEvolver]). - */ - private fun resolveCriteriaWithTypedId(entityType: Class<*>, idClass: Class<*>, typedId: Any): EventCriteria { - val resolver = obtainCriteriaResolver(entityType, idClass) - // ProcessingContext is @NonNull via JSpecify @NullMarked, but the default - // AnnotationBasedEventCriteriaResolver never reads it. We bypass Kotlin's nullability - // check via reflection; resolvers that actually rely on the context will throw NPE - // which propagates to the handler-level catch and surfaces a clear error. - return invokeResolveWithNullContext(resolver, typedId) - } - - /** - * Parses the incoming [entityId] wire string into the entity's id type. For simple id - * types we parse directly; for compound types the wire format is a JSON object whose - * keys match the id's property names. - */ private fun deserializeEntityId(entityId: String, idClass: Class<*>): Any? { val trimmed = entityId.trim() return when { @@ -367,204 +232,43 @@ open class RSocketModelInspectionResponder( } } - /** - * Obtains a [CriteriaResolver] for the given (entityType, idType) pair. Strategy: - * 1. Walk the registered repository via `describeTo` and pick out the first matching - * [CriteriaResolver] — this honors any custom resolver an application may have wired in. - * 2. Fall back to constructing a fresh [AnnotationBasedEventCriteriaResolver] from the - * available [Configuration], which covers the default annotation-driven setup. - * Results are cached per (entityType, idType) pair. - */ - @Suppress("UNCHECKED_CAST") - private fun obtainCriteriaResolver(entityClass: Class<*>, idClass: Class<*>): CriteriaResolver { - return criteriaResolverCache.getOrPut(entityClass to idClass) { - findInRepository(entityClass, idClass, CriteriaResolver::class.java) as CriteriaResolver? - ?: AnnotationBasedEventCriteriaResolver( - entityClass as Class, - idClass as Class, - configuration - ) as CriteriaResolver - } - } - - /** - * Invokes [CriteriaResolver.resolve] with a no-op [ProcessingContext] via reflection, - * bypassing the JSpecify/Kotlin non-null check on the parameter. The default - * `AnnotationBasedEventCriteriaResolver` doesn't read the context; resolvers that do - * may interact with the proxy (returning null/false defaults) and either no-op or throw, - * which propagates to the handler-level catch. - */ - private fun invokeResolveWithNullContext(resolver: CriteriaResolver, typedId: Any): EventCriteria { - val method = resolver.javaClass.methods.first { it.name == "resolve" && it.parameterCount == 2 } - return method.invoke(resolver, typedId, noOpProcessingContext) as EventCriteria - } - // ------------------------------------------------------------------------------------------ - // Entity evolver lookup + state reconstruction + // UoW-driven inspection load // ------------------------------------------------------------------------------------------ /** - * Obtains an [InitializingEntityEvolver] for the given (entityType, idType). AF5 - * repositories don't pre-instantiate `InitializingEntityEvolver`; they expose the - * `entityFactory` and `entityEvolver` as separate describable properties. We find both - * via the describe tree and construct the initializer manually — same constructor the - * framework uses internally — so null initial state on the first event triggers entity - * creation via the factory rather than no-op'ing. - * - * Returns null when either piece can't be located. + * Resolves the (entityType, idType) pair to the registered repository. Returns null if no + * matching repository was registered at boot — e.g. the entity exists in a non-event-sourced + * module, or the user-supplied class names don't resolve. */ - @Suppress("UNCHECKED_CAST") - private fun obtainInitializingEvolver(entityClass: Class<*>, idClass: Class<*>): InitializingEntityEvolver? { - initializingEvolverCache[entityClass to idClass]?.let { return it } - val factory = findInRepository(entityClass, idClass, EventSourcedEntityFactory::class.java) - as EventSourcedEntityFactory? - val evolver = findInRepository(entityClass, idClass, EntityEvolver::class.java, preferredName = "entityEvolver") - as EntityEvolver? - if (factory == null || evolver == null) { - logger.warn("Could not assemble InitializingEntityEvolver for [{}] / [{}] — factory={}, evolver={}", - entityClass.name, idClass.name, factory?.javaClass?.name, evolver?.javaClass?.name) - return null - } - logger.info("Assembled InitializingEntityEvolver for [{}] / [{}] — factory=[{}], evolver=[{}]", - entityClass.simpleName, idClass.simpleName, factory.javaClass.simpleName, evolver.javaClass.simpleName) - val initializer = InitializingEntityEvolver(factory, evolver) - initializingEvolverCache[entityClass to idClass] = initializer - return initializer + private fun lookupRepository(entityType: Class<*>, idType: Class<*>): Repository? { + return repositories[entityType to idType] } /** - * Invokes [InitializingEntityEvolver.evolve] with a null [ProcessingContext] via reflection - * — same trick as [invokeResolveWithNullContext]. The initializer handles null state by - * delegating to the entity factory before evolving. Custom code that relies on the context - * will throw NPE; we catch at the caller and keep the previous state. - * - * The 4-arg signature is `(I id, E currentState, EventMessage event, ProcessingContext ctx)`. + * Runs [block] inside a real [org.axonframework.messaging.core.unitofwork.UnitOfWork], wiring + * the supplied hooks onto the [ProcessingContext] so the framework's own event-sourcing + * pipeline drives state replay. The repository's load is invoked through the framework path — + * criteria resolution, payload conversion, and `@EventSourcingHandler` dispatch all happen as + * they would in a real command flow. We never append events, so commit is a no-op for storage. */ - private fun invokeEvolveWithNullContext( - evolver: InitializingEntityEvolver, + private fun withInspectionUoW( + repository: Repository, typedId: Any, - currentState: Any?, - message: EventMessage, - ): Any? { - val method = evolver.javaClass.methods.first { it.name == "evolve" && it.parameterCount == 4 } - return method.invoke(evolver, typedId, currentState, message, noOpProcessingContext) - } - - /** - * Walks the repository registered for (entityClass, idClass) via `describeTo` and returns - * the first instance of [target] that surfaces. When [preferredName] is set, the property - * with that exact name wins over any other match (used to prefer raw evolvers over - * lifecycle-wrapped ones). - */ - private fun findInRepository( - entityClass: Class<*>, - idClass: Class<*>, - target: Class, - preferredName: String? = null, - ): T? { - return try { - val repository = stateManager.repository(entityClass, idClass) ?: return null - val repoClass = repository.javaClass.name - // Diagnostic dump (DEBUG): on every lookup, list every describable property — useful - // when debugging an unfamiliar repository implementation, but too noisy for INFO. - if (logger.isDebugEnabled) { - val dump = DescribePropertyDump() - (repository as? DescribableComponent)?.describeTo(dump) - logger.debug("[ModelInspection] Looking for [{}] in repository [{}] for [{}] / [{}]; describe tree:\n{}", - target.simpleName, repoClass, entityClass.simpleName, idClass.simpleName, dump.formatted()) + beforeConsumer: BiConsumer? = null, + afterConsumer: BiConsumer? = null, + maxIndex: Long? = null, + extract: (entity: Any?) -> R, + ): R { + return unitOfWorkFactory.create().executeWithResult { ctx: ProcessingContext -> + beforeConsumer?.let { ctx.putResource(AxoniqPlatformEntityEvolver.BEFORE_CONSUMER, it) } + afterConsumer?.let { ctx.putResource(AxoniqPlatformEntityEvolver.AFTER_CONSUMER, it) } + maxIndex?.let { ctx.putResource(AxoniqPlatformEntityEvolver.MAX_INDEX, it) } + + repository.load(typedId, ctx).thenApply { managed -> + extract(managed?.entity()) } - val finder = NamedDescribablePropertyFinder(target, preferredName) - (repository as? DescribableComponent)?.describeTo(finder) ?: return null - finder.found - } catch (e: Exception) { - logger.debug("describeTo lookup of [{}] for entity [{}] / id [{}] failed: {}", - target.simpleName, entityClass.name, idClass.name, e.message) - null - } - } - - /** Diagnostic descriptor that records every property name + value class seen, recursing into describable nested components. */ - private class DescribePropertyDump : ComponentDescriptor { - private val lines = mutableListOf() - private var depth = 0 - private val visited = mutableSetOf() // identity-based cycle break - - private fun indent() = " ".repeat(depth) - - override fun describeProperty(name: String, value: Any?) { - val cls = value?.javaClass?.name ?: "null" - lines += "${indent()}- $name : $cls" - if (value is DescribableComponent && visited.add(System.identityHashCode(value))) { - depth++ - try { - value.describeTo(this) - } catch (_: Exception) { /* swallow — diagnostic only */ } - depth-- - } - } - - override fun describeProperty(name: String, value: Collection<*>?) { - lines += "${indent()}- $name : Collection(size=${value?.size ?: 0})" - } - - override fun describeProperty(name: String, value: Map<*, *>?) { - lines += "${indent()}- $name : Map(size=${value?.size ?: 0})" - } - - override fun describeProperty(name: String, value: String?) { - lines += "${indent()}- $name : String = $value" - } - - override fun describeProperty(name: String, value: Long?) { - lines += "${indent()}- $name : Long = $value" - } - - override fun describeProperty(name: String, value: Boolean?) { - lines += "${indent()}- $name : Boolean = $value" - } - - override fun describe(): String = "DescribePropertyDump" - - fun formatted(): String = lines.joinToString("\n") - } - - /** - * Drills through a repository's `describeTo` tree looking for a single instance of [target]. - * Property-name preference lets callers pick the canonical match (e.g. "entityEvolver") - * over wrapper variants like "initializingEntityEvolver". - */ - private class NamedDescribablePropertyFinder( - private val target: Class, - private val preferredName: String?, - ) : ComponentDescriptor { - private var preferred: T? = null - private var fallback: T? = null - - @Suppress("UNCHECKED_CAST") - override fun describeProperty(name: String, value: Any?) { - if (value == null) return - if (target.isInstance(value)) { - if (preferredName != null && name == preferredName && preferred == null) { - preferred = value as T - } else if (fallback == null) { - fallback = value as T - } - // Don't recurse into matched component — we already have what we need at this level. - return - } - if (value is DescribableComponent) { - value.describeTo(this) - } - } - - override fun describeProperty(name: String, value: Collection<*>?) { /* not needed */ } - override fun describeProperty(name: String, value: Map<*, *>?) { /* not needed */ } - override fun describeProperty(name: String, value: String?) { /* not needed */ } - override fun describeProperty(name: String, value: Long?) { /* not needed */ } - override fun describeProperty(name: String, value: Boolean?) { /* not needed */ } - override fun describe(): String = "NamedDescribablePropertyFinder<${target.simpleName}>" - - val found: T? get() = preferred ?: fallback + }.get() } // ------------------------------------------------------------------------------------------ @@ -572,9 +276,9 @@ open class RSocketModelInspectionResponder( // ------------------------------------------------------------------------------------------ /** - * Extracts a human-readable type name for the event. Events read directly from the - * [EventStorageEngine] often have a raw byte[] payload whose `payloadType()` returns `[B`. - * In that case the proper event type is available via `message.type().name()`. + * Extracts a human-readable type name. Events read from [EventStorageEngine] often have a + * raw `byte[]` payload whose `payloadType()` returns `[B`; the proper event type is in + * `message.type().name()`. */ private fun extractPayloadTypeName(message: EventMessage): String { return try { @@ -585,9 +289,9 @@ open class RSocketModelInspectionResponder( } /** - * Converts the event payload to a String. When reading directly from [EventStorageEngine], - * payloads are usually raw byte[] containing JSON or CBOR. We try UTF-8 decoding first - * (works for JSON), falling back to Jackson serialization for typed payloads. + * Converts the event payload to a String. Payloads sourced from the event store are + * usually raw `byte[]` containing JSON or CBOR. UTF-8 first (works for JSON) and Jackson as + * fallback for typed payloads. */ private fun extractPayloadAsString(message: EventMessage): String? { val payload = message.payload() ?: return null @@ -621,42 +325,48 @@ open class RSocketModelInspectionResponder( // Query handlers // ------------------------------------------------------------------------------------------ - private fun handleDomainEvents(query: ModelDomainEventsQuery): DomainEventsResult { + internal fun handleDomainEvents(query: ModelDomainEventsQuery): DomainEventsResult { logger.info("Handling Axoniq Platform MODEL_DOMAIN_EVENTS query for entity [{}] id [{}] idType [{}]", query.entityType, query.entityId, query.idType) val entityClass = Class.forName(query.entityType) val idClass = Class.forName(query.idType) - val criteria = resolveCriteria(entityClass, idClass, query.entityId) - val condition = SourcingCondition.conditionFor(criteria) - - val stream = eventStorageEngine.source(condition) - val allEvents = try { - stream.reduce(mutableListOf()) { acc, entry -> - val message = entry.message() - if (message != null && message !is TerminalEventMessage) { - acc.add(DomainEvent( - sequenceNumber = acc.size.toLong(), - timestamp = message.timestamp(), - payloadType = extractPayloadTypeName(message), - payload = extractPayloadAsString(message) - )) - } - acc - }.get() + val repository = lookupRepository(entityClass, idClass) + ?: return DomainEventsResult( + entityId = query.entityId, + entityType = query.entityType, + domainEvents = emptyList(), + page = query.page, + pageSize = query.pageSize, + totalCount = 0L, + ) + val typedId = deserializeEntityId(query.entityId, idClass) + ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") + + val collected = mutableListOf() + try { + withInspectionUoW( + repository = repository, + typedId = typedId, + beforeConsumer = { event, _ -> + collected += DomainEvent( + sequenceNumber = collected.size.toLong(), + timestamp = event.timestamp(), + payloadType = extractPayloadTypeName(event), + payload = extractPayloadAsString(event), + ) + }, + extract = {}, + ) } catch (e: Exception) { logger.error("Error while sourcing events for entity [{}] id [{}]", query.entityType, query.entityId, e) - mutableListOf() } - logger.info("Sourced [{}] events for entity [{}] id [{}]", - allEvents.size, query.entityType, query.entityId) - - val totalCount = allEvents.size.toLong() + val totalCount = collected.size.toLong() val start = query.page * query.pageSize - val end = minOf(start + query.pageSize, allEvents.size) - val pagedEvents = if (start < allEvents.size) allEvents.subList(start, end) else emptyList() + val end = minOf(start + query.pageSize, collected.size) + val pagedEvents = if (start < collected.size) collected.subList(start, end) else emptyList() return DomainEventsResult( entityId = query.entityId, @@ -668,47 +378,30 @@ open class RSocketModelInspectionResponder( ) } - /** - * Reconstructs the entity's state at a specific sequence number by replaying all events - * up to (and including) that sequence through the registered [EntityEvolver]. Returns - * the JSON-serialized state. If no EntityEvolver is registered for the entity, returns - * null state — frontend can treat that as "state reconstruction not available". - */ - private fun handleEntityStateAtSequence(query: ModelEntityStateAtSequenceQuery): EntityStateResult { + internal fun handleEntityStateAtSequence(query: ModelEntityStateAtSequenceQuery): EntityStateResult { logger.info("Handling Axoniq Platform MODEL_ENTITY_STATE_AT_SEQUENCE query for entity [{}] id [{}] idType [{}] seq [{}]", query.entityType, query.entityId, query.idType, query.maxSequenceNumber) val entityClass = Class.forName(query.entityType) val idClass = Class.forName(query.idType) + val repository = lookupRepository(entityClass, idClass) + ?: return EntityStateResult( + type = query.entityType, + entityId = query.entityId, + maxSequenceNumber = query.maxSequenceNumber, + state = null, + ) val typedId = deserializeEntityId(query.entityId, idClass) ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") - val criteria = resolveCriteriaWithTypedId(entityClass, idClass, typedId) - val condition = SourcingCondition.conditionFor(criteria) - val evolver = obtainInitializingEvolver(entityClass, idClass) - - if (evolver == null) { - logger.warn("No InitializingEntityEvolver found for entity [{}] / id [{}] — returning null state", - query.entityType, query.idType) - return EntityStateResult( - type = query.entityType, - entityId = query.entityId, - maxSequenceNumber = query.maxSequenceNumber, - state = null, - ) - } - // Accumulator: (eventsConsumedSoFar, currentState). MessageStream doesn't expose the - // entry index, so we maintain it inline. - val stream = eventStorageEngine.source(condition) val finalState = try { - stream.reduce(StateHolder(0L, null)) { holder, entry -> - val message = entry.message() - if (message == null || message is TerminalEventMessage) return@reduce holder - // Sequence numbers are 0-indexed by event order. Negative maxSequenceNumber = "all events". - val withinWindow = query.maxSequenceNumber < 0 || holder.index <= query.maxSequenceNumber - if (!withinWindow) return@reduce holder - StateHolder(holder.index + 1, evolveSafely(evolver, typedId, holder.state, deserializePayload(message))) - }.get().state + withInspectionUoW( + repository = repository, + typedId = typedId, + // Negative sequence = "all events" — don't set MAX_INDEX so nothing is skipped. + maxIndex = if (query.maxSequenceNumber < 0) null else query.maxSequenceNumber, + extract = { entity -> entity }, + ) } catch (e: Exception) { logger.error("Error while reconstructing state for entity [{}] id [{}]", query.entityType, query.entityId, e) @@ -723,268 +416,77 @@ open class RSocketModelInspectionResponder( ) } - /** Reduce accumulator that tracks the running event index alongside the evolved state. */ - private data class StateHolder(val index: Long, val state: Any?) - - private fun evolveSafely( - evolver: InitializingEntityEvolver, - typedId: Any, - currentState: Any?, - message: EventMessage, - ): Any? { - return try { - // Capture JSON before AF5 dispatch so we can detect whether the metamodel actually - // mutated the entity, and so the [Evolve] log shows the real pre-mutation state. - // Without this snapshot, since entities mutate in place via @EventSourcingHandler, - // re-serializing currentState after evolve would just print the post-mutation state. - val beforeJson = if (currentState != null) safeJson(currentState) else "null" - - val result = invokeEvolveWithNullContext(evolver, typedId, currentState, message) - - val afterJsonInitial = if (result != null) safeJson(result) else "null" - val mutatedByMetamodel = currentState != null && beforeJson != afterJsonInitial - // If state already changed, the metamodel did its job — don't double-apply via - // reflection. If no change AND we have a non-null entity AND a typed payload, try - // direct reflection dispatch on the entity class. - val finalResult = if (!mutatedByMetamodel && result != null && message.payload() != null) { - applyEventViaReflection(result, message) - result - } else { - result - } - - if (logger.isInfoEnabled) { - logger.info("[Evolve] event=[{}] before=[{}] after=[{}] beforeIs=[{}] afterIs=[{}] reflectionFallback=[{}]", - message.payloadType().simpleName, - beforeJson, - safeJson(finalResult), - currentState?.let { System.identityHashCode(it) } ?: 0, - finalResult?.let { System.identityHashCode(it) } ?: 0, - !mutatedByMetamodel && result != null) - } - finalResult - } catch (e: Exception) { - logger.warn("EntityEvolver.evolve threw for event [{}]: {} — keeping previous state", - message.payloadType().name, e.cause?.message ?: e.message, e) - currentState - } - } - - /** - * Reflection-based fallback for entities where AF5's [AnnotationBasedEntityEvolvingComponent] - * dispatch silently no-ops in an ad-hoc inspection context (no real - * [org.axonframework.messaging.core.unitofwork.ProcessingContext], no message handler - * interceptor chain, etc.). - * - * Walks the entity's class hierarchy looking for methods annotated with - * [EventSourcingHandler] whose first parameter type accepts the message payload. Each match - * is invoked directly on the entity. The method is expected to mutate `this` in place - * (typical AF5 pattern: `void on(SomeEvent e) { this.field = e.field(); }`). For methods - * with extra parameters (e.g. for parameter resolvers), passes `null` so we don't break - * compilation, but those handlers may NPE — caught at the boundary. - * - * Idempotent re-invocation of an event handler is the caller's contract; we only invoke this - * path when the AF5 dispatch produced no observable mutation, so we won't re-apply changes. - */ - internal fun applyEventViaReflection(entity: Any, message: EventMessage) { - val payload = message.payload() ?: return - val payloadCls = payload.javaClass - val handlerCandidates = collectEventSourcingHandlers(entity.javaClass) - - // Path A: payload was already deserialized by [deserializePayload] (works when - // event.type().name() is a FQN that Class.forName resolves, e.g. with - // ClassBasedMessageTypeResolver). Match handlers whose first parameter accepts the - // typed payload directly. - if (payloadCls != ByteArray::class.java) { - for ((declaringClass, method) in handlerCandidates) { - val paramType = method.parameterTypes[0] - if (!paramType.isInstance(payload)) continue - invokeHandlerSafely(entity, declaringClass, method, payload, paramType) - return - } - return - } - - // Path B: payload is still raw byte[] because Class.forName couldn't resolve the type - // name (common with @Event(namespace=...) which produces names like - // "quickstart.OrderCreatedEvent" that don't match a real class). Resolve the handler by - // matching the simple class name extracted from message.type().name() against each - // handler's parameter simple name — picking exactly one. Convert byte[] to that - // specific type only. This avoids Jackson's permissive deserialization successfully - // returning a partially-filled instance of the wrong event class (e.g. OrderShippedEvent - // accepting an OrderCreatedEvent JSON because they share an `orderId` field). - val converter = payloadConverter ?: return - val typeName = message.type()?.name() ?: return - val expectedSimpleName = simpleNameFromMessageType(typeName) - val match = handlerCandidates.firstOrNull { (_, method) -> - method.parameterTypes[0].simpleName == expectedSimpleName - } ?: run { - logger.debug("[ModelInspection] No @EventSourcingHandler param simpleName matches [{}] (from type=[{}]) on [{}]", - expectedSimpleName, typeName, entity.javaClass.simpleName) - return - } - val (declaringClass, method) = match - val paramType = method.parameterTypes[0] - val typedArg: Any? = try { - message.payloadAs(paramType, converter) - } catch (e: Exception) { - logger.debug("[ModelInspection] Converter failed for type=[{}] -> param=[{}]: {}", - typeName, paramType.simpleName, e.message) - null - } - if (typedArg == null || !paramType.isInstance(typedArg)) { - logger.debug("[ModelInspection] Converter returned non-matching payload for type=[{}] (target=[{}])", - typeName, paramType.simpleName) - return - } - invokeHandlerSafely(entity, declaringClass, method, typedArg, paramType) - } - - /** - * Extracts the simple Java class name from an [org.axonframework.messaging.core.MessageType] - * name string. Handles both formats: - * - FQN with optional `$` (nested class): `io.axoniq.quickstart.reservation.event.ReservationEvents$SeatReservedEvent` - * → `SeatReservedEvent` - * - Namespaced short form from `@Event(namespace=...)`: `quickstart.OrderCreatedEvent` - * → `OrderCreatedEvent` - */ - internal fun simpleNameFromMessageType(typeName: String): String { - val afterDollar = typeName.substringAfterLast('$') - return afterDollar.substringAfterLast('.') - } - - private fun invokeHandlerSafely( - entity: Any, - declaringClass: Class<*>, - method: java.lang.reflect.Method, - payload: Any, - paramType: Class<*>, - ) { - try { - method.isAccessible = true - val args: Array = if (method.parameterCount == 1) { - arrayOf(payload) - } else { - Array(method.parameterCount) { idx -> if (idx == 0) payload else null } - } - method.invoke(entity, *args) - logger.debug("[ModelInspection] Reflection dispatch invoked [{}#{}] for param=[{}]", - declaringClass.simpleName, method.name, paramType.simpleName) - } catch (e: Exception) { - logger.debug("[ModelInspection] Reflection dispatch failed for [{}#{}]: {}", - declaringClass.simpleName, method.name, e.cause?.message ?: e.message) - } - } - - /** - * Walks the entity's class hierarchy collecting `@EventSourcingHandler` methods with at - * least one parameter (the event). Returned in declaring-class-first order. - */ - private fun collectEventSourcingHandlers(entityClass: Class<*>): List, java.lang.reflect.Method>> { - val result = mutableListOf, java.lang.reflect.Method>>() - var current: Class<*>? = entityClass - while (current != null && current != Any::class.java) { - for (method in current.declaredMethods) { - if (!method.isAnnotationPresent(EventSourcingHandler::class.java)) continue - if (method.parameterCount < 1) continue - result.add(current to method) - } - current = current.superclass - } - return result - } - - private fun safeJson(value: Any?): String = - if (value == null) "null" - else try { - objectMapper.writeValueAsString(value) - } catch (e: Exception) { - "" - } - - /** - * Replays all events for the entity through the registered [EntityEvolver] and emits a - * timeline of `(sequence, event, stateBefore, stateAfter)` entries within the requested window. - * - * State serialization is JSON via Jackson (handles records, Kotlin data classes, POJOs). - * State strings are byte-truncated via [String.truncateToBytes] so a single oversize entity - * cannot blow gRPC/RSocket message size limits. - * - * If no EntityEvolver is registered for the entity (e.g. non-event-sourced repositories), - * `stateBefore`/`stateAfter` are emitted as null and the frontend can collapse that section. - */ - private fun handleTimelineReplay(query: ModelTimelineQuery): ModelTimelineResult { + internal fun handleTimelineReplay(query: ModelTimelineQuery): ModelTimelineResult { logger.info("Handling Axoniq Platform MODEL_REPLAY_TIMELINE query for entity [{}] id [{}] idType [{}] offset [{}] limit [{}]", query.entityType, query.entityId, query.idType, query.offset, query.limit) val entityClass = Class.forName(query.entityType) val idClass = Class.forName(query.idType) + val repository = lookupRepository(entityClass, idClass) + ?: return ModelTimelineResult( + entityType = query.entityType, + entityId = query.entityId, + entries = emptyList(), + offset = query.offset, + totalEvents = 0, + truncated = false, + ) val typedId = deserializeEntityId(query.entityId, idClass) ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") - val criteria = resolveCriteriaWithTypedId(entityClass, idClass, typedId) - val condition = SourcingCondition.conditionFor(criteria) - val evolver = obtainInitializingEvolver(entityClass, idClass) - if (evolver == null) { - logger.warn("No InitializingEntityEvolver found for entity [{}] / id [{}] — timeline state fields will be null", - query.entityType, query.idType) - } val offset = maxOf(0, query.offset) val limit = if (query.limit <= 0) 100 else query.limit - val maxStateSizeBytes = 100 * 1024 // 100 KB per state snapshot before truncation - val maxEventSizeBytes = 50 * 1024 // 50 KB per event payload before truncation + val maxStateSizeBytes = 100 * 1024 + val maxEventSizeBytes = 50 * 1024 val entries = mutableListOf() - var totalEvents = 0 - var currentState: Any? = null + val totalEvents = intArrayOf(0) + // BEFORE/AFTER snapshots have to be paired — capture stateBefore in BEFORE_CONSUMER so it + // reflects the pre-evolve state even when the entity mutates in place. + val pending = arrayOfNulls(2) // [eventMessage, stateBeforeJson] - val stream = eventStorageEngine.source(condition) try { - stream.reduce(Unit) { _, entry -> - val message = entry.message() - if (message == null || message is TerminalEventMessage) return@reduce Unit - - val seq = totalEvents.toLong() - totalEvents++ - - // Capture stateBefore JSON eagerly. Reservation-style entities mutate in place - // via reflection fallback (see [evolveSafely]) so currentState and nextState are - // the same instance. If we serialize stateBefore after evolveSafely, both fields - // would show the post-mutation state and the timeline UI couldn't diff between - // events. Snapshot the JSON now while currentState is still pre-mutation. - val stateBeforeJson = stateAsJson(currentState).truncateToBytes(maxStateSizeBytes) - - val nextState = evolver?.let { evolveSafely(it, typedId, currentState, deserializePayload(message)) } ?: currentState - - if (seq >= offset && entries.size < limit) { - entries.add(ModelTimelineEntry( - sequenceNumber = seq, - timestamp = message.timestamp().toString(), - eventType = extractPayloadTypeName(message), - eventPayload = extractPayloadAsString(message).truncateToBytes(maxEventSizeBytes), - stateBefore = stateBeforeJson, - stateAfter = stateAsJson(nextState).truncateToBytes(maxStateSizeBytes), - )) - } - currentState = nextState - Unit - }.get() + withInspectionUoW( + repository = repository, + typedId = typedId, + beforeConsumer = { event, stateBefore -> + pending[0] = event + pending[1] = stateAsJson(stateBefore).truncateToBytes(maxStateSizeBytes) + }, + afterConsumer = { event, stateAfter -> + val seq = totalEvents[0].toLong() + totalEvents[0]++ + if (seq >= offset && entries.size < limit) { + entries += ModelTimelineEntry( + sequenceNumber = seq, + timestamp = event.timestamp().toString(), + eventType = extractPayloadTypeName(event), + eventPayload = extractPayloadAsString(event).truncateToBytes(maxEventSizeBytes), + stateBefore = pending[1] as String?, + stateAfter = stateAsJson(stateAfter).truncateToBytes(maxStateSizeBytes), + ) + } + pending[0] = null + pending[1] = null + }, + extract = {}, + ) } catch (e: Exception) { logger.error("Error while sourcing events for timeline of entity [{}] id [{}]", query.entityType, query.entityId, e) } - val remainingAfterWindow = maxOf(0, totalEvents - offset - entries.size) + val remainingAfterWindow = maxOf(0, totalEvents[0] - offset - entries.size) val truncated = remainingAfterWindow > 0 logger.info("Sourced [{}] events for timeline of [{}] id [{}] (returning [{}] from offset [{}], truncated={})", - totalEvents, query.entityType, query.entityId, entries.size, offset, truncated) + totalEvents[0], query.entityType, query.entityId, entries.size, offset, truncated) return ModelTimelineResult( entityType = query.entityType, entityId = query.entityId, entries = entries, offset = offset, - totalEvents = totalEvents, + totalEvents = totalEvents[0], truncated = truncated, ) } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt index 4d3241e8..60d5a25e 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt @@ -58,10 +58,6 @@ class HandlerMeasurement( } fun registerMetricValue(metric: Metric, timeInNs: Long) { - if (completedTime != null) { - logger.warn { "HandlerMeasurement for handler [${message.type()}] is already completed. Can not register metric [$metric] with value [$timeInNs]. Ignoring." } - return - } registeredMetrics.compute(metric) { _, it -> // Sum the metric if it was already registered (it ?: 0L) + timeInNs @@ -69,10 +65,6 @@ class HandlerMeasurement( } fun reportMessageDispatched(messageIdentifier: MessageIdentifier) { - if (completedTime != null) { - logger.warn { "HandlerMeasurement for handler [${message.type()}] is already completed. Can not report dispatched message [$messageIdentifier]. Ignoring." } - return - } dispatchedMessages.add(messageIdentifier) } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt index 8ac2f69b..f029c5bd 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,10 @@ class AxoniqPlatformStateManager( private val delegate: StateManager ): StateManager { override fun register(repository: Repository): StateManager { + if(repository is AxoniqPlatformRepository) { + delegate.register(repository) + return this + } delegate.register(AxoniqPlatformRepository(repository)) return this } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java index 6c80d206..3a7c4e59 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java @@ -35,21 +35,27 @@ private ModellingDecorators() { static void apply(ComponentRegistry registry) { registry.registerDecorator(DecoratorDefinition.forType(StateManager.class) - .with((cc, name, delegate) -> - new AxoniqPlatformStateManager(delegate)) + .with((cc, name, delegate) -> { + if(delegate instanceof AxoniqPlatformStateManager) { + return delegate; + } + return new AxoniqPlatformStateManager(delegate); + }) .order(Integer.MAX_VALUE)); UtilsKt.doOnSubModules(registry, (componentRegistry, module) -> { componentRegistry .registerDecorator(DecoratorDefinition.forType(Repository.class) .with((cc, name, delegate) -> + delegate instanceof AxoniqPlatformRepository ? delegate : new AxoniqPlatformRepository<>(delegate)) .order(Integer.MIN_VALUE)) .registerDecorator(DecoratorDefinition.forType(StateManager.class) .with((cc, name, delegate) -> + delegate instanceof AxoniqPlatformStateManager ? delegate : new AxoniqPlatformStateManager(delegate)) .order(Integer.MAX_VALUE)); return null; - }); + }, true); } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt index 6eefda30..2fcfbbc3 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt @@ -18,6 +18,7 @@ package io.axoniq.platform.framework import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Timer import org.axonframework.common.ReflectionUtils +import org.axonframework.common.configuration.BaseModule import org.axonframework.common.configuration.ComponentRegistry import org.axonframework.common.configuration.Module import java.lang.reflect.Field @@ -188,13 +189,19 @@ fun String?.truncateToBytes(maxBytes: Int): String? { return truncatedContent + truncationMessage } -fun ComponentRegistry.doOnSubModules(block: (ComponentRegistry, Module?) -> Unit) { +fun ComponentRegistry.doOnSubModules(block: (ComponentRegistry, Module?) -> Unit, recursive: Boolean = true) { val modules = this.getPropertyValue>("modules") modules?.forEach { entry -> val module = entry.value - module.getPropertyValue("componentRegistry")?.let { cr -> - block(cr, module) - cr.doOnSubModules(block) + block(this, module) + if (recursive && module is BaseModule<*>) { + // BaseModule's nested registry only materialises when the module is built. Defer + // the inner walk via the public componentRegistry(action) API; the action runs on + // the module's own registry at build time, with arbitrary-depth nesting visible to + // recursive doOnSubModules calls inside. + module.componentRegistry { innerRegistry -> + innerRegistry.doOnSubModules(block, recursive) + } } } } \ No newline at end of file diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt index d67617de..dca2a611 100644 --- a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt @@ -23,24 +23,32 @@ import io.mockk.verify import org.axonframework.common.configuration.ComponentDefinition import org.axonframework.common.configuration.ComponentRegistry import org.axonframework.eventsourcing.eventstore.EventStorageEngine -import org.axonframework.modelling.StateManager import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test /** - * Verifies the registration guard in [AxoniqPlatformModelInspectionEnhancer]: the - * inspection responder must register only when the application has all three required - * components (StateManager + EventStorageEngine + RSocketHandlerRegistrar). Missing any - * of them is the AF4 / non-event-sourced case and registering would NPE on first request. + * Verifies the guard in [AxoniqPlatformModelInspectionEnhancer]: the responder must register + * only when the platform client is wired ([RSocketHandlerRegistrar] present) and an + * [EventStorageEngine] is available. Missing either is the no-event-sourcing / no-platform-client + * case where registering would have nothing to act on. + * + * The enhancer itself does not probe for [EventStorageEngine] directly — it first probes for + * {@code axon-eventsourcing} on the classpath and delegates the rest of the wiring to + * [ModelInspectionDecorators], which is where the [EventStorageEngine] check lives. In tests + * the classpath probe always succeeds (axon-eventsourcing is a test dependency), so we exercise + * both branches via the registry mocks. + * + * StateManager is intentionally NOT probed: ModellingConfigurationDefaults registers it at + * Integer.MAX_VALUE, after this enhancer's order, so the probe would falsely return false + * during a real boot. */ class AxoniqPlatformModelInspectionEnhancerTest { private val enhancer = AxoniqPlatformModelInspectionEnhancer() @Test - fun `registers the responder when StateManager, EventStorageEngine and RSocketHandlerRegistrar are all present`() { + fun `registers the responder when EventStorageEngine and RSocketHandlerRegistrar are both present`() { val registry = mockk(relaxed = true) - every { registry.hasComponent(StateManager::class.java) } returns true every { registry.hasComponent(EventStorageEngine::class.java) } returns true every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns true @@ -50,21 +58,8 @@ class AxoniqPlatformModelInspectionEnhancerTest { } @Test - fun `skips registration when StateManager is missing — typical AF4 application`() { + fun `skips registration when EventStorageEngine is missing — typical AF4 application`() { val registry = mockk(relaxed = true) - every { registry.hasComponent(StateManager::class.java) } returns false - every { registry.hasComponent(EventStorageEngine::class.java) } returns true - every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns true - - enhancer.enhance(registry) - - verify(exactly = 0) { registry.registerComponent(any>()) } - } - - @Test - fun `skips registration when EventStorageEngine is missing`() { - val registry = mockk(relaxed = true) - every { registry.hasComponent(StateManager::class.java) } returns true every { registry.hasComponent(EventStorageEngine::class.java) } returns false every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns true @@ -76,8 +71,6 @@ class AxoniqPlatformModelInspectionEnhancerTest { @Test fun `skips registration when RSocketHandlerRegistrar is missing — console client not wired`() { val registry = mockk(relaxed = true) - every { registry.hasComponent(StateManager::class.java) } returns true - every { registry.hasComponent(EventStorageEngine::class.java) } returns true every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns false enhancer.enhance(registry) diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt index 32d13030..3152e327 100644 --- a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt @@ -20,7 +20,6 @@ import io.axoniq.platform.framework.client.RSocketHandlerRegistrar import io.mockk.mockk import org.axonframework.common.configuration.Configuration import org.axonframework.eventsourcing.eventstore.EventStorageEngine -import org.axonframework.modelling.StateManager import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue @@ -32,9 +31,8 @@ import java.util.UUID /** * Unit tests for the pure-logic helpers on [RSocketModelInspectionResponder] that don't - * require a live AF5 configuration / event store. These helpers govern public-facing - * behaviour (id-type description for the FE form, MessageType-name parsing for handler - * dispatch) so regressing them silently breaks the inspection UI. + * require a live AF5 configuration / event store. These helpers govern the FE id-type form, + * so regressing them silently breaks the inspection UI. */ class RSocketModelInspectionResponderHelpersTest { @@ -45,46 +43,12 @@ class RSocketModelInspectionResponderHelpersTest { // The helpers under test never reach into these dependencies, so simple unrecorded // mocks are enough — we don't need MockK relaxed mocks elsewhere. responder = RSocketModelInspectionResponder( - stateManager = mockk(), eventStorageEngine = mockk(), registrar = mockk(), configuration = mockk(), ) } - // --------------------------------------------------------------------------------------- - // simpleNameFromMessageType - // - // Drives Path B handler resolution in applyEventViaReflection. Must produce the same - // simple name regardless of whether the MessageType.name() is a fully qualified class - // name (default ClassBasedMessageTypeResolver) or a namespaced short name from - // @Event(namespace = ...). - // --------------------------------------------------------------------------------------- - - @Test - fun `simpleNameFromMessageType strips package for fully qualified class name`() { - val name = "io.axoniq.quickstart.reservation.event.ReservationEvents\$SeatReservedEvent" - assertEquals("SeatReservedEvent", responder.simpleNameFromMessageType(name)) - } - - @Test - fun `simpleNameFromMessageType strips namespace for short namespaced form`() { - // Format produced by @Event(namespace = "quickstart") on a record class - assertEquals("OrderCreatedEvent", responder.simpleNameFromMessageType("quickstart.OrderCreatedEvent")) - } - - @Test - fun `simpleNameFromMessageType is identity for a single segment`() { - assertEquals("Foo", responder.simpleNameFromMessageType("Foo")) - } - - @Test - fun `simpleNameFromMessageType prefers dollar over dot when both present`() { - // A nested class with a namespaced prefix would be a strange case but the heuristic - // still produces the right simple class name. - assertEquals("Inner", responder.simpleNameFromMessageType("quickstart.Outer\$Inner")) - } - // --------------------------------------------------------------------------------------- // isSimpleIdType // diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt new file mode 100644 index 00000000..646f5411 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt @@ -0,0 +1,333 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.ModelDomainEventsQuery +import io.axoniq.platform.framework.api.ModelEntityStateAtSequenceQuery +import io.axoniq.platform.framework.api.ModelTimelineQuery +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy +import org.axonframework.common.configuration.AxonConfiguration +import org.axonframework.common.configuration.BaseModule +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.common.configuration.Configuration +import org.axonframework.common.configuration.LifecycleRegistry +import org.axonframework.common.configuration.Module +import org.axonframework.eventsourcing.annotation.EventSourcedEntity +import org.axonframework.eventsourcing.annotation.EventTag +import org.axonframework.eventsourcing.annotation.reflection.EntityCreator +import org.axonframework.eventsourcing.annotation.reflection.InjectEntityId +import org.axonframework.eventsourcing.configuration.EventSourcedEntityModule +import org.axonframework.eventsourcing.configuration.EventSourcingConfigurer +import org.axonframework.eventsourcing.annotation.EventSourcingHandler +import org.axonframework.messaging.commandhandling.configuration.CommandHandlingModule +import org.axonframework.messaging.eventhandling.EventSink +import org.axonframework.messaging.eventhandling.GenericEventMessage +import org.axonframework.messaging.core.MessageType +import org.axonframework.modelling.SimpleStateManager +import org.axonframework.modelling.StateManager +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * End-to-end test for the model inspection feature: spins up a real + * [EventSourcingConfigurer] with the inspection enhancer wired in, registers an + * event-sourced entity in a sub-module, publishes events through the in-memory event + * store, then drives the responder's query handlers and asserts on their outputs. + * + * The submodule structure is what makes this interesting: the entity is registered as a + * nested module (via [EventSourcedEntityModule.autodetected]) so the enhancer's + * `doOnSubModules` walker has to drill in to find it. A bug there would cause the + * registered-entities query to come back empty. + */ +class RSocketModelInspectionResponderIntegrationTest { + + private lateinit var configuration: AxonConfiguration + private lateinit var responder: RSocketModelInspectionResponder + + @BeforeEach + fun setUp() { + // Build a minimal AF5 application: + // - InMemoryEventStorageEngine (default) + // - one annotated event-sourced entity registered as a sub-module + // - a stub RSocketHandlerRegistrar (the inspection enhancer requires its presence, + // but we don't exercise the RSocket transport — we call responder methods directly) + // - the AxoniqPlatformModelInspectionEnhancer registered manually + // The AxoniqPlatformModelInspectionEnhancer is auto-discovered via the + // META-INF/services SPI registration — no need to register it manually. + configuration = EventSourcingConfigurer.create() + .registerEntity(EventSourcedEntityModule.autodetected(String::class.java, Reservation::class.java)) + .componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(RSocketHandlerRegistrar::class.java) + .withBuilder { RSocketHandlerRegistrar(CborJackson3EncodingStrategy()) }) + } + .start() + + responder = configuration.getComponent(RSocketModelInspectionResponder::class.java) + + // Publish a known sequence of events for entity "RES-1". + val sink = configuration.getComponent(EventSink::class.java) + sink.publish( + null, + event(ReservationCreated("RES-1", "alice")), + event(ReservationConfirmed("RES-1")), + event(ReservationCancelled("RES-1", "double-booked")), + ).get() + } + + @AfterEach + fun tearDown() { + configuration.shutdown() + } + + private fun event(payload: Any) = GenericEventMessage( + MessageType(payload.javaClass), + payload, + ) + + // ------------------------------------------------------------------------------------------ + // Registered entities + // ------------------------------------------------------------------------------------------ + + @Test + fun `registered entities query discovers entities defined in nested modules`() { + val result = invokeRegisteredEntities() + assertEquals(1, result.entities.size, "expected the Reservation entity to be visible") + + val entity = result.entities.first() + assertEquals(Reservation::class.java.name, entity.entityType) + assertEquals(1, entity.idTypes.size) + assertEquals(String::class.java.name, entity.idTypes.first().type) + // String is a simple id type — no sub-fields surface to the FE. + assertTrue(entity.idTypes.first().idFields.isEmpty()) + } + + // ------------------------------------------------------------------------------------------ + // Domain events listing + // ------------------------------------------------------------------------------------------ + + @Test + fun `domain events query returns the published events in publication order with typed names`() { + val result = responder.handleDomainEvents(domainEventsQuery("RES-1")) + + assertEquals(3, result.totalCount, "all three events should be returned") + assertEquals(3, result.domainEvents.size) + + val payloadTypes = result.domainEvents.map { it.payloadType } + assertEquals( + listOf( + ReservationCreated::class.java.name, + ReservationConfirmed::class.java.name, + ReservationCancelled::class.java.name, + ), + payloadTypes, + ) + + // Sequence numbers are 0-indexed and dense across the listed events. + assertEquals(listOf(0L, 1L, 2L), result.domainEvents.map { it.sequenceNumber }) + } + + @Test + fun `domain events query returns empty result for an unknown entity id without throwing`() { + val result = responder.handleDomainEvents(domainEventsQuery("RES-DOES-NOT-EXIST")) + assertEquals(0, result.totalCount) + assertTrue(result.domainEvents.isEmpty()) + } + + // ------------------------------------------------------------------------------------------ + // Entity state at sequence + // ------------------------------------------------------------------------------------------ + + @Test + fun `entity state at sequence reconstructs intermediate state by replaying through the metamodel`() { + // After event 0 (created): status = CREATED, eventCount = 1 + val afterCreated = responder.handleEntityStateAtSequence(stateQuery("RES-1", 0)) + assertNotNull(afterCreated.state) + assertTrue(afterCreated.state!!.contains("\"status\":\"CREATED\""), afterCreated.state) + assertTrue(afterCreated.state!!.contains("\"eventCount\":1"), afterCreated.state) + + // After event 1 (confirmed): status = CONFIRMED, eventCount = 2 + val afterConfirmed = responder.handleEntityStateAtSequence(stateQuery("RES-1", 1)) + assertNotNull(afterConfirmed.state) + assertTrue(afterConfirmed.state!!.contains("\"status\":\"CONFIRMED\""), afterConfirmed.state) + assertTrue(afterConfirmed.state!!.contains("\"eventCount\":2"), afterConfirmed.state) + + // After event 2 (cancelled): status = CANCELLED, eventCount = 3, reason captured + val afterCancelled = responder.handleEntityStateAtSequence(stateQuery("RES-1", 2)) + assertNotNull(afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"status\":\"CANCELLED\""), afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"eventCount\":3"), afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"cancelReason\":\"double-booked\""), afterCancelled.state) + } + + @Test + fun `entity state at negative sequence replays all events`() { + val result = responder.handleEntityStateAtSequence(stateQuery("RES-1", -1)) + assertNotNull(result.state) + assertTrue(result.state!!.contains("\"status\":\"CANCELLED\"")) + assertTrue(result.state!!.contains("\"eventCount\":3")) + } + + @Test + fun `entity state for unknown id returns null state`() { + val result = responder.handleEntityStateAtSequence(stateQuery("RES-MISSING", -1)) + assertNull(result.state) + } + + // ------------------------------------------------------------------------------------------ + // Timeline replay + // ------------------------------------------------------------------------------------------ + + @Test + fun `timeline replay produces stateBefore and stateAfter pairs for every event`() { + val result = responder.handleTimelineReplay(timelineQuery("RES-1")) + + assertEquals(3, result.totalEvents) + assertEquals(3, result.entries.size) + + // Event 0 — AF5's factory has just created the entity (defaults applied: status=CREATED, + // eventCount=0, customerId=null) BEFORE the @EventSourcingHandler runs. So stateBefore + // captures that just-constructed shape; stateAfter captures the post-handler state with + // customerId set and eventCount=1. + val first = result.entries[0] + assertEquals(0L, first.sequenceNumber) + assertEquals(ReservationCreated::class.java.name, first.eventType) + assertNotNull(first.stateBefore) + assertTrue(first.stateBefore!!.contains("\"eventCount\":0")) + assertTrue(first.stateBefore!!.contains("\"customerId\":null")) + assertNotNull(first.stateAfter) + assertTrue(first.stateAfter!!.contains("\"eventCount\":1")) + assertTrue(first.stateAfter!!.contains("\"customerId\":\"alice\"")) + + // Event 1 — stateBefore = CREATED, stateAfter = CONFIRMED. + val second = result.entries[1] + assertEquals(1L, second.sequenceNumber) + assertTrue(second.stateBefore!!.contains("\"status\":\"CREATED\"")) + assertTrue(second.stateAfter!!.contains("\"status\":\"CONFIRMED\"")) + + // Event 2 — stateBefore = CONFIRMED, stateAfter = CANCELLED. + val third = result.entries[2] + assertEquals(2L, third.sequenceNumber) + assertTrue(third.stateBefore!!.contains("\"status\":\"CONFIRMED\"")) + assertTrue(third.stateAfter!!.contains("\"status\":\"CANCELLED\"")) + } + + @Test + fun `timeline replay honours offset and limit`() { + val result = responder.handleTimelineReplay(timelineQuery("RES-1", offset = 1, limit = 1)) + + // Total still reflects the full stream so the FE can drive pagination. + assertEquals(3, result.totalEvents) + assertEquals(1, result.entries.size) + assertEquals(1L, result.entries.first().sequenceNumber) + assertEquals(ReservationConfirmed::class.java.name, result.entries.first().eventType) + assertTrue(result.truncated, "events remain after the requested window") + } + + // ------------------------------------------------------------------------------------------ + // Helpers + // ------------------------------------------------------------------------------------------ + + private fun invokeRegisteredEntities() = responder.handleRegisteredEntities() + + private fun domainEventsQuery(id: String) = ModelDomainEventsQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + ) + + private fun stateQuery(id: String, maxSeq: Long) = ModelEntityStateAtSequenceQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + maxSequenceNumber = maxSeq, + ) + + private fun timelineQuery(id: String, offset: Int = 0, limit: Int = 100) = ModelTimelineQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + offset = offset, + limit = limit, + ) + + // ------------------------------------------------------------------------------------------ + // Test fixture: entity + events + // ------------------------------------------------------------------------------------------ + + /** + * Status enum (declared as a top-level type within the test file) — using an enum gives us + * a state field whose JSON representation is a clean string we can assert against. + */ + enum class Status { CREATED, CONFIRMED, CANCELLED } + + /** + * Mutable event-sourced entity with `@EventSourcingHandler` methods. We mutate in place + * (the AF5 default for annotated entities) so the test exercises the same dispatch path + * a real application uses. + */ + @EventSourcedEntity(tagKey = "reservationId") + class Reservation @EntityCreator constructor( + // @InjectEntityId disambiguates the id from the payload parameter — without it, + // AF5 treats the first ctor arg as the event payload type and no match exists. + @Suppress("unused") @InjectEntityId val reservationId: String, + ) { + var status: Status = Status.CREATED + var customerId: String? = null + var cancelReason: String? = null + var eventCount: Int = 0 + + @EventSourcingHandler + fun on(event: ReservationCreated) { + customerId = event.customerId + status = Status.CREATED + eventCount++ + } + + @EventSourcingHandler + fun on(@Suppress("unused") event: ReservationConfirmed) { + status = Status.CONFIRMED + eventCount++ + } + + @EventSourcingHandler + fun on(event: ReservationCancelled) { + status = Status.CANCELLED + cancelReason = event.reason + eventCount++ + } + } + + data class ReservationCreated( + @field:EventTag(key = "reservationId") val reservationId: String, + val customerId: String, + ) + + data class ReservationConfirmed( + @field:EventTag(key = "reservationId") val reservationId: String, + ) + + data class ReservationCancelled( + @field:EventTag(key = "reservationId") val reservationId: String, + val reason: String, + ) +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt new file mode 100644 index 00000000..363d4098 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.ModelEntityStateAtSequenceQuery +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy +import org.axonframework.common.configuration.AxonConfiguration +import org.axonframework.common.configuration.BaseModule +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.eventsourcing.annotation.EventSourcedEntity +import org.axonframework.eventsourcing.annotation.EventTag +import org.axonframework.eventsourcing.annotation.reflection.EntityCreator +import org.axonframework.eventsourcing.annotation.reflection.InjectEntityId +import org.axonframework.eventsourcing.annotation.EventSourcingHandler +import org.axonframework.eventsourcing.configuration.EventSourcedEntityModule +import org.axonframework.eventsourcing.configuration.EventSourcingConfigurer +import org.axonframework.messaging.core.MessageType +import org.axonframework.messaging.eventhandling.EventSink +import org.axonframework.messaging.eventhandling.GenericEventMessage +import org.axonframework.modelling.SimpleStateManager +import org.axonframework.modelling.StateManager +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * End-to-end test for the case the existing integration test doesn't cover: an event-sourced + * entity buried inside a custom user-defined [BaseModule]. Without this, you can't tell whether + * inspection works only for the conveniently top-level entity registration that + * `registerEntity(...)` produces, or also for arbitrary user nesting. + * + * The setup: + * - {@link OuterEntity} registered at the top level via the usual `registerEntity(...)` path. + * - [MySubModule] — a hand-rolled [BaseModule] that registers its own [StateManager] and an + * {@link InnerEntity} inside it via `registerModule(EventSourcedEntityModule.autodetected(...))`. + * + * Both entities must surface in the registered-entities query, and queries against the inner one + * must reconstruct state correctly — proving the model-inspection enhancer's submodule walk + * reaches arbitrary depth, not just one level. + */ +class RSocketModelInspectionResponderNestedModuleIntegrationTest { + + private lateinit var configuration: AxonConfiguration + private lateinit var responder: RSocketModelInspectionResponder + + @BeforeEach + fun setUp() { + configuration = EventSourcingConfigurer.create() + .registerEntity(EventSourcedEntityModule.autodetected(String::class.java, OuterEntity::class.java)) + .componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(RSocketHandlerRegistrar::class.java) + .withBuilder { RSocketHandlerRegistrar(CborJackson3EncodingStrategy()) }) + // The custom BaseModule lives directly under the root component registry. + // Inside it, a further sub-module registers InnerEntity — two levels deep. + cr.registerModule(MySubModule()) + } + .start() + + responder = configuration.getComponent(RSocketModelInspectionResponder::class.java) + + val sink = configuration.getComponent(EventSink::class.java) + sink.publish( + null, + event(OuterCreated("OUTER-1", "blue")), + event(InnerOpened("INNER-1", 42)), + event(InnerClosed("INNER-1")), + ).get() + } + + @AfterEach + fun tearDown() { + configuration.shutdown() + } + + private fun event(payload: Any) = GenericEventMessage(MessageType(payload.javaClass), payload) + + @Test + fun `registered entities query surfaces both top-level and deeply nested entities`() { + val result = responder.handleRegisteredEntities() + val typeNames = result.entities.map { it.entityType }.toSet() + + assertTrue(typeNames.contains(OuterEntity::class.java.name), + "expected OuterEntity (top-level) to be registered") + assertTrue(typeNames.contains(InnerEntity::class.java.name), + "expected InnerEntity (nested inside MySubModule) to be registered — submodule walker must reach it") + } + + @Test + fun `state at sequence reconstructs the inner entity in the nested module`() { + val result = responder.handleEntityStateAtSequence(ModelEntityStateAtSequenceQuery( + entityType = InnerEntity::class.java.name, + entityId = "INNER-1", + idType = String::class.java.name, + maxSequenceNumber = -1, + )) + + assertNotNull(result.state, "state must be reconstructed for the inner entity") + assertTrue(result.state!!.contains("\"open\":false"), result.state) + assertTrue(result.state!!.contains("\"value\":42"), result.state) + } + + @Test + fun `state at sequence reconstructs the outer entity registered at the root`() { + val result = responder.handleEntityStateAtSequence(ModelEntityStateAtSequenceQuery( + entityType = OuterEntity::class.java.name, + entityId = "OUTER-1", + idType = String::class.java.name, + maxSequenceNumber = -1, + )) + + assertNotNull(result.state) + assertTrue(result.state!!.contains("\"colour\":\"blue\""), result.state) + } + + // ------------------------------------------------------------------------------------------ + // Test fixtures + // ------------------------------------------------------------------------------------------ + + /** + * Custom user module that owns its own [StateManager] and registers an event-sourced entity + * as a sub-module. Mirrors how a real application might package a bounded context. + */ + class MySubModule : BaseModule("MySubModule") { + init { + componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(StateManager::class.java) + .withBuilder { SimpleStateManager.named("MySubModuleStateManager") }) + cr.registerModule(EventSourcedEntityModule.autodetected(String::class.java, InnerEntity::class.java)) + } + } + } + + @EventSourcedEntity(tagKey = "outerId") + class OuterEntity @EntityCreator constructor( + @Suppress("unused") @InjectEntityId val outerId: String, + ) { + var colour: String = "" + + @EventSourcingHandler + fun on(event: OuterCreated) { + colour = event.colour + } + } + + data class OuterCreated( + @field:EventTag(key = "outerId") val outerId: String, + val colour: String, + ) + + @EventSourcedEntity(tagKey = "innerId") + class InnerEntity @EntityCreator constructor( + @Suppress("unused") @InjectEntityId val innerId: String, + ) { + var open: Boolean = false + var value: Int = 0 + + @EventSourcingHandler + fun on(event: InnerOpened) { + open = true + value = event.value + } + + @EventSourcingHandler + fun on(@Suppress("unused") event: InnerClosed) { + open = false + } + } + + data class InnerOpened( + @field:EventTag(key = "innerId") val innerId: String, + val value: Int, + ) + + data class InnerClosed( + @field:EventTag(key = "innerId") val innerId: String, + ) +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderReflectionDispatchTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderReflectionDispatchTest.kt deleted file mode 100644 index ccbafd53..00000000 --- a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderReflectionDispatchTest.kt +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright (c) 2022-2026. AxonIQ B.V. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.axoniq.platform.framework.eventsourcing - -import io.axoniq.platform.framework.client.RSocketHandlerRegistrar -import io.mockk.every -import io.mockk.mockk -import org.axonframework.common.configuration.Configuration -import org.axonframework.conversion.Converter -import org.axonframework.eventsourcing.annotation.EventSourcingHandler -import org.axonframework.eventsourcing.eventstore.EventStorageEngine -import org.axonframework.messaging.core.MessageType -import org.axonframework.messaging.eventhandling.EventMessage -import org.axonframework.modelling.StateManager -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNull -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test - -/** - * Tests the reflection-based `@EventSourcingHandler` dispatch fallback used when AF5's - * own metamodel dispatch silently no-ops in our ad-hoc inspection context (no real - * `ProcessingContext`, no interceptor chain). - * - * Two dispatch paths are exercised: - * - * - **Path A**: payload is already a typed instance (the FQN [MessageType.name] case where - * [RSocketModelInspectionResponder.deserializePayload] succeeded). The matching handler - * is found by parameter-type assignability. - * - * - **Path B**: payload is still raw `byte[]` because the [MessageType.name] is a short - * namespaced form (e.g. `quickstart.OrderCreatedEvent` from `@Event(namespace=...)`) - * that `Class.forName` can't resolve. The handler is selected by simple-name match - * against [MessageType.name] segments, then the [Converter] is invoked to turn the - * `byte[]` into the handler's parameter type. - * - * Critically: under Path B, Jackson permissive deserialization could happily build any of - * the entity's event classes from any JSON byte sequence (filling whatever fields match, - * defaulting the rest). We must pick the handler **before** invoking the converter, - * otherwise the wrong handler fires (e.g. an `OrderCreated` event mutating the entity as - * if it were `OrderShipped`). - */ -class RSocketModelInspectionResponderReflectionDispatchTest { - - private lateinit var responder: RSocketModelInspectionResponder - private lateinit var configuration: Configuration - private lateinit var converter: Converter - - @BeforeEach - fun setUp() { - configuration = mockk() - converter = mockk() - - // Lazy `payloadConverter` reads `configuration.getComponent(Converter::class.java)`. - // Stub it so Path B can use the converter. - every { configuration.getComponent(Converter::class.java) } returns converter - - responder = RSocketModelInspectionResponder( - stateManager = mockk(), - eventStorageEngine = mockk(), - registrar = mockk(), - configuration = configuration, - ) - } - - // --------------------------------------------------------------------------------------- - // Path A — payload already deserialized - // --------------------------------------------------------------------------------------- - - @Test - fun `Path A invokes the @EventSourcingHandler whose param matches the typed payload`() { - val entity = TestOrder() - val event = TestOrder.OrderCreatedEvent("order-1", "Alice") - val message = mockk() - every { message.payload() } returns event - // Path A doesn't read message.type() at all — we never reach the converter call. - - responder.applyEventViaReflection(entity, message) - - assertEquals("order-1", entity.orderId) - assertEquals("Alice", entity.customerName) - assertEquals(TestOrder.Status.CREATED, entity.status) - } - - @Test - fun `Path A is a no-op when no handler matches the typed payload`() { - val entity = TestOrder() - // Use an event class the entity has no handler for. - val unrelatedEvent = UnrelatedEvent("payload-content") - val message = mockk() - every { message.payload() } returns unrelatedEvent - - responder.applyEventViaReflection(entity, message) - - // Entity untouched because no handler accepted UnrelatedEvent. - assertNull(entity.orderId) - assertEquals(TestOrder.Status.DRAFT, entity.status) - } - - // --------------------------------------------------------------------------------------- - // Path B — raw byte[] payload, simple-name handler resolution - // --------------------------------------------------------------------------------------- - - @Test - fun `Path B selects the handler by simple-name match and converts raw byte payload to it`() { - val entity = TestOrder() - val rawBytes = "{\"orderId\":\"order-2\",\"customerName\":\"Bob\"}".toByteArray() - val typedEvent = TestOrder.OrderCreatedEvent("order-2", "Bob") - val message = mockk() - every { message.payload() } returns rawBytes - every { message.type() } returns MessageType("quickstart.OrderCreatedEvent") - // Converter sees the request for the handler's param type and returns a typed instance. - // Note: the simple-name resolver picks OrderCreatedEvent from `quickstart.OrderCreatedEvent`, - // so the converter is invoked with that type — never with OrderShippedEvent or any other. - every { - message.payloadAs(TestOrder.OrderCreatedEvent::class.java, converter) - } returns typedEvent - - responder.applyEventViaReflection(entity, message) - - assertEquals("order-2", entity.orderId) - assertEquals("Bob", entity.customerName) - assertEquals(TestOrder.Status.CREATED, entity.status) - } - - @Test - fun `Path B does not fire any handler when no entity handler param matches the simple-name`() { - val entity = TestOrder() - val rawBytes = "{\"foo\":\"bar\"}".toByteArray() - val message = mockk() - every { message.payload() } returns rawBytes - // Simple-name "Mystery" doesn't match any of TestOrder's handler param types. - every { message.type() } returns MessageType("some.namespace.Mystery") - - responder.applyEventViaReflection(entity, message) - - // No handler fired → entity stays at defaults. Crucially, the converter was never - // invoked either, because the resolver short-circuited on the missing simple-name. - assertNull(entity.orderId) - assertEquals(TestOrder.Status.DRAFT, entity.status) - } - - @Test - fun `Path B picks correct handler when multiple share an overlapping JSON shape`() { - // Regression guard: under the previous "try every handler with Jackson" approach, - // an OrderCreatedEvent JSON could permissively deserialize to OrderShippedEvent - // (sharing only `orderId`), causing the OrderShipped handler to fire and set - // status=SHIPPED instead of CREATED. The simple-name matcher prevents this. - val entity = TestOrder() - val createdJsonBytes = "{\"orderId\":\"order-3\",\"customerName\":\"Carol\"}".toByteArray() - val message = mockk() - every { message.payload() } returns createdJsonBytes - every { message.type() } returns MessageType("quickstart.OrderCreatedEvent") - - // Only the OrderCreated path should be exercised. We stub it; if the responder - // mistakenly tried OrderShipped, MockK would throw on the unstubbed call. - every { - message.payloadAs(TestOrder.OrderCreatedEvent::class.java, converter) - } returns TestOrder.OrderCreatedEvent("order-3", "Carol") - - responder.applyEventViaReflection(entity, message) - - assertEquals(TestOrder.Status.CREATED, entity.status) // not SHIPPED - assertEquals("order-3", entity.orderId) - assertEquals("Carol", entity.customerName) - } - - // --------------------------------------------------------------------------------------- - // Test fixtures - // --------------------------------------------------------------------------------------- - - /** - * Mirrors the AF5 entity pattern under test: no-arg constructor + several - * `@EventSourcingHandler` methods that mutate `this` in place. - */ - class TestOrder { - var orderId: String? = null - var customerName: String? = null - var carrier: String? = null - var status: Status = Status.DRAFT - - enum class Status { DRAFT, CREATED, SHIPPED } - - @EventSourcingHandler - fun on(event: OrderCreatedEvent) { - this.orderId = event.orderId - this.customerName = event.customerName - this.status = Status.CREATED - } - - @EventSourcingHandler - fun on(event: OrderShippedEvent) { - // If this handler ever fires on an OrderCreated payload (the "Jackson permissive" - // bug we guard against), `status` would jump straight to SHIPPED. - this.orderId = event.orderId - this.carrier = event.carrier - this.status = Status.SHIPPED - } - - @JvmRecord - data class OrderCreatedEvent(val orderId: String, val customerName: String) - - @JvmRecord - data class OrderShippedEvent(val orderId: String, val carrier: String) - } - - /** A class no `TestOrder` handler accepts — used to assert no-op behaviour. */ - @JvmRecord - data class UnrelatedEvent(val payload: String) -}