Skip to content

Commit 27de98f

Browse files
JoshVanLcicoyle
andauthored
Add wf purge --all-filter-status flag (#1609)
* Add `wf purge --all-filter-status` flag Allow filtering by runtime status when purging workflow instances with `--all-older-than`. This enables purging only instances in a specific state (e.g., COMPLETED, FAILED, TERMINATED) rather than all terminal instances. The flag is mutually exclusive with `--all` and requires `--all-older-than` to be set. Signed-off-by: joshvanl <me@joshvanl.dev> * Review comments Signed-off-by: joshvanl <me@joshvanl.dev> * Adds review comments Signed-off-by: joshvanl <me@joshvanl.dev> --------- Signed-off-by: joshvanl <me@joshvanl.dev> Co-authored-by: Cassie Coyle <cassie@diagrid.io>
1 parent 4b40955 commit 27de98f

7 files changed

Lines changed: 323 additions & 18 deletions

File tree

cmd/workflow/purge.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,21 @@ package workflow
1515

1616
import (
1717
"errors"
18+
"slices"
19+
"strings"
1820

1921
"github.com/dapr/cli/pkg/workflow"
2022
"github.com/dapr/kit/signals"
2123
"github.com/spf13/cobra"
2224
)
2325

2426
var (
25-
flagPurgeOlderThan string
26-
flagPurgeAll bool
27-
flagPurgeConn *connFlag
28-
flagPurgeForce bool
29-
schedulerNamespace string
27+
flagPurgeOlderThan string
28+
flagPurgeAll bool
29+
flagPurgeConn *connFlag
30+
flagPurgeForce bool
31+
flagPurgeFilterStatus string
32+
schedulerNamespace string
3033
)
3134

3235
var PurgeCmd = &cobra.Command{
@@ -41,6 +44,9 @@ var PurgeCmd = &cobra.Command{
4144
return errors.New("no arguments are accepted when using purge all flags")
4245
}
4346
default:
47+
if cmd.Flags().Changed("all-filter-status") {
48+
return errors.New("--all-filter-status can only be used with --all-older-than")
49+
}
4450
if len(args) == 0 {
4551
return errors.New("one or more workflow instance ID arguments are required when not using purge all flags")
4652
}
@@ -75,14 +81,38 @@ var PurgeCmd = &cobra.Command{
7581
}
7682
}
7783

84+
if cmd.Flags().Changed("all-filter-status") {
85+
opts.AllFilterStatus = &flagPurgeFilterStatus
86+
}
87+
7888
return workflow.Purge(ctx, opts)
7989
},
8090
}
8191

