Skip to content

Commit b97f38b

Browse files
mhaggerznull
authored andcommitted
Stage: change the interface to make stdin/stdout handling more flexible
The old `Stage` interface, and in particular its `Start()` method, was not ideal. `Start()` was responsible for creating its own stdout, without knowledge of what will be consuming it. In practice, there are only two main stages: * `commandStage` ultimately runs a subprocess, which needs an `*os.File` as both stdin and stdout. The old code created its stdout using `cmd.StdoutPipe()`, which creates an `*os.File`. * `goStage` runs a Go function, which should be happy with any kind of `io.ReadCloser` / `io.WriteCloser` for its stdin and stdout. The old code created its stdout using `io.Pipe()`, which _doesn't_ return an `*os.File`. There are some scenarios where the old behavior was not ideal: 1. If a `goStage` was followed by a `commandStage`, the `commandStage` would had to consume the non-`*os.File` stdin that was created by the former. But since an external command requires an `*os.File`, `exec.Cmd` had to create an `os.Pipe()` internally and create an extra goroutine to copy from the `io.Reader` to the pipe. This is not only wasteful, but also meant that the `goStage` was not informed when the subprocess terminated or closed its stdin. (For example, the copy goroutine could block waiting to read from the `io.Reader`.) 2. If `Pipeline.stdout` was set, then an extra stage was always needed to copy from the output of the last stage to `Pipeline.stdout`. But: * If the last stage was a `commandStage` and `Pipeline.stdout` was an `*os.File`, then this copy was unnecessary; the subprocess could instead have written directly to the corresponding file descriptor. This was wasteful, and also lead to cases where the subprocess couldn't detect that `Pipeline.stdout` had been closed. * If the last stage was a `goStage`, then the copy was also unnecessary; the stage could have written directly to `Pipeline.stdout` whatever its type. Problem (1) could have been fixed by changing `goStage` to always use `os.Pipe()` to create its stdout pipe. But that would be wasteful if two `goStage`s were adjacent, in which case they could use a cheaper `io.Pipe()` instead. And it wouldn't solve problem (2) at all. Both problems can only be solved by considering both the producer _and_ the consumer of the stdin and stdout of any stage. If either end is a `commandStage`, then it is preferable to us `os.Pipe()`. If both ends are `goStage`s, then it is preferable to use `io.Pipe()`. And if `Pipeline.Stdout` is set, the last stage should write directly into it whenever possible. This PR solves the problem by changing the `Stage` interface to add a `Preferences()` method and change the signature of the `Start()` method: Preferences() StagePreferences Start( ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser, ) error The first indicates what kind of stdin/stdout the stage prefers, and the second starts up the stage with a `stdin` and `stdout` that are provided by the caller, rather than letting the stage return its own stdout. Now, when a stage is added to a `Pipeline`, then `Pipeline.Start()` uses the first method to figure out what kind of pipes are preferred between this stage and its neighbors, then the second is called to start the stage with the preferred type of pipe if possible. It also passes `Pipeline.stdout` into the last stage rather than copying the data an extra time. Note that this is a backwards-incompatible change, and thus will require a change to v2. Any clients that implement their own `Stage` will have to change their stage to conform to the new interface. However, clients that only create stages using the functions in this package (e.g., `pipe.Command()`, `pipe.CommandStage()`, `pipe.Function()`, `pipe.LinewiseFunction()`, etc.) should continue to work without changes, since those functions' signatures haven't changed. Such clients will get the benefit of the new behavior. For example, the benchmarks `BenchmarkMoreDataBuffered` and `BenchmarkMoreDataUnbuffered` (admittedly, worst cases for the old code) are sped up by roughly 2.25x and 6.6x, respectively: ``` snare:~/github/proj/go-pipe/git(main-bench)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go goos: linux goarch: amd64 cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz BenchmarkSingleProgram-20 8497 1383275 ns/op BenchmarkTenPrograms-20 2186 5388075 ns/op BenchmarkTenFunctions-20 37605 324808 ns/op BenchmarkTenMixedStages-20 3380 3565218 ns/op BenchmarkMoreDataUnbuffered-20 25 423838490 ns/op BenchmarkMoreDataBuffered-20 44 261734773 ns/op PASS ok command-line-arguments 76.120s 172.91user 91.15system 1:16.56elapsed 344%CPU (0avgtext+0avgdata 114080maxresident)k 0inputs+7768outputs (40major+3819487minor)pagefaults 0swaps snare:~/github/proj/go-pipe/git(version-2)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go goos: linux goarch: amd64 cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz BenchmarkSingleProgram-20 8458 1366214 ns/op BenchmarkTenPrograms-20 2233 5296019 ns/op BenchmarkTenFunctions-20 42453 289761 ns/op BenchmarkTenMixedStages-20 3398 3497226 ns/op BenchmarkMoreDataUnbuffered-20 177 64410211 ns/op BenchmarkMoreDataBuffered-20 100 115728132 ns/op PASS ok command-line-arguments 82.751s 175.42user 142.81system 1:23.21elapsed 382%CPU (0avgtext+0avgdata 114080maxresident)k 0inputs+7776outputs (42major+3883888minor)pagefaults 0swaps ``` Also, look how much simpler `testMemoryLimit()` has become, since it doesn't need the awkward workaround that was previously required. In terms of backwards compatibility, some applications might notice a difference with the new pipe structure. The difference should usually be an improvement, for example lower resource consumption and less risk of deadlock. It is conceivable that some applications were in some way relying on the delayed completion of pipelines when an `io.Pipe` was closed, though I'm having trouble imagining scenarios like that in the real world. # Conflicts: # pipe/function.go # pipe/iocopier.go # pipe/memorylimit_test.go # pipe/pipeline.go
1 parent ff75205 commit b97f38b

