Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
351 changes: 351 additions & 0 deletions chasm/chasmtest/test_engine.go
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only issue with what you've done here is that there's a lot of duplicate logic with the real engine implementation. When we last synced we said you would reuse the "real" engine and mock the persistence interfaces.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just concerned that the implementations will diverge. We really should mimic the real engine's behavior as closely as possible because otherwise that could lead to wrong assumptions and subtle bugs.

Copy link
Copy Markdown
Contributor Author

@awln-temporal awln-temporal Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree diverging implementations is a big concern here. Last discussion, we talked about mocking the persistence layer only and not using MockNodeBackend. The main problem is package cyclic imports, since both the existing chasm_engine and the framework's dependence on NodeBackend are implemented in history. I thought maybe we could try just mocking persistence/execution manager only in the meeting, but I don't see how this is possible after looking at it. Lmk if i'm missing something.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree reusing the implementation and mock persistence layer will be the best here.

With that said, practically I am not too worried about code diverging here. Chasm engine logic should be quite stable or we have a problem in the chasm design.

Now that I think about it I actually don't know how mocking persistence layer can solve the IDReuse/ConflictPolicy part, which relies on errors returned from DB.

Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
package chasmtest

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/testing/testlogger"
)

type (
Option[T chasm.RootComponent] func(*Engine[T])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Call this EngineOption


Engine[T chasm.RootComponent] struct {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the engine need to be typed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't, it was only added for the helper func to get the Root component, but definitely not needed, removing this along with other accessor methods as mentioned below

t *testing.T
registry *chasm.Registry
logger log.Logger
metrics metrics.Handler

rootExecutionKey chasm.ExecutionKey
root T
rootExecution *execution
executions map[executionLookupKey]*execution
}

execution struct {
key chasm.ExecutionKey
node *chasm.Node
backend *chasm.MockNodeBackend
timeSource *clock.EventTimeSource
root chasm.RootComponent
}

executionLookupKey struct {
namespaceID string
businessID string
}
)

func NewEngine[T chasm.RootComponent](
t *testing.T,
registry *chasm.Registry,
opts ...Option[T],
) *Engine[T] {
t.Helper()

e := &Engine[T]{
t: t,
registry: registry,
logger: testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly),
metrics: metrics.NoopMetricsHandler,
rootExecutionKey: chasm.ExecutionKey{
NamespaceID: "test-namespace-id",
BusinessID: "test-workflow-id",
RunID: "test-run-id",
},
executions: make(map[executionLookupKey]*execution),
}

for _, opt := range opts {
opt(e)
}

return e
}

func WithRoot[T chasm.RootComponent](
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, StartExecution should be used here instead to create a root.

factory func(chasm.MutableContext) T,
) Option[T] {
return func(e *Engine[T]) {
execution := e.newExecution(e.rootExecutionKey)
ctx := chasm.NewMutableContext(context.Background(), execution.node)
root := factory(ctx)
require.NoError(e.t, execution.node.SetRootComponent(root))
_, err := execution.node.CloseTransaction()
require.NoError(e.t, err)
e.root = root
execution.root = root
e.rootExecution = execution
e.executions[newExecutionLookupKey(execution.key)] = execution
}
}

func WithExecutionKey[T chasm.RootComponent](key chasm.ExecutionKey) Option[T] {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will also be part of StartExecution.

return func(e *Engine[T]) {
e.rootExecutionKey = key
}
}

func (e *Engine[T]) Root() T {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use ReadComponent to extract whatever we need from the engine.

return e.root
}

func (e *Engine[T]) EngineContext() context.Context {
return chasm.NewEngineContext(context.Background(), e)
}

func (e *Engine[T]) Ref(component chasm.Component) chasm.ComponentRef {
for _, execution := range e.executions {
ref, err := execution.node.Ref(component)
if err != nil {
continue
}
structuredRef, err := chasm.DeserializeComponentRef(ref)
require.NoError(e.t, err)
return structuredRef
}

e.t.Fatalf("component %T is not attached to the chasmtest engine", component)
return chasm.ComponentRef{}
}