92+
var purgeFilterStatuses = workflow.RuntimeStatuses
93+
8294
func init() {
8395
PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.")
8496
PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.")
97+
PurgeCmd.Flags().StringVar(&flagPurgeFilterStatus, "all-filter-status", "", "Filter purge to only workflow instances with the given runtime status. Must be used with --all-older-than. One of "+strings.Join(purgeFilterStatuses, ", "))
8598
PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all")
99+
PurgeCmd.MarkFlagsMutuallyExclusive("all-filter-status", "all")
100+
101+
pre := PurgeCmd.PreRunE
102+
PurgeCmd.PreRunE = func(cmd *cobra.Command, args []string) error {
103+
if cmd.Flags().Changed("all-filter-status") {
104+
if !slices.Contains(purgeFilterStatuses, flagPurgeFilterStatus) {
105+
return errors.New("invalid value for --all-filter-status. Supported values are " + strings.Join(purgeFilterStatuses, ", "))
106+
}
107+
if !slices.Contains(workflow.TerminalStatuses, flagPurgeFilterStatus) && !flagPurgeForce {
108+
return errors.New("--force is required when using --all-filter-status with a non-terminal status (" + flagPurgeFilterStatus + ")")
109+
}
110+
}
111+
if pre != nil {
112+
return pre(cmd, args)
113+
}
114+
return nil
115+
}
86116
PurgeCmd.Flags().BoolVar(&flagPurgeForce, "force", false, "force will force a purge of a workflow, regardless of its current runtime state, or whether an active worker can process it, the backend will attempt to delete it anyway. This necessarily means the purging is executed out side of the workflow state machine, and therefore, can lead to corrupt state or broken workflow execution. Usage of this should _only_ be used when you know the workflow is not being currently processed. It is highly recommended to avoid using this flag unless absolutely necessary.")
87117

88118
PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set")

cmd/workflow/purge_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Copyright 2026 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"testing"
18+
19+
"github.com/dapr/cli/pkg/workflow"
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestPurgeFilterStatuses(t *testing.T) {
25+
assert.Equal(t, workflow.RuntimeStatuses, purgeFilterStatuses)
26+
}
27+
28+
func TestPurgeCmdFlags(t *testing.T) {
29+
t.Run("all-filter-status flag is registered", func(t *testing.T) {
30+
f := PurgeCmd.Flags().Lookup("all-filter-status")
31+
assert.NotNil(t, f)
32+
assert.Equal(t, "string", f.Value.Type())
33+
assert.Contains(t, f.Usage, "Must be used with --all-older-than")
34+
})
35+
36+
t.Run("all-filter-status and all are mutually exclusive", func(t *testing.T) {
37+
WorkflowCmd.SetArgs([]string{"purge", "--all", "--all-filter-status", "COMPLETED"})
38+
err := WorkflowCmd.Execute()
39+
require.Error(t, err)
40+
assert.Contains(t, err.Error(), "if any flags in the group [all-filter-status all] are set none of the others can be")
41+
})
42+
43+
t.Run("all-older-than flag is registered", func(t *testing.T) {
44+
f := PurgeCmd.Flags().Lookup("all-older-than")
45+
assert.NotNil(t, f)
46+
})
47+
48+
t.Run("non-terminal status without force errors", func(t *testing.T) {
49+
WorkflowCmd.SetArgs([]string{"purge", "--all-older-than", "1s", "--all-filter-status", "RUNNING"})
50+
err := WorkflowCmd.Execute()
51+
require.Error(t, err)
52+
assert.Contains(t, err.Error(), "--force is required when using --all-filter-status with a non-terminal status")
53+
})
54+
}

cmd/workflow/workflow.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,7 @@ func filterCmd(cmd *cobra.Command) *workflow.Filter {
148148
status string
149149
maxAge string
150150

151-
listStatuses = []string{
152-
"RUNNING",
153-
"COMPLETED",
154-
"CONTINUED_AS_NEW",
155-
"FAILED",
156-
"CANCELED",
157-
"TERMINATED",
158-
"PENDING",
159-
"SUSPENDED",
160-
}
151+
listStatuses = workflow.RuntimeStatuses
161152
)
162153

163154
cmd.Flags().StringVarP(&name, "filter-name", "w", "", "Filter only the workflows with the given name")

pkg/workflow/list.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,28 @@ type Filter struct {
4444
Terminal bool
4545
}
4646

47+
// RuntimeStatuses is the canonical list of workflow runtime statuses.
48+
var RuntimeStatuses = []string{
49+
"RUNNING",
50+
"COMPLETED",
51+
"CONTINUED_AS_NEW",
52+
"FAILED",
53+
"CANCELED",
54+
"TERMINATED",
55+
"PENDING",
56+
"SUSPENDED",
57+
}
58+
59+
// TerminalStatuses is the subset of RuntimeStatuses that represent terminal
60+
// (completed) workflow states.
61+
var TerminalStatuses = []string{
62+
"COMPLETED",
63+
"CONTINUED_AS_NEW",
64+
"FAILED",
65+
"CANCELED",
66+
"TERMINATED",
67+
}
68+
4769
type ListOutputShort struct {
4870
Namespace string `csv:"-" json:"namespace" yaml:"namespace"`
4971
AppID string `csv:"-" json:"appID" yaml:"appID"`

pkg/workflow/purge.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,36 @@ type PurgeOptions struct {
3232
AppID string
3333
InstanceIDs []string
3434
AllOlderThan *time.Time
35+
AllFilterStatus *string
3536
All bool
3637
Force bool
3738

3839
ConnectionString *string
3940
TableName *string
4041
}
4142

43+
// BuildPurgeFilter constructs the Filter used when listing workflow instances
44+
// for bulk purge. When AllFilterStatus is set, it filters by that status
45+
// instead of using the default terminal-only filter.
46+
func BuildPurgeFilter(allFilterStatus *string) Filter {
47+
filter := Filter{
48+
Terminal: true,
49+
}
50+
if allFilterStatus != nil {
51+
filter.Terminal = false
52+
filter.Status = allFilterStatus
53+
}
54+
return filter
55+
}
56+
4257
func Purge(ctx context.Context, opts PurgeOptions) error {
4358
var toPurge []string
4459

4560
if len(opts.InstanceIDs) > 0 {
4661
toPurge = opts.InstanceIDs
4762
} else {
63+
filter := BuildPurgeFilter(opts.AllFilterStatus)
64+
4865
var list []*ListOutputWide
4966
var err error
5067
list, err = ListWide(ctx, ListOptions{
@@ -53,9 +70,7 @@ func Purge(ctx context.Context, opts PurgeOptions) error {
5370
AppID: opts.AppID,
5471
ConnectionString: opts.ConnectionString,
5572
TableName: opts.TableName,
56-
Filter: Filter{
57-
Terminal: true,
58-
},
73+
Filter: filter,
5974
})
6075
if err != nil {
6176
return err

pkg/workflow/purge_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Copyright 2026 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"testing"
18+
19+
"github.com/dapr/kit/ptr"
20+
"github.com/stretchr/testify/assert"
21+
)
22+
23+
func TestBuildPurgeFilter(t *testing.T) {
24+
t.Run("nil status uses terminal filter", func(t *testing.T) {
25+
filter := BuildPurgeFilter(nil)
26+
assert.True(t, filter.Terminal)
27+
assert.Nil(t, filter.Status)
28+
})
29+
30+
t.Run("with status uses status filter instead of terminal", func(t *testing.T) {
31+
filter := BuildPurgeFilter(ptr.Of("FAILED"))
32+
assert.False(t, filter.Terminal)
33+
assert.NotNil(t, filter.Status)
34+
assert.Equal(t, "FAILED", *filter.Status)
35+
})
36+
37+
t.Run("with COMPLETED status", func(t *testing.T) {
38+
filter := BuildPurgeFilter(ptr.Of("COMPLETED"))
39+
assert.False(t, filter.Terminal)
40+
assert.Equal(t, "COMPLETED", *filter.Status)
41+
})
42+
43+
t.Run("with RUNNING status", func(t *testing.T) {
44+
filter := BuildPurgeFilter(ptr.Of("RUNNING"))
45+
assert.False(t, filter.Terminal)
46+
assert.Equal(t, "RUNNING", *filter.Status)
47+
})
48+
}

0 commit comments

Comments
 (0)