11 files changed

Lines changed: 473 additions & 676 deletions

pipe/command.go

Lines changed: 80 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ var errProcessInfoMissing = errors.New("cmd.Process is nil")
2020
// commandStage is a pipeline `Stage` based on running an external
2121
// command and piping the data through its stdin and stdout.
2222
type commandStage struct {
23-
name string
24-
stdin io.Closer
25-
cmd *exec.Cmd
23+
name string
24+
cmd *exec.Cmd
25+
26+
// lateClosers is a list of things that have to be closed once the
27+
// command has finished.
28+
lateClosers []io.Closer
29+
2630
done chan struct{}
2731
wg errgroup.Group
2832
stderr bytes.Buffer
@@ -32,6 +36,10 @@ type commandStage struct {
3236
ctxErr atomic.Value
3337
}
3438

39+
var (
40+
_ Stage = (*commandStage)(nil)
41+
)
42+
3543
// Command returns a pipeline `Stage` based on the specified external
3644
// `command`, run with the given command-line `args`. Its stdin and
3745
// stdout are handled as usual, and its stderr is collected and
@@ -61,33 +69,80 @@ func (s *commandStage) Name() string {
6169
return s.name
6270
}
6371

72+
func (s *commandStage) Preferences() StagePreferences {
73+
prefs := StagePreferences{
74+
StdinPreference: IOPreferenceFile,
75+
StdoutPreference: IOPreferenceFile,
76+
}
77+
if s.cmd.Stdin != nil {
78+
prefs.StdinPreference = IOPreferenceNil
79+
}
80+
if s.cmd.Stdout != nil {
81+
prefs.StdoutPreference = IOPreferenceNil
82+
}
83+
84+
return prefs
85+
}
86+
6487
func (s *commandStage) Start(
65-
ctx context.Context, env Env, stdin io.ReadCloser,
66-
) (io.ReadCloser, error) {
88+
ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser,
89+
) error {
6790
if s.cmd.Dir == "" {
6891
s.cmd.Dir = env.Dir
6992
}
7093

7194
s.setupEnv(ctx, env)
7295

96+
// Things that have to be closed as soon as the command has
97+
// started:
98+
var earlyClosers []io.Closer
99+
100+
// See the type command for `Stage` and the long comment in
101+
// `Pipeline.WithStdin()` for the explanation of this unwrapping
102+
// and closing behavior.
103+
73104
if stdin != nil {
74-
// See the long comment in `Pipeline.Start()` for the
75-
// explanation of this special case.
76105
switch stdin := stdin.(type) {
77106
case readerNopCloser:
107+
// In this case, we shouldn't close it. But unwrap it for
108+
// efficiency's sake:
78109
s.cmd.Stdin = stdin.Reader
79110
case readerWriterToNopCloser:
111+
// In this case, we shouldn't close it. But unwrap it for
112+
// efficiency's sake:
80113
s.cmd.Stdin = stdin.Reader
114+
case *os.File:
115+
// In this case, we can close stdin as soon as the command
116+
// has started:
117+
s.cmd.Stdin = stdin
118+
earlyClosers = append(earlyClosers, stdin)
81119
default:
120+
// In this case, we need to close `stdin`, but we should
121+
// only do so after the command has finished:
82122
s.cmd.Stdin = stdin
123+
s.lateClosers = append(s.lateClosers, stdin)
83124
}
84-
// Also keep a copy so that we can close it when the command exits:
85-
s.stdin = stdin
86125
}
87126

88-
stdout, err := s.cmd.StdoutPipe()
89-
if err != nil {
90-
return nil, err
127+
if stdout != nil {
128+
// See the long comment in `Pipeline.Start()` for the
129+
// explanation of this special case.
130+
switch stdout := stdout.(type) {
131+
case writerNopCloser:
132+
// In this case, we shouldn't close it. But unwrap it for
133+
// efficiency's sake:
134+
s.cmd.Stdout = stdout.Writer
135+
case *os.File:
136+
// In this case, we can close stdout as soon as the command
137+
// has started:
138+
s.cmd.Stdout = stdout
139+
earlyClosers = append(earlyClosers, stdout)
140+
default:
141+
// In this case, we need to close `stdout`, but we should
142+
// only do so after the command has finished:
143+
s.cmd.Stdout = stdout
144+
s.lateClosers = append(s.lateClosers, stdout)
145+
}
91146
}
92147

93148
// If the caller hasn't arranged otherwise, read the command's
@@ -99,7 +154,7 @@ func (s *commandStage) Start(
99154
// can be sure.
100155
p, err := s.cmd.StderrPipe()
101156
if err != nil {
102-
return nil, err
157+
return err
103158
}
104159
s.wg.Go(func() error {
105160
_, err := io.Copy(&s.stderr, p)
@@ -116,7 +171,11 @@ func (s *commandStage) Start(
116171
s.runInOwnProcessGroup()
117172

118173
if err := s.cmd.Start(); err != nil {
119-
return nil, err
174+
return err
175+
}
176+
177+
for _, closer := range earlyClosers {
178+
_ = closer.Close()
120179
}
121180

122181
// Arrange for the process to be killed (gently) if the context
@@ -130,7 +189,7 @@ func (s *commandStage) Start(
130189
}
131190
}()
132191

133-
return stdout, nil
192+
return nil
134193
}
135194

136195
// setupEnv sets or modifies the environment that will be passed to
@@ -219,19 +278,18 @@ func (s *commandStage) Wait() error {
219278

220279
// Make sure that any stderr is copied before `s.cmd.Wait()`
221280
// closes the read end of the pipe:
222-
wErr := s.wg.Wait()
281+
wgErr := s.wg.Wait()
223282

224283
err := s.cmd.Wait()
225284
err = s.filterCmdError(err)
226285

227-
if err == nil && wErr != nil {
228-
err = wErr
286+
if err == nil && wgErr != nil {
287+
err = wgErr
229288
}
230289

231-
if s.stdin != nil {
232-
cErr := s.stdin.Close()
233-
if cErr != nil && err == nil {
234-
return cErr
290+
for _, closer := range s.lateClosers {
291+
if closeErr := closer.Close(); closeErr != nil && err == nil {
292+
err = closeErr
235293
}
236294
}
237295

pipe/command_nil_panic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestKillWithFailedStart(t *testing.T) {
3333

3434
stage := Command("/this/path/does/not/exist/invalid_command_12345")
3535

36-
_, err := stage.Start(ctx, Env{}, nil)
36+
err := stage.Start(ctx, Env{}, nil, nil)
3737
if err == nil {
3838
t.Fatal("Expected start to fail, but it succeeded")
3939
}

pipe/command_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ func TestCopyEnvWithOverride(t *testing.T) {
7878
for _, ex := range examples {
7979
t.Run(ex.label, func(t *testing.T) {
8080
assert.ElementsMatch(t, ex.expectedResult,
81-
copyEnvWithOverrides(ex.env, ex.overrides))
81+
copyEnvWithOverrides(ex.env, ex.overrides),
82+
)
8283
})
8384
}
8485
}

pipe/function.go

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
// StageFunc is a function that can be used to power a `goStage`. It
1010
// should read its input from `stdin` and write its output to
1111
// `stdout`. `stdin` and `stdout` will be closed automatically (if
12-
// necessary) once the function returns.
12+
// non-nil) once the function returns.
1313
//
1414
// Neither `stdin` nor `stdout` are necessarily buffered. If the
1515
// `StageFunc` requires buffering, it needs to arrange that itself.
@@ -32,57 +32,62 @@ func Function(name string, f StageFunc) Stage {
3232
// goStage is a `Stage` that does its work by running an arbitrary
3333
// `stageFunc` in a goroutine.
3434
type goStage struct {
35-
name string
36-
f StageFunc
37-
done chan struct{}
38-
err error
39-
panicHandler StagePanicHandler
35+
name string
36+
f StageFunc
37+
done chan struct{}
38+
err error
4039
}
4140

41+
var (
42+
_ Stage = (*goStage)(nil)
43+
)
44+
4245
func (s *goStage) Name() string {
4346
return s.name
4447
}
4548

46-
func (s *goStage) SetPanicHandler(ph StagePanicHandler) {
47-
s.panicHandler = ph
49+
func (s *goStage) Preferences() StagePreferences {
50+
return StagePreferences{
51+
StdinPreference: IOPreferenceUndefined,
52+
StdoutPreference: IOPreferenceUndefined,
53+
}
4854
}
4955

50-
func (s *goStage) Start(ctx context.Context, env Env, stdin io.ReadCloser) (io.ReadCloser, error) {
51-
r, w := io.Pipe()
56+
func (s *goStage) Start(
57+
ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser,
58+
) error {
59+
var r io.Reader = stdin
60+
if stdin, ok := stdin.(readerNopCloser); ok {
61+
r = stdin.Reader
62+
}
63+
64+
var w io.Writer = stdout
65+
if stdout, ok := stdout.(writerNopCloser); ok {
66+
w = stdout.Writer
67+
}
5268

5369
go func() {
54-
defer func() {
55-
// Cleanup resources on exit
56-
if err := w.Close(); err != nil && s.err == nil {
57-
s.err = fmt.Errorf("error closing output pipe for stage %q: %w", s.Name(), err)
58-
}
59-
if stdin != nil {
60-
if err := stdin.Close(); err != nil && s.err == nil {
61-
s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err)
62-
}
70+
s.err = s.f(ctx, env, r, w)
71+
72+
if stdout != nil {
73+
if err := stdout.Close(); err != nil && s.err == nil {
74+
s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err)
6375
}
64-
close(s.done)
65-
}()
76+
}
6677

67-
defer s.recoverPanic()
78+
if stdin != nil {
79+
if err := stdin.Close(); err != nil && s.err == nil {
80+
s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err)
81+
}
82+
}
6883

69-
s.err = s.f(ctx, env, stdin, w)
84+
close(s.done)
7085
}()
7186

72-
return r, nil
87+
return nil
7388
}
7489

7590
func (s *goStage) Wait() error {
7691
<-s.done
7792
return s.err
7893
}
79-
80-
func (s *goStage) recoverPanic() {
81-
if s.panicHandler == nil {
82-
return
83-
}
84-
85-
if p := recover(); p != nil {
86-
s.err = s.panicHandler(p)
87-
}
88-
}

0 commit comments

Comments
 (0)