func (e *Engine[T]) StartExecution(
ctx context.Context,
ref chasm.ComponentRef,
startFn func(chasm.MutableContext) (chasm.RootComponent, error),
_ ...chasm.TransitionOption,
) (chasm.StartExecutionResult, error) {
if _, ok := e.executionForKey(ref.ExecutionKey); ok {
return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr("already exists", "", ref.RunID)
}

execution := e.newExecution(ref.ExecutionKey)
mutableCtx := chasm.NewMutableContext(ctx, execution.node)
root, err := startFn(mutableCtx)
if err != nil {
return chasm.StartExecutionResult{}, err
}
if err := execution.node.SetRootComponent(root); err != nil {
return chasm.StartExecutionResult{}, err
}
_, err = execution.node.CloseTransaction()
if err != nil {
return chasm.StartExecutionResult{}, err
}

execution.root = root
e.executions[newExecutionLookupKey(execution.key)] = execution

serializedRef, err := execution.node.Ref(root)
if err != nil {
return chasm.StartExecutionResult{}, err
}

return chasm.StartExecutionResult{
ExecutionKey: execution.key,
ExecutionRef: serializedRef,
Created: true,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be false when using UseExisting policy.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm right now the implementation just assumes there can be one execution at a time, ig same discussion as below, do we want to handle conflict policies in this engine, or should we just go with a simpler solution? When would unit tests want to test out conflicting execution behavior?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would implement a more complete solution. You basically are implementing a full in-memory CHASM engine. It should have the same semantics as the history chasm engine.

}, nil
}

func (e *Engine[T]) UpdateWithStartExecution(
ctx context.Context,
ref chasm.ComponentRef,
startFn func(chasm.MutableContext) (chasm.RootComponent, error),
updateFn func(chasm.MutableContext, chasm.Component) error,
_ ...chasm.TransitionOption,
) (chasm.EngineUpdateWithStartExecutionResult, error) {
if execution, ok := e.executionForKey(ref.ExecutionKey); ok {
serializedRef, err := e.updateComponentInExecution(ctx, execution, ref, updateFn)
if err != nil {
return chasm.EngineUpdateWithStartExecutionResult{}, err
}
return chasm.EngineUpdateWithStartExecutionResult{
ExecutionKey: execution.key,
ExecutionRef: serializedRef,
Created: false,
}, nil
}

execution := e.newExecution(ref.ExecutionKey)
mutableCtx := chasm.NewMutableContext(ctx, execution.node)
root, err := startFn(mutableCtx)
if err != nil {
return chasm.EngineUpdateWithStartExecutionResult{}, err
}
if err := execution.node.SetRootComponent(root); err != nil {
return chasm.EngineUpdateWithStartExecutionResult{}, err
}
if err := updateFn(mutableCtx, root); err != nil {
return chasm.EngineUpdateWithStartExecutionResult{}, err
}
_, err = execution.node.CloseTransaction()
if err != nil {
return chasm.EngineUpdateWithStartExecutionResult{}, err
}

execution.root = root
e.executions[newExecutionLookupKey(execution.key)] = execution

serializedRef, err := execution.node.Ref(root)
if err != nil {
return chasm.EngineUpdateWithStartExecutionResult{}, err
}

return chasm.EngineUpdateWithStartExecutionResult{
ExecutionKey: execution.key,
ExecutionRef: serializedRef,
Created: true,
}, nil
}

func (e *Engine[T]) UpdateComponent(
ctx context.Context,
ref chasm.ComponentRef,
updateFn func(chasm.MutableContext, chasm.Component) error,
_ ...chasm.TransitionOption,
) ([]byte, error) {
execution, err := e.mustExecutionForRef(ref)
if err != nil {
return nil, err
}
return e.updateComponentInExecution(ctx, execution, ref, updateFn)
}

func (e *Engine[T]) ReadComponent(
ctx context.Context,
ref chasm.ComponentRef,
readFn func(chasm.Context, chasm.Component) error,
_ ...chasm.TransitionOption,
) error {
execution, err := e.mustExecutionForRef(ref)
if err != nil {
return err
}

component, err := execution.node.Component(chasm.NewContext(ctx, execution.node), ref)
if err != nil {
return err
}

readCtx := chasm.NewContext(ctx, execution.node)
return readFn(readCtx, component)
}

