Skip to content

Commit e6bb80d

Browse files
[rayci] Upload pipeline in batches when job count exceeds Buildkite limit (#483)
Buildkite rejects pipeline uploads with more than 500 jobs > buildkite-agent: fatal: Failed to upload and process pipeline: Pipeline upload rejected: The number of jobs in this upload exceeds your organization limit of 500. Please break the upload into batches below this limit, or contact support to discuss an increase When the pipeline exceeds this limit (counting parallelism-expanded jobs), split it into batches by group and upload each batch separately. If any single group exceeds 500 jobs, error with a message to split the group. Topic: upload-batches Signed-off-by: andrew <andrew@anyscale.com> Signed-off-by: andrew <andrew@anyscale.com>
1 parent 1823368 commit e6bb80d

3 files changed

Lines changed: 219 additions & 5 deletions

File tree

raycicmd/bk_pipeline.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,61 @@ func (p *bkPipeline) totalSteps() int {
3030
return total
3131
}
3232

33+
func groupJobCount(g *bkPipelineGroup) int {
34+
total := 0
35+
for _, s := range g.Steps {
36+
m, ok := s.(map[string]any)
37+
if !ok {
38+
total++
39+
continue
40+
}
41+
if p, ok := intInMap(m, "parallelism"); ok && p > 1 {
42+
total += p
43+
} else {
44+
total++
45+
}
46+
}
47+
return total
48+
}
49+
50+
func (p *bkPipeline) totalJobs() int {
51+
total := 0
52+
for _, group := range p.Steps {
53+
total += groupJobCount(group)
54+
}
55+
return total
56+
}
57+
58+
func (p *bkPipeline) splitIntoBatches(
59+
limit int,
60+
) ([]*bkPipeline, error) {
61+
cur := &bkPipeline{Notify: p.Notify}
62+
batches := []*bkPipeline{cur}
63+
curCount := 0
64+
65+
for _, group := range p.Steps {
66+
n := groupJobCount(group)
67+
if n > limit {
68+
return nil, fmt.Errorf(
69+
"group %q has %d jobs, exceeding the limit of %d; "+
70+
"split this group into smaller groups",
71+
group.Group, n, limit,
72+
)
73+
}
74+
75+
if curCount+n > limit {
76+
cur = &bkPipeline{}
77+
batches = append(batches, cur)
78+
curCount = 0
79+
}
80+
81+
cur.Steps = append(cur.Steps, group)
82+
curCount += n
83+
}
84+
85+
return batches, nil
86+
}
87+
3388
func newBkAgents(queue string) map[string]any {
3489
return map[string]any{"queue": queue}
3590
}

raycicmd/bk_pipeline_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,148 @@ func TestMakeRayDockerPlugin_mountSSHAgent(t *testing.T) {
3232
})
3333
}
3434

