Skip to content

Commit 613d215

Browse files
committed
add functional tests for history_node cleanup
1 parent f9b2f6d commit 613d215

1 file changed

Lines changed: 278 additions & 0 deletions

File tree

tests/history_node_cleanup_test.go

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
package tests
2+
3+
// This file contains two functional tests to make sure that history_tree and
4+
// history_node rows are cleaned up correctly after a workflow deletion.
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/stretchr/testify/suite"
13+
commandpb "go.temporal.io/api/command/v1"
14+
commonpb "go.temporal.io/api/common/v1"
15+
enumspb "go.temporal.io/api/enums/v1"
16+
"go.temporal.io/api/workflowservice/v1"
17+
"go.temporal.io/server/api/adminservice/v1"
18+
"go.temporal.io/server/chasm"
19+
"go.temporal.io/server/common"
20+
"go.temporal.io/server/common/persistence"
21+
"go.temporal.io/server/common/persistence/versionhistory"
22+
"go.temporal.io/server/common/testing/taskpoller"
23+
"go.temporal.io/server/common/testing/testvars"
24+
"go.temporal.io/server/tests/testcore"
25+
"google.golang.org/protobuf/types/known/durationpb"
26+
)
27+
28+
type HistoryNodeCleanupSuite struct {
29+
testcore.FunctionalTestBase
30+
}
31+
32+
func TestHistoryNodeCleanupSuite(t *testing.T) {
33+
t.Parallel()
34+
suite.Run(t, new(HistoryNodeCleanupSuite))
35+
}
36+
37+
func (s *HistoryNodeCleanupSuite) SetupSuite() {
38+
s.SetupSuiteWithCluster(testcore.WithSharedCluster())
39+
}
40+
41+
// TestDeletionOfSingleWorkflow runs a single workflow, force-deletes it via the
42+
// admin API, then asserts that all history_tree and history_node rows are removed.
43+
func (s *HistoryNodeCleanupSuite) TestDeletionOfSingleWorkflow() {
44+
tv := testvars.New(s.T())
45+
ctx := testcore.NewContext()
46+
47+
shardID := common.WorkflowIDToHistoryShard(
48+
s.NamespaceID().String(),
49+
tv.WorkflowID(),
50+
s.GetTestClusterConfig().HistoryConfig.NumHistoryShards,
51+
)
52+
execMgr := s.GetTestCluster().TestBase().ExecutionManager
53+
poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace().String())
54+
55+
startResp, err := s.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
56+
RequestId: uuid.NewString(),
57+
Namespace: s.Namespace().String(),
58+
WorkflowId: tv.WorkflowID(),
59+
WorkflowType: tv.WorkflowType(),
60+
TaskQueue: tv.TaskQueue(),
61+
})
62+
s.NoError(err)
63+
runID := startResp.RunId
64+
65+
s.completeWorkflowWithActivities(tv, poller)
66+
67+
branchToken := s.captureCurrentBranchToken(ctx, tv.WorkflowID(), runID)
68+
69+
// The admin force-delete and the DeleteHistoryEventTask retention timer both
70+
// reach the same persistence.ExecutionManager.DeleteHistoryBranch call, which
71+
// is the operation that removes history_tree and history_node rows.
72+
_, err = s.AdminClient().DeleteWorkflowExecution(ctx, &adminservice.DeleteWorkflowExecutionRequest{
73+
Namespace: s.Namespace().String(),
74+
Execution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runID},
75+
Archetype: chasm.WorkflowArchetype,
76+
})
77+
s.NoError(err)
78+
s.waitForMutableStateGone(ctx, shardID, execMgr, tv.WorkflowID(), runID)
79+
80+
resp, err := execMgr.ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
81+
ShardID: shardID,
82+
BranchToken: branchToken,
83+
MinEventID: common.FirstEventID,
84+
MaxEventID: common.EndEventID,
85+
PageSize: 1000,
86+
})
87+
if err == nil {
88+
s.Empty(resp.HistoryEvents, "history_node rows should be gone after deletion")
89+
}
90+
// A NotFound/InvalidArgument error is also acceptable — it means the branch is gone.
91+
}
92+
93+
// TestDeletionOfWorkflowAfterReset runs a workflow, resets it to create a new
94+
// run, force-deletes both runs via the admin API, then asserts that no
95+
// history_node rows remain for either branch.
96+
func (s *HistoryNodeCleanupSuite) TestDeletionOfWorkflowAfterReset() {
97+
tv := testvars.New(s.T())
98+
ctx := testcore.NewContext()
99+
100+
shardID := common.WorkflowIDToHistoryShard(
101+
s.NamespaceID().String(),
102+
tv.WorkflowID(),
103+
s.GetTestClusterConfig().HistoryConfig.NumHistoryShards,
104+
)
105+
execMgr := s.GetTestCluster().TestBase().ExecutionManager
106+
poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace().String())
107+
108+
// ── Step 1: start and complete run A ─────────────────────────────────────
109+
startResp, err := s.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
110+
RequestId: uuid.NewString(),
111+
Namespace: s.Namespace().String(),
112+
WorkflowId: tv.WorkflowID(),
113+
WorkflowType: tv.WorkflowType(),
114+
TaskQueue: tv.TaskQueue(),
115+
})
116+
s.NoError(err)
117+
runIDA := startResp.RunId
118+
119+
s.completeWorkflowWithActivities(tv, poller)
120+
121+
branchTokenA := s.captureCurrentBranchToken(ctx, tv.WorkflowID(), runIDA)
122+
123+
// Find the first WorkflowTaskCompleted event to use as the reset point.
124+
// B inherits A's opening events and forks from there.
125+
var resetEventID int64
126+
var histPageToken []byte
127+
resetSearch:
128+
for {
129+
histResp, err := s.FrontendClient().GetWorkflowExecutionHistory(ctx, &workflowservice.GetWorkflowExecutionHistoryRequest{
130+
Namespace: s.Namespace().String(),
131+
Execution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runIDA},
132+
NextPageToken: histPageToken,
133+
MaximumPageSize: 100,
134+
})
135+
s.NoError(err)
136+
for _, event := range histResp.GetHistory().GetEvents() {
137+
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
138+
resetEventID = event.EventId
139+
break resetSearch
140+
}
141+
}
142+
histPageToken = histResp.GetNextPageToken()
143+
if len(histPageToken) == 0 {
144+
break
145+
}
146+
}
147+
s.NotZero(resetEventID)
148+
149+
// ── Step 2: reset A → run B ───────────────────────────────────────────────
150+
resetResp, err := s.FrontendClient().ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{
151+
Namespace: s.Namespace().String(),
152+
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runIDA},
153+
Reason: "test",
154+
RequestId: uuid.NewString(),
155+
WorkflowTaskFinishEventId: resetEventID,
156+
})
157+
s.NoError(err)
158+
runIDB := resetResp.RunId
159+
160+
tvB := tv.WithRunID(runIDB)
161+
s.completeWorkflowWithActivities(tvB, poller)
162+
163+
branchTokenB := s.captureCurrentBranchToken(ctx, tv.WorkflowID(), runIDB)
164+
165+
// ── Step 3: force-delete run A ────────────────────────────────────────────
166+
// Both the admin force-delete and the DeleteHistoryEventTask retention timer
167+
// ultimately call persistence.ExecutionManager.DeleteHistoryBranch, the same
168+
// operation that removes history_tree and history_node rows.
169+
_, err = s.AdminClient().DeleteWorkflowExecution(ctx, &adminservice.DeleteWorkflowExecutionRequest{
170+
Namespace: s.Namespace().String(),
171+
Execution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runIDA},
172+
Archetype: chasm.WorkflowArchetype,
173+
})
174+
s.NoError(err)
175+
s.waitForMutableStateGone(ctx, shardID, execMgr, tv.WorkflowID(), runIDA)
176+
177+
// ── Step 4: force-delete run B ────────────────────────────────────────────
178+
_, err = s.AdminClient().DeleteWorkflowExecution(ctx, &adminservice.DeleteWorkflowExecutionRequest{
179+
Namespace: s.Namespace().String(),
180+
Execution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runIDB},
181+
Archetype: chasm.WorkflowArchetype,
182+
})
183+
s.NoError(err)
184+
s.waitForMutableStateGone(ctx, shardID, execMgr, tv.WorkflowID(), runIDB)
185+
186+
// ── Assertions ────────────────────────────────────────────────────────────
187+
for _, tc := range []struct {
188+
label string
189+
token []byte
190+
}{
191+
{"run A (original)", branchTokenA},
192+
{"run B (reset)", branchTokenB},
193+
} {
194+
resp, err := execMgr.ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
195+
ShardID: shardID,
196+
BranchToken: tc.token,
197+
MinEventID: common.FirstEventID,
198+
MaxEventID: common.EndEventID,
199+
PageSize: 1000,
200+
})
201+
if err == nil {
202+
s.Empty(resp.HistoryEvents,
203+
"history_node rows for %s should be gone after deletion", tc.label)
204+
}
205+
// A NotFound/InvalidArgument error is acceptable — it means the branch is gone.
206+
}
207+
}
208+
209+
// completeWorkflowWithActivities drives a workflow through a single activity then completes it.
210+
func (s *HistoryNodeCleanupSuite) completeWorkflowWithActivities(
211+
tv *testvars.TestVars,
212+
poller *taskpoller.TaskPoller,
213+
) {
214+
activityScheduled := false
215+
wtHandler := func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
216+
if !activityScheduled {
217+
activityScheduled = true
218+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
219+
Commands: []*commandpb.Command{{
220+
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
221+
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
222+
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
223+
ActivityId: "act",
224+
ActivityType: tv.ActivityType(),
225+
TaskQueue: tv.TaskQueue(),
226+
ScheduleToCloseTimeout: durationpb.New(30 * time.Second),
227+
StartToCloseTimeout: durationpb.New(10 * time.Second),
228+
},
229+
},
230+
}},
231+
}, nil
232+
}
233+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
234+
Commands: []*commandpb.Command{{
235+
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
236+
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}},
237+
}},
238+
}, nil
239+
}
240+
241+
_, err := poller.PollAndHandleWorkflowTask(tv, wtHandler)
242+
s.NoError(err)
243+
_, err = poller.PollAndHandleActivityTask(tv, taskpoller.CompleteActivityTask(tv))
244+
s.NoError(err)
245+
_, err = poller.PollAndHandleWorkflowTask(tv, wtHandler)
246+
s.NoError(err)
247+
}
248+
249+
// captureCurrentBranchToken extracts the current branch token from a workflow's mutable state.
250+
func (s *HistoryNodeCleanupSuite) captureCurrentBranchToken(ctx context.Context, workflowID, runID string) []byte {
251+
descResp, err := s.AdminClient().DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
252+
Namespace: s.Namespace().String(),
253+
Execution: &commonpb.WorkflowExecution{WorkflowId: workflowID, RunId: runID},
254+
Archetype: chasm.WorkflowArchetype,
255+
})
256+
s.NoError(err)
257+
vh := descResp.GetDatabaseMutableState().GetExecutionInfo().GetVersionHistories()
258+
currentVH, err := versionhistory.GetCurrentVersionHistory(vh)
259+
s.NoError(err)
260+
token := currentVH.GetBranchToken()
261+
s.NotEmpty(token)
262+
return token
263+
}
264+
265+
// waitForMutableStateGone polls until GetWorkflowExecution returns NotFound for the given runID.
266+
func (s *HistoryNodeCleanupSuite) waitForMutableStateGone(ctx context.Context, shardID int32, execMgr persistence.ExecutionManager, workflowID, runID string) {
267+
s.Eventually(func() bool {
268+
_, err := execMgr.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
269+
ShardID: shardID,
270+
NamespaceID: s.NamespaceID().String(),
271+
WorkflowID: workflowID,
272+
RunID: runID,
273+
ArchetypeID: chasm.WorkflowArchetypeID,
274+
})
275+
return common.IsNotFoundError(err)
276+
}, 10*time.Second, 100*time.Millisecond,
277+
"timed out waiting for mutable state of run %s to be deleted", runID)
278+
}

0 commit comments

Comments
 (0)