func (e *Engine[T]) PollComponent(
context.Context,
chasm.ComponentRef,
func(chasm.Context, chasm.Component) (bool, error),
...chasm.TransitionOption,
) ([]byte, error) {
return nil, serviceerror.NewUnimplemented("chasmtest.Engine.PollComponent")
}

func (e *Engine[T]) DeleteExecution(
context.Context,
chasm.ComponentRef,
chasm.DeleteExecutionRequest,
) error {
return serviceerror.NewUnimplemented("chasmtest.Engine.DeleteExecution")
}

func (e *Engine[T]) NotifyExecution(chasm.ExecutionKey) {}

func (e *Engine[T]) newExecution(key chasm.ExecutionKey) *execution {
key = normalizeExecutionKey(key)
timeSource := clock.NewEventTimeSource()
timeSource.Update(time.Now())
backend := &chasm.MockNodeBackend{
HandleNextTransitionCount: func() int64 { return 2 },
HandleGetCurrentVersion: func() int64 { return 1 },
HandleGetWorkflowKey: func() definition.WorkflowKey {
return definition.NewWorkflowKey(key.NamespaceID, key.BusinessID, key.RunID)
},
HandleIsWorkflow: func() bool { return false },
HandleCurrentVersionedTransition: func() *persistencespb.VersionedTransition {
return &persistencespb.VersionedTransition{
NamespaceFailoverVersion: 1,
TransitionCount: 1,
}
},
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're going to want to increment the current version on every transaction (UpdateComponent, StartExecution, UpdateWithStartExecution).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm ok, yeah I left this part out since the backend is not even accessible right now. Should we discuss what verifications would be useful to check on the MockNodeBackend?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't really care about verifications on transition counts and rather that be in scope of functional tests, should we just leave the MockNodeBackend impl as is?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The engine should be as close to the real engine as possible. Ideally it would have the exact same behavior.

}
return &execution{
key: key,
backend: backend,
timeSource: timeSource,
node: chasm.NewEmptyTree(
e.registry,
timeSource,
backend,
chasm.DefaultPathEncoder,
e.logger,
e.metrics,
),
}
}

func (e *Engine[T]) executionForKey(key chasm.ExecutionKey) (*execution, bool) {
execution, ok := e.executions[newExecutionLookupKey(normalizeExecutionKey(key))]
return execution, ok
}

func (e *Engine[T]) mustExecutionForRef(ref chasm.ComponentRef) (*execution, error) {
execution, ok := e.executionForKey(ref.ExecutionKey)
if !ok {
return nil, serviceerror.NewNotFound(
fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID),
)
}
return execution, nil
}

func (e *Engine[T]) updateComponentInExecution(
ctx context.Context,
execution *execution,
ref chasm.ComponentRef,
updateFn func(chasm.MutableContext, chasm.Component) error,
) ([]byte, error) {
component, err := execution.node.Component(chasm.NewContext(ctx, execution.node), ref)
if err != nil {
return nil, err
}

mutableCtx := chasm.NewMutableContext(ctx, execution.node)
if err := updateFn(mutableCtx, component); err != nil {
return nil, err
}

_, err = execution.node.CloseTransaction()
if err != nil {
return nil, err
}

return mutableCtx.Ref(component)
}

func newExecutionLookupKey(key chasm.ExecutionKey) executionLookupKey {
return executionLookupKey{
namespaceID: key.NamespaceID,
businessID: key.BusinessID,
}
}

func normalizeExecutionKey(key chasm.ExecutionKey) chasm.ExecutionKey {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just ask test authors to create a valid execution key instead of implicitly adding defaults.

if key.NamespaceID == "" {
key.NamespaceID = "test-namespace-id"
}
if key.BusinessID == "" {
key.BusinessID = "test-workflow-id"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
key.BusinessID = "test-workflow-id"
key.BusinessID = "test-business-id"

}
return key
}
Loading