-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add chasm test engine for unit tests #9882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
ea21b9b
171cdf0
267e896
9c6bf52
54ecfd9
ab0e979
2e57fc9
1693f89
66b376f
d56ffa7
3f0fa04
3e419c2
8cee03f
f57bcdd
7654cac
1b15856
931c412
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,351 @@ | ||||||
| package chasmtest | ||||||
|
|
||||||
| import ( | ||||||
| "context" | ||||||
| "fmt" | ||||||
| "testing" | ||||||
| "time" | ||||||
|
|
||||||
| "github.com/stretchr/testify/require" | ||||||
| "go.temporal.io/api/serviceerror" | ||||||
| persistencespb "go.temporal.io/server/api/persistence/v1" | ||||||
| "go.temporal.io/server/chasm" | ||||||
| "go.temporal.io/server/common/clock" | ||||||
| "go.temporal.io/server/common/definition" | ||||||
| "go.temporal.io/server/common/log" | ||||||
| "go.temporal.io/server/common/metrics" | ||||||
| "go.temporal.io/server/common/testing/testlogger" | ||||||
| ) | ||||||
|
|
||||||
| type ( | ||||||
| Option[T chasm.RootComponent] func(*Engine[T]) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Call this |
||||||
|
|
||||||
| Engine[T chasm.RootComponent] struct { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does the engine need to be typed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't, it was only added for the helper func to get the Root component, but definitely not needed, removing this along with other accessor methods as mentioned below |
||||||
| t *testing.T | ||||||
| registry *chasm.Registry | ||||||
| logger log.Logger | ||||||
| metrics metrics.Handler | ||||||
|
|
||||||
| rootExecutionKey chasm.ExecutionKey | ||||||
| root T | ||||||
| rootExecution *execution | ||||||
| executions map[executionLookupKey]*execution | ||||||
| } | ||||||
|
|
||||||
| execution struct { | ||||||
| key chasm.ExecutionKey | ||||||
| node *chasm.Node | ||||||
| backend *chasm.MockNodeBackend | ||||||
| timeSource *clock.EventTimeSource | ||||||
| root chasm.RootComponent | ||||||
| } | ||||||
|
|
||||||
| executionLookupKey struct { | ||||||
| namespaceID string | ||||||
| businessID string | ||||||
| } | ||||||
| ) | ||||||
|
|
||||||
| func NewEngine[T chasm.RootComponent]( | ||||||
| t *testing.T, | ||||||
| registry *chasm.Registry, | ||||||
| opts ...Option[T], | ||||||
| ) *Engine[T] { | ||||||
| t.Helper() | ||||||
|
|
||||||
| e := &Engine[T]{ | ||||||
| t: t, | ||||||
| registry: registry, | ||||||
| logger: testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly), | ||||||
| metrics: metrics.NoopMetricsHandler, | ||||||
| rootExecutionKey: chasm.ExecutionKey{ | ||||||
| NamespaceID: "test-namespace-id", | ||||||
| BusinessID: "test-workflow-id", | ||||||
| RunID: "test-run-id", | ||||||
| }, | ||||||
| executions: make(map[executionLookupKey]*execution), | ||||||
| } | ||||||
|
|
||||||
| for _, opt := range opts { | ||||||
| opt(e) | ||||||
| } | ||||||
|
|
||||||
| return e | ||||||
| } | ||||||
|
|
||||||
| func WithRoot[T chasm.RootComponent]( | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not needed, |
||||||
| factory func(chasm.MutableContext) T, | ||||||
| ) Option[T] { | ||||||
| return func(e *Engine[T]) { | ||||||
| execution := e.newExecution(e.rootExecutionKey) | ||||||
| ctx := chasm.NewMutableContext(context.Background(), execution.node) | ||||||
| root := factory(ctx) | ||||||
| require.NoError(e.t, execution.node.SetRootComponent(root)) | ||||||
| _, err := execution.node.CloseTransaction() | ||||||
| require.NoError(e.t, err) | ||||||
| e.root = root | ||||||
| execution.root = root | ||||||
| e.rootExecution = execution | ||||||
| e.executions[newExecutionLookupKey(execution.key)] = execution | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func WithExecutionKey[T chasm.RootComponent](key chasm.ExecutionKey) Option[T] { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That will also be part of |
||||||
| return func(e *Engine[T]) { | ||||||
| e.rootExecutionKey = key | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) Root() T { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use |
||||||
| return e.root | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) EngineContext() context.Context { | ||||||
| return chasm.NewEngineContext(context.Background(), e) | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) Ref(component chasm.Component) chasm.ComponentRef { | ||||||
| for _, execution := range e.executions { | ||||||
| ref, err := execution.node.Ref(component) | ||||||
| if err != nil { | ||||||
| continue | ||||||
| } | ||||||
| structuredRef, err := chasm.DeserializeComponentRef(ref) | ||||||
| require.NoError(e.t, err) | ||||||
| return structuredRef | ||||||
| } | ||||||
|
|
||||||
| e.t.Fatalf("component %T is not attached to the chasmtest engine", component) | ||||||
| return chasm.ComponentRef{} | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) StartExecution( | ||||||
| ctx context.Context, | ||||||
| ref chasm.ComponentRef, | ||||||
| startFn func(chasm.MutableContext) (chasm.RootComponent, error), | ||||||
| _ ...chasm.TransitionOption, | ||||||
| ) (chasm.StartExecutionResult, error) { | ||||||
| if _, ok := e.executionForKey(ref.ExecutionKey); ok { | ||||||
| return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr("already exists", "", ref.RunID) | ||||||
| } | ||||||
|
|
||||||
| execution := e.newExecution(ref.ExecutionKey) | ||||||
| mutableCtx := chasm.NewMutableContext(ctx, execution.node) | ||||||
| root, err := startFn(mutableCtx) | ||||||
| if err != nil { | ||||||
| return chasm.StartExecutionResult{}, err | ||||||
| } | ||||||
| if err := execution.node.SetRootComponent(root); err != nil { | ||||||
| return chasm.StartExecutionResult{}, err | ||||||
| } | ||||||
| _, err = execution.node.CloseTransaction() | ||||||
| if err != nil { | ||||||
| return chasm.StartExecutionResult{}, err | ||||||
| } | ||||||
|
|
||||||
| execution.root = root | ||||||
| e.executions[newExecutionLookupKey(execution.key)] = execution | ||||||
|
|
||||||
| serializedRef, err := execution.node.Ref(root) | ||||||
| if err != nil { | ||||||
| return chasm.StartExecutionResult{}, err | ||||||
| } | ||||||
|
|
||||||
| return chasm.StartExecutionResult{ | ||||||
| ExecutionKey: execution.key, | ||||||
| ExecutionRef: serializedRef, | ||||||
| Created: true, | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be false when using UseExisting policy.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm right now the implementation just assumes there can be one execution at a time, ig same discussion as below, do we want to handle conflict policies in this engine, or should we just go with a simpler solution? When would unit tests want to test out conflicting execution behavior?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would implement a more complete solution. You basically are implementing a full in-memory CHASM engine. It should have the same semantics as the history chasm engine. |
||||||
| }, nil | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) UpdateWithStartExecution( | ||||||
| ctx context.Context, | ||||||
| ref chasm.ComponentRef, | ||||||
| startFn func(chasm.MutableContext) (chasm.RootComponent, error), | ||||||
| updateFn func(chasm.MutableContext, chasm.Component) error, | ||||||
| _ ...chasm.TransitionOption, | ||||||
| ) (chasm.EngineUpdateWithStartExecutionResult, error) { | ||||||
| if execution, ok := e.executionForKey(ref.ExecutionKey); ok { | ||||||
| serializedRef, err := e.updateComponentInExecution(ctx, execution, ref, updateFn) | ||||||
| if err != nil { | ||||||
| return chasm.EngineUpdateWithStartExecutionResult{}, err | ||||||
| } | ||||||
| return chasm.EngineUpdateWithStartExecutionResult{ | ||||||
| ExecutionKey: execution.key, | ||||||
| ExecutionRef: serializedRef, | ||||||
| Created: false, | ||||||
| }, nil | ||||||
| } | ||||||
|
|
||||||
| execution := e.newExecution(ref.ExecutionKey) | ||||||
| mutableCtx := chasm.NewMutableContext(ctx, execution.node) | ||||||
| root, err := startFn(mutableCtx) | ||||||
| if err != nil { | ||||||
| return chasm.EngineUpdateWithStartExecutionResult{}, err | ||||||
| } | ||||||
| if err := execution.node.SetRootComponent(root); err != nil { | ||||||
| return chasm.EngineUpdateWithStartExecutionResult{}, err | ||||||
| } | ||||||
| if err := updateFn(mutableCtx, root); err != nil { | ||||||
| return chasm.EngineUpdateWithStartExecutionResult{}, err | ||||||
| } | ||||||
| _, err = execution.node.CloseTransaction() | ||||||
| if err != nil { | ||||||
| return chasm.EngineUpdateWithStartExecutionResult{}, err | ||||||
| } | ||||||
|
|
||||||
| execution.root = root | ||||||
| e.executions[newExecutionLookupKey(execution.key)] = execution | ||||||
|
|
||||||
| serializedRef, err := execution.node.Ref(root) | ||||||
| if err != nil { | ||||||
| return chasm.EngineUpdateWithStartExecutionResult{}, err | ||||||
| } | ||||||
|
|
||||||
| return chasm.EngineUpdateWithStartExecutionResult{ | ||||||
| ExecutionKey: execution.key, | ||||||
| ExecutionRef: serializedRef, | ||||||
| Created: true, | ||||||
| }, nil | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) UpdateComponent( | ||||||
| ctx context.Context, | ||||||
| ref chasm.ComponentRef, | ||||||
| updateFn func(chasm.MutableContext, chasm.Component) error, | ||||||
| _ ...chasm.TransitionOption, | ||||||
| ) ([]byte, error) { | ||||||
| execution, err := e.mustExecutionForRef(ref) | ||||||
| if err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
| return e.updateComponentInExecution(ctx, execution, ref, updateFn) | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) ReadComponent( | ||||||
| ctx context.Context, | ||||||
| ref chasm.ComponentRef, | ||||||
| readFn func(chasm.Context, chasm.Component) error, | ||||||
| _ ...chasm.TransitionOption, | ||||||
| ) error { | ||||||
| execution, err := e.mustExecutionForRef(ref) | ||||||
| if err != nil { | ||||||
| return err | ||||||
| } | ||||||
|
|
||||||
| component, err := execution.node.Component(chasm.NewContext(ctx, execution.node), ref) | ||||||
| if err != nil { | ||||||
| return err | ||||||
| } | ||||||
|
|
||||||
| readCtx := chasm.NewContext(ctx, execution.node) | ||||||
| return readFn(readCtx, component) | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) PollComponent( | ||||||
| context.Context, | ||||||
| chasm.ComponentRef, | ||||||
| func(chasm.Context, chasm.Component) (bool, error), | ||||||
| ...chasm.TransitionOption, | ||||||
| ) ([]byte, error) { | ||||||
| return nil, serviceerror.NewUnimplemented("chasmtest.Engine.PollComponent") | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) DeleteExecution( | ||||||
| context.Context, | ||||||
| chasm.ComponentRef, | ||||||
| chasm.DeleteExecutionRequest, | ||||||
| ) error { | ||||||
| return serviceerror.NewUnimplemented("chasmtest.Engine.DeleteExecution") | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) NotifyExecution(chasm.ExecutionKey) {} | ||||||
|
|
||||||
| func (e *Engine[T]) newExecution(key chasm.ExecutionKey) *execution { | ||||||
| key = normalizeExecutionKey(key) | ||||||
| timeSource := clock.NewEventTimeSource() | ||||||
| timeSource.Update(time.Now()) | ||||||
| backend := &chasm.MockNodeBackend{ | ||||||
| HandleNextTransitionCount: func() int64 { return 2 }, | ||||||
| HandleGetCurrentVersion: func() int64 { return 1 }, | ||||||
| HandleGetWorkflowKey: func() definition.WorkflowKey { | ||||||
| return definition.NewWorkflowKey(key.NamespaceID, key.BusinessID, key.RunID) | ||||||
| }, | ||||||
| HandleIsWorkflow: func() bool { return false }, | ||||||
| HandleCurrentVersionedTransition: func() *persistencespb.VersionedTransition { | ||||||
| return &persistencespb.VersionedTransition{ | ||||||
| NamespaceFailoverVersion: 1, | ||||||
| TransitionCount: 1, | ||||||
| } | ||||||
| }, | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're going to want to increment the current version on every transaction (
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm ok, yeah I left this part out since the backend is not even accessible right now. Should we discuss what verifications would be useful to check on the MockNodeBackend?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't really care about verifications on transition counts and rather that be in scope of functional tests, should we just leave the MockNodeBackend impl as is?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The engine should be as close to the real engine as possible. Ideally it would have the exact same behavior. |
||||||
| } | ||||||
| return &execution{ | ||||||
| key: key, | ||||||
| backend: backend, | ||||||
| timeSource: timeSource, | ||||||
| node: chasm.NewEmptyTree( | ||||||
| e.registry, | ||||||
| timeSource, | ||||||
| backend, | ||||||
| chasm.DefaultPathEncoder, | ||||||
| e.logger, | ||||||
| e.metrics, | ||||||
| ), | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) executionForKey(key chasm.ExecutionKey) (*execution, bool) { | ||||||
| execution, ok := e.executions[newExecutionLookupKey(normalizeExecutionKey(key))] | ||||||
| return execution, ok | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) mustExecutionForRef(ref chasm.ComponentRef) (*execution, error) { | ||||||
| execution, ok := e.executionForKey(ref.ExecutionKey) | ||||||
| if !ok { | ||||||
| return nil, serviceerror.NewNotFound( | ||||||
| fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID), | ||||||
| ) | ||||||
| } | ||||||
| return execution, nil | ||||||
| } | ||||||
|
|
||||||
| func (e *Engine[T]) updateComponentInExecution( | ||||||
| ctx context.Context, | ||||||
| execution *execution, | ||||||
| ref chasm.ComponentRef, | ||||||
| updateFn func(chasm.MutableContext, chasm.Component) error, | ||||||
| ) ([]byte, error) { | ||||||
| component, err := execution.node.Component(chasm.NewContext(ctx, execution.node), ref) | ||||||
| if err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
|
|
||||||
| mutableCtx := chasm.NewMutableContext(ctx, execution.node) | ||||||
| if err := updateFn(mutableCtx, component); err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
|
|
||||||
| _, err = execution.node.CloseTransaction() | ||||||
| if err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
|
|
||||||
| return mutableCtx.Ref(component) | ||||||
| } | ||||||
|
|
||||||
| func newExecutionLookupKey(key chasm.ExecutionKey) executionLookupKey { | ||||||
| return executionLookupKey{ | ||||||
| namespaceID: key.NamespaceID, | ||||||
| businessID: key.BusinessID, | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func normalizeExecutionKey(key chasm.ExecutionKey) chasm.ExecutionKey { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would just ask test authors to create a valid execution key instead of implicitly adding defaults. |
||||||
| if key.NamespaceID == "" { | ||||||
| key.NamespaceID = "test-namespace-id" | ||||||
| } | ||||||
| if key.BusinessID == "" { | ||||||
| key.BusinessID = "test-workflow-id" | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| } | ||||||
| return key | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only issue with what you've done here is that there's a lot of duplicate logic with the real engine implementation. When we last synced we said you would reuse the "real" engine and mock the persistence interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just concerned that the implementations will diverge. We really should mimic the real engine's behavior as closely as possible because otherwise that could lead to wrong assumptions and subtle bugs.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree diverging implementations is a big concern here. Last discussion, we talked about mocking the persistence layer only and not using MockNodeBackend. The main problem is package cyclic imports, since both the existing chasm_engine and the framework's dependence on NodeBackend are implemented in history. I thought maybe we could try just mocking persistence/execution manager only in the meeting, but I don't see how this is possible after looking at it. Lmk if i'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also agree reusing the implementation and mock persistence layer will be the best here.
With that said, practically I am not too worried about code diverging here. Chasm engine logic should be quite stable or we have a problem in the chasm design.
Now that I think about it I actually don't know how mocking persistence layer can solve the IDReuse/ConflictPolicy part, which relies on errors returned from DB.