Skip to content

Commit a080d32

Browse files
mhaggerznull
authored andcommitted
Add some benchmarks of moving a bunch of data through a pipeline
Add some benchmarks that move MB-scale data through pipelines consisting of alternating commands and functions, one in small writes, and one buffered into larger writes, then processing it one line at a time. This is not so efficient, because every transition from `Function` → `Command` requires an extra (hidden) goroutine that copies the data from an `io.Reader` to a `*os.File`. We can make this faster!
1 parent 2b7f0a1 commit a080d32

1 file changed

Lines changed: 91 additions & 0 deletions

File tree

pipe/pipeline_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,97 @@ func BenchmarkTenMixedStages(b *testing.B) {
946946
}
947947
}
948948

949+
func BenchmarkMoreDataUnbuffered(b *testing.B) {
950+
ctx := context.Background()
951+
952+
cp := func(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error {
953+
_, err := io.Copy(stdout, stdin)
954+
return err
955+
}
956+
957+
for i := 0; i < b.N; i++ {
958+
count := 0
959+
p := pipe.New()
960+
p.Add(
961+
pipe.Function(
962+
"seq",
963+
func(ctx context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error {
964+
for i := 1; i <= 100000; i++ {
965+
fmt.Fprintln(stdout, i)
966+
}
967+
return nil
968+
},
969+
),
970+
pipe.Command("cat"),
971+
pipe.Function("copy2", cp),
972+
pipe.Command("cat"),
973+
pipe.Function("copy3", cp),
974+
pipe.Command("cat"),
975+
pipe.Function("copy4", cp),
976+
pipe.Command("cat"),
977+
pipe.Function("copy5", cp),
978+
pipe.Command("cat"),
979+
pipe.LinewiseFunction(
980+
"count",
981+
func(ctx context.Context, _ pipe.Env, line []byte, stdout *bufio.Writer) error {
982+
count++
983+
return nil
984+
},
985+
),
986+
)
987+
err := p.Run(ctx)
988+
if assert.NoError(b, err) {
989+
assert.EqualValues(b, 100000, count)
990+
}
991+
}
992+
}
993+
994+
func BenchmarkMoreDataBuffered(b *testing.B) {
995+
ctx := context.Background()
996+
997+
cp := func(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error {
998+
_, err := io.Copy(stdout, stdin)
999+
return err
1000+
}
1001+
1002+
for i := 0; i < b.N; i++ {
1003+
count := 0
1004+
p := pipe.New()
1005+
p.Add(
1006+
pipe.Function(
1007+
"seq",
1008+
func(ctx context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) error {
1009+
out := bufio.NewWriter(stdout)
1010+
for i := 1; i <= 1000000; i++ {
1011+
fmt.Fprintln(out, i)
1012+
}
1013+
return out.Flush()
1014+
},
1015+
),
1016+
pipe.Command("cat"),
1017+
pipe.Function("copy2", cp),
1018+
pipe.Command("cat"),
1019+
pipe.Function("copy3", cp),
1020+
pipe.Command("cat"),
1021+
pipe.Function("copy4", cp),
1022+
pipe.Command("cat"),
1023+
pipe.Function("copy5", cp),
1024+
pipe.Command("cat"),
1025+
pipe.LinewiseFunction(
1026+
"count",
1027+
func(ctx context.Context, _ pipe.Env, line []byte, stdout *bufio.Writer) error {
1028+
count++
1029+
return nil
1030+
},
1031+
),
1032+
)
1033+
err := p.Run(ctx)
1034+
if assert.NoError(b, err) {
1035+
assert.EqualValues(b, 1000000, count)
1036+
}
1037+
}
1038+
}
1039+
9491040
func genErr(err error) pipe.StageFunc {
9501041
return func(_ context.Context, _ pipe.Env, _ io.Reader, _ io.Writer) error {
9511042
return err

0 commit comments

Comments
 (0)