Skip to content

Commit f2213e5

Browse files
authored
csbufio: Respect contexts in NewReader/NewWriter (lytics/lio#28503) (PR #100)
1 parent b80c772 commit f2213e5

8 files changed

Lines changed: 167 additions & 33 deletions

File tree

awss3/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metada
450450
uploader := s3manager.NewUploader(f.sess)
451451

452452
pr, pw := io.Pipe()
453-
bw := csbufio.NewWriter(pw)
453+
bw := csbufio.NewWriter(ctx, pw)
454454

455455
go func() {
456456
// TODO: this needs to be managed, ie shutdown signals, close, handler err etc.

csbufio/reader.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,39 @@ package csbufio
22

33
import (
44
"bufio"
5+
"context"
56
"io"
67
"os"
78
)
89

9-
func OpenReader(name string) (io.ReadCloser, error) {
10+
func OpenReader(ctx context.Context, name string) (io.ReadCloser, error) {
1011
f, err := os.Open(name)
1112
if err != nil {
1213
return nil, err
1314
}
14-
return NewReader(f), nil
15+
return NewReader(ctx, f), nil
1516
}
1617

17-
func NewReader(rc io.ReadCloser) io.ReadCloser {
18-
return bufReadCloser{bufio.NewReader(rc), rc}
18+
func NewReader(ctx context.Context, rc io.ReadCloser) io.ReadCloser {
19+
return &bufReadCloser{ctx, bufio.NewReader(rc), rc}
1920
}
2021

2122
type bufReadCloser struct {
22-
io.Reader
23-
c io.Closer
23+
ctx context.Context
24+
r io.Reader
25+
c io.Closer
2426
}
2527

26-
func (bc bufReadCloser) Close() error { return bc.c.Close() }
28+
func (b *bufReadCloser) Read(p []byte) (int, error) {
29+
if err := b.ctx.Err(); err != nil {
30+
return 0, err
31+
}
32+
return b.r.Read(p)
33+
}
34+
35+
func (b *bufReadCloser) Close() error {
36+
if err := b.ctx.Err(); err != nil {
37+
return err
38+
}
39+
return b.c.Close()
40+
}

csbufio/reader_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package csbufio
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestReaderContextDone(t *testing.T) {
11+
t.Parallel()
12+
13+
ctx, cancel := context.WithCancel(context.Background())
14+
cancel()
15+
16+
m := memRWC([]byte("some-data"))
17+
rc := NewReader(ctx, &m)
18+
19+
var p []byte
20+
n, err := rc.Read(p)
21+
require.ErrorIs(t, err, context.Canceled)
22+
require.Equal(t, 0, n)
23+
require.Len(t, p, 0)
24+
25+
err = rc.Close()
26+
require.ErrorIs(t, err, context.Canceled)
27+
}
28+
29+
type memRWC []byte
30+
31+
func (m memRWC) Read(p []byte) (int, error) {
32+
n := len(p)
33+
if n > len(m) {
34+
n = len(m)
35+
}
36+
copy(p, m)
37+
return n, nil
38+
}
39+
40+
func (m *memRWC) Write(p []byte) (int, error) {
41+
*m = append(*m, p...)
42+
return len(p), nil
43+
}
44+
45+
func (m memRWC) Close() error {
46+
return nil
47+
}

csbufio/writer.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,43 @@ package csbufio
22

33
import (
44
"bufio"
5+
"context"
56
"io"
67
"os"
78
)
89

9-
type (
10-
bufWriteCloser struct {
11-
*bufio.Writer
12-
c io.Closer
13-
}
14-
)
10+
type bufWriteCloser struct {
11+
ctx context.Context
12+
w *bufio.Writer
13+
c io.Closer
14+
}
1515

16-
func OpenWriter(name string) (io.WriteCloser, error) {
16+
func OpenWriter(ctx context.Context, name string) (io.WriteCloser, error) {
1717
f, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0665)
1818
if err != nil {
1919
return nil, err
2020
}
21-
return NewWriter(f), nil
21+
return NewWriter(ctx, f), nil
2222
}
2323

2424
// NewWriter is a io.WriteCloser.
25-
func NewWriter(rc io.WriteCloser) io.WriteCloser {
26-
return bufWriteCloser{bufio.NewWriter(rc), rc}
25+
func NewWriter(ctx context.Context, rc io.WriteCloser) io.WriteCloser {
26+
return &bufWriteCloser{ctx, bufio.NewWriter(rc), rc}
27+
}
28+
29+
func (b *bufWriteCloser) Write(p []byte) (int, error) {
30+
if err := b.ctx.Err(); err != nil {
31+
return 0, err
32+
}
33+
return b.w.Write(p)
2734
}
2835

29-
func (bc bufWriteCloser) Close() error {
30-
if err := bc.Flush(); err != nil {
36+
func (b *bufWriteCloser) Close() error {
37+
if err := b.ctx.Err(); err != nil {
38+
return err
39+
}
40+
if err := b.w.Flush(); err != nil {
3141
return err
3242
}
33-
return bc.c.Close()
43+
return b.c.Close()
3444
}

csbufio/writer_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package csbufio
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestWriterContextDone(t *testing.T) {
11+
t.Parallel()
12+
13+
ctx, cancel := context.WithCancel(context.Background())
14+
cancel()
15+
16+
var m memRWC
17+
wc := NewWriter(ctx, &m)
18+
19+
n, err := wc.Write([]byte("some-data"))
20+
require.ErrorIs(t, err, context.Canceled)
21+
require.Equal(t, 0, n)
22+
require.Len(t, m, 0)
23+
24+
err = wc.Close()
25+
require.ErrorIs(t, err, context.Canceled)
26+
}

go.mod

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,54 @@
11
module github.com/lytics/cloudstorage
22

3-
go 1.15
3+
go 1.18
44

55
require (
66
cloud.google.com/go/storage v1.15.0
77
github.com/Azure/azure-sdk-for-go v40.5.0+incompatible
8-
github.com/Azure/go-autorest/autorest v0.11.10 // indirect
9-
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
108
github.com/araddon/gou v0.0.0-20190110011759-c797efecbb61
119
github.com/aws/aws-sdk-go v1.29.34
12-
github.com/dnaeon/go-vcr v1.1.0 // indirect
1310
github.com/pborman/uuid v1.2.1
1411
github.com/pkg/sftp v1.11.0
15-
github.com/satori/go.uuid v1.2.0 // indirect
16-
github.com/stretchr/testify v1.6.1
12+
github.com/stretchr/testify v1.8.0
1713
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897
1814
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4
1915
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78
2016
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
2117
google.golang.org/api v0.45.0
2218
)
19+
20+
require (
21+
cloud.google.com/go v0.81.0 // indirect
22+
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
23+
github.com/Azure/go-autorest/autorest v0.11.10 // indirect
24+
github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect
25+
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
26+
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
27+
github.com/Azure/go-autorest/logger v0.2.0 // indirect
28+
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
29+
github.com/davecgh/go-spew v1.1.1 // indirect
30+
github.com/dnaeon/go-vcr v1.1.0 // indirect
31+
github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
32+
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
33+
github.com/golang/protobuf v1.5.2 // indirect
34+
github.com/google/uuid v1.1.2 // indirect
35+
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
36+
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
37+
github.com/jstemmer/go-junit-report v0.9.1 // indirect
38+
github.com/kr/fs v0.1.0 // indirect
39+
github.com/pkg/errors v0.9.1 // indirect
40+
github.com/pmezard/go-difflib v1.0.0 // indirect
41+
github.com/satori/go.uuid v1.2.0 // indirect
42+
go.opencensus.io v0.23.0 // indirect
43+
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
44+
golang.org/x/mod v0.4.1 // indirect
45+
golang.org/x/sys v0.0.0-20210412220455-f1c623a9e750 // indirect
46+
golang.org/x/text v0.3.5 // indirect
47+
golang.org/x/tools v0.1.0 // indirect
48+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
49+
google.golang.org/appengine v1.6.7 // indirect
50+
google.golang.org/genproto v0.0.0-20210420162539-3c870d7478d2 // indirect
51+
google.golang.org/grpc v1.37.0 // indirect
52+
google.golang.org/protobuf v1.26.0 // indirect
53+
gopkg.in/yaml.v3 v3.0.1 // indirect
54+
)

go.sum

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
7171
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
7272
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
7373
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
74-
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
7574
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
75+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
76+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
7677
github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c=
7778
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
7879
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -188,10 +189,13 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
188189
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
189190
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
190191
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
192+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
191193
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
192194
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
193-
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
194195
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
196+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
197+
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
198+
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
195199
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
196200
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
197201
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -521,8 +525,9 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
521525
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
522526
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
523527
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
524-
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
525528
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
529+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
530+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
526531
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
527532
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
528533
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

localfs/store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,14 +237,13 @@ func (l *LocalStore) NewReaderWithContext(ctx context.Context, o string) (io.Rea
237237
if err != nil {
238238
return nil, err
239239
}
240-
return csbufio.OpenReader(fo)
240+
return csbufio.OpenReader(ctx, fo)
241241
}
242242

243243
func (l *LocalStore) NewWriter(o string, metadata map[string]string) (io.WriteCloser, error) {
244244
return l.NewWriterWithContext(context.Background(), o, metadata)
245245
}
246246
func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) {
247-
248247
fo := path.Join(l.storepath, o)
249248

250249
err := cloudstorage.EnsureDir(fo)
@@ -269,7 +268,8 @@ func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadat
269268
if err != nil {
270269
return nil, err
271270
}
272-
return csbufio.NewWriter(f), nil
271+
272+
return csbufio.NewWriter(ctx, f), nil
273273
}
274274

275275
func (l *LocalStore) Get(ctx context.Context, o string) (cloudstorage.Object, error) {

0 commit comments

Comments
 (0)