Skip to content

Commit 5bfcff3

Browse files
cpuguy83runcom
authored andcommitted
Fix log readers can block writes indefinitely
Before this patch, a log reader is able to block all log writes indefinitely (and other operations) by simply opening the log stream and not consuming all the messages. The reason for this is we protect the read stream from corruption by ensuring there are no new writes while the log stream is consumed (and caught up with the live entries). We can get around this issue because log files are append only, so we can limit reads to only the section of the file that was written to when the log stream was first requested. Now logs are only blocked until all files are opened, rather than streamed to the client. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
1 parent 9201b5e commit 5bfcff3

2 files changed

Lines changed: 66 additions & 34 deletions

File tree

daemon/logger/jsonfilelog/jsonfilelog.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"bytes"
88
"encoding/json"
99
"fmt"
10+
"io"
1011
"strconv"
1112
"sync"
1213

@@ -15,19 +16,21 @@ import (
1516
"github.com/docker/docker/daemon/logger/loggerutils"
1617
"github.com/docker/docker/pkg/jsonlog"
1718
"github.com/docker/go-units"
19+
"github.com/pkg/errors"
1820
)
1921

2022
// Name is the name of the file that the jsonlogger logs to.
2123
const Name = "json-file"
2224

2325
// JSONFileLogger is Logger implementation for default Docker logging.
2426
type JSONFileLogger struct {
25-
buf *bytes.Buffer
27+
extra []byte // json-encoded extra attributes
28+
29+
mu sync.RWMutex
30+
buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
31+
closed bool
2632
writer *loggerutils.RotateFileWriter
27-
mu sync.Mutex
2833
readers map[*logger.LogWatcher]struct{} // stores the active log followers
29-
extra []byte // json-encoded extra attributes
30-
closed bool
3134
}
3235

3336
func init() {
@@ -86,33 +89,45 @@ func New(info logger.Info) (logger.Logger, error) {
8689

8790
// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
8891
func (l *JSONFileLogger) Log(msg *logger.Message) error {
92+
l.mu.Lock()
93+
err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
94+
l.buf.Reset()
95+
l.mu.Unlock()
96+
return err
97+
}
98+
99+
func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
100+
if err := marshalMessage(m, extra, buf); err != nil {
101+
logger.PutMessage(m)
102+
return err
103+
}
104+
logger.PutMessage(m)
105+
if _, err := w.Write(buf.Bytes()); err != nil {
106+
return errors.Wrap(err, "error writing log entry")
107+
}
108+
return nil
109+
}
110+
111+
func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
89112
timestamp, err := jsonlog.FastTimeMarshalJSON(msg.Timestamp)
90113
if err != nil {
91114
return err
92115
}
93-
l.mu.Lock()
94-
logline := msg.Line
116+
logLine := msg.Line
95117
if !msg.Partial {
96-
logline = append(msg.Line, '\n')
118+
logLine = append(msg.Line, '\n')
97119
}
98120
err = (&jsonlog.JSONLogs{
99-
Log: logline,
121+
Log: logLine,
100122
Stream: msg.Source,
101123
Created: timestamp,
102-
RawAttrs: l.extra,
103-
}).MarshalJSONBuf(l.buf)
104-
logger.PutMessage(msg)
124+
RawAttrs: extra,
125+
}).MarshalJSONBuf(buf)
105126
if err != nil {
106-
l.mu.Unlock()
107-
return err
127+
return errors.Wrap(err, "error writing log message to buffer")
108128
}
109-
110-
l.buf.WriteByte('\n')
111-
_, err = l.writer.Write(l.buf.Bytes())
112-
l.buf.Reset()
113-
l.mu.Unlock()
114-
115-
return err
129+
err = buf.WriteByte('\n')
130+
return errors.Wrap(err, "error finalizing log buffer")
116131
}
117132

118133
// ValidateLogOpt looks for json specific log options max-file & max-size.

daemon/logger/jsonfilelog/read.go

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package jsonfilelog
33
import (
44
"bytes"
55
"encoding/json"
6-
"errors"
76
"fmt"
87
"io"
98
"os"
@@ -18,6 +17,7 @@ import (
1817
"github.com/docker/docker/pkg/ioutils"
1918
"github.com/docker/docker/pkg/jsonlog"
2019
"github.com/docker/docker/pkg/tailfile"
20+
"github.com/pkg/errors"
2121
)
2222

2323
const maxJSONDecodeRetry = 20000
@@ -48,36 +48,48 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
4848
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
4949
defer close(logWatcher.Msg)
5050

51-
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we read
51+
// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
5252
// This will block writes!!!
53-
l.mu.Lock()
53+
l.mu.RLock()
5454

55+
// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
5556
pth := l.writer.LogPath()
5657
var files []io.ReadSeeker
5758
for i := l.writer.MaxFiles(); i > 1; i-- {
5859
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
5960
if err != nil {
6061
if !os.IsNotExist(err) {
6162
logWatcher.Err <- err
62-
break
63+
l.mu.RUnlock()
64+
return
6365
}
6466
continue
6567
}
6668
defer f.Close()
67-
6869
files = append(files, f)
6970
}
7071

7172
latestFile, err := os.Open(pth)
7273
if err != nil {
73-
logWatcher.Err <- err
74-
l.mu.Unlock()
74+
logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
75+
l.mu.RUnlock()
7576
return
7677
}
7778
defer latestFile.Close()
7879

80+
latestChunk, err := newSectionReader(latestFile)
81+
82+
// Now we have the reader sectioned, all fd's opened, we can unlock.
83+
// New writes/rotates will not affect seeking through these files
84+
l.mu.RUnlock()
85+
86+
if err != nil {
87+
logWatcher.Err <- err
88+
return
89+
}
90+
7991
if config.Tail != 0 {
80-
tailer := ioutils.MultiReadSeeker(append(files, latestFile)...)
92+
tailer := ioutils.MultiReadSeeker(append(files, latestChunk)...)
8193
tailFile(tailer, logWatcher, config.Tail, config.Since)
8294
}
8395

@@ -89,19 +101,14 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
89101
}
90102

91103
if !config.Follow || l.closed {
92-
l.mu.Unlock()
93104
return
94105
}
95106

96-
if config.Tail >= 0 {
97-
latestFile.Seek(0, os.SEEK_END)
98-
}
99-
100107
notifyRotate := l.writer.NotifyRotate()
101108
defer l.writer.NotifyRotateEvict(notifyRotate)
102109

110+
l.mu.Lock()
103111
l.readers[logWatcher] = struct{}{}
104-
105112
l.mu.Unlock()
106113

107114
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
@@ -111,6 +118,16 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
111118
l.mu.Unlock()
112119
}
113120

121+
func newSectionReader(f *os.File) (*io.SectionReader, error) {
122+
// seek to the end to get the size
123+
// we'll leave this at the end of the file since section reader does not advance the reader
124+
size, err := f.Seek(0, os.SEEK_END)
125+
if err != nil {
126+
return nil, errors.Wrap(err, "error getting current file size")
127+
}
128+
return io.NewSectionReader(f, 0, size), nil
129+
}
130+
114131
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
115132
var rdr io.Reader
116133
rdr = f

0 commit comments

Comments
 (0)