Skip to content

Commit 8ef44b4

Browse files
Add MemoryLimitWithObserver to combine limit and observer in one goroutine
MemoryLimitWithObserver provides the same functionality as composing MemoryLimit(MemoryObserver(stage, h), limit, h) but uses a single goroutine instead of two. The combined watcher polls GetRSSAnon once per second, enforces the byte limit, tracks peak RSS, and logs peak memory usage on exit. In gitrpcd, each upload-pack request wraps its git subprocess with MemoryLimit(MemoryObserver(...)), spawning two goroutines that both poll the same PID's RSS every second. During a goroutine spike on github-dfs-dfee0d0 (2026-03-20 ~08:04 UTC), this contributed ~415 goroutines out of 4,430 total. Using the combined API eliminates ~200 of those goroutines. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 903aad4 commit 8ef44b4

2 files changed

Lines changed: 209 additions & 4 deletions

File tree

pipe/memorylimit.go

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,102 @@ func killAtLimit(byteLimit uint64, eventHandler func(e *Event)) memoryWatchFunc
8989
}
9090
}
9191

92+
// MemoryLimitWithObserver combines MemoryLimit and MemoryObserver into a single
93+
// stage that uses one goroutine instead of two. It watches the memory usage of
94+
// the stage, kills the process if it exceeds byteLimit, and logs peak memory
95+
// usage when the stage exits.
96+
//
97+
// Use this instead of MemoryLimit(MemoryObserver(stage, h), limit, h) to save
98+
// one goroutine per pipeline stage.
99+
func MemoryLimitWithObserver(stage Stage, byteLimit uint64, eventHandler func(e *Event)) Stage {
100+
limitableStage, ok := stage.(LimitableStage)
101+
if !ok {
102+
eventHandler(&Event{
103+
Command: stage.Name(),
104+
Msg: "invalid pipe.MemoryLimitWithObserver usage",
105+
Err: fmt.Errorf("invalid pipe.MemoryLimitWithObserver usage"),
106+
})
107+
return stage
108+
}
109+
110+
return &memoryWatchStage{
111+
nameSuffix: " with memory limit",
112+
stage: limitableStage,
113+
watch: killAtLimitAndObserve(byteLimit, eventHandler),
114+
}
115+
}
116+
117+
func killAtLimitAndObserve(byteLimit uint64, eventHandler func(e *Event)) memoryWatchFunc {
118+
return func(ctx context.Context, stage LimitableStage) {
119+
var (
120+
maxRSS uint64
121+
samples, errCount, consecutiveErrors int
122+
killed bool
123+
)
124+
125+
t := time.NewTicker(memoryPollInterval)
126+
defer t.Stop()
127+
128+
for {
129+
select {
130+
case <-ctx.Done():
131+
eventHandler(&Event{
132+
Command: stage.Name(),
133+
Msg: "peak memory usage",
134+
Context: map[string]interface{}{
135+
"max_rss_bytes": maxRSS,
136+
"samples": samples,
137+
"errors": errCount,
138+
},
139+
})
140+
return
141+
case <-t.C:
142+
if killed {
143+
continue
144+
}
145+
146+
rss, err := stage.GetRSSAnon(ctx)
147+
if err != nil {
148+
if !errors.Is(err, errProcessInfoMissing) {
149+
errCount++
150+
consecutiveErrors++
151+
if consecutiveErrors == 2 {
152+
eventHandler(&Event{
153+
Command: stage.Name(),
154+
Msg: "error getting RSS",
155+
Err: err,
156+
})
157+
}
158+
} else {
159+
consecutiveErrors = 0
160+
}
161+
continue
162+
}
163+
164+
consecutiveErrors = 0
165+
samples++
166+
if rss > maxRSS {
167+
maxRSS = rss
168+
}
169+
170+
if rss >= byteLimit {
171+
eventHandler(&Event{
172+
Command: stage.Name(),
173+
Msg: "stage exceeded allowed memory use",
174+
Err: fmt.Errorf("stage exceeded allowed memory use"),
175+
Context: map[string]interface{}{
176+
"limit": byteLimit,
177+
"used": rss,
178+
},
179+
})
180+
stage.Kill(ErrMemoryLimitExceeded)
181+
killed = true
182+
}
183+
}
184+
}
185+
}
186+
}
187+
92188
// MemoryObserver watches memory use of the stage and logs the maximum
93189
// value when the stage exits.
94190
func MemoryObserver(stage Stage, eventHandler func(e *Event)) Stage {
@@ -194,11 +290,9 @@ func (m *memoryWatchStage) Start(ctx context.Context, env Env, stdin io.ReadClos
194290
}
195291

196292
func (m *memoryWatchStage) Wait() error {
197-
if err := m.stage.Wait(); err != nil {
198-
return err
199-
}
293+
err := m.stage.Wait()
200294
m.stopWatching()
201-
return nil
295+
return err
202296
}
203297

204298
func (m *memoryWatchStage) GetRSSAnon(ctx context.Context) (uint64, error) {

pipe/memorylimit_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,117 @@ func (w closeWrapper) Close() error {
121121
return w.close()
122122
}
123123

124+
func TestMemoryLimitWithObserverSimple(t *testing.T) {
125+
t.Parallel()
126+
msg, err := testMemoryLimitWithObserver(t, 400, 10_000_000, pipe.Command("less"))
127+
assert.Contains(t, msg, "exceeded allowed memory")
128+
assert.Contains(t, msg, "limit=10000000")
129+
require.ErrorContains(t, err, "memory limit exceeded")
130+
}
131+
132+
func TestMemoryLimitWithObserverTreeMem(t *testing.T) {
133+
t.Parallel()
134+
msg, err := testMemoryLimitWithObserver(t, 400, 10_000_000, pipe.Command("sh", "-c", "less; :"))
135+
assert.Contains(t, msg, "exceeded allowed memory")
136+
assert.Contains(t, msg, "limit=10000000")
137+
require.ErrorContains(t, err, "memory limit exceeded")
138+
}
139+
140+
func TestMemoryLimitWithObserverBelowLimit(t *testing.T) {
141+
t.Parallel()
142+
rss := testMemoryLimitWithObserverBelowLimit(t, 400, pipe.Command("less"))
143+
require.Greater(t, rss, 400_000_000)
144+
}
145+
146+
func TestMemoryLimitWithObserverBelowLimitTreeMem(t *testing.T) {
147+
t.Parallel()
148+
rss := testMemoryLimitWithObserverBelowLimit(t, 400, pipe.Command("sh", "-c", "less; :"))
149+
require.Greater(t, rss, 400_000_000)
150+
}
151+
152+
func TestMemoryLimitWithObserverLogsPeakOnKill(t *testing.T) {
153+
t.Parallel()
154+
msg, err := testMemoryLimitWithObserver(t, 400, 10_000_000, pipe.Command("less"))
155+
// Verify both limit-exceeded AND peak memory are logged (matching
156+
// the behavior of MemoryLimit(MemoryObserver(...)))
157+
assert.Contains(t, msg, "exceeded allowed memory")
158+
assert.Contains(t, msg, "peak memory usage")
159+
require.ErrorContains(t, err, "memory limit exceeded")
160+
}
161+
162+
func testMemoryLimitWithObserverBelowLimit(t *testing.T, mbs int, stage pipe.Stage) int {
163+
ctx := context.Background()
164+
165+
stdinReader, stdinWriter := io.Pipe()
166+
167+
devNull, err := os.OpenFile("/dev/null", os.O_WRONLY, 0)
168+
require.NoError(t, err)
169+
170+
buf := &bytes.Buffer{}
171+
logger := log.New(buf, "testMemoryLimitWithObserver", log.Ldate|log.Ltime)
172+
173+
// Use a high limit so it won't be hit — we want to verify the observer part
174+
p := pipe.New(pipe.WithDir("/"), pipe.WithStdin(stdinReader), pipe.WithStdout(devNull))
175+
p.Add(pipe.MemoryLimitWithObserver(stage, 100*1024*1024*1024, LogEventHandler(logger)))
176+
require.NoError(t, p.Start(ctx))
177+
178+
var bytes [1_000_000]byte
179+
for i := 0; i < mbs; i++ {
180+
n, err := stdinWriter.Write(bytes[:])
181+
require.NoError(t, err)
182+
require.Equal(t, len(bytes), n)
183+
}
184+
185+
time.Sleep(2 * time.Second)
186+
187+
require.NoError(t, stdinWriter.Close())
188+
require.NoError(t, p.Wait())
189+
190+
// Verify that peak memory usage was logged (the observer part)
191+
output := buf.String()
192+
assert.Contains(t, output, "peak memory usage")
193+
194+
return maxBytes(output)
195+
}
196+
197+
func testMemoryLimitWithObserver(t *testing.T, mbs int, limit uint64, stage pipe.Stage) (string, error) {
198+
ctx := context.Background()
199+
200+
stdinReader, stdinWriter := io.Pipe()
201+
202+
devNull, err := os.OpenFile("/dev/null", os.O_WRONLY, 0)
203+
require.NoError(t, err)
204+
205+
closedErr := fmt.Errorf("stdout was closed")
206+
stdout := closeWrapper{
207+
Writer: devNull,
208+
close: func() error {
209+
require.NoError(t, stdinReader.CloseWithError(closedErr))
210+
return nil
211+
},
212+
}
213+
214+
buf := &bytes.Buffer{}
215+
logger := log.New(buf, "testMemoryLimitWithObserver", log.Ldate|log.Ltime)
216+
217+
p := pipe.New(pipe.WithDir("/"), pipe.WithStdin(stdinReader), pipe.WithStdoutCloser(stdout))
218+
p.Add(pipe.MemoryLimitWithObserver(stage, limit, LogEventHandler(logger)))
219+
require.NoError(t, p.Start(ctx))
220+
221+
var bytes [1_000_000]byte
222+
for i := 0; i < mbs; i++ {
223+
_, err := stdinWriter.Write(bytes[:])
224+
if err != nil {
225+
require.ErrorIs(t, err, closedErr)
226+
}
227+
}
228+
229+
require.NoError(t, stdinWriter.Close())
230+
err = p.Wait()
231+
232+
return buf.String(), err
233+
}
234+
124235
func testMemoryLimit(t *testing.T, mbs int, limit uint64, stage pipe.Stage) (string, error) {
125236
ctx := context.Background()
126237

0 commit comments

Comments
 (0)