Skip to content

Commit ea21b9b

Browse files
committed
Add chasm test engine for engine-backed tests
1 parent a133503 commit ea21b9b

3 files changed

Lines changed: 434 additions & 90 deletions

File tree

chasm/chasmtest/test_engine.go

Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
package chasmtest
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
"go.temporal.io/api/serviceerror"
11+
persistencespb "go.temporal.io/server/api/persistence/v1"
12+
"go.temporal.io/server/chasm"
13+
"go.temporal.io/server/common/clock"
14+
"go.temporal.io/server/common/definition"
15+
"go.temporal.io/server/common/log"
16+
"go.temporal.io/server/common/metrics"
17+
"go.temporal.io/server/common/testing/testlogger"
18+
)
19+
20+
type (
21+
Option[T chasm.Component] func(*Engine[T])
22+
23+
Engine[T chasm.Component] struct {
24+
t *testing.T
25+
registry *chasm.Registry
26+
logger log.Logger
27+
metrics metrics.Handler
28+
29+
rootExecutionKey chasm.ExecutionKey
30+
root T
31+
rootRuntime *runtime
32+
executions map[executionLookupKey]*runtime
33+
}
34+
35+
runtime struct {
36+
key chasm.ExecutionKey
37+
node *chasm.Node
38+
backend *chasm.MockNodeBackend
39+
timeSource *clock.EventTimeSource
40+
root chasm.Component
41+
}
42+
43+
executionLookupKey struct {
44+
namespaceID string
45+
businessID string
46+
}
47+
)
48+
49+
func NewEngine[T chasm.Component](
50+
t *testing.T,
51+
registry *chasm.Registry,
52+
opts ...Option[T],
53+
) *Engine[T] {
54+
t.Helper()
55+
56+
e := &Engine[T]{
57+
t: t,
58+
registry: registry,
59+
logger: testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly),
60+
metrics: metrics.NoopMetricsHandler,
61+
rootExecutionKey: chasm.ExecutionKey{
62+
NamespaceID: "test-namespace-id",
63+
BusinessID: "test-workflow-id",
64+
RunID: "test-run-id",
65+
},
66+
executions: make(map[executionLookupKey]*runtime),
67+
}
68+
69+
for _, opt := range opts {
70+
opt(e)
71+
}
72+
73+
return e
74+
}
75+
76+
func WithRoot[T chasm.Component](
77+
factory func(chasm.MutableContext) T,
78+
) Option[T] {
79+
return func(e *Engine[T]) {
80+
runtime := e.newRuntime(e.rootExecutionKey)
81+
ctx := chasm.NewMutableContext(context.Background(), runtime.node)
82+
root := factory(ctx)
83+
require.NoError(e.t, runtime.node.SetRootComponent(root))
84+
_, err := runtime.node.CloseTransaction()
85+
require.NoError(e.t, err)
86+
e.root = root
87+
runtime.root = root
88+
e.rootRuntime = runtime
89+
e.executions[newExecutionLookupKey(runtime.key)] = runtime
90+
}
91+
}
92+
93+
func WithExecutionKey[T chasm.Component](key chasm.ExecutionKey) Option[T] {
94+
return func(e *Engine[T]) {
95+
e.rootExecutionKey = key
96+
}
97+
}
98+
99+
func (e *Engine[T]) Root() T {
100+
return e.root
101+
}
102+
103+
func (e *Engine[T]) EngineContext() context.Context {
104+
return chasm.NewEngineContext(context.Background(), e)
105+
}
106+
107+
func (e *Engine[T]) Ref(component chasm.Component) chasm.ComponentRef {
108+
for _, runtime := range e.executions {
109+
ref, err := runtime.node.Ref(component)
110+
if err != nil {
111+
continue
112+
}
113+
structuredRef, err := chasm.DeserializeComponentRef(ref)
114+
require.NoError(e.t, err)
115+
return structuredRef
116+
}
117+
118+
e.t.Fatalf("component %T is not attached to the chasmtest engine", component)
119+
return chasm.ComponentRef{}
120+
}
121+
122+
func (e *Engine[T]) StartExecution(
123+
ctx context.Context,
124+
ref chasm.ComponentRef,
125+
startFn func(chasm.MutableContext) (chasm.RootComponent, error),
126+
_ ...chasm.TransitionOption,
127+
) (chasm.StartExecutionResult, error) {
128+
if _, ok := e.executionForKey(ref.ExecutionKey); ok {
129+
return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr("already exists", "", ref.RunID)
130+
}
131+
132+
runtime := e.newRuntime(ref.ExecutionKey)
133+
mutableCtx := chasm.NewMutableContext(ctx, runtime.node)
134+
root, err := startFn(mutableCtx)
135+
if err != nil {
136+
return chasm.StartExecutionResult{}, err
137+
}
138+
if err := runtime.node.SetRootComponent(root); err != nil {
139+
return chasm.StartExecutionResult{}, err
140+
}
141+
_, err = runtime.node.CloseTransaction()
142+
if err != nil {
143+
return chasm.StartExecutionResult{}, err
144+
}
145+
146+
runtime.root = root
147+
e.executions[newExecutionLookupKey(runtime.key)] = runtime
148+
149+
serializedRef, err := runtime.node.Ref(root)
150+
if err != nil {
151+
return chasm.StartExecutionResult{}, err
152+
}
153+
154+
return chasm.StartExecutionResult{
155+
ExecutionKey: runtime.key,
156+
ExecutionRef: serializedRef,
157+
Created: true,
158+
}, nil
159+
}
160+
161+
func (e *Engine[T]) UpdateWithStartExecution(
162+
ctx context.Context,
163+
ref chasm.ComponentRef,
164+
startFn func(chasm.MutableContext) (chasm.RootComponent, error),
165+
updateFn func(chasm.MutableContext, chasm.Component) error,
166+
_ ...chasm.TransitionOption,
167+
) (chasm.EngineUpdateWithStartExecutionResult, error) {
168+
if runtime, ok := e.executionForKey(ref.ExecutionKey); ok {
169+
serializedRef, err := e.updateComponentInRuntime(ctx, runtime, ref, updateFn)
170+
if err != nil {
171+
return chasm.EngineUpdateWithStartExecutionResult{}, err
172+
}
173+
return chasm.EngineUpdateWithStartExecutionResult{
174+
ExecutionKey: runtime.key,
175+
ExecutionRef: serializedRef,
176+
Created: false,
177+
}, nil
178+
}
179+
180+
runtime := e.newRuntime(ref.ExecutionKey)
181+
mutableCtx := chasm.NewMutableContext(ctx, runtime.node)
182+
root, err := startFn(mutableCtx)
183+
if err != nil {
184+
return chasm.EngineUpdateWithStartExecutionResult{}, err
185+
}
186+
if err := runtime.node.SetRootComponent(root); err != nil {
187+
return chasm.EngineUpdateWithStartExecutionResult{}, err
188+
}
189+
if err := updateFn(mutableCtx, root); err != nil {
190+
return chasm.EngineUpdateWithStartExecutionResult{}, err
191+
}
192+
_, err = runtime.node.CloseTransaction()
193+
if err != nil {
194+
return chasm.EngineUpdateWithStartExecutionResult{}, err
195+
}
196+
197+
runtime.root = root
198+
e.executions[newExecutionLookupKey(runtime.key)] = runtime
199+
200+
serializedRef, err := runtime.node.Ref(root)
201+
if err != nil {
202+
return chasm.EngineUpdateWithStartExecutionResult{}, err
203+
}
204+
205+
return chasm.EngineUpdateWithStartExecutionResult{
206+
ExecutionKey: runtime.key,
207+
ExecutionRef: serializedRef,
208+
Created: true,
209+
}, nil
210+
}
211+
212+
func (e *Engine[T]) UpdateComponent(
213+
ctx context.Context,
214+
ref chasm.ComponentRef,
215+
updateFn func(chasm.MutableContext, chasm.Component) error,
216+
_ ...chasm.TransitionOption,
217+
) ([]byte, error) {
218+
runtime, err := e.mustExecutionForRef(ref)
219+
if err != nil {
220+
return nil, err
221+
}
222+
return e.updateComponentInRuntime(ctx, runtime, ref, updateFn)
223+
}
224+
225+
func (e *Engine[T]) ReadComponent(
226+
ctx context.Context,
227+
ref chasm.ComponentRef,
228+
readFn func(chasm.Context, chasm.Component) error,
229+
_ ...chasm.TransitionOption,
230+
) error {
231+
runtime, err := e.mustExecutionForRef(ref)
232+
if err != nil {
233+
return err
234+
}
235+
236+
component, err := runtime.node.Component(chasm.NewContext(ctx, runtime.node), ref)
237+
if err != nil {
238+
return err
239+
}
240+
241+
readCtx := chasm.NewContext(ctx, runtime.node)
242+
return readFn(readCtx, component)
243+
}
244+
245+
func (e *Engine[T]) PollComponent(
246+
context.Context,
247+
chasm.ComponentRef,
248+
func(chasm.Context, chasm.Component) (bool, error),
249+
...chasm.TransitionOption,
250+
) ([]byte, error) {
251+
return nil, serviceerror.NewUnimplemented("chasmtest.Engine.PollComponent")
252+
}
253+
254+
func (e *Engine[T]) DeleteExecution(
255+
context.Context,
256+
chasm.ComponentRef,
257+
chasm.DeleteExecutionRequest,
258+
) error {
259+
return serviceerror.NewUnimplemented("chasmtest.Engine.DeleteExecution")
260+
}
261+
262+
func (e *Engine[T]) NotifyExecution(chasm.ExecutionKey) {}
263+
264+
func (e *Engine[T]) newRuntime(key chasm.ExecutionKey) *runtime {
265+
key = normalizeExecutionKey(key)
266+
timeSource := clock.NewEventTimeSource()
267+
timeSource.Update(time.Now())
268+
backend := &chasm.MockNodeBackend{
269+
HandleNextTransitionCount: func() int64 { return 2 },
270+
HandleGetCurrentVersion: func() int64 { return 1 },
271+
HandleGetWorkflowKey: func() definition.WorkflowKey {
272+
return definition.NewWorkflowKey(key.NamespaceID, key.BusinessID, key.RunID)
273+
},
274+
HandleIsWorkflow: func() bool { return false },
275+
HandleCurrentVersionedTransition: func() *persistencespb.VersionedTransition {
276+
return &persistencespb.VersionedTransition{
277+
NamespaceFailoverVersion: 1,
278+
TransitionCount: 1,
279+
}
280+
},
281+
}
282+
return &runtime{
283+
key: key,
284+
backend: backend,
285+
timeSource: timeSource,
286+
node: chasm.NewEmptyTree(
287+
e.registry,
288+
timeSource,
289+
backend,
290+
chasm.DefaultPathEncoder,
291+
e.logger,
292+
e.metrics,
293+
),
294+
}
295+
}
296+
297+
func (e *Engine[T]) executionForKey(key chasm.ExecutionKey) (*runtime, bool) {
298+
runtime, ok := e.executions[newExecutionLookupKey(normalizeExecutionKey(key))]
299+
return runtime, ok
300+
}
301+
302+
func (e *Engine[T]) mustExecutionForRef(ref chasm.ComponentRef) (*runtime, error) {
303+
runtime, ok := e.executionForKey(ref.ExecutionKey)
304+
if !ok {
305+
return nil, serviceerror.NewNotFound(
306+
fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID),
307+
)
308+
}
309+
return runtime, nil
310+
}
311+
312+
func (e *Engine[T]) updateComponentInRuntime(
313+
ctx context.Context,
314+
runtime *runtime,
315+
ref chasm.ComponentRef,
316+
updateFn func(chasm.MutableContext, chasm.Component) error,
317+
) ([]byte, error) {
318+
component, err := runtime.node.Component(chasm.NewContext(ctx, runtime.node), ref)
319+
if err != nil {
320+
return nil, err
321+
}
322+
323+
mutableCtx := chasm.NewMutableContext(ctx, runtime.node)
324+
if err := updateFn(mutableCtx, component); err != nil {
325+
return nil, err
326+
}
327+
328+
_, err = runtime.node.CloseTransaction()
329+
if err != nil {
330+
return nil, err
331+
}
332+
333+
return mutableCtx.Ref(component)
334+
}
335+
336+
func newExecutionLookupKey(key chasm.ExecutionKey) executionLookupKey {
337+
return executionLookupKey{
338+
namespaceID: key.NamespaceID,
339+
businessID: key.BusinessID,
340+
}
341+
}
342+
343+
func normalizeExecutionKey(key chasm.ExecutionKey) chasm.ExecutionKey {
344+
if key.NamespaceID == "" {
345+
key.NamespaceID = "test-namespace-id"
346+
}
347+
if key.BusinessID == "" {
348+
key.BusinessID = "test-workflow-id"
349+
}
350+
return key
351+
}

0 commit comments

Comments
 (0)