35+
func makeGroup(name string, n int) *bkPipelineGroup {
36+
steps := make([]any, n)
37+
for i := range steps {
38+
steps[i] = map[string]any{"command": "echo"}
39+
}
40+
return &bkPipelineGroup{Group: name, Steps: steps}
41+
}
42+
43+
func TestSplitIntoBatches(t *testing.T) {
44+
t.Run("all groups fit in one batch", func(t *testing.T) {
45+
p := &bkPipeline{
46+
Steps: []*bkPipelineGroup{
47+
makeGroup("a", 100),
48+
makeGroup("b", 200),
49+
},
50+
Notify: []*bkNotify{{Email: "a@b.com"}},
51+
}
52+
batches, err := p.splitIntoBatches(500)
53+
if err != nil {
54+
t.Fatalf("splitIntoBatches() error = %v", err)
55+
}
56+
if got := len(batches); got != 1 {
57+
t.Fatalf("len(batches) = %d, want 1", got)
58+
}
59+
if got := batches[0].totalJobs(); got != 300 {
60+
t.Errorf("batch[0].totalSteps() = %d, want 300", got)
61+
}
62+
if len(batches[0].Notify) != 1 {
63+
t.Errorf("batch[0].Notify = %v, want 1 entry", batches[0].Notify)
64+
}
65+
})
66+
67+
t.Run("splits across multiple batches", func(t *testing.T) {
68+
p := &bkPipeline{
69+
Steps: []*bkPipelineGroup{
70+
makeGroup("a", 300),
71+
makeGroup("b", 300),
72+
makeGroup("c", 100),
73+
},
74+
Notify: []*bkNotify{{Email: "a@b.com"}},
75+
}
76+
batches, err := p.splitIntoBatches(500)
77+
if err != nil {
78+
t.Fatalf("splitIntoBatches() error = %v", err)
79+
}
80+
if got := len(batches); got != 2 {
81+
t.Fatalf("len(batches) = %d, want 2", got)
82+
}
83+
if got := batches[0].totalJobs(); got != 300 {
84+
t.Errorf("batch[0].totalSteps() = %d, want 300", got)
85+
}
86+
if got := batches[1].totalJobs(); got != 400 {
87+
t.Errorf("batch[1].totalSteps() = %d, want 400", got)
88+
}
89+
if len(batches[0].Notify) != 1 {
90+
t.Errorf("batch[0] should have Notify")
91+
}
92+
if batches[1].Notify != nil {
93+
t.Errorf("batch[1] should not have Notify")
94+
}
95+
})
96+
97+
t.Run("group exceeds limit", func(t *testing.T) {
98+
p := &bkPipeline{
99+
Steps: []*bkPipelineGroup{
100+
makeGroup("big", 501),
101+
},
102+
}
103+
_, err := p.splitIntoBatches(500)
104+
if err == nil {
105+
t.Fatal("splitIntoBatches() expected error for oversized group")
106+
}
107+
})
108+
109+
t.Run("empty pipeline", func(t *testing.T) {
110+
p := &bkPipeline{}
111+
batches, err := p.splitIntoBatches(500)
112+
if err != nil {
113+
t.Fatalf("splitIntoBatches() error = %v", err)
114+
}
115+
if got := len(batches); got != 1 {
116+
t.Fatalf("len(batches) = %d, want 1", got)
117+
}
118+
})
119+
120+
t.Run("parallelism counted as jobs", func(t *testing.T) {
121+
p := &bkPipeline{
122+
Steps: []*bkPipelineGroup{
123+
{
124+
Group: "a",
125+
Steps: []any{
126+
map[string]any{"command": "echo", "parallelism": 4},
127+
map[string]any{"command": "echo"},
128+
},
129+
},
130+
{
131+
Group: "b",
132+
Steps: []any{
133+
map[string]any{"command": "echo", "parallelism": 3},
134+
},
135+
},
136+
},
137+
}
138+
// Group a = 4 + 1 = 5 jobs, group b = 3 jobs, total = 8
139+
batches, err := p.splitIntoBatches(7)
140+
if err != nil {
141+
t.Fatalf("splitIntoBatches() error = %v", err)
142+
}
143+
if got := len(batches); got != 2 {
144+
t.Fatalf("len(batches) = %d, want 2", got)
145+
}
146+
if got := batches[0].totalJobs(); got != 5 {
147+
t.Errorf("batch[0].totalJobs() = %d, want 5", got)
148+
}
149+
if got := batches[1].totalJobs(); got != 3 {
150+
t.Errorf("batch[1].totalJobs() = %d, want 3", got)
151+
}
152+
})
153+
154+
t.Run("exact limit boundary", func(t *testing.T) {
155+
p := &bkPipeline{
156+
Steps: []*bkPipelineGroup{
157+
makeGroup("a", 500),
158+
makeGroup("b", 1),
159+
},
160+
}
161+
batches, err := p.splitIntoBatches(500)
162+
if err != nil {
163+
t.Fatalf("splitIntoBatches() error = %v", err)
164+
}
165+
if got := len(batches); got != 2 {
166+
t.Fatalf("len(batches) = %d, want 2", got)
167+
}
168+
if got := batches[0].totalJobs(); got != 500 {
169+
t.Errorf("batch[0].totalSteps() = %d, want 500", got)
170+
}
171+
if got := batches[1].totalJobs(); got != 1 {
172+
t.Errorf("batch[1].totalSteps() = %d, want 1", got)
173+
}
174+
})
175+
}
176+
35177
func TestBkPipelineTotalSteps(t *testing.T) {
36178
p := &bkPipeline{
37179
Steps: []*bkPipelineGroup{

raycicmd/main.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,30 @@ func Main(args []string, envs Envs) error {
239239
}
240240
}
241241
} else {
242-
// Prints out the pipeline content to logs.
243-
log.Printf("%s", bs)
242+
const maxUploadJobs = 500
243+
batches, err := pipeline.splitIntoBatches(maxUploadJobs)
244+
if err != nil {
245+
return fmt.Errorf("split pipeline into batches: %w", err)
246+
}
244247

245-
args := []string{"pipeline", "upload"}
246248
agent := flags.BuildkiteAgent
247-
if err := execWithInput(agent, args, bs, nil); err != nil {
248-
return fmt.Errorf("upload pipeline: %w", err)
249+
args := []string{"pipeline", "upload"}
250+
for i, batch := range batches {
251+
bs, err := yaml.Marshal(batch)
252+
if err != nil {
253+
return fmt.Errorf("marshal batch %d: %w", i, err)
254+
}
255+
if len(batches) > 1 {
256+
log.Printf("uploading batch %d/%d (%d jobs)",
257+
i+1, len(batches), batch.totalJobs())
258+
}
259+
log.Printf("%s", bs)
260+
261+
if err := execWithInput(agent, args, bs, nil); err != nil {
262+
return fmt.Errorf(
263+
"upload batch %d/%d: %w", i+1, len(batches), err,
264+
)
265+
}
249266
}
250267
}
251268

0 commit comments

Comments
